MultithreadedTableWriter

本小节介绍如何通过 DolphinDB C++ API 中的 MultithreadedTableWriter (MTW) 类来将数据高效的写入到 DolphinDB 中。MTW 是一种多线程异步写入数据的方式,它按行写入,内置一个数据缓冲队列,在满足一定的条件时会将一批数据提交到DolphinDB Server。

其方法如下:

构造方法

方法一:

MultithreadedTableWriter(const std::string& host,int port,
                         const std::string& userId, const std::string& password,
                 const string& dbPath, const string& tableName,
                         bool useSSL,
                         bool enableHighAvailability = false,
                         const vector<string> *pHighAvailabilitySites = nullptr,
                         int batchSize = 1,
                         float throttle = 0.01f,
                         int threadCount = 1,
                         const string& partitionCol ="",
                         const vector<COMPRESS_METHOD> *pCompressMethods = nullptr,
                         Mode mode = M_Append,
                 vector<string> *pModeOption = nullptr,
                         const std::function<void(ConstantSP)> &callbackFunc = nullptr,
                         bool enableStreamTableTimestamp = false);

参数:

  • host:字符串,表示所连接的服务器的地址。
  • port:整数,表示服务器端口。
  • userId / password:字符串,登录时的用户名和密码。
  • dbPath:字符串,表示分布式数据库地址。内存表时该参数为空。
  • tableName:字符串,表示分布式表或内存表的表名。
  • useSSL:布尔值,默认值为 false。表示是否启用加密通讯。
  • enableHighAvailability:布尔值,默认为 false。若要开启 API 高可用,则需要指定该参数为true。
  • pHighAvailabilitySites:列表类型,表示所有可用节点的 ip:port 构成的 list。
  • batchSize:整数,表示批处理的消息的数量,默认值是 1,表示客户端写入数据后就立即发送给服务器。如果该参数大于 1,表示数据量达到 batchSize 时,客户端才会将数据发送给服务器。
  • throttle:大于 0 的数,单位为秒。若客户端有数据写入,但数据量不足 batchSize,则等待 throttle的时间再发送数据。
  • threadCount:整数,表示创建的工作线程数量,默认为 1,表示单线程。对于维度表,其值必须为1。
  • partitionCol:字符串类型,默认为空,仅在 threadCount 大于1时起效。对于分区表,必须指定为分区字段名;如果是流表,必须指定为表的某一列名;对于维度表,该参数不起效。
  • pCompressMethods:列表类型,用于指定每一列采用的压缩传输方式,为空表示不压缩。每一列可选的压缩方式包括:
    • COMPRESS_LZ4: LZ4 压缩。
    • COMPRESS_DELTA: DELTAOFDELTA 压缩。
  • mode:表示数据写入的方式,可选值为:M_Append 或 M_Upsert。M_Upsert 表示以 upsert! 方式追加或更新表数据;M_Append 表示以 append! 方式追加表数据。
  • modeOption:字符串数组,表示不同模式下的扩展选项,目前,仅当 mode 指定为 M_Upsert 时有效,表示由 可选参数组成的字符串数组。
  • callbackFunc:回调函数,插入一行数据成功之后会进行回调。回调参数是一个表,第一列为字符串类型,表示之前插入的数据中某一行的ID,第二列为BOOL类型,表示该行是否插入成功
  • enableStreamTableTimestamp:布尔类型,为true时只能写入流表,并且必须已对该表的最后一列调用了 setStreamTableTimestamp 函数。在插入数据时,不用填最后一列的数据,dolphindb会根据数据插入时间自动填写。该参数主要用来测试插入时延。

方法二:

MultithreadedTableWriter(const MTWConfig &config);

其中类 MTWConfig 的接口定义如下:

enum class WriteMode {
    Append,
    Upsert,
};

enum class MTWState {
    Initializing,      // MTW 初始化阶段,内部使用,不会回调
    ConnectedOne,      // 一个连接已建立
    ConnectedAll,      // 所有连接均已建立
    ReconnectingOne,   // 一个连接正在重试
    ReconnectingAll,   // 所有连接正在重试
    Terminated         // MTW 退出,内部使用,不会回调
};

class MTWConfig {
public:
  // 对应原参数 host, port, userId, password, tableName, useSSL, enableHighAvailability, pHighAvailabilitySites
  MTWConfig(const std::shared_ptr<DBConnection> conn, const std::string &tableName);
  // 对应原参数 batchSize, throttle
  template<typename Rep, typename Period>
  MTWConfig& setBatching(const size_t batchSize, const std::chrono::duration<Rep,Period> throttle);
  // 对应原参数 pCompressMethods
  MTWConfig& setCompression(const std::vector<COMPRESS_METHOD> &compressMethods);
  // 对应原参数 threadCount、partitionCol
  MTWConfig& setThreads(const size_t threadNum, const std::string& partitionColumnName);
  // 对应原参数 mode, pModeOption
  MTWConfig& setWriteMode(const WriteMode mode, const std::vector<std::string> &option = std::vector<std::string>());
  // 对应原参数 enableStreamTableTimestamp
  MTWConfig& setStreamTableTimestamp();
  // 新增接口,连接状态回调
  using stateCallbackT = std::function<bool(const MTWState state, const std::string &host, const int port)>;
  MTWConfig& onConnectionStateChange(const stateCallbackT &callback);
  // 对应原参数 callbackFunc(数据写入回调)
  using dataCallbackT = std::function<void(ConstantSP)>;
  MTWConfig& onDataWrite(const dataCallbackT &callback);
};
Note: 当前回调函数的返回值(bool)尚未生效,请返回 true。后续将支持通过返回 false 来中断写入。

使用示例:

int main(){
    // 客户自行创建连接并校验服务器状态
    std::shared_ptr conn = std::make_shared<DBConnection>();
    conn->connect("localhost", 8848, "admin", "123456", "", false, {}, 7200, true);
    // 客户结合自身业务处理连接状况
    auto cb = [&conn](const MTWState state, const std::string &host, const int port) {
        std::cout << "This is a callback: state " << static_cast<int>(state) << ", server " << host << ":" << port << std::endl;
        return true;
    };
    // 创建并配置MTW
    MTWConfig config(conn, "myTable");
    config.onConnectionStateChange(cb).setThreads(5, "myColumn");
    MultithreadedTableWriter mtw(config);

    // 后续MTW操作与构造接口无关,支持当前MTW的所有功能。
    ErrorCodeInfo pErrorInfo;
    string sym[] = {"A", "B", "C", "D"};
    for (int i = 0; i < 10; i++) {
        mtw.insert(pErrorInfo, sym[i % 4], i * 12, i + 64);
        std::this_thread::sleep_for(100ms);
    }
    mtw.waitForThreadCompletion();
    return 0;
}

数据插入方法

template<typename... TArgs> 
bool insert(ErrorCodeInfo &errorInfo, TArgs... args)

插入单行数据。返回一个bool 类型数据。true 表示插入成功,false 表示失败。

参数说明:

  • errorInfo 是 ErrorCodeInfo 类,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。当 insert 接口返回 false 时表示数据写入失败,此时,errorInfo 会显示失败的详细信息。
  • args 是变长参数,代表插入的一行数据。
    Note:
    • 如果在创建 MTW 类时提供了回调函数,那么要求可变长参数中的第一个参数是一个字符串类型的参数,表示这一行的 ID,在回调函数中可根据 ID 来判断该行是否插入成功。
    • 方法 insert 会在执行后释放 args 中包含的指针,因此建议传入 ConstantSP 类型或 C++ 基本类型参数。

获取未写入数据方法

void getUnwrittenData(std::vector<std::vector<ConstantSP>*> &unwrittenData);

返回一个列表,表示未写入服务器的数据。

Note: 该方法获取到数据资源后,MultithreadedTableWriter 内部将不再持有这些数据。

参数说明:

  • unwrittenData 嵌套列表,表示未写入服务器的数据,包含发送失败的数据以及待发送的数据两部分

重新插入未写入数据

bool insertUnwrittenData(std::vector<std::vector<ConstantSP>*> &records, ErrorCodeInfo &errorInfo)

将数据插入数据表。返回值同 insert 方法。与 insert 方法的区别在于,insert 只能插入单行数据,而 insertUnwrittenData 可以同时插入多行数据。

参数说明:

  • records 需要再次写入的数据。可以通过方法 getUnwrittenData 获取该对象。
  • errorInfo 是 ErrorCodeInfo 类,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。当insert接口返回false时表示数据写入失败,此时,errorInfo 会显示失败的详细信息。

获取状态方法

void getStatus(Status &status);

获取 MultithreadedTableWriter 对象当前的运行状态。

参数说明:

  • status 是 MultithreadedTableWriter::Status 类

等待插入完成方法

void waitForThreadCompletion();

该函数会阻塞直至后台线程完成全部工作后才会返回。

示例:常规用法

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    //创建表
    string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
    conn.run(script);
    
    //构造MTW对象
    ErrorCodeInfo errorInfo;
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false);
    char msg[] = "123456msg";
    if(!writer.insert(errorInfo, 1, 2.3, msg)){
        std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
        return -1;
    }
    writer.waitForThreadCompletion();
    //插入完成
    MultithreadedTableWriter::Status status;
    writer.getStatus(status);
    if (status.hasError()) {
        std::cout << "error in writing: " << status.errorInfo << std::endl;
    }
    //验证插入结果
    std::cout << conn.run("t1")->getString() << std::endl;
}

示例:使用回调函数

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    //创建表
    string script = "t = table(1000:0, `col1`col2`col3, [LONG, DOUBLE, STRING]); share t as t1;";
    conn.run(script);

    auto cb = [](ConstantSP callbackTable)
	{
		/***
			callbackTable schema:
			column 0: id->string
			column 1: success->bool
		***/
		std::cout << callbackTable->getString() << std::endl;
	};

    //构造MTW对象
    ErrorCodeInfo errorInfo;
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "", "t1", false, false, nullptr, 1, 0.1, 5, "col1", nullptr, MultithreadedTableWriter::M_Append, nullptr, cb);
    char msg[] = "123456msg";
    if(!writer.insert(errorInfo, "row1", 1, 2.3, msg)){
        std::cout << "insert fail " << errorInfo.errorInfo << std::endl;
        return -1;
    }
    writer.waitForThreadCompletion();
    //插入完成
    MultithreadedTableWriter::Status status;
    writer.getStatus(status);
    if (status.hasError()) {
        std::cout << "error in writing: " << status.errorInfo << std::endl;
    }
    //验证插入结果
    std::cout << conn.run("t1")->getString() << std::endl;
}

示例:写入分布式表

int main(int argc, const char **argv)
{
    DBConnection conn;
	conn.connect("127.0.0.1", 8848, "admin", "123456");
	conn.run(R"(dbName = 'dfs://valuedb3'
                if(exists(dbName)){
                    dropDatabase(dbName);
                }
                datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,LONG]);
                db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]);
                pt=db.createPartitionedTable(datetest,'pdatetest','id');)");

    vector<COMPRESS_METHOD> compress;
    compress.push_back(COMPRESS_LZ4);
    compress.push_back(COMPRESS_LZ4);
    compress.push_back(COMPRESS_DELTA);
    MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, NULL, 10000, 1, 5, "id", &compress);
    
    ErrorCodeInfo errorInfo;
    //插入100行正确数据 (类型和列数都正确),MTW正常运行
    for (int i = 0; i < 100; i++) {
        if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB", rand() % 10000) == false) {
            //此处不会执行到
            cout << "insert failed: " << errorInfo.errorInfo << endl;
            break;
        }
    }
    //插入1行数据(类型不匹配),MTW 立刻发现待插入数据类型不匹配,立刻返回错误信息
    if (writer.insert(errorInfo, rand() % 10000, 222, rand() % 10000) == false) {
        cout << "insert failed: " << errorInfo.errorInfo << endl;// insert failed: Column counts don't match 2
    }
    //插入1行数据(列数不匹配),MTW 立刻发现待插入数据列数与待插入表的列数不匹配,立刻返回错误信息
    if (writer.insert(errorInfo, rand() % 10000, "AAAAAAAB") == false) {
        cout << "insert failed: " << errorInfo.errorInfo << endl;
    }
    writer.waitForThreadCompletion();
    cout << conn.run("select count(*) from pt")->getString() << endl;
}