订阅异构流表

DolphinDB server 自 1.30.17 及 2.00.5 版本开始,支持通过 replay 函数将多个结构不同的流数据表,回放(序列化)到一个流表里,这个流表被称为异构流表。

在 C++ API 中,需要在回调函数中通过 symbol 值来区分该条消息具体来自哪个表。

注: 在订阅异构流表时,如果表中某一列是 ArrayVector,那么在回调函数的参数 Message 中,该列对应的元素也是一个 ArrayVector,长度为 1。如果订阅的是普通流表,表中某一列是 ArrayVector,则在回调参数 Message 中,该列对应的元素是一个 Vector。

创建反序列化器

如果要通过 C++ API 来订阅异构流表,则需要定义一个异构流表反序列化器,也就是 StreamDeserializer 类的一个对象。创建该类对象的几种方式如下:

  • 通过symbol与表的对应关系来创建,其中,表由一个pair<string, string>来表示,前面的字符串表示库名,后面的字符串表示表名,如果是内存表, 库名为空字符串。第二个参数pconn目前可以忽略。

    StreamDeserializer(const unordered_map<string, pair<string, string>> &sym2tableName, DBConnection *pconn = nullptr);
  • 通过symbol与表的Schema的对应关系来创建。

    StreamDeserializer(const unordered_map<string, DictionarySP> &sym2schema);
  • 通过symbol与表中每一列的类型的关系来创建。

示例代码

本例使用2.00.10版本的 DolphinDB Server

DolphinDB 脚本,创建两个结构不同的表,将两个表的内容回放到一个异构流表中:

st2 = streamTable(100:0, `timestampv`sym`blob`price1,[TIMESTAMP,SYMBOL,BLOB,DOUBLE])
share st2 as SDoutTables

n = 5;
table1 = table(100:0, `datetimev`timestampv`sym`price1`price2, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE, DOUBLE]);
table2 = table(100:0, `datetimev`timestampv`sym`price1, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE]);
share table1 as table1_SDPT
share table2 as table2_SDPT
tableInsert(table1_SDPT, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n), rand(100,n)+rand(1.0, n));
tableInsert(table2_SDPT, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n));
d = dict(['msg1','msg2'], [table1_SDPT, table2_SDPT]);
replay(inputTables=d, outputTables=`SDoutTables, dateColumn=`timestampv, timeColumn=`timestampv)

C++ 代码 1(用第一个构造函数来创建反序列化器):

int main(int argc, const char **argv)
{
    std::unordered_map<std::string, std::pair<std::string, std::string>> sym2table{{"msg1", {"", "table1_SDPT"}}, {"msg2", {"", "table2_SDPT"}}};
    StreamDeserializerSP sdsp = new StreamDeserializer(sym2table);
    auto onehandler = [&](Message msg)
    {
        std::cout << "symbol: " << msg.getSymbol() << " content: " << msg->getString() << std::endl;
    };
    ThreadedClient threadedClient(0);
    threadedClient.subscribe("127.0.0.1", 8848, onehandler, "SDoutTables", "test_SD", 0, true, nullptr, false, false, "admin", "123456", sdsp);
    Util::sleep(1000);
    threadedClient.unsubscribe("127.0.0.1", 8848, "SDoutTables", "test_SD");
}

C++ 代码 2(用第二个构造函数来创建反序列化器):

int main(int argc, const char **argv)
{
    DBConnection conn;
    conn.connect("127.0.0.1", 8848, "admin", "123456");
    DictionarySP t1s = conn.run("schema(table1_SDPT)");
    DictionarySP t2s = conn.run("schema(table2_SDPT)");
    unordered_map<string, DictionarySP> sym2schema;
    sym2schema["msg1"] = t1s;
    sym2schema["msg2"] = t2s;
    StreamDeserializerSP sdsp = new StreamDeserializer(sym2schema);
    auto onehandler = [&](Message msg)
    {
        std::cout << "symbol: " << msg.getSymbol() << " content: " << msg->getString() << std::endl;
    };
    ThreadedClient threadedClient(0);
    threadedClient.subscribe("127.0.0.1", 8848, onehandler, "SDoutTables", "test_SD", 0, true, nullptr, false, false, "admin", "123456", sdsp);
    Util::sleep(1000);
    threadedClient.unsubscribe("127.0.0.1", 8848, "SDoutTables", "test_SD");
}

C++ 代码 3(用第三个构造函数来创建反序列化器):

int main(int argc, const char **argv)
{
    unordered_map<string, std::vector<DATA_TYPE>> sym2cols{{"msg1", {DT_DATETIME, DT_TIMESTAMP, DT_SYMBOL, DT_DOUBLE, DT_DOUBLE}}, {"msg2", {DT_DATETIME, DT_TIMESTAMP, DT_SYMBOL, DT_DOUBLE}}};
    StreamDeserializerSP sdsp = new StreamDeserializer(sym2cols);
    auto onehandler = [&](Message msg)
    {
        std::cout << "symbol: " << msg.getSymbol() << " content: " << msg->getString() << std::endl;
    };
    ThreadedClient threadedClient(0);
    threadedClient.subscribe("127.0.0.1", 8848, onehandler, "SDoutTables", "test_SD", 0, true, nullptr, false, false, "admin", "123456", sdsp);
    Util::sleep(1000);
    threadedClient.unsubscribe("127.0.0.1", 8848, "SDoutTables", "test_SD");
}