tableInsert/insert into

本小节介绍如何在 C++ API 端通过调用DolphinDB 的 tableInsert 函数以及 insert into 语句来插入数据。这是一种同步的写入方式,支持单条以及批量写入。

保存数据到内存表

环境准备

通过DolphinDB脚本创建一个共享内存表

t = table(100:0, `name`date`price, [STRING, DATE, DOUBLE]); 
share t as tglobal; 

使用 insert into 语句保存数据

保存单条数据:

std::string assembleScript(const std::string& name, int date, double price){
    return string("insert into tglobal values(`").append(name).append(",").append(std::to_string(date)).append(",").append(std::to_string(price).append(")"));
}

int main(int argc, const char **argv)
{
    DBConnection conn;
	conn.connect("127.0.0.1", 8848, "admin", "123456");
	conn.run(assembleScript("Tom", 0, 1.2));
    conn.run(assembleScript("Lily", 1, 2.2));
    conn.run(assembleScript("Lucy", 2, 3.2));
    std::cout << conn.run("tglobal")->getString() << std::endl;
}

保存多条数据:

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");

    string script; 
    int rowNum=10000, indexCapacity=10000; 
    VectorSP names = Util::createVector(DT_STRING, rowNum, indexCapacity); 
    VectorSP dates = Util::createVector(DT_DATE, rowNum, indexCapacity); 
    VectorSP prices = Util::createVector(DT_DOUBLE, rowNum, indexCapacity); 

    int array_dt_buf[Util:: BUF_SIZE]; //定义date列缓冲区数组
    double array_db_buf[Util:: BUF_SIZE]; //定义price列缓冲区数组
    int start = 0; 
    int no=0; 
    while (start < rowNum) {
        size_t len = std::min(Util::BUF_SIZE, rowNum - start);
        int *dtp = dates->getIntBuffer(start, len, array_dt_buf); //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部
        double *dbp = prices->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部
        for (size_t i = 0; i < len; i++) {
            names->setString(i+start, "name_"+std::to_string(++no)); //对string类型的name列直接进行赋值,不采用getbuffer的方式
            dtp[i] = 17898+i;
            dbp[i] = (rand()%100)/3.0;
        }
        dates->setInt(start, len, dtp); //写完后使用 `setInt` 将缓冲区写回数组
        prices->setDouble(start, len, dbp); //写完后使用 `setDouble` 将缓冲区写回数组
        start += len;

    }
    vector<string> allnames = {"names", "dates", "prices"}; 
    vector<ConstantSP> allcols = {names, dates, prices}; 
    conn.upload(allnames, allcols); 

    script += "insert into tglobal values(names, dates, prices); tglobal"; 
    TableSP table = conn.run(script); 
}

使用 tableInsert 保存 TableSP 对象

TableSP createDemoTable(){
    vector<string> colNames = {"name", "date","price"};
    vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
    int colNum = 3, rowNum = 50, indexCapacity = 50;
    ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
    vector<VectorSP> columnVecs;
    for(int i = 0; i < colNum; ++i)
        columnVecs.push_back(table->getColumn(i));

    for(int i = 0; i < rowNum; ++i){
        columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i)));
        columnVecs[1]->set(i, Util::createDate(2010, 1, i+1));
        columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0));
    }
    return table;
}

int main(int argc, const char **argv)
{
    DBConnection conn;
	conn.connect("127.0.0.1", 8848, "admin", "123456");
	vector<ConstantSP> args; 
    TableSP table = createDemoTable(); 
    args.push_back(table); 
    conn.run("tableInsert{tglobal}", args); 
}

保存数据到分布式表

单线程写入

  1. 环境准备。通过 DolphinDB 脚本创建一个分布式表:
    dbPath = "dfs://SAMPLE_TRDDB";
    tableName = `demoTable
    db = database(dbPath, VALUE, 2010.01.01..2010.01.30)
    pt=db.createPartitionedTable(table(1000000:0, `name`date `price, [STRING,DATE,DOUBLE]), tableName, `date) 
  2. C++ 代码如下:
    TableSP createDemoTable(){
        vector<string> colNames = {"name", "date","price"};
        vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
        int colNum = 3, rowNum = 10, indexCapacity = 10;
        ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
        vector<VectorSP> columnVecs;
        for(int i = 0; i < colNum; ++i)
            columnVecs.push_back(table->getColumn(i));
    
        for(int i = 0; i < rowNum; ++i){
            columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i)));
            columnVecs[1]->set(i, Util::createDate(2010, 1, i+1));
            columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0));
        }
        return table;
    }
    
    int main(int argc, const char **argv)
    {
        DBConnection conn;
    	conn.connect("127.0.0.1", 8848, "admin", "123456");
        TableSP table = createDemoTable(); 
        vector<ConstantSP> args; 
        args.push_back(table); 
        conn.run("tableInsert{loadTable('dfs://SAMPLE_TRDDB', `demoTable)}", args); 
    }

并发写入

  1. 环境准备。 通过 DolphinDB 脚本创建一个分布式表,创建分布式数据库 "dfs://natlog" 和分布式表 "natlogrecords"。其中,数据库按照 VALUE-HASH-HASH 的组合进行三级分区:
    dbName="dfs://natlog"
    tableName="natlogrecords"
    db1 = database("", VALUE, datehour(2019.09.11T00:00:00)..datehour(2019.09.13T00:00:00) )//starttime,  newValuePartitionPolicy=add
    db2 = database("", HASH, [IPADDR, 5]) //source_address 
    db3 = database("", HASH,  [IPADDR, 5]) //destination_address
    db = database(dbName, COMPO, [db1,db2,db3])
    data = table(1:0, ["fwname","filename","source_address","source_port","destination_address","destination_port","nat_source_address","nat_source_port","starttime","stoptime","elapsed_time"], [SYMBOL,STRING,IPADDR,INT,IPADDR,INT,IPADDR,INT,DATETIME,DATETIME,INT])
    db.createPartitionedTable(data,tableName,`starttime`source_address`destination_address)
  2. DolphinDB 不允许多个 writer 同时将数据写入到同一个分区,因此在客户端多线程并行写入数据时,需要确保每个线程分别写入不同的分区。对于按哈希值进行分区的分布式表, DolphinDB C++ API 提供了 getHash 函数来数据的 hash 值。在客户端设计多线程并发写入分布式表时,可根据哈希分区字段数据的哈希值分组,每组指定一个写线程。这样就能保证每个线程同时将数据写到不同的哈希分区。
    #include "Concurrent.h"
    #include "DolphinDB.h"
    #include "Util.h"
    #include <iostream>
    #include <sstream>
    #include <string>
    #include <sys/time.h>
    #include <thread>
    #include <arpa/inet.h>
    
    using namespace dolphindb;
    using namespace std;
    #define BUCKETS 50
    #define MAX_THREAD_NUM BUCKETS
    DBConnection conn[MAX_THREAD_NUM];
    SmartPointer<BoundedBlockingQueue<TableSP>> tableQueue[MAX_THREAD_NUM];
    
    //参数结构体,存放传入的各项参数
    struct parameter {
        int index;
        int count;
        long cLong;
        long nLong;
        long nTime;
        long nStarttime;
    };
    
    void showUsage() {
        cout << "DolpinDB Multi-threaded performance test program" << endl;
        cout << "Usage example:--h=127.0.0.1 --p=8921 --c=1000 --n=5 --t=5 --s=1579080800000" << endl;
        cout << "Options :" << endl;
        cout << " --h=127.0.0.1 Mandatory,dolphindb host, Multiple hosts separated by commas" << endl;
        cout << " --p=8921 Mandatory,dolphindb port, Multiple ports separated by commas.The number of ports should be the same of hosts!" << endl;
        cout << " --c=1000 Mandatory,The number of records inserted per thread" << endl;
        cout << " --n=5 Optional,Batches  insertions per thread,default is 1" << endl;
        cout << " --t=5 Optional,Threads number,default is 1,max is " << BUCKETS << endl;
        cout << " --s=1574380800 Optional,start time,default is " << Util::getEpochTime() / 1000 << endl;
        cout << " --help Print this help." << endl;
    }
    
    //为不同的分区生成不同的表
    TableSP createDemoTable(long rows, long startPartition, long partitionCount, long startTime, int timeInc) {
        vector<string> colNames = {"fwname", "filename", "source_address", "source_port", "destination_address", "destination_port",
                                    "nat_source_address", "nat_source_port", "starttime", "stoptime", "elapsed_time"};
        vector<DATA_TYPE> colTypes = {DT_SYMBOL, DT_STRING, DT_IP, DT_INT, DT_IP, DT_INT, DT_IP, DT_INT, DT_DATETIME, DT_DATETIME, DT_INT};
        int colNum = 11, rowNum = rows, indexCapacity = rows;
        ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
        vector<VectorSP> columnVecs;
        for (int i = 0; i < colNum; i++)
        columnVecs.push_back(table->getColumn(i));
    
        unsigned char sip[16] = {0};
        sip[3] = 192;
        sip[2] = startPartition;
        sip[1] = partitionCount;
        ConstantSP spIP = Util::createConstant(DT_IP);
        for (int j = 1; j < 255; j++) {
            sip[0] = j;
            spIP->setBinary(0, 16, sip);
            //不同的哈希值会写入到不同分区
            if (spIP->getHash(BUCKETS) >= startPartition && spIP->getHash(BUCKETS) < startPartition + partitionCount) {
                break;
            }
        }
    
        unsigned char ip[16] = {0};
        for (int i = 0; i < rowNum; i++) {
            columnVecs[0]->setString(i, "10.189.45.2:9000");
            columnVecs[1]->setString(i, std::to_string(startPartition)); 
            columnVecs[2]->setBinary(i, 16, sip);
            columnVecs[3]->setInt(i, 1 * i);
            memcpy(ip, (unsigned char *)&i, 4);
            columnVecs[4]->setBinary(i, 16, ip);
            columnVecs[5]->setInt(i, 2 * i);
            columnVecs[6]->set(i, Util::parseConstant(DT_IP, "192.168.1.1"));
            columnVecs[7]->setInt(i, 3 * i);
            columnVecs[8]->setLong(i, startTime + timeInc);
            columnVecs[9]->setLong(i, i + startTime + 100);
            columnVecs[10]->setInt(i, i);
        }
        return table;
    }
    //用tableInsert写数据
    void *writeData(void *arg) {
        struct parameter *pParam;
        pParam = (struct parameter *)arg;
    
        TableSP table;
        for (unsigned int i = 0; i < pParam->nLong; i++) {
            tableQueue[pParam->index]->pop(table);
            long long startTime = Util::getEpochTime();
            vector<ConstantSP> args;
            args.push_back(table);
            conn[pParam->index].run("tableInsert{loadTable('dfs://natlog', `natlogrecords)}", args);
            pParam->nTime += Util::getEpochTime() - startTime;
        }
        printf("Thread %d,insert %ld rows %ld times, used %ld ms.\n", pParam->index, pParam->cLong, pParam->nLong, pParam->nTime);
        return NULL;
    }
    //生成模拟数据
    void *genData(void *arg) {
        struct parameter *pParam;
        pParam = (struct parameter *)arg;
        long partitionCount = BUCKETS / pParam->count;
    
        for (unsigned int i = 0; i < pParam->nLong; i++) {
            TableSP table = createDemoTable(pParam->cLong, partitionCount * pParam->index, partitionCount, pParam->nStarttime, i * 5);
            tableQueue[pParam->index]->push(table);
        }
        return NULL;
    }
    
    int main(int argc, char *argv[]) {
        if (argc < 2) {
            cout << "No arguments, you MUST give an argument at least!" << endl;
            showUsage();
            return -1;
        }
        //解析参数
        int nOptionIndex = 1;
        string cString, nString, hString, pString, tString, sString;
        stringstream cSS, nSS, pSS, tSS, sSS;
        long cLong, nLong, pLong, tLong, sLong;
        vector<string> vHost, vPort;
    
        while (nOptionIndex < argc) {
            if (strncmp(argv[nOptionIndex], "--c=", 4) == 0) { // get records number per threads
                cString = &argv[nOptionIndex][4];
            } else if (strncmp(argv[nOptionIndex], "--h=", 4) == 0) { // get host
                hString = &argv[nOptionIndex][4];
            } else if (strncmp(argv[nOptionIndex], "--p=", 4) == 0) { // get port
                pString = &argv[nOptionIndex][4];
            } else if (strncmp(argv[nOptionIndex], "--n=", 4) == 0) { // get batches
                nString = &argv[nOptionIndex][4];
            } else if (strncmp(argv[nOptionIndex], "--t=", 4) == 0) { // get thread
                tString = &argv[nOptionIndex][4];
            } else if (strncmp(argv[nOptionIndex], "--s=", 4) == 0) { // get start time
                sString = &argv[nOptionIndex][4];
            } else if (strncmp(argv[nOptionIndex], "--help", 6) == 0) { // help
                showUsage();
                return 0;
            } else {
                cout << "Options '" << argv[nOptionIndex] << "' not valid. Run '" << argv[0] << "' for details." << endl;
                return -1;
            }
            nOptionIndex++;
        }
    
        if (cString.empty()) {
            cout << "--c is required" << endl;
            showUsage();
            return -1;
        } else {
            cSS << cString;
            cSS >> cLong;
        }
        if (pString.empty()) {
            cout << "--p is required" << endl;
            showUsage();
            return -1;
        } else {
            vPort = Util::split(pString, ',');
        }
        if (hString.empty()) {
            cout << "--h is required" << endl;
            showUsage();
            return -1;
        } else {
            vHost = Util::split(hString, ',');
        }
        if (nString.empty()) {
            nLong = 1;
        } else {
            nSS << nString;
            nSS >> nLong;
        }
        if (tString.empty()) {
            tLong = 1;
        } else {
            tSS << tString;
            tSS >> tLong;
        }
        if (sString.empty()) {
            sLong = Util::getEpochTime() / 1000; // 1574380800;
            cout << "starttime=" << sLong << endl;
        } else {
            sSS << sString;
            sSS >> sLong;
        }
        if (tLong > BUCKETS) {
            cout << "The number of threads must be less than " << BUCKETS << endl;
            showUsage();
            return -1;
        }
    
        if (vHost.size() != vPort.size()) {
            cout << "The number of host and port must be the same! " << vHost.size() << ":" << vPort.size() << endl;
            showUsage();
            return -1;
        }
        try {
            for (int i = 0; i < tLong; ++i) {
                hString = vHost[i % vHost.size()];
                pLong = std::stol(vPort[i % vPort.size()]);
                bool ret = conn[i].connect(hString, pLong, "admin", "123456");
                if (!ret) {
                    cout << "Failed to connect to the server" << endl;
                    return 0;
                }
                tableQueue[i] = new BoundedBlockingQueue<TableSP>(2);
            }
        } catch (exception &ex) {
            cout << "Failed to  connect  with error: " << ex.what();
            return -1;
        }
        cout << "Please waiting..." << endl;
        //启动生成数据线程以及写入线程
        long long startTime = Util::getEpochTime();
        struct parameter arg[tLong];
        std::thread genThreads[tLong];
        std::thread writeThreads[tLong];
        for (int i = 0; i < tLong; ++i) {
            arg[i].index = i;
            arg[i].count = tLong;
            arg[i].nLong = nLong;
            arg[i].cLong = cLong;
            arg[i].nTime = 0;
            arg[i].nStarttime = sLong;
            genThreads[i] = std::thread(genData, (void *)&arg[i]);
            writeThreads[i] = std::thread(writeData, (void *)&arg[i]);
        }
    
        for (int i = 0; i < tLong; ++i) {
            genThreads[i].join();
            writeThreads[i].join();
        }
        //工作完成后,统计耗时
        long long endTime = Util::getEpochTime();
        long long rowCount = cLong * nLong * tLong;
        cout << "Inserted " << rowCount << " rows, took a total of  " + std::to_string(endTime - startTime) + " ms.  "
            << rowCount / (endTime - startTime) * 1000 / 10000 << " w/s " << endl;
        long timeSum = arg[0].nTime;
        for (int i = 1; i < tLong; ++i) {
            timeSum += arg[i].nTime;
        }
        cout << "Total time minus data preparation time:  " << std::to_string(timeSum / (double)tLong) + " ms" << endl;
        return 0;
    }