MultithreadedTableWriter

本小节介绍如何通过 DolphinDB C++ API 中的 MultithreadedTableWriter (MTW) 类来将数据高效的写入到 DolphinDB 中。MTW 是一种多线程异步写入数据的方式,它按行写入,内置一个数据缓冲队列,在满足一定的条件时会将一批数据提交到DolphinDB Server。

其方法如下:

构造方法

MultithreadedTableWriter(const std::string& host,int port,
                         const std::string& userId, const std::string& password,
                 const string& dbPath, const string& tableName,
                         bool useSSL,
                         bool enableHighAvailability = false,
                         const vector<string> *pHighAvailabilitySites = nullptr,
                         int batchSize = 1,
                         float throttle = 0.01f,
                         int threadCount = 1,
                         const string& partitionCol ="",
                         const vector<COMPRESS_METHOD> *pCompressMethods = nullptr,
                         Mode mode = M_Append,
                 vector<string> *pModeOption = nullptr,
                         const std::function<void(ConstantSP)> &callbackFunc = nullptr,
                         bool enableStreamTableTimestamp = false);

参数:

  • host:字符串,表示所连接的服务器的地址。
  • port:整数,表示服务器端口。
  • userId / password:字符串,登录时的用户名和密码。
  • dbPath:字符串,表示分布式数据库地址。内存表时该参数为空。
  • tableName:字符串,表示分布式表或内存表的表名。
  • useSSL:布尔值,默认值为 false。表示是否启用加密通讯。
  • enableHighAvailability:布尔值,默认为 false。若要开启 API 高可用,则需要指定该参数为true。
  • pHighAvailabilitySites:列表类型,表示所有可用节点的 ip:port 构成的 list。
  • batchSize:整数,表示批处理的消息的数量,默认值是 1,表示客户端写入数据后就立即发送给服务器。如果该参数大于 1,表示数据量达到 batchSize 时,客户端才会将数据发送给服务器。
  • throttle:大于 0 的数,单位为秒。若客户端有数据写入,但数据量不足 batchSize,则等待 throttle的时间再发送数据。
  • threadCount:整数,表示创建的工作线程数量,默认为 1,表示单线程。对于维度表,其值必须为1。
  • partitionCol:字符串类型,默认为空,仅在 threadCount 大于1时起效。对于分区表,必须指定为分区字段名;如果是流表,必须指定为表的某一列名;对于维度表,该参数不起效。
  • pCompressMethods:列表类型,用于指定每一列采用的压缩传输方式,为空表示不压缩。每一列可选的压缩方式包括:
    • COMPRESS_LZ4: LZ4 压缩。
    • COMPRESS_DELTA: DELTAOFDELTA 压缩。
  • mode:表示数据写入的方式,可选值为:M_Append 或 M_Upsert。M_Upsert 表示以 upsert! 方式追加或更新表数据;M_Append 表示以 append! 方式追加表数据。
  • modeOption:字符串数组,表示不同模式下的扩展选项,目前,仅当 mode 指定为 M_Upsert 时有效,表示由 可选参数组成的字符串数组。
  • callbackFunc:回调函数,插入一行数据成功之后会进行回调。回调参数是一个表,第一列为字符串类型,表示之前插入的数据中某一行的ID,第二列为BOOL类型,表示该行是否插入成功
  • enableStreamTableTimestamp:布尔类型,为true时只能写入流表,并且必须已对该表的最后一列调用了 setStreamTableTimestamp 函数。在插入数据时,不用填最后一列的数据,dolphindb会根据数据插入时间自动填写。该参数主要用来测试插入时延。

数据插入方法

template<typename... TArgs> 
bool insert(ErrorCodeInfo &errorInfo, TArgs... args)

插入单行数据。返回一个bool 类型数据。true 表示插入成功,false 表示失败。

参数说明:

  • errorInfo 是 ErrorCodeInfo 类,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。当 insert 接口返回 false 时表示数据写入失败,此时,errorInfo 会显示失败的详细信息。
  • args 是变长参数,代表插入的一行数据。
    Note: 如果在创建 MTW 类时提供了回调函数,那么要求可变长参数中的第一个参数是一个字符串类型的参数,表示这一行的 ID,在回调函数中可根据 ID 来判断该行是否插入成功。

获取未写入数据方法

void getUnwrittenData(std::vector<std::vector<ConstantSP>*> &unwrittenData);

返回一个列表,表示未写入服务器的数据。

Note: 该方法获取到数据资源后,MultithreadedTableWriter 内部将不再持有这些数据。

参数说明:

  • unwrittenData 嵌套列表,表示未写入服务器的数据,包含发送失败的数据以及待发送的数据两部分

重新插入未写入数据

bool insertUnwrittenData(std::vector<std::vector<ConstantSP>*> &records, ErrorCodeInfo &errorInfo)

将数据插入数据表。返回值同 insert 方法。与 insert 方法的区别在于,insert 只能插入单行数据,而 insertUnwrittenData 可以同时插入多行数据。

参数说明:

  • records 需要再次写入的数据。可以通过方法 getUnwrittenData 获取该对象。
  • errorInfo 是 ErrorCodeInfo 类,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。当insert接口返回false时表示数据写入失败,此时,errorInfo 会显示失败的详细信息。

获取状态方法

void getStatus(Status &status);

获取 MultithreadedTableWriter 对象当前的运行状态。

参数说明:

  • status 是 MultithreadedTableWriter::Status 类

等待插入完成方法

void waitForThreadCompletion();

该函数会阻塞直至后台线程完成全部工作后才会返回。

示例:常规用法

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    //创建表
    string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
    conn.run(script);
    
    //构造MTW对象
    ErrorCodeInfo errorInfo;
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false);
    char msg[] = "123456msg";
    if(!writer.insert(errorInfo, 1, 2.3, msg)){
        std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
        return -1;
    }
    writer.waitForThreadCompletion();
    //插入完成
    MultithreadedTableWriter::Status status;
    writer.getStatus(status);
    if (status.hasError()) {
        std::cout << "error in writing: " << status.errorInfo << std::endl;
    }
    //验证插入结果
    std::cout << conn.run("t1")->getString() << std::endl;
}

示例:使用回调函数

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    //创建表
    string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
    conn.run(script);

    auto cb = [](ConstantSP callbackTable)
	{
		/***
			callbackTable schema:
			column 0: id->string
			column 1: success->bool
		***/
		std::cout << callbackTable->getString() << std::endl;
	};

    //构造MTW对象
    ErrorCodeInfo errorInfo;
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false, false, nullptr, 1, 0.1, 5, "col1", nullptr, MultithreadedTableWriter::M_Append, nullptr, cb);
    char msg[] = "123456msg";
    if(!writer.insert(errorInfo, "row1", 1, 2.3, msg)){
        std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
        return -1;
    }
    writer.waitForThreadCompletion();
    //插入完成
    MultithreadedTableWriter::Status status;
    writer.getStatus(status);
    if (status.hasError()) {
        std::cout << "error in writing: " << status.errorInfo << std::endl;
    }
    //验证插入结果
    std::cout << conn.run("t1")->getString() << std::endl;
}

示例:写入分布式表

int main(int argc, const char **argv)
{
    DBConnection conn;
	conn.connect("127.0.0.1", 8848, "admin", "123456");
	conn.run(R"(dbName = 'dfs://valuedb3'
                if(exists(dbName)){
                    dropDatabase(dbName);
                }
                datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,LONG]);
                db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]);
                pt=db.createPartitionedTable(datetest,'pdatetest','id');)");

    vector<COMPRESS_METHOD> compress;
    compress.push_back(COMPRESS_LZ4);
    compress.push_back(COMPRESS_LZ4);
    compress.push_back(COMPRESS_DELTA);
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, NULL, 10000, 1, 5, "id", &compress);
    
    ErrorCodeInfo errorInfo;
    //插入100行正确数据 (类型和列数都正确),MTW正常运行
    for (int i = 0; i < 100; i++) {
        if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB", rand() % 10000) == false) {
            //此处不会执行到
            cout << "insert failed: " << errorInfo.errorInfo << endl;
            break;
        }
    }
    //插入1行数据(类型不匹配),MTW 立刻发现待插入数据类型不匹配,立刻返回错误信息
    if (writer.insert(errorInfo, rand() % 10000, 222, rand() % 10000) == false) {
        cout << "insert failed: " << errorInfo.errorInfo << endl;// insert failed: Column counts don't match 2
    }
    //插入1行数据(列数不匹配),MTW 立刻发现待插入数据列数与待插入表的列数不匹配,立刻返回错误信息
    if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB") == false) {
        cout << "insert failed: " << errorInfo.errorInfo << endl;
    }
    writer.waitForThreadCompletion();
    cout << conn.run("select count(*) from pt")->getString() << endl;
}