appendMsg
Syntax
appendMsg(engine, msgBody, msgId)
Arguments
engine is a built-in streaming engine, i.e., the abstract table object return by functions such as createReactiveStateEngine
msgBody is the messages to be ingested into the streaming engine.
msgId is the ID of the last message that has been ingested into the streaming engine. The ID starts from the beginning of the subscription.
Details
If snapshot is enabled and RaftGroup is disabled for a streaming
engine, the handler parameter of function subscribeTable must be
appendMsg
to inject the data into the engine.
Examples
share streamTable(10000:0,`time`sym`price, [TIMESTAMP,SYMBOL,DOUBLE]) as trades
output1 =table(10000:0, `time`sym`avgPrice, [TIMESTAMP,SYMBOL,DOUBLE]);
engine1 = createTimeSeriesEngine(name=`engine1, windowSize=100, step=50, metrics=<avg(price)>, dummyTable=trades, outputTable=output1, timeColumn=`time, keyColumn=`sym, snapshotDir="C:/DolphinDB/Data/snapshotDir", snapshotIntervalInMsgCount=100)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=appendMsg{engine1}, msgAsTable=true, handlerNeedMsgId=true)
n=500
timev=2021.03.12T15:00:00.000 + (1..n join 1..n)
symv = take(`A, n) join take(`B, n)
pricev = (100+cumsum(rand(1.0,n)-0.5)) join (200+cumsum(rand(1.0,n)-0.5))
t=table(timev as time, symv as sym, pricev as price).sortBy!(`time)
trades.append!(t)
select * from output1