/** createEngineSub.txt Script to register stream computing engine and subscribe the stream tables DolphinDB Inc. DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09 Last modification time: 2022.05.31 */ /* * Label small, medium and large order * small : 0 * medium : 1 * large : 2 */ @state def tagFunc(qty){ return iif(qty <= 20000, 0, iif(qty <= 200000 and qty > 20000, 1, 2)) } def processBuyOrderFunc(parallel){ metricsBuy = [ , , , , , , , ] for(i in 1..parallel){ createReactiveStateEngine(name = "processBuyOrder"+string(i), metrics = metricsBuy, dummyTable = tradeOriginalStream, outputTable = getStreamEngine("processSellOrder"+string(i)), keyColumn=`SecurityID`BuyNum, keepOrder =true) subscribeTable(tableName = "tradeOriginalStream", actionName = "processBuyOrder"+string(i), offset = -1, handler = getStreamEngine("processBuyOrder"+string(i)), msgAsTable = true, hash = i, filter = (parallel, i-1)) } } def processSellOrderFunc(parallel){ colName = `SecurityID`BuyNum`TradeTime`SellNum`TradeAmount`TradeQty`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag colType = [SYMBOL, INT, TIMESTAMP, INT, DOUBLE, INT, DOUBLE, INT, DOUBLE, INT] processBuyOrder = table(1:0, colName, colType) metricsSell = [ , , , , , , , , , , ] for(i in 1..parallel){ createReactiveStateEngine(name = "processSellOrder"+string(i), metrics = metricsSell, dummyTable = processBuyOrder, outputTable = getStreamEngine("processCapitalFlow"+string(i)), keyColumn = `SecurityID`SellNum, keepOrder = true) } } def processCapitalFlowFunc(parallel){ colName = `SecurityID`SellNum`TradeTime`TradeAmount`TotalSellAmount`SellOrderFlag`PrevTotalSellAmount`PrevSellOrderFlag`BuyNum`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag colType = [SYMBOL, INT, TIMESTAMP, DOUBLE, DOUBLE, INT, DOUBLE, INT, INT, DOUBLE, INT, DOUBLE, INT] processSellOrder = table(1:0, colName, colType) metrics1 = metrics2 = metrics3 = metrics4 = for(i in 1..parallel){ createReactiveStateEngine(name = "processCapitalFlow"+string(i), metrics = [, , metrics1, metrics2, metrics3, metrics4], dummyTable =processSellOrder, outputTable = capitalFlowStream, keyColumn = `SecurityID, keepOrder = true) } } def processCapitalFlow60minFunc(){ aggrMetrics = <[ last(TotalAmount), last(SellSmallAmount), last(SellMediumAmount), last(SellBigAmount), last(SellSmallCount), last(SellMediumCount), last(SellBigCount), last(BuySmallAmount), last(BuyMediumAmount), last(BuyBigAmount), last(BuySmallCount), last(BuyMediumCount), last(BuyBigCount)]> createDailyTimeSeriesEngine(name = "processCapitalFlow60min", windowSize = 60000*60, step = 60000*60, metrics = aggrMetrics, dummyTable = capitalFlowStream, outputTable =capitalFlowStream60min, timeColumn="TradeTime", useSystemTime=false, keyColumn=`SecurityID, useWindowStartTime=false) subscribeTable(tableName = "capitalFlowStream", actionName = "processCapitalFlow60min", offset = -1, handler = getStreamEngine("processCapitalFlow60min"), msgAsTable = true, batchSize = 10000, throttle=1, hash = 0) } parallel = 3 processCapitalFlowFunc(parallel) go processSellOrderFunc(parallel) go processBuyOrderFunc(parallel) processCapitalFlow60minFunc()