haStreamTable

语法

haStreamTable(raftGroup, table, tableName, cacheLimit, [keyColumn], [retentionMinutes=1440])

参数

raftGroup 是一个大于1的整数,表示 Raft 组的 ID。

table 是一个表对象。它必须是一个由 table 函数创建的空表。

tableName 是一个字符串,表示高可用流数据表的名称。

cacheLimit 是一个整数,表示高可用流数据表在内存中最多保留多少行。如果 cacheLimit 是小于100,000的正整数,它会被自动调整为100,000。

keyColumn 是一个字符串标量或向量,表示主键。它是一个可选参数。

retentionMinutes 是一个整数,表示保留大小超过 1GB 的 log 文件的时间(从文件的最后修改时间开始计算),单位是分钟。默认值是1440,即一天。

详情

创建高可用流数据表。该函数只能在启用流数据高可用后使用。要启用流数据高可用,用户需要在集群配置文件 cluster.cfg 中指定配置参数 streamingHAModestreamingRaftGroups。系统启动时,配置参数 streamingRaftGroup 指定的数据节点/计算节点组成 Raft 组,一个数据节点/计算节点作为 Leader,其他数据节点/计算节点作为 Follower。Raft 组的每个数据节点/计算节点上都有流数据表的副本。

客户端只需订阅 Raft 组中任意一个数据节点/计算节点上的高可用流数据表,并启用订阅的自动重连功能,即把 reconnect 参数设置为 true。Leader 上的高可用流数据表会向客户端发布数据。如果 Raft 组中的 Leader 宕机,系统会选举出新的 Leader 继续发布数据,客户端会自动切换订阅到新的 Leader 上的高可用流数据表。

一个 Raft 组可以包含多个高可用流数据表。

例子

假设配置参数 streamingRaftGroup=11:NODE1:NODE2:NODE3,在 Raft 组的任意一个节点如 NODE1 上执行以下脚本创建高可用流数据表 trades:

colNames = `timestamp`sym`qty`price
colTypes = [TIMESTAMP,SYMBOL,INT,DOUBLE]
t=table(1:0,colNames,colTypes)
haStreamTable(11,t,`trades,100000);

在集群的另外一个节点 NODE4 上执行以下脚本订阅表 trades,把订阅的数据保存至分布式数据库中。

if(existsDatabase("dfs://stock")){
   dropDatabase("dfs://stock")
}
db=database(directory="dfs://stock",partitionType=VALUE,partitionScheme=2018.08.01..2019.12.30)
t=table(1:0,`timestamp`sym`qty`price,[TIMESTAMP,SYMBOL,INT,DOUBLE])
trades_slave=db.createPartitionedTable(table=t,tableName=`trades_slave,partitionColumns=`timestamp);
subscribeTable(server=NODE2,tableName=`trades,actionName=`sub_trades,offset=-1,handler=append!{trades_slave},msgAsTable=true,batchSize=1000,throttle=1,hash=-1,reconnect=true);

// 这里subscribeTable函数的第一个参数可以是NODE1,NODE2,NODE3中的任意一个,reconnect参数必须为true。

在 NODE4 上执行以下脚本取消订阅:

unsubscribeTable(server=NODE2,tableName=`trades,actionName=`sub_trades);
// 这里unsubscribeTable函数的第一个参数可以是 NODE1, NODE2, NODE3 中的任意一个。

相关函数:dropStreamTable, getStreamingLeader, getStreamingRaftGroups