MemSQL is something like very fast MySQL database but to feed data from MySQL dumps into it is not exactly straight forward and you do not have always the opportunity to create immediate dump. Therefore I prepared this simple golang program.

It is simple feeding program which reads data from MySQL and inserts them into MemSQL. Old tables are dropped and new created on fly to avoid problems with possible changes in MySQL. Right now (2017/02) MemSQL does not implement bulk inserts or prepared statements and also not foreign keys. Feeder reads table DDL and constructs insert statement. Masking NULL values in columns which are real pain in golang is done in SQL select. Requires env variables to be set with connect string into MySQL and MemSQL + db names.

You can use one command line parameter – table name (without database/ schema name).

format like follows:
export MYSQL_URI=login:passwd@tcp(IP_address:port)/databasename
export MYSQL_DB=databasename
export MEMSQL_URI=login:passwd@tcp(IP_address:port)/databasename
export MEMSQL_DB=databasename

 

package main

import (
    "database/sql"
    "fmt"
    "log"
    "os"
    "time"

    _ "github.com/go-sql-driver/mysql"
)

type tabledata struct {
    tableStructure      string
    columnListForSelect string
    columnListForInsert string
    stringForSprint     string
}

var (
    useDatabase = `use %s;`

    tablesQuery = `select TABLE_NAME from information_schema.TABLES where TABLE_SCHEMA = '%s' order by 1;`

    dropMemSQLTable = `DROP TABLE IF EXISTS ` + "`" + `%s.%s` + "`" + `;`

    tableStructureQuery = `
    select 
    concat('CREATE TABLE ` + "`" + `',TABLE_NAME, '` + "`" + ` (', 
    (select group_concat( distinct '` + "`" + `',column_name,'` + "`" + `',' ', column_type order by ordinal_position separator ', ') 
    from information_schema.COLUMNS where TABLE_SCHEMA = t.TABLE_SCHEMA and TABLE_NAME = t.TABLE_NAME 
    group by TABLE_NAME),') COLLATE ', TABLE_COLLATION,';' ) 
    as table_ddl, 
    (select group_concat( 
        case when lower(data_type) in ('varchar','text','longtext','char','enum','set','mediumtext','longblob', 'blob') 
        then concat('replace(replace(coalesce(` + "`" + `',column_name,'` + "`" + `,''NULL''),'''''''',''''''''''''),''\\\\'','''') as ` + "`" + `',column_name,'` + "`" + `')
        else concat('coalesce(` + "`" + `',column_name,'` + "`" + `,''NULL'') as ` + "`" + `',column_name,'` + "`" + `') end order by ordinal_position separator ', ') 
    from information_schema.COLUMNS where TABLE_SCHEMA = t.TABLE_SCHEMA and TABLE_NAME = t.TABLE_NAME group by TABLE_NAME) 
    as list_for_select,
    (select group_concat( 
        case when lower(data_type) in ('varchar','text','longtext','char','enum','set','mediumtext','longblob', 'blob') 
        then concat('` + "`" + `',column_name,'` + "`" + `') 
        else concat('` + "`" + `',column_name,'` + "`" + `') end order by ordinal_position separator ', ') 
    from information_schema.COLUMNS where TABLE_SCHEMA = t.TABLE_SCHEMA and TABLE_NAME = t.TABLE_NAME group by TABLE_NAME) 
    as list_for_insert,
    (select group_concat( 
        case when lower(data_type) in ('varchar','text','longtext','char','enum','set','mediumtext','longblob', 'blob') 
        then 'NULLIF(''%s'', ''NULL'')' 
        when lower(data_type) in ('decimal','bigint','double','int','tinyint','float','smallint') 
        then '%s' 
        when lower(data_type) in ('datetime','timestamp','time','date') 
        then '''%s''' 
    end order by ordinal_position separator ', ') 
    from information_schema.COLUMNS where TABLE_SCHEMA = t.TABLE_SCHEMA and TABLE_NAME = t.TABLE_NAME group by TABLE_NAME) 
    as string_for_sprint
    from information_schema.TABLES t 
    where TABLE_SCHEMA = '%s' and TABLE_NAME = '%s';`

    queryTableData = `select %s from ` + "`" + `%s` + "`" + `;`

    query string

    results []string
)

func main() {

    fmt.Println(curTime(), "Settings:")
    mysqlURI := requireEnvVar("MYSQL_URI")
    mysqlDatabase := requireEnvVar("MYSQL_DB")
    memsqlURI := requireEnvVar("MEMSQL_URI")
    memsqlDatabase := requireEnvVar("MEMSQL_DB")

    tableToDo := ""
    if len(os.Args) > 1 {
        tableToDo = os.Args[1]
    }

    mysqlDB, err := sql.Open("mysql", mysqlURI)
    if err != nil {
        log.Fatal(curTime(), "cannot connect to mysql:", err)
    }

    defer func() {
        if errClose := mysqlDB.Close(); err != nil {
            log.Println(curTime(), "closing mysql database:", errClose.Error())
        }
    }()

    memsqlDB, err := sql.Open("mysql", memsqlURI)
    if err != nil {
        log.Fatal(curTime(), "cannot connect to memsql:", err)
    }

    defer func() {
        if errClose := memsqlDB.Close(); err != nil {
            log.Println(curTime(), "closing memsql database:", errClose.Error())
        }
    }()

    // set mysql database
    query = fmt.Sprintf(useDatabase, mysqlDatabase)
    _, err = mysqlDB.Query(query)
    if err != nil {
        log.Fatalln("cannot use mysql database:", err, " Query: ", query)
    }

    // set memsql database
    query = fmt.Sprintf(useDatabase, memsqlDatabase)
    _, err = memsqlDB.Query(query)
    if err != nil {
        log.Fatalln("cannot use memsql database:", err, " Query: ", query)
    }

    // read from master
    query = fmt.Sprintf(tablesQuery, mysqlDatabase)

    mysqlTables, err := mysqlDB.Query(query)
    if err != nil {
        log.Fatalln("cannot get list of tables from mysql:", err, " Query: ", query)
    }

    if tableToDo == "" {
        var tableName string
        if mysqlTables != nil {
            for mysqlTables.Next() {
                if err = mysqlTables.Scan(
                    &tableName,
                ); err != nil {
                    log.Fatalln("cannot parse data:", err)
                }

                processTable(tableName, mysqlDatabase, memsqlDatabase, mysqlDB, memsqlDB)
            }
        }
    } else {
        processTable(tableToDo, mysqlDatabase, memsqlDatabase, mysqlDB, memsqlDB)
    }
    fmt.Println(curTime(), "ALL DONE")

}

func curTime() string {
    return time.Now().UTC().Format(time.RFC3339) + ": "
}

func requireEnvVar(s string) string {
    env, ok := os.LookupEnv(s)
    if !ok {
        log.Fatalln(curTime(), "ERROR: ", s, "isn't defined!")
    }
    fmt.Println(curTime(), s, ": ", env)
    return (env)
}

func processTable(tableName string, mysqlDatabase string, memsqlDatabase string, mysqlDB *sql.DB, memsqlDB *sql.DB) {

    fmt.Println(curTime(), "Starting table: ", tableName)
    var t tabledata
    query = fmt.Sprintf(tableStructureQuery, "%s", "%s", "%s", mysqlDatabase, tableName)

    err := mysqlDB.QueryRow(query).Scan(&t.tableStructure, &t.columnListForSelect, &t.columnListForInsert, &t.stringForSprint)
    if err != nil {
        log.Fatalln("Cannot get mysql table structure:", err, " Table: ", tableName, " Query: ", query)
    }

    //drop memsql table if EXISTS
    query = fmt.Sprintf(dropMemSQLTable, memsqlDatabase, tableName)

    _, err = memsqlDB.Query(query)
    if err != nil {
        log.Fatalln("Cannot drop old memsql table:", err, " Query: ", query)
    }

    //create table
    query = fmt.Sprintf(t.tableStructure)

    _, err = memsqlDB.Query(query)
    if err != nil {
        log.Fatalln("Cannot create new memsql table:", err, " Query: ", query)
    }

    //feed data
    query = fmt.Sprintf(queryTableData, t.columnListForSelect, tableName)

    masterData, err := mysqlDB.Query(query)
    if err != nil {
        log.Fatalln("Cannot get mysql data:", err, " Query: ", query)
    }

    sourceColumns, err := masterData.Columns()
    if err != nil {
        log.Fatalln("cannot read mysql data columns:", err)
    }

    columnPointers := make([]interface{}, len(sourceColumns))
    columnContainers := make([]interface{}, len(sourceColumns))
    for i := 0; i < len(sourceColumns); i++ {
        columnPointers[i] = &columnContainers[i]
    }

    memsqlTrans, err := memsqlDB.Begin()
    if err != nil {
        log.Fatal("cannot open new transaction in memsql db:", err)
    }

    memsqlInsert := "insert into `" + tableName + "` (" + t.columnListForInsert + ") values (" + t.stringForSprint + ");"
    rowCount := 0
    if masterData != nil {
        for masterData.Next() {
            if err := masterData.Scan(columnPointers...); err != nil {
                log.Fatalln("Cannot scan source rows structure: ", err)
            }
            rowCount++

            query = fmt.Sprintf(memsqlInsert, columnContainers...)
            _, err = memsqlTrans.Query(query)
            if err != nil {
                log.Fatal("cannot add row number:", rowCount, " ERROR: ", err, " Query: ", query)
            }

        }
        if masterData.Err() != nil {
            log.Fatal("Cannot read source rows: ", masterData.Err(), " Query: ", memsqlInsert)
        }
    }
    fmt.Println(curTime(), "Table: ", tableName, " Rows processed:", rowCount)

    err = memsqlTrans.Commit()
    if err != nil {
        log.Fatal("Cannot commit target transaction:", err)
    }

}