Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
support geo_point
  • Loading branch information
roy.lei committed Nov 26, 2018
commit cc342eb28e92dd7a3a332fc51a9cd308ec15665e
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ type = "t"

// If the created_time field type is "int", and you want to convert it to "date" type in es, you can do it as below
created_time=",date"

// This will map mysql lat, lon two field to es location field which is geo_point type.
lat = "location,geo_point.lat"
lon = "location,geo_point.lon"
```

Modifier "list" will translates a mysql string field like "a,b,c" on an elastic array type '{"a", "b", "c"}' this is specially useful if you need to use those fields on filtering on elasticsearch.
Expand Down
32 changes: 31 additions & 1 deletion river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ const (
// for the mysql int type to es date type
// set the [rule.field] created_time = ",date"
fieldTypeDate = "date"
// for the mysql lat, lon two field to es geo_point type
/*
set the [rule.field]
lat = "location,geo_point.lat"
lon = "location,geo_point.lon"
*/
fieldTypeGeopointLat = "geo_point.lat"
fieldTypeGeopointLon = "geo_point.lon"
)

const mysqlDateFormat = "2006-01-02"
Expand Down Expand Up @@ -385,7 +393,27 @@ func (r *River) makeInsertReqData(req *elastic.BulkRequest, rule *Rule, values [
mysql, elastic, fieldType := r.getFieldParts(k, v)
if mysql == c.Name {
mapped = true
req.Data[elastic] = r.getFieldValue(&c, fieldType, values[i])
if fieldType == fieldTypeGeopointLat {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not handing this in getFieldValue?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because getFieldValue has no access to already read field values and would overwrite the first part of the geo point.
for long and lat it would do:

  1. getFieldValue for longitude. returns map with { long: n }
  2. getFieldValue for latitude. return map with { lat: n }
  3. writing data would require to merge the both results, because we can't access the already processed data (or next)

if req.Data[elastic] == nil {
req.Data[elastic] = make(map[string]float64)
}
d := r.makeReqColumnData(&c, values[i])
if d == nil {
d = 0.0
}
req.Data[elastic].(map[string]float64)["lat"] = d.(float64)
} else if fieldType == fieldTypeGeopointLon {
if req.Data[elastic] == nil {
req.Data[elastic] = make(map[string]float64)
}
d := r.makeReqColumnData(&c, values[i])
if d == nil {
d = 0.0
}
req.Data[elastic].(map[string]float64)["lon"] = d.(float64)
} else {
req.Data[elastic] = r.getFieldValue(&c, fieldType, values[i])
}
}
}
if mapped == false {
Expand Down Expand Up @@ -515,6 +543,8 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
}
}
case fieldTypeGeopointLat, fieldTypeGeopointLon:
fieldValue = r.makeReqColumnData(col, value)
}

if fieldValue == nil {
Expand Down