/** createStreamTB.txt Script to create stream tables for publishing DolphinDB Inc. DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09 Last modification time: 2022.05.31 */ // clean up environment def cleanEnvironment(parallel){ for(i in 1..parallel){ try { unsubscribeTable(tableName="tradeOriginalStream", actionName="processBuyOrder" + string(i)) } catch(ex) { print(ex) } try { dropStreamEngine("processBuyOrder" + string(i)) } catch(ex) { print(ex) } try { dropStreamEngine("processSellOrder" + string(i)) } catch(ex) { print(ex) } try { dropStreamEngine("processCapitalFlow" + string(i)) } catch(ex) { print(ex) } } try { unsubscribeTable(tableName="capitalFlowStream", actionName="processCapitalFlow60min") } catch(ex) { print(ex) } try { dropStreamEngine("processCapitalFlow60min") } catch(ex) { print(ex) } try{ dropStreamTable(`tradeOriginalStream) } catch(ex){ print(ex) } try{ dropStreamTable(`capitalFlowStream) } catch(ex){ print(ex) } try{ dropStreamTable(`capitalFlowStream60min) } catch(ex){ print(ex) } undef all } //calculation parallel, developers need to modify according to the development environment parallel = 3 cleanEnvironment(parallel) def createStreamTableFunc(){ //create stream table: tradeOriginalStream colName = `SecurityID`Market`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum colType = [SYMBOL, SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT] tradeOriginalStreamTemp = streamTable(20000000:0, colName, colType) try{ enableTableShareAndPersistence(table=tradeOriginalStreamTemp, tableName="tradeOriginalStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) } undef("tradeOriginalStreamTemp") //create stream table: capitalFlow colName = `SecurityID`TradeTime`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount colType = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT] capitalFlowStreamTemp = streamTable(20000000:0, colName, colType) try{ enableTableShareAndPersistence(table=capitalFlowStreamTemp, tableName="capitalFlowStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) } undef("capitalFlowStreamTemp") //create stream table: capitalFlowStream60min colName = `TradeTime`SecurityID`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT] capitalFlowStream60minTemp = streamTable(1000000:0, colName, colType) try{ enableTableShareAndPersistence(table=capitalFlowStream60minTemp, tableName="capitalFlowStream60min", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) } undef("capitalFlowStreamTemp") } createStreamTableFunc() go setStreamTableFilterColumn(tradeOriginalStream, `SecurityID)