Simple golang program which calls PostgreSQL procedure and inserts into InfluxDB. Connect strings are supplied in environmental values.

package main

import (
"database/sql"
"fmt"
"log"
"os"
"time"

"github.com/influxdata/influxdb/client/v2"
_ "github.com/lib/pq"
)

func connect(driverName string, uri string) (*sql.DB, error) {
db, err := sql.Open(driverName, uri)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
return db, nil
}

type metricRate struct {
Metricname string
Shop       string
Unixdate   int64
Value      float32
....
}

func requireEnvVar(s string) string {
env, ok := os.LookupEnv(s)
if !ok {
panic(fmt.Sprintf("%s isn't defined", s))
}
return (env)
}

func influxDBClient(influxuri string, influxuser string, influxpass string) client.Client {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr:     influxuri,
Username: influxuser,
Password: influxpass,
})
if err != nil {
log.Fatalln("InfluxDB error: ", err)
}
return c
}

func main() {

var startDate string
if len(os.Args) > 1 {
startDate = os.Args[1]
} else {
t := time.Now().UTC().AddDate(0, 0, -1)
startDate = fmt.Sprintf("%d-%02d-%02d", t.Year(), t.Month(), t.Day())
}

influxURI := requireEnvVar("INFLUX_URL")
influxDb := requireEnvVar("INFLUX_DATABASE")
influxUser := requireEnvVar("INFLUX_USER")
influxPass := requireEnvVar("INFLUX_PASS")
pguri := requireEnvVar("QUERIES_URI")

db, err := connect("postgres", pguri)
if err != nil {
log.Fatal(err)
}
defer func() {
if errClose := db.Close(); err != nil {
log.Printf("closing database: %s\n", errClose.Error())
}
}()

c := influxDBClient(influxURI, influxUser, influxPass)
fmt.Println("starting query:", startDate)
query := fmt.Sprintf(`SELECT * from dashboards.feed_influxdb('%s'::date,false::boolean,null::text);`, startDate)
var mr metricRate
rows, err := db.Query(query)
if err != nil {
log.Fatalf("could not run query: %v", err)
}
var total int32
fmt.Println("starting rows scan")
if rows != nil {
for rows.Next() {
if err = rows.Scan(
&mr.Metricname,
&mr.Shop,
&mr.Unixdate,
&mr.Value,
.....
); err != nil {
log.Fatalf("cannot parse data: %v", err)
}
total++
insertGrafanaDashboards(c, influxDb, mr)
}
}
fmt.Println("rows inserted: ", total)
}

func insertGrafanaDashboards(c client.Client, influxdb string, data metricRate) {
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database:  influxdb,
Precision: "s",
})

if err != nil {
log.Fatalln("NewBatchPoints Error: ", err)
}

tags := map[string]string{
"shop":      data.Shop,
.....
}

fields := map[string]interface{}{
"rate":       data.Value,
.....
}

point, err := client.NewPoint(
data.Metricname,
tags,
fields,
time.Unix(data.Unixdate, 0),
)
if err != nil {
log.Fatalln("NewPoint Error: ", err)
}

bp.AddPoint(point)
if err != nil {
log.Fatal("addpoint error:", err)
}

//writing into influxdb
err = c.Write(bp)
if err != nil {
log.Fatal("influx write error:", err)
}

}