From cc342eb28e92dd7a3a332fc51a9cd308ec15665e Mon Sep 17 00:00:00 2001 From: "roy.lei" Date: Thu, 22 Nov 2018 19:15:17 +0800 Subject: [PATCH 1/3] support geo_point --- README.md | 4 ++++ river/sync.go | 32 +++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4992d48b..6a3972bf 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,10 @@ type = "t" // If the created_time field type is "int", and you want to convert it to "date" type in es, you can do it as below created_time=",date" + + // This will map mysql lat, lon two field to es location field which is geo_point type. + lat = "location,geo_point.lat" + lon = "location,geo_point.lon" ``` Modifier "list" will translates a mysql string field like "a,b,c" on an elastic array type '{"a", "b", "c"}' this is specially useful if you need to use those fields on filtering on elasticsearch. diff --git a/river/sync.go b/river/sync.go index 3b3854ee..12b06ab9 100644 --- a/river/sync.go +++ b/river/sync.go @@ -28,6 +28,14 @@ const ( // for the mysql int type to es date type // set the [rule.field] created_time = ",date" fieldTypeDate = "date" + // for the mysql lat, lon two field to es geo_point type + /* + set the [rule.field] + lat = "location,geo_point.lat" + lon = "location,geo_point.lon" + */ + fieldTypeGeopointLat = "geo_point.lat" + fieldTypeGeopointLon = "geo_point.lon" ) const mysqlDateFormat = "2006-01-02" @@ -385,7 +393,27 @@ func (r *River) makeInsertReqData(req *elastic.BulkRequest, rule *Rule, values [ mysql, elastic, fieldType := r.getFieldParts(k, v) if mysql == c.Name { mapped = true - req.Data[elastic] = r.getFieldValue(&c, fieldType, values[i]) + if fieldType == fieldTypeGeopointLat { + if req.Data[elastic] == nil { + req.Data[elastic] = make(map[string]float64) + } + d := r.makeReqColumnData(&c, values[i]) + if d == nil { + d = 0.0 + } + req.Data[elastic].(map[string]float64)["lat"] = d.(float64) + } else if fieldType == fieldTypeGeopointLon { + if req.Data[elastic] == nil { + req.Data[elastic] = make(map[string]float64) + } + d := r.makeReqColumnData(&c, values[i]) + if d == nil { + d = 0.0 + } + req.Data[elastic].(map[string]float64)["lon"] = d.(float64) + } else { + req.Data[elastic] = r.getFieldValue(&c, fieldType, values[i]) + } } } if mapped == false { @@ -515,6 +543,8 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat)) } } + case fieldTypeGeopointLat, fieldTypeGeopointLon: + fieldValue = r.makeReqColumnData(col, value) } if fieldValue == nil { From ed7d887a9aa82659ef2b53edcd82d7a337ea2a13 Mon Sep 17 00:00:00 2001 From: "roy.lei" Date: Wed, 19 Dec 2018 12:13:28 +0800 Subject: [PATCH 2/3] add geo_point support for update --- river/sync.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/river/sync.go b/river/sync.go index 12b06ab9..9d27eb44 100644 --- a/river/sync.go +++ b/river/sync.go @@ -442,7 +442,27 @@ func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule, mysql, elastic, fieldType := r.getFieldParts(k, v) if mysql == c.Name { mapped = true - req.Data[elastic] = r.getFieldValue(&c, fieldType, afterValues[i]) + if fieldType == fieldTypeGeopointLat { + if req.Data[elastic] == nil { + req.Data[elastic] = make(map[string]float64) + } + d := r.makeReqColumnData(&c, afterValues[i]) + if d == nil { + d = 0.0 + } + req.Data[elastic].(map[string]float64)["lat"] = d.(float64) + } else if fieldType == fieldTypeGeopointLon { + if req.Data[elastic] == nil { + req.Data[elastic] = make(map[string]float64) + } + d := r.makeReqColumnData(&c, afterValues[i]) + if d == nil { + d = 0.0 + } + req.Data[elastic].(map[string]float64)["lon"] = d.(float64) + } else { + req.Data[elastic] = r.getFieldValue(&c, fieldType, afterValues[i]) + } } } if mapped == false { From 9764ab2d3eadfab9df67d7a503bac1ec2be0b9f9 Mon Sep 17 00:00:00 2001 From: "roy.lei" Date: Wed, 19 Dec 2018 14:26:39 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=B7=BB=E5=8A=A0boolean=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- river/sync.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/river/sync.go b/river/sync.go index 9d27eb44..75ddd933 100644 --- a/river/sync.go +++ b/river/sync.go @@ -36,6 +36,8 @@ const ( */ fieldTypeGeopointLat = "geo_point.lat" fieldTypeGeopointLon = "geo_point.lon" + + fieldTypeBoolean = "bool" ) const mysqlDateFormat = "2006-01-02" @@ -563,8 +565,9 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat)) } } - case fieldTypeGeopointLat, fieldTypeGeopointLon: - fieldValue = r.makeReqColumnData(col, value) + case fieldTypeBoolean: + v := value.(int8) + fieldValue = r.makeReqColumnData(col, v == 1) } if fieldValue == nil {