appendMsg

语法

appendMsg(engine, msgBody, msgId)

参数

engine 是内置流数据引擎,即 createReactiveStateEngine 等函数返回的抽象表对象。

msgBody 是将要写入流数据引擎的消息。

msgId 是写入数据之前,流数据引擎已接收到的最后一条消息的 ID。ID 从订阅发布的第一条消息开始计数。

详情

当流数据引擎启用快照机制(snapshot)且未开启 RaftGroup 时,订阅函数 subscribeTable 的handler参数必须为 appendMsg 函数,将数据写入流数据引擎。

例子

share streamTable(10000:0,`time`sym`price, [TIMESTAMP,SYMBOL,DOUBLE]) as trades
output1 =table(10000:0, `time`sym`avgPrice, [TIMESTAMP,SYMBOL,DOUBLE]);

engine1 = createTimeSeriesEngine(name=`engine1, windowSize=100, step=50, metrics=<avg(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, keyColumn=`sym, snapshotDir="C:/DolphinDB/Data/snapshotDir", snapshotIntervalInMsgCount=100)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=appendMsg{engine1}, msgAsTable=true, handlerNeedMsgId=true)

n=500
timev=2021.03.12T15:00:00.000 + (1..n join 1..n)
symv = take(`A, n) join take(`B, n)
pricev = (100+cumsum(rand(1.0,n)-0.5)) join (200+cumsum(rand(1.0,n)-0.5))
t=table(timev as time, symv as sym, pricev as price).sortBy!(`time)
trades.append!(t)

select * from output1