/**
createEngineSub.txt
Script to register stream computing engine and subscribe the stream tables
DolphinDB Inc.
DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09
Last modification time: 2022.05.31
*/

/* 
 * Label small, medium and large order
 * small : 0
 * medium : 1
 * large : 2
 */
@state
def tagFunc(qty){
    return iif(qty <= 20000, 0, iif(qty <= 200000 and qty > 20000, 1, 2))
}

def processBuyOrderFunc(parallel){
	metricsBuy = [
		<TradeTime>,
		<SellNum>,
		<TradeAmount>,
		<TradeQty>,
		<cumsum(TradeAmount)>,
		<tagFunc(cumsum(TradeQty))>,
		<prev(cumsum(TradeAmount))>,
		<prev(tagFunc(cumsum(TradeQty)))>]
	for(i in 1..parallel){
		createReactiveStateEngine(name = "processBuyOrder"+string(i), metrics = metricsBuy, dummyTable = tradeOriginalStream, outputTable = getStreamEngine("processSellOrder"+string(i)), keyColumn=`SecurityID`BuyNum, keepOrder =true)
		subscribeTable(tableName = "tradeOriginalStream", actionName = "processBuyOrder"+string(i), offset = -1, handler = getStreamEngine("processBuyOrder"+string(i)), msgAsTable = true, hash = i, filter = (parallel, i-1))
	}
}

def processSellOrderFunc(parallel){
	colName = `SecurityID`BuyNum`TradeTime`SellNum`TradeAmount`TradeQty`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag
	colType =  [SYMBOL, INT, TIMESTAMP, INT, DOUBLE, INT, DOUBLE, INT, DOUBLE, INT]
	processBuyOrder = table(1:0, colName, colType)
	metricsSell = [
		<TradeTime>,
		<TradeAmount>,
		<cumsum(TradeAmount)>,
		<tagFunc(cumsum(TradeQty))>,
		<prev(cumsum(TradeAmount))>,
		<prev(tagFunc(cumsum(TradeQty)))>,
		<BuyNum>,
		<TotalBuyAmount>,
		<BuyOrderFlag>,
		<PrevTotalBuyAmount>,
		<PrevBuyOrderFlag>]
	for(i in 1..parallel){
		createReactiveStateEngine(name = "processSellOrder"+string(i), metrics = metricsSell, dummyTable = processBuyOrder, outputTable = getStreamEngine("processCapitalFlow"+string(i)), keyColumn = `SecurityID`SellNum, keepOrder = true)
	}
}

def processCapitalFlowFunc(parallel){
	colName = `SecurityID`SellNum`TradeTime`TradeAmount`TotalSellAmount`SellOrderFlag`PrevTotalSellAmount`PrevSellOrderFlag`BuyNum`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag
	colType =  [SYMBOL, INT, TIMESTAMP, DOUBLE, DOUBLE, INT, DOUBLE, INT,  INT, DOUBLE, INT, DOUBLE, INT]
	processSellOrder = table(1:0, colName, colType)
	metrics1 = <dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)> 
	metrics2 = <dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)> 
	metrics3 = <dynamicGroupCumsum(TotalBuyAmount, PrevTotalBuyAmount, BuyOrderFlag, PrevBuyOrderFlag, 3)> 
	metrics4 = <dynamicGroupCumcount(BuyOrderFlag, PrevBuyOrderFlag, 3)>
	for(i in 1..parallel){
		createReactiveStateEngine(name = "processCapitalFlow"+string(i), metrics = [<TradeTime>, <cumsum(TradeAmount)>, metrics1, metrics2, metrics3, metrics4], dummyTable =processSellOrder, outputTable = capitalFlowStream, keyColumn = `SecurityID, keepOrder = true)
	}
}

def processCapitalFlow60minFunc(){
	aggrMetrics = <[
		last(TotalAmount),
		last(SellSmallAmount),
		last(SellMediumAmount),
		last(SellBigAmount),
		last(SellSmallCount),
		last(SellMediumCount),
		last(SellBigCount),
		last(BuySmallAmount),
		last(BuyMediumAmount),
		last(BuyBigAmount),
		last(BuySmallCount),
		last(BuyMediumCount),
		last(BuyBigCount)]>
	createDailyTimeSeriesEngine(name = "processCapitalFlow60min", windowSize = 60000*60, step = 60000*60, metrics = aggrMetrics, dummyTable = capitalFlowStream, outputTable =capitalFlowStream60min, timeColumn="TradeTime", useSystemTime=false, keyColumn=`SecurityID, useWindowStartTime=false)
	subscribeTable(tableName = "capitalFlowStream", actionName = "processCapitalFlow60min", offset = -1, handler = getStreamEngine("processCapitalFlow60min"), msgAsTable = true, batchSize = 10000, throttle=1, hash = 0)
}

parallel = 3
processCapitalFlowFunc(parallel)
go
processSellOrderFunc(parallel)
go
processBuyOrderFunc(parallel)
processCapitalFlow60minFunc()