订阅跨进程共享内存表
C++ API 提供 IPCInMemoryStreamClient 类以实现订阅跨进程共享内存表的功能,来满足对订阅数据时延性要求较高的场景需求。通过订阅跨进程共享内存表,API 端可以直接通过共享内存获取由 Server 端发布的流数据,极大地减少了网络传输的延时。因为进程间会访问同一个共享内存,所以要求发布端和订阅端必须位于同一台服务器。本节主要介绍如何通过 C++ API 实现订阅共享内存表的功能。
订阅接口
using IPCInMemoryTableReadHandler = std::function<void(ConstantSP)>;
ThreadSP subscribe(const string& tableName, const IPCInMemoryTableReadHandler& handler, TableSP outputTable = nullptr, bool overwrite = true);
参数如下:
-
tableName
:表名 -
handler
:回调函数 -
outputTable
:可选参数,用来存储收到的消息内容的表,表结构要求和订阅的表结构相同 -
overwrite
:可选参数,表示收到新消息后是否覆盖outputTable中的原有内容
取消订阅接口
void unsubscribe(const string& tableName);
其中,tableName
为表名。
代码示例
本例使用 2.00.10 版本的DolphinDB Server
DolphinDB 脚本:定义一个流表和一个共享内存表,订阅该流表,在流表中插入数据时,数据会插入到共享内存表中:
share streamTable(10000:0,`timestamp`temperature, [TIMESTAMP,DOUBLE]) as pubTable;
share createIPCInMemoryTable(1000000, "pubTable", `timestamp`temperature, [TIMESTAMP, DOUBLE]) as shm_test;
def shm_append(msg) {
shm_test.append!(msg)
}
topic2 = subscribeTable(tableName="pubTable", actionName="act3", offset=0, handler=shm_append, msgAsTable=true)
C++ 代码:
void print(TableSP table) {
std::cout << table->getString() << std::endl;
}
int main(int argc, const char **argv)
{
string tableName = "pubTable";
//构造对象
IPCInMemoryStreamClient memTable;
//创建一个存储数据的 table,要求和 createIPCInMemoryTable 中列的类型和名称一一对应
vector<string> colNames = {"timestamp", "temperature"};
vector<DATA_TYPE> colTypes = {DT_TIMESTAMP, DT_DOUBLE};
int rowNum = 0, indexCapacity=10000;
TableSP outputTable = Util::createTable(colNames, colTypes, rowNum, indexCapacity); // 创建一个和共享内存表结构相同的表
//overwrite 是否覆盖前面旧的数据
bool overwrite = false;
ThreadSP thread = memTable.subscribe(tableName, print, nullptr, overwrite);
Util::sleep(10000);
//最后取消订阅,结束回调
memTable.unsubscribe(tableName);
}