tableInsert/insert into
本小节介绍如何在 C++ API 端通过调用DolphinDB 的 tableInsert
函数以及 insert
into
语句来插入数据。这是一种同步的写入方式,支持单条以及批量写入。
保存数据到内存表
环境准备
通过DolphinDB脚本创建一个共享内存表
t = table(100:0, `name` date`price, [STRING, DATE, DOUBLE]);
share t as tglobal;
使用 insert into
语句保存数据
保存单条数据:
std::string assembleScript(const std::string& name, int date, double price){
return string("insert into tglobal values(`").append(name).append(",").append(std::to_string(date)).append(",").append(std::to_string(price).append(")"));
}
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
conn.run(assembleScript("Tom", 0, 1.2));
conn.run(assembleScript("Lily", 1, 2.2));
conn.run(assembleScript("Lucy", 2, 3.2));
std::cout << conn.run("tglobal")->getString() << std::endl;
}
保存多条数据:
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
string script;
int rowNum=10000, indexCapacity=10000;
VectorSP names = Util::createVector(DT_STRING, rowNum, indexCapacity);
VectorSP dates = Util::createVector(DT_DATE, rowNum, indexCapacity);
VectorSP prices = Util::createVector(DT_DOUBLE, rowNum, indexCapacity);
int array_dt_buf[Util:: BUF_SIZE]; //定义date列缓冲区数组
double array_db_buf[Util:: BUF_SIZE]; //定义price列缓冲区数组
int start = 0;
int no=0;
while (start < rowNum) {
size_t len = std::min(Util::BUF_SIZE, rowNum - start);
int *dtp = dates->getIntBuffer(start, len, array_dt_buf); //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部
double *dbp = prices->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部
for (size_t i = 0; i < len; i++) {
names->setString(i+start, "name_"+std::to_string(++no)); //对string类型的name列直接进行赋值,不采用getbuffer的方式
dtp[i] = 17898+i;
dbp[i] = (rand()%100)/3.0;
}
dates->setInt(start, len, dtp); //写完后使用 `setInt` 将缓冲区写回数组
prices->setDouble(start, len, dbp); //写完后使用 `setDouble` 将缓冲区写回数组
start += len;
}
vector<string> allnames = {"names", "dates", "prices"};
vector<ConstantSP> allcols = {names, dates, prices};
conn.upload(allnames, allcols);
script += "insert into tglobal values(names, dates, prices); tglobal";
TableSP table = conn.run(script);
}
使用 tableInsert
保存 TableSP 对象
TableSP createDemoTable(){
vector<string> colNames = {"name", "date","price"};
vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
int colNum = 3, rowNum = 50, indexCapacity = 50;
ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
vector<VectorSP> columnVecs;
for(int i = 0; i < colNum; ++i)
columnVecs.push_back(table->getColumn(i));
for(int i = 0; i < rowNum; ++i){
columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i)));
columnVecs[1]->set(i, Util::createDate(2010, 1, i+1));
columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0));
}
return table;
}
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
vector<ConstantSP> args;
TableSP table = createDemoTable();
args.push_back(table);
conn.run("tableInsert{tglobal}", args);
}
保存数据到分布式表
单线程写入
- 环境准备。通过 DolphinDB
脚本创建一个分布式表:
dbPath = "dfs://SAMPLE_TRDDB"; tableName = `demoTable db = database(dbPath, VALUE, 2010.01.01..2010.01.30) pt=db.createPartitionedTable(table(1000000:0, `name`date `price, [STRING,DATE,DOUBLE]), tableName, `date)
- C++
代码如下:
TableSP createDemoTable(){ vector<string> colNames = {"name", "date","price"}; vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE}; int colNum = 3, rowNum = 10, indexCapacity = 10; ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity); vector<VectorSP> columnVecs; for(int i = 0; i < colNum; ++i) columnVecs.push_back(table->getColumn(i)); for(int i = 0; i < rowNum; ++i){ columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i))); columnVecs[1]->set(i, Util::createDate(2010, 1, i+1)); columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0)); } return table; } int main(int argc, const char **argv) { DBConnection conn; conn.connect("127.0.0.1", 8848, "admin", "123456"); TableSP table = createDemoTable(); vector<ConstantSP> args; args.push_back(table); conn.run("tableInsert{loadTable('dfs://SAMPLE_TRDDB', `demoTable)}", args); }
并发写入
- 环境准备。 通过 DolphinDB 脚本创建一个分布式表,创建分布式数据库 "dfs://natlog" 和分布式表
"natlogrecords"。其中,数据库按照 VALUE-HASH-HASH
的组合进行三级分区:
dbName="dfs://natlog" tableName="natlogrecords" db1 = database("", VALUE, datehour(2019.09.11T00:00:00)..datehour(2019.09.13T00:00:00) )//starttime, newValuePartitionPolicy=add db2 = database("", HASH, [IPADDR, 5]) //source_address db3 = database("", HASH, [IPADDR, 5]) //destination_address db = database(dbName, COMPO, [db1,db2,db3]) data = table(1:0, ["fwname","filename","source_address","source_port","destination_address","destination_port","nat_source_address","nat_source_port","starttime","stoptime","elapsed_time"], [SYMBOL,STRING,IPADDR,INT,IPADDR,INT,IPADDR,INT,DATETIME,DATETIME,INT]) db.createPartitionedTable(data,tableName,`starttime`source_address`destination_address)
- DolphinDB 不允许多个 writer
同时将数据写入到同一个分区,因此在客户端多线程并行写入数据时,需要确保每个线程分别写入不同的分区。对于按哈希值进行分区的分布式表, DolphinDB
C++ API 提供了
getHash
函数来数据的 hash 值。在客户端设计多线程并发写入分布式表时,可根据哈希分区字段数据的哈希值分组,每组指定一个写线程。这样就能保证每个线程同时将数据写到不同的哈希分区。#include "Concurrent.h" #include "DolphinDB.h" #include "Util.h" #include <iostream> #include <sstream> #include <string> #include <sys/time.h> #include <thread> #include <arpa/inet.h> using namespace dolphindb; using namespace std; #define BUCKETS 50 #define MAX_THREAD_NUM BUCKETS DBConnection conn[MAX_THREAD_NUM]; SmartPointer<BoundedBlockingQueue<TableSP>> tableQueue[MAX_THREAD_NUM]; //参数结构体,存放传入的各项参数 struct parameter { int index; int count; long cLong; long nLong; long nTime; long nStarttime; }; void showUsage() { cout << "DolpinDB Multi-threaded performance test program" << endl; cout << "Usage example:--h=127.0.0.1 --p=8921 --c=1000 --n=5 --t=5 --s=1579080800000" << endl; cout << "Options :" << endl; cout << " --h=127.0.0.1 Mandatory,dolphindb host, Multiple hosts separated by commas" << endl; cout << " --p=8921 Mandatory,dolphindb port, Multiple ports separated by commas.The number of ports should be the same of hosts!" << endl; cout << " --c=1000 Mandatory,The number of records inserted per thread" << endl; cout << " --n=5 Optional,Batches insertions per thread,default is 1" << endl; cout << " --t=5 Optional,Threads number,default is 1,max is " << BUCKETS << endl; cout << " --s=1574380800 Optional,start time,default is " << Util::getEpochTime() / 1000 << endl; cout << " --help Print this help." << endl; } //为不同的分区生成不同的表 TableSP createDemoTable(long rows, long startPartition, long partitionCount, long startTime, int timeInc) { vector<string> colNames = {"fwname", "filename", "source_address", "source_port", "destination_address", "destination_port", "nat_source_address", "nat_source_port", "starttime", "stoptime", "elapsed_time"}; vector<DATA_TYPE> colTypes = {DT_SYMBOL, DT_STRING, DT_IP, DT_INT, DT_IP, DT_INT, DT_IP, DT_INT, DT_DATETIME, DT_DATETIME, DT_INT}; int colNum = 11, rowNum = rows, indexCapacity = rows; ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity); vector<VectorSP> columnVecs; for (int i = 0; i < colNum; i++) columnVecs.push_back(table->getColumn(i)); unsigned char sip[16] = {0}; sip[3] = 192; sip[2] = startPartition; sip[1] = partitionCount; ConstantSP spIP = Util::createConstant(DT_IP); for (int j = 1; j < 255; j++) { sip[0] = j; spIP->setBinary(0, 16, sip); //不同的哈希值会写入到不同分区 if (spIP->getHash(BUCKETS) >= startPartition && spIP->getHash(BUCKETS) < startPartition + partitionCount) { break; } } unsigned char ip[16] = {0}; for (int i = 0; i < rowNum; i++) { columnVecs[0]->setString(i, "10.189.45.2:9000"); columnVecs[1]->setString(i, std::to_string(startPartition)); columnVecs[2]->setBinary(i, 16, sip); columnVecs[3]->setInt(i, 1 * i); memcpy(ip, (unsigned char *)&i, 4); columnVecs[4]->setBinary(i, 16, ip); columnVecs[5]->setInt(i, 2 * i); columnVecs[6]->set(i, Util::parseConstant(DT_IP, "192.168.1.1")); columnVecs[7]->setInt(i, 3 * i); columnVecs[8]->setLong(i, startTime + timeInc); columnVecs[9]->setLong(i, i + startTime + 100); columnVecs[10]->setInt(i, i); } return table; } //用tableInsert写数据 void *writeData(void *arg) { struct parameter *pParam; pParam = (struct parameter *)arg; TableSP table; for (unsigned int i = 0; i < pParam->nLong; i++) { tableQueue[pParam->index]->pop(table); long long startTime = Util::getEpochTime(); vector<ConstantSP> args; args.push_back(table); conn[pParam->index].run("tableInsert{loadTable('dfs://natlog', `natlogrecords)}", args); pParam->nTime += Util::getEpochTime() - startTime; } printf("Thread %d,insert %ld rows %ld times, used %ld ms.\n", pParam->index, pParam->cLong, pParam->nLong, pParam->nTime); return NULL; } //生成模拟数据 void *genData(void *arg) { struct parameter *pParam; pParam = (struct parameter *)arg; long partitionCount = BUCKETS / pParam->count; for (unsigned int i = 0; i < pParam->nLong; i++) { TableSP table = createDemoTable(pParam->cLong, partitionCount * pParam->index, partitionCount, pParam->nStarttime, i * 5); tableQueue[pParam->index]->push(table); } return NULL; } int main(int argc, char *argv[]) { if (argc < 2) { cout << "No arguments, you MUST give an argument at least!" << endl; showUsage(); return -1; } //解析参数 int nOptionIndex = 1; string cString, nString, hString, pString, tString, sString; stringstream cSS, nSS, pSS, tSS, sSS; long cLong, nLong, pLong, tLong, sLong; vector<string> vHost, vPort; while (nOptionIndex < argc) { if (strncmp(argv[nOptionIndex], "--c=", 4) == 0) { // get records number per threads cString = &argv[nOptionIndex][4]; } else if (strncmp(argv[nOptionIndex], "--h=", 4) == 0) { // get host hString = &argv[nOptionIndex][4]; } else if (strncmp(argv[nOptionIndex], "--p=", 4) == 0) { // get port pString = &argv[nOptionIndex][4]; } else if (strncmp(argv[nOptionIndex], "--n=", 4) == 0) { // get batches nString = &argv[nOptionIndex][4]; } else if (strncmp(argv[nOptionIndex], "--t=", 4) == 0) { // get thread tString = &argv[nOptionIndex][4]; } else if (strncmp(argv[nOptionIndex], "--s=", 4) == 0) { // get start time sString = &argv[nOptionIndex][4]; } else if (strncmp(argv[nOptionIndex], "--help", 6) == 0) { // help showUsage(); return 0; } else { cout << "Options '" << argv[nOptionIndex] << "' not valid. Run '" << argv[0] << "' for details." << endl; return -1; } nOptionIndex++; } if (cString.empty()) { cout << "--c is required" << endl; showUsage(); return -1; } else { cSS << cString; cSS >> cLong; } if (pString.empty()) { cout << "--p is required" << endl; showUsage(); return -1; } else { vPort = Util::split(pString, ','); } if (hString.empty()) { cout << "--h is required" << endl; showUsage(); return -1; } else { vHost = Util::split(hString, ','); } if (nString.empty()) { nLong = 1; } else { nSS << nString; nSS >> nLong; } if (tString.empty()) { tLong = 1; } else { tSS << tString; tSS >> tLong; } if (sString.empty()) { sLong = Util::getEpochTime() / 1000; // 1574380800; cout << "starttime=" << sLong << endl; } else { sSS << sString; sSS >> sLong; } if (tLong > BUCKETS) { cout << "The number of threads must be less than " << BUCKETS << endl; showUsage(); return -1; } if (vHost.size() != vPort.size()) { cout << "The number of host and port must be the same! " << vHost.size() << ":" << vPort.size() << endl; showUsage(); return -1; } try { for (int i = 0; i < tLong; ++i) { hString = vHost[i % vHost.size()]; pLong = std::stol(vPort[i % vPort.size()]); bool ret = conn[i].connect(hString, pLong, "admin", "123456"); if (!ret) { cout << "Failed to connect to the server" << endl; return 0; } tableQueue[i] = new BoundedBlockingQueue<TableSP>(2); } } catch (exception &ex) { cout << "Failed to connect with error: " << ex.what(); return -1; } cout << "Please waiting..." << endl; //启动生成数据线程以及写入线程 long long startTime = Util::getEpochTime(); struct parameter arg[tLong]; std::thread genThreads[tLong]; std::thread writeThreads[tLong]; for (int i = 0; i < tLong; ++i) { arg[i].index = i; arg[i].count = tLong; arg[i].nLong = nLong; arg[i].cLong = cLong; arg[i].nTime = 0; arg[i].nStarttime = sLong; genThreads[i] = std::thread(genData, (void *)&arg[i]); writeThreads[i] = std::thread(writeData, (void *)&arg[i]); } for (int i = 0; i < tLong; ++i) { genThreads[i].join(); writeThreads[i].join(); } //工作完成后,统计耗时 long long endTime = Util::getEpochTime(); long long rowCount = cLong * nLong * tLong; cout << "Inserted " << rowCount << " rows, took a total of " + std::to_string(endTime - startTime) + " ms. " << rowCount / (endTime - startTime) * 1000 / 10000 << " w/s " << endl; long timeSum = arg[0].nTime; for (int i = 1; i < tLong; ++i) { timeSum += arg[i].nTime; } cout << "Total time minus data preparation time: " << std::to_string(timeSum / (double)tLong) + " ms" << endl; return 0; }