/** createEngineSub.txt Script to register stream computing engine and subscribe the stream tables DolphinDB Inc. DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09 Storage engine: OLAP Last modification time: 2022.05.30 */ //real time calculation of minute index defg calCapitalFlow(Num, BSFlag, TradeQty, TradeAmount){ // You can define the smallBigBoundary by yourself smallBigBoundary = 50000 tempTable1 = table(Num as `Num, BSFlag as `BSFlag, TradeQty as `TradeQty, TradeAmount as `TradeAmount) tempTable2 = select sum(TradeQty) as TradeQty, sum(TradeAmount) as TradeAmount from tempTable1 group by Num, BSFlag BuySmallAmount = exec sum(TradeAmount) from tempTable2 where TradeQty<=smallBigBoundary && BSFlag==`B BuyBigAmount = exec sum(TradeAmount) from tempTable2 where TradeQty>smallBigBoundary && BSFlag==`B SellSmallAmount = exec sum(TradeAmount) from tempTable2 where TradeQty<=smallBigBoundary && BSFlag==`S SellBigAmount = exec sum(TradeAmount) from tempTable2 where TradeQty>smallBigBoundary && BSFlag==`S return nullFill([BuySmallAmount, BuyBigAmount, SellSmallAmount, SellBigAmount], 0) } //real time calculation of capitalFlow //calculation parallel, developers need to modify according to the development environment parallel = 3 for(i in 1..parallel){ //create ReactiveStateEngine: tradeProcess createReactiveStateEngine(name="tradeProcess"+string(i), metrics=[, SellNum, BuyNum, SellNum)>, , , SellNum, "B", "S")>], dummyTable=tradeOriginalStream, outputTable=tradeProcessStream, keyColumn="SecurityID") subscribeTable(tableName="tradeOriginalStream", actionName="tradeProcess"+string(i), offset=-1, handler=getStreamEngine("tradeProcess"+string(i)), msgAsTable=true, hash=i-1, filter = (parallel, i-1), reconnect=true) //create DailyTimeSeriesEngine: tradeTSAggr createDailyTimeSeriesEngine(name="tradeTSAggr"+string(i), windowSize=60000, step=60000, metrics=[], dummyTable=tradeProcessStream, outputTable=capitalFlowStream, timeColumn="TradeTime", useSystemTime=false, keyColumn=`SecurityID, useWindowStartTime=true, forceTriggerTime=60000) subscribeTable(tableName="tradeProcessStream", actionName="tradeTSAggr"+string(i), offset=-1, handler=getStreamEngine("tradeTSAggr"+string(i)), msgAsTable=true, batchSize=2000, throttle=1, hash=parallel+i-1, filter = (parallel, i-1), reconnect=true) } //real time data to database subscribeTable(tableName="tradeOriginalStream", actionName="tradeToDatabase", offset=-1, handler=loadTable("dfs://trade_stream", "trade"), msgAsTable=true, batchSize=20000, throttle=1, hash=6, reconnect=true)