Before you start to feed any data into Elasticsearch you have to define Elasticsearch mapping!
This is really necessary because otherwise if you try to insert decimal numbers Elasticsearch can take them as type long. To must map them as “double” before you insert any data.
This example takes calculated data from PostgreSQL and inserts them in bulk inserts into Elasticsearch. Without parameters it calls PostgreSQL function with yesterday’s date or you can supply start date and end date. PostgreSQL connect string must be supplied in environmental variable QUERIES_URI. Program is intended for Elasticsearch 2.x and in intended to run on server/instance with Elasticsearch – but you can simply change address.
package main // golang interface into elasticsearch // based on https://gist.github.com/olivere/114347ff9d9cfdca7bdc0ecea8b82263 import ( "database/sql" "encoding/json" "fmt" "log" "os" "time" _ "github.com/lib/pq" elastic "gopkg.in/olivere/elastic.v3" ) //this struct must contain all columns returned by PostgreSQL function type metricsData struct { ID string `json:"-"` Metricname string `json:"@metricname,omitempty"` Shop string `json:"@shopid,omitempty"` DateText string `json:"@datetext,omitempty"` DateStamp string `json:"@date,omitempty"` UnixMilisec int64 `json:"@timestamp,omitempty"` Value float32 `json:"@value,omitempty"` ..... } func main() { var startDate string endDate := "" bulkLimit := 250 //max number of records for bulk insert if len(os.Args) > 1 { startDate = os.Args[1] if len(os.Args) > 2 { endDate = os.Args[2] } } else { t := time.Now().UTC().AddDate(0, 0, -1) startDate = fmt.Sprintf("%d-%02d-%02d", t.Year(), t.Month(), t.Day()) } if endDate == "" { endDate = startDate } pguri := requireEnvVar("QUERIES_URI") db, err := connect("postgres", pguri) if err != nil { log.Fatal(curTime(), "cannot connect to postgresql:", err) } defer func() { if errClose := db.Close(); err != nil { log.Println(curTime(), "closing database:", errClose.Error()) } }() // Create a client c, err := elastic.NewClient() if err != nil { log.Fatal("Cannot create elasticsearch client") } fmt.Println("ES client:", c) fmt.Println(curTime(), "starting query for date:", startDate) query := fmt.Sprintf(`SELECT * from myschema.myfunction('%s'::date,'%s'::date);`, startDate, endDate) fmt.Println(query) var md metricsData rows, err := db.Query(query) if err != nil { log.Fatalln(curTime(), "could not run query:", err) } var total int32 bulkData := c.Bulk() bulkC := 0 curDate := "" fmt.Println(curTime(), "starting rows scan") if rows != nil { for rows.Next() { if err = rows.Scan( &md.ID, &md.Metricname, &md.Shop, &md.DateText, &md.DateStamp, &md.UnixMilisec, &md.Value, ..... ); err != nil { log.Fatalln(curTime(), "cannot parse data:", err) } if curDate != md.DateText { curDate = md.DateText fmt.Println("Date:", md.DateText) } total++ bulkC++ if total%1000 == 0 { fmt.Println(curTime(), total, "rows inserted") } bp, err := json.Marshal(md) if err != nil { log.Fatal("cannot create json", err) } rec := elastic.NewBulkIndexRequest().Index("myesindex").Type("myestype").Id(md.ID).Doc(string(bp)) bulkData = bulkData.Add(rec) if bulkC >= bulkLimit { _, err := bulkData.Do() if err != nil { log.Fatal("cannot insert bulk", err) } bulkC = 0 bulkData = c.Bulk() } //fmt.Println(md.Id) //fmt.Println(string(bp)) if err != nil { log.Fatal("cannot write into ES:", err) } } if bulkC > 0 { _, err := bulkData.Do() if err != nil { log.Fatal("cannot insert last bulk", err) } } } fmt.Println(curTime(), total, "row done") } func curTime() string { return time.Now().UTC().Format(time.RFC3339) + ":" } func requireEnvVar(s string) string { env, ok := os.LookupEnv(s) if !ok { log.Fatalln(curTime(), s, "isn't defined") } return (env) } func connect(driverName string, uri string) (*sql.DB, error) { db, err := sql.Open(driverName, uri) if err != nil { return nil, err } if err = db.Ping(); err != nil { return nil, err } return db, nil }