MultithreadedTableWriter
本小节介绍如何通过 DolphinDB C++ API 中的 MultithreadedTableWriter (MTW) 类来将数据高效的写入到 DolphinDB 中。MTW 是一种多线程异步写入数据的方式,它按行写入,内置一个数据缓冲队列,在满足一定的条件时会将一批数据提交到DolphinDB Server。
其方法如下:
构造方法
MultithreadedTableWriter(const std::string& host,int port,
const std::string& userId, const std::string& password,
const string& dbPath, const string& tableName,
bool useSSL,
bool enableHighAvailability = false,
const vector<string> *pHighAvailabilitySites = nullptr,
int batchSize = 1,
float throttle = 0.01f,
int threadCount = 1,
const string& partitionCol ="",
const vector<COMPRESS_METHOD> *pCompressMethods = nullptr,
Mode mode = M_Append,
vector<string> *pModeOption = nullptr,
const std::function<void(ConstantSP)> &callbackFunc = nullptr,
bool enableStreamTableTimestamp = false);
参数:
-
host:字符串,表示所连接的服务器的地址。
-
port:整数,表示服务器端口。
-
userId / password:字符串,登录时的用户名和密码。
-
dbPath:字符串,表示分布式数据库地址。内存表时该参数为空。
-
tableName:字符串,表示分布式表或内存表的表名。
-
useSSL:布尔值,默认值为 false。表示是否启用加密通讯。
-
enableHighAvailability:布尔值,默认为 false。若要开启 API 高可用,则需要指定该参数为true。
-
pHighAvailabilitySites:列表类型,表示所有可用节点的 ip:port 构成的 list。
-
batchSize:整数,表示批处理的消息的数量,默认值是 1,表示客户端写入数据后就立即发送给服务器。如果该参数大于 1,表示数据量达到 batchSize 时,客户端才会将数据发送给服务器。
-
throttle:大于 0 的数,单位为秒。若客户端有数据写入,但数据量不足 batchSize,则等待 throttle的时间再发送数据。
-
threadCount:整数,表示创建的工作线程数量,默认为 1,表示单线程。对于维度表,其值必须为1。
-
partitionCol:字符串类型,默认为空,仅在 threadCount 大于1时起效。对于分区表,必须指定为分区字段名;如果是流表,必须指定为表的某一列名;对于维度表,该参数不起效。
-
pCompressMethods:列表类型,用于指定每一列采用的压缩传输方式,为空表示不压缩。每一列可选的压缩方式包括:
-
COMPRESS_LZ4: LZ4 压缩。
-
COMPRESS_DELTA: DELTAOFDELTA 压缩。
-
-
mode:表示数据写入的方式,可选值为:M_Append 或 M_Upsert。M_Upsert 表示以 upsert! 方式追加或更新表数据;M_Append 表示以 append! 方式追加表数据。
-
modeOption:字符串数组,表示不同模式下的扩展选项,目前,仅当 mode 指定为 M_Upsert 时有效,表示由 upsert! 可选参数组成的字符串数组。
-
callbackFunc:回调函数,插入一行数据成功之后会进行回调。回调参数是一个表,第一列为字符串类型,表示之前插入的数据中某一行的ID,第二列为BOOL类型,表示该行是否插入成功
-
enableStreamTableTimestamp:布尔类型,为true时只能写入流表,并且必须已对该表的最后一列调用了
setStreamTableTimestamp
函数。在插入数据时,不用填最后一列的数据,dolphindb会根据数据插入时间自动填写。该参数主要用来测试插入时延。
数据插入方法
template<typename... TArgs>
bool insert(ErrorCodeInfo &errorInfo, TArgs... args)
插入单行数据。返回一个bool 类型数据。true 表示插入成功,false 表示失败。
参数说明:
-
errorInfo 是 ErrorCodeInfo 类,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。当 insert 接口返回 false 时表示数据写入失败,此时,errorInfo 会显示失败的详细信息。
-
args 是变长参数,代表插入的一行数据。
注: 如果在创建 MTW 类时提供了回调函数,那么要求可变长参数中的第一个参数是一个字符串类型的参数,表示这一行的 ID,在回调函数中可根据 ID 来判断该行是否插入成功。
获取未写入数据方法
void getUnwrittenData(std::vector<std::vector<ConstantSP>*> &unwrittenData);
返回一个列表,表示未写入服务器的数据。
MultithreadedTableWriter
内部将不再持有这些数据。参数说明:
-
unwrittenData 嵌套列表,表示未写入服务器的数据,包含发送失败的数据以及待发送的数据两部分
重新插入未写入数据
bool insertUnwrittenData(std::vector<std::vector<ConstantSP>*> &records, ErrorCodeInfo &errorInfo)
将数据插入数据表。返回值同 insert 方法。与 insert 方法的区别在于,insert 只能插入单行数据,而 insertUnwrittenData 可以同时插入多行数据。
参数说明:
-
records 需要再次写入的数据。可以通过方法 getUnwrittenData 获取该对象。
-
errorInfo 是 ErrorCodeInfo 类,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。当insert接口返回false时表示数据写入失败,此时,errorInfo 会显示失败的详细信息。
获取状态方法
void getStatus(Status &status);
获取 MultithreadedTableWriter
对象当前的运行状态。
参数说明:
-
status 是 MultithreadedTableWriter::Status 类
等待插入完成方法
void waitForThreadCompletion();
该函数会阻塞直至后台线程完成全部工作后才会返回。
示例:常规用法
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
//创建表
string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
conn.run(script);
//构造MTW对象
ErrorCodeInfo errorInfo;
MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false);
char msg[] = "123456msg";
if(!writer.insert(errorInfo, 1, 2.3, msg)){
std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
return -1;
}
writer.waitForThreadCompletion();
//插入完成
MultithreadedTableWriter::Status status;
writer.getStatus(status);
if (status.hasError()) {
std::cout << "error in writing: " << status.errorInfo << std::endl;
}
//验证插入结果
std::cout << conn.run("t1")->getString() << std::endl;
}
示例:使用回调函数
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
//创建表
string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
conn.run(script);
auto cb = [](ConstantSP callbackTable)
{
/***
callbackTable schema:
column 0: id->string
column 1: success->bool
***/
std::cout << callbackTable->getString() << std::endl;
};
//构造MTW对象
ErrorCodeInfo errorInfo;
MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false, false, nullptr, 1, 0.1, 5, "col1", nullptr, MultithreadedTableWriter::M_Append, nullptr, cb);
char msg[] = "123456msg";
if(!writer.insert(errorInfo, "row1", 1, 2.3, msg)){
std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
return -1;
}
writer.waitForThreadCompletion();
//插入完成
MultithreadedTableWriter::Status status;
writer.getStatus(status);
if (status.hasError()) {
std::cout << "error in writing: " << status.errorInfo << std::endl;
}
//验证插入结果
std::cout << conn.run("t1")->getString() << std::endl;
}
示例:写入分布式表
int main(int argc, const char **argv)
{
DBConnection conn;
conn.connect("127.0.0.1", 8848, "admin", "123456");
conn.run(R"(dbName = 'dfs://valuedb3'
if(exists(dbName)){
dropDatabase(dbName);
}
datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,LONG]);
db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]);
pt=db.createPartitionedTable(datetest,'pdatetest','id');)");
vector<COMPRESS_METHOD> compress;
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_DELTA);
MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, NULL, 10000, 1, 5, "id", &compress);
ErrorCodeInfo errorInfo;
//插入100行正确数据 (类型和列数都正确),MTW正常运行
for (int i = 0; i < 100; i++) {
if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB", rand() % 10000) == false) {
//此处不会执行到
cout << "insert failed: " << errorInfo.errorInfo << endl;
break;
}
}
//插入1行数据(类型不匹配),MTW 立刻发现待插入数据类型不匹配,立刻返回错误信息
if (writer.insert(errorInfo, rand() % 10000, 222, rand() % 10000) == false) {
cout << "insert failed: " << errorInfo.errorInfo << endl;// insert failed: Column counts don't match 2
}
//插入1行数据(列数不匹配),MTW 立刻发现待插入数据列数与待插入表的列数不匹配,立刻返回错误信息
if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB") == false) {
cout << "insert failed: " << errorInfo.errorInfo << endl;
}
writer.waitForThreadCompletion();
cout << conn.run("select count(*) from pt")->getString() << endl;
}