订阅跨进程共享内存表

C++ API 提供 IPCInMemoryStreamClient 类以实现订阅跨进程共享内存表的功能,来满足对订阅数据时延性要求较高的场景需求。通过订阅跨进程共享内存表,API 端可以直接通过共享内存获取由 Server 端发布的流数据,极大地减少了网络传输的延时。因为进程间会访问同一个共享内存,所以要求发布端和订阅端必须位于同一台服务器。本节主要介绍如何通过 C++ API 实现订阅共享内存表的功能。

订阅接口

using IPCInMemoryTableReadHandler = std::function<void(ConstantSP)>;
ThreadSP subscribe(const string& tableName, const IPCInMemoryTableReadHandler& handler, TableSP outputTable = nullptr, bool overwrite = true);

参数如下:

  • tableName:表名

  • handler:回调函数

  • outputTable :可选参数,用来存储收到的消息内容的表,表结构要求和订阅的表结构相同

  • overwrite :可选参数,表示收到新消息后是否覆盖outputTable中的原有内容

取消订阅接口

void unsubscribe(const string& tableName);

其中,tableName 为表名。

代码示例

本例使用 2.00.10 版本的DolphinDB Server

DolphinDB 脚本:定义一个流表和一个共享内存表,订阅该流表,在流表中插入数据时,数据会插入到共享内存表中:

share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable;
share createIPCInMemoryTable(1000000, "pubTable", `timestamp`temperature, [TIMESTAMP, DOUBLE]) as shm_test;

def shm_append(msg) {
    shm_test.append!(msg)
}
topic2 = subscribeTable(tableName="pubTable", actionName="act3", offset=0, handler=shm_append, msgAsTable=true)

C++ 代码:

void print(TableSP table) {
    std::cout << table->getString() << std::endl;
}

int main(int argc, const char **argv)
{
    string tableName = "pubTable";
    //构造对象
    IPCInMemoryStreamClient memTable;

    //创建一个存储数据的 table,要求和 createIPCInMemoryTable 中列的类型和名称一一对应
    vector<string> colNames = {"timestamp", "temperature"};
    vector<DATA_TYPE> colTypes = {DT_TIMESTAMP, DT_DOUBLE};
    int rowNum = 0, indexCapacity=10000;
    TableSP outputTable = Util::createTable(colNames, colTypes, rowNum, indexCapacity); // 创建一个和共享内存表结构相同的表

    //overwrite 是否覆盖前面旧的数据
    bool overwrite = false;
    ThreadSP thread = memTable.subscribe(tableName, print, nullptr, overwrite);
    Util::sleep(10000);
    //最后取消订阅,结束回调
    memTable.unsubscribe(tableName);
}