subscribeTable
详情
从客户端节点订阅本地或远程服务器的流数据表。可在 handler 调用函数来处理订阅数据。
返回一个订阅主题(topic),即一个订阅的名称。它是一个字符串,由订阅表所在节点的别名、流数据表名称和订阅任务名称(如果指定了 actionName)组合而成,使用 "/" 分隔。如果订阅主题已经存在,函数将会抛出异常。
-
如果指定了 batchSize,当未处理消息数量达到 batchSize 或距离上次 handler 被触发已过去 throttle 秒,handler 将会被触发。
-
如果订阅的表被重定义了,为了保证订阅能够正常使用,需要使用 unsubscribeTable 命令取消订阅,然后重新创建订阅。
-
在高可用订阅流数据写入分布式表的场景下,订阅节点构成的 raft 组发生 leader 切换或集群重启时,可能导致之前登录的用户退出。由于 guest 用户无权限写入分布式表,写入会被中断(可以通过 getCurrentSessionAndUser 函数查看当前的用户)。若配置 userId 和 password 参数,用户退出后系统会自动尝试重新登录,以保证订阅数据成功写入分布式表。需要注意的是,从 2.00.10.10 开始,用户可以通过配置项 enhancedSecurityVerification 控制在登录时是否约束密码重试的次数。若不设置 enhancedSecurityVerification,则不约束;若设置 enhancedSecurityVerification=true,则当用户登录时,在1分钟内连续5次使用了错误密码,会导致用户被锁定,必须等待10分钟后才可以再次登录。
语法
subscribeTable([server],tableName,[actionName],[offset=-1],handler,[msgAsTable=false],[batchSize=0],[throttle=1],[hash=-1],[reconnect=false],[filter],[persistOffset=false],[timeTrigger=false],[handlerNeedMsgId=false],[raftGroup],[userId=""],[password=""])
参数
只有 tableName 和 handler 两个参数是必选参数。其他所有参数都是可选参数。
server 是一个字符串,表示服务器的别名或远程连接的句柄。如果未指定或者为空字符串,表示流数据所在的服务器是本地实例。
tableName 是被订阅的数据表名。该表必须为共享的流数据表。
actionName 是一个字符串,表示订阅任务的名称。它可以包含字母,数字和下划线,并以字母开头。如果一个节点有多个订阅任务均订阅了同一张表,则每个订阅必须指定唯一的 actionName。
offset 是订阅任务开始后的第一条消息所在的位置。消息是流数据表中的行。如果未指定,或设为-1,订阅将会从流数据表的当前行开始。如果 offset = -2,系统会获取持久化到磁盘上的 offset,并从该位置开始订阅。offset 与流数据表创建时的第一行对应。如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。
- 一元函数:它唯一的参数是订阅的数据。订阅的数据可以是一个表或元组,订阅数据表的每个列是元组的一个元素。
- handlerNeedMsgId = true 时,handler 必须是二元函数,其两个参数分别是订阅的数据(msgBody)和数据偏移量(msgId)。详见 handlerNeedMsgId 参数说明。
- 数据表:可以是流数据引擎、共享流数据表、共享内存表、共享键值表、共享索引表或 DFS 表。订阅数据会直接插入到该表中。
msgAsTable 是布尔值,表示订阅的数据是否为表。默认值是 false,表示订阅的数据是由列组成的元组。
batchSize 是一个整数。若为正数,表示未处理消息的数量达到 batchSize 时,handler 才会处理消息。若未指定或为非正数,每一批次的消息到达之后,handler 就会马上处理。
throttle 是一个浮点数,单位为秒,默认值为1。表示继上次 handler 处理消息之后,若 batchSize 条件一直未达到,多久后再次处理消息。如果没有指定 batchSize,throttle 即使指定,也不起作用。 若 throttle 需要设置小于1秒,则需要先修改配置项 subThrottle。
hash 是一个非负整数,指定某个订阅线程处理进来的消息。如果没有指定该参数,系统会自动分配一个线程。如果需要使用同一个线程来处理多个订阅任务的消息,可把这些订阅任务的 hash 设置为相同的值。
- 如果发布端与订阅端处于正常状态,但是网络中断,那么订阅端会在网络正常时,自动从中断位置重新订阅。
- 如果发布端崩溃,订阅端会在发布端重启后不断尝试重新订阅。
- 如果发布端对流数据表启用了持久化,发布端重启后会首先读取硬盘上的数据,直到发布端读取到订阅中断位置的数据,订阅端才能成功重新订阅。
- 如果发布端没有对流数据表启用持久化,那么订阅端将自动重新订阅失败。
- 如果订阅端崩溃,订阅端重启后不会自动重新订阅,需要重新执行 subscribeTable 函数。
filter 参数需要配合 setStreamTableFilterColumn 函数一起使用。使用
setStreamTableFilterColumn
指定流数据表的过滤列,流数据表过滤列在 filter
中的数据才会发布到订阅端,不在 filter 中的数据不会发布。filter 不支持过滤 BOOL 类型数据。
- 值过滤:一个向量。
- 范围过滤:一个数据对。范围包含下限值,但不包括上限值。
- 哈希过滤:一个元组。第一个元素表示 bucket 的个数;第二个元素是一个标量或数据对,其中标量表示 bucket 的索引(从0开始),数据对表示 bucket 的索引范围(包含下限值,但不包括上限值)。
- 若要订阅高可用流数据表,需要设置 persistOffset 为 true,以防止订阅端丢失数据。
- 设置 persistOffset 为
true,且取消订阅(
unsubscribeTable
)时,设置 removeOffset = false,再次订阅时才会从持久化保存的偏移量开始订阅。
timeTrigger 是一个布尔值。若设为 true,表示即使没有新的消息进入,handler 也会在 throttle 参数所设定的时间间隔被触发。
- 若设为 true,handler 必须支持两个参数:一个是 msgBody(传入的消息),一个是 msgId(消息的偏移量)。如:以部分应用的形式固定 appendMsg 的 engine 参数后,将其作为二元应用传入 handler。
- 若设为 false,handler 仅支持一个参数:msgBody。调用 handler 时,只传入消息本身。
subscribeTable
函数如果指定了
raftGroup,则只能在 leader 上执行。若同时指定 handlerNeedMsgId = true,则
handler 只能是计算引擎,即 handler = engine(创建引擎时的句柄变量)或 handler
= getStreamEngine(engineName)。userId 字符串,表示用户名。
password 字符串,表示用户密码。
例子
下面是关于流计算的例子。在本例中,集群有两个节点:DFS_NODE1 和 DFS_NODE2。我们需要在 cluster.cfg 中指定 maxPubConnections和subPort 参数来启动发布/订阅功能。例如:
maxPubConnections=32
DFS_NODE1.subPort=9010
DFS_NODE1.persistenceDir=C:/DolphinDB/Data
DFS_NODE2.subPort=9011
在 DFS_NODE1 上执行以下脚本:
- 创建一个共享的流数据表 trades_stream,并以同步模式保存。此时,表 trades_stream
有0行记录。
n=20000000 colNames = `time`sym`qty`price colTypes = [TIME,SYMBOL,INT,DOUBLE] enableTableShareAndPersistence(table=streamTable(n:0, colNames, colTypes), tableName="trades_stream", asynWrite=false, cacheSize=n) go
- 创建一个分布式表 trades。此时,表 trades
有0行记录。
if(existsDatabase("dfs://STREAM_TEST")){ dropDatabase("dfs://STREAM_TEST") } dbDate = database(directory="", partitionType=VALUE, partitionScheme=temporalAdd(date(today()),0..30,'d')) dbSym= database(directory="", partitionType=RANGE, partitionScheme=string('A'..'Z') join "ZZZZ") db = database(directory="dfs://STREAM_TEST", partitionType=COMPO, partitionScheme=[dbDate, dbSym]) colNames = `date`time`sym`qty`price colTypes = [DATE,TIME,SYMBOL,INT,DOUBLE] trades = db.createPartitionedTable(table=table(1:0, colNames, colTypes), tableName="trades", partitionColumns=`date`sym)
- 创建表 trades_stream 的本地订阅。使用 saveTradesToDFS 函数把表 trades_stream 的流数据和今天的日期保存至表
trades。
def saveTradesToDFS(mutable dfsTrades, msg): dfsTrades.append!(select today() as date,* from msg) subscribeTable(tableName="trades_stream", actionName="trades", offset=0, handler=saveTradesToDFS{trades}, msgAsTable=true, batchSize=100000, throttle=60)
- 创建表 trades_stream 的另一个本地订阅。使用每分钟的流数据计算成交量加权平均价格(vwap),并以异步模式把结果保存至共享的流数据表
vwap_stream
中。
n=1000000 tmpTrades = table(n:0, colNames, colTypes) lastMinute = [00:00:00.000] colNames = `time`sym`vwap colTypes = [MINUTE,SYMBOL,DOUBLE] enableTableShareAndPersistence(table=streamTable(n:0, colNames, colTypes), tableName="vwap_stream") go def calcVwap(mutable vwap, mutable tmpTrades, mutable lastMinute, msg){ tmpTrades.append!(msg) curMinute = time(msg.time.last().minute()*60000l) t = select wavg(price, qty) as vwap from tmpTrades where time < curMinute, time >= lastMinute[0] group by time.minute(), sym if(t.size() == 0) return vwap.append!(t) t = select * from tmpTrades where time >= curMinute tmpTrades.clear!() lastMinute[0] = curMinute if(t.size() > 0) tmpTrades.append!(t) } subscribeTable(tableName="trades_stream", actionName="vwap", offset=0, handler=calcVwap{vwap_stream, tmpTrades, lastMinute}, msgAsTable=true, batchSize=100000, throttle=60)
在 DFS_NODE2 上执行以下脚本,创建表 trades_stream 的远程订阅,并以异步模式把流数据保存至表 trades_stream_slave 中。
n=20000000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
enableTableShareAndPersistence(table=streamTable(n:0, colNames, colTypes), tableName="trades_stream_slave", cacheSize=n)
go
subscribeTable(server="DFS_NODE1", tableName="trades_stream", actionName="slave", offset=0, handler=trades_stream_slave)
在 DFS_NODE1 上执行以下脚本,模拟3支股票在10分钟内的流数据。每支股票每分钟生成2,000,000条记录。每分钟的数据被插入到流数据表 trades_stream 的600个数据块中。每两个数据块有100毫秒的时间间隔。
n=10
ticks = 2000000
rows = ticks*3
startMinute = 09:30:00.000
blocks=600
for(x in 0:n){
time = startMinute + x*60000 + rand(60000, rows)
indices = isort(time)
time = time[indices]
sym = array(SYMBOL,0,rows).append!(take(`IBM,ticks)).append!(take(`MSFT,ticks)).append!(take(`GOOG,ticks))[indices]
price = array(DOUBLE,0,rows).append!(norm(153,1,ticks)).append!(norm(91,1,ticks)).append!(norm(1106,20,ticks))[indices]
indices = NULL
blockSize = rows / blocks
for(y in 0:blocks){
range =pair(y * blockSize, (y+1)* blockSize)
insert into trades_stream values(subarray(time,range), subarray(sym,range), 10+ rand(100, blockSize), subarray(price,range))
sleep(100)
}
blockSize = rows % blocks
if(blockSize > 0){
range =pair(rows - blockSize, rows)
insert into trades_stream values(subarray(time,range), subarray(sym,range), 10+ rand(100, blockSize), subarray(price,range))
}
}
在 DFS_NODE1 上执行以下脚本来检查结果:
trades=loadTable("dfs://STREAM_TEST", `trades)
select count(*) from trades
结果预期是有60,000,000条记录。
select * from vwap_stream
表 vwap_stream 预期是有27条记录。
在 DFS_NODE2 上执行以下脚本:
select count(*) from trades_stream_slave
我们看到的结果小于60,000,000行,因为部分表的记录已经保存到磁盘中。