Before you start to feed any data into Elasticsearch you have to define Elasticsearch mapping!
This is really necessary because otherwise if you try to insert decimal numbers Elasticsearch can take them as type long. To must map them as “double” before you insert any data.
This example takes calculated data from PostgreSQL and inserts them in bulk inserts into Elasticsearch. Without parameters it calls PostgreSQL function with yesterday’s date or you can supply start date and end date. PostgreSQL connect string must be supplied in environmental variable QUERIES_URI. Program is intended for Elasticsearch 2.x and in intended to run on server/instance with Elasticsearch – but you can simply change address.

package main

// golang interface into elasticsearch
// based on https://gist.github.com/olivere/114347ff9d9cfdca7bdc0ecea8b82263

import (
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"time"

_ "github.com/lib/pq"
elastic "gopkg.in/olivere/elastic.v3"
)

//this struct must contain all columns returned by PostgreSQL function
type metricsData struct {
ID             string  `json:"-"`
Metricname     string  `json:"@metricname,omitempty"`
Shop           string  `json:"@shopid,omitempty"`
DateText       string  `json:"@datetext,omitempty"`
DateStamp      string  `json:"@date,omitempty"`
UnixMilisec    int64   `json:"@timestamp,omitempty"`
Value          float32 `json:"@value,omitempty"`
.....
}

func main() {

var startDate string
endDate := ""
bulkLimit := 250  //max number of records for bulk insert
if len(os.Args) > 1 {
startDate = os.Args[1]
if len(os.Args) > 2 {
endDate = os.Args[2]
}
} else {
t := time.Now().UTC().AddDate(0, 0, -1)
startDate = fmt.Sprintf("%d-%02d-%02d", t.Year(), t.Month(), t.Day())
}

if endDate == "" {
endDate = startDate
}
pguri := requireEnvVar("QUERIES_URI")

db, err := connect("postgres", pguri)
if err != nil {
log.Fatal(curTime(), "cannot connect to postgresql:", err)
}
defer func() {
if errClose := db.Close(); err != nil {
log.Println(curTime(), "closing database:", errClose.Error())
}
}()

// Create a client
c, err := elastic.NewClient()
if err != nil {
log.Fatal("Cannot create elasticsearch client")
}
fmt.Println("ES client:", c)

fmt.Println(curTime(), "starting query for date:", startDate)
query := fmt.Sprintf(`SELECT * from myschema.myfunction('%s'::date,'%s'::date);`, startDate, endDate)
fmt.Println(query)
var md metricsData
rows, err := db.Query(query)
if err != nil {
log.Fatalln(curTime(), "could not run query:", err)
}
var total int32
bulkData := c.Bulk()
bulkC := 0
curDate := ""
fmt.Println(curTime(), "starting rows scan")
if rows != nil {
for rows.Next() {
if err = rows.Scan(
&md.ID,
&md.Metricname,
&md.Shop,
&md.DateText,
&md.DateStamp,
&md.UnixMilisec,
&md.Value,
.....
); err != nil {
log.Fatalln(curTime(), "cannot parse data:", err)
}
if curDate != md.DateText {
curDate = md.DateText
fmt.Println("Date:", md.DateText)
}
total++
bulkC++
if total%1000 == 0 {
fmt.Println(curTime(), total, "rows inserted")
}

bp, err := json.Marshal(md)
if err != nil {
log.Fatal("cannot create json", err)
}
rec := elastic.NewBulkIndexRequest().Index("myesindex").Type("myestype").Id(md.ID).Doc(string(bp))
bulkData = bulkData.Add(rec)
if bulkC >= bulkLimit {
_, err := bulkData.Do()
if err != nil {
log.Fatal("cannot insert bulk", err)
}
bulkC = 0
bulkData = c.Bulk()
}
//fmt.Println(md.Id)
//fmt.Println(string(bp))

if err != nil {
log.Fatal("cannot write into ES:", err)
}
}
if bulkC > 0 {
_, err := bulkData.Do()
if err != nil {
log.Fatal("cannot insert last bulk", err)
}
}

}
fmt.Println(curTime(), total, "row done")

}

func curTime() string {
return time.Now().UTC().Format(time.RFC3339) + ":"
}

func requireEnvVar(s string) string {
env, ok := os.LookupEnv(s)
if !ok {
log.Fatalln(curTime(), s, "isn't defined")
}
return (env)
}

func connect(driverName string, uri string) (*sql.DB, error) {
db, err := sql.Open(driverName, uri)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
return db, nil
}