diff --git a/README.md b/README.md index 4992d48b..6a3972bf 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,10 @@ type = "t" // If the created_time field type is "int", and you want to convert it to "date" type in es, you can do it as below created_time=",date" + + // This will map mysql lat, lon two field to es location field which is geo_point type. + lat = "location,geo_point.lat" + lon = "location,geo_point.lon" ``` Modifier "list" will translates a mysql string field like "a,b,c" on an elastic array type '{"a", "b", "c"}' this is specially useful if you need to use those fields on filtering on elasticsearch. diff --git a/river/sync.go b/river/sync.go index 3b3854ee..75ddd933 100644 --- a/river/sync.go +++ b/river/sync.go @@ -28,6 +28,16 @@ const ( // for the mysql int type to es date type // set the [rule.field] created_time = ",date" fieldTypeDate = "date" + // for the mysql lat, lon two field to es geo_point type + /* + set the [rule.field] + lat = "location,geo_point.lat" + lon = "location,geo_point.lon" + */ + fieldTypeGeopointLat = "geo_point.lat" + fieldTypeGeopointLon = "geo_point.lon" + + fieldTypeBoolean = "bool" ) const mysqlDateFormat = "2006-01-02" @@ -385,7 +395,27 @@ func (r *River) makeInsertReqData(req *elastic.BulkRequest, rule *Rule, values [ mysql, elastic, fieldType := r.getFieldParts(k, v) if mysql == c.Name { mapped = true - req.Data[elastic] = r.getFieldValue(&c, fieldType, values[i]) + if fieldType == fieldTypeGeopointLat { + if req.Data[elastic] == nil { + req.Data[elastic] = make(map[string]float64) + } + d := r.makeReqColumnData(&c, values[i]) + if d == nil { + d = 0.0 + } + req.Data[elastic].(map[string]float64)["lat"] = d.(float64) + } else if fieldType == fieldTypeGeopointLon { + if req.Data[elastic] == nil { + req.Data[elastic] = make(map[string]float64) + } + d := r.makeReqColumnData(&c, values[i]) + if d == nil { + d = 0.0 + } + req.Data[elastic].(map[string]float64)["lon"] = d.(float64) + } else { + req.Data[elastic] = r.getFieldValue(&c, fieldType, values[i]) + } } } if mapped == false { @@ -414,7 +444,27 @@ func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule, mysql, elastic, fieldType := r.getFieldParts(k, v) if mysql == c.Name { mapped = true - req.Data[elastic] = r.getFieldValue(&c, fieldType, afterValues[i]) + if fieldType == fieldTypeGeopointLat { + if req.Data[elastic] == nil { + req.Data[elastic] = make(map[string]float64) + } + d := r.makeReqColumnData(&c, afterValues[i]) + if d == nil { + d = 0.0 + } + req.Data[elastic].(map[string]float64)["lat"] = d.(float64) + } else if fieldType == fieldTypeGeopointLon { + if req.Data[elastic] == nil { + req.Data[elastic] = make(map[string]float64) + } + d := r.makeReqColumnData(&c, afterValues[i]) + if d == nil { + d = 0.0 + } + req.Data[elastic].(map[string]float64)["lon"] = d.(float64) + } else { + req.Data[elastic] = r.getFieldValue(&c, fieldType, afterValues[i]) + } } } if mapped == false { @@ -515,6 +565,9 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat)) } } + case fieldTypeBoolean: + v := value.(int8) + fieldValue = r.makeReqColumnData(col, v == 1) } if fieldValue == nil {