/** createStreamTB.txt Script to create stream tables for publishing DolphinDB Inc. DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09 Last modification time: 2022.05.30 */ // clean up environment def cleanEnvironment(parallel){ for(i in 1..parallel){ try{ unsubscribeTable(tableName=`tradeOriginalStream, actionName="tradeProcess"+string(i)) } catch(ex){ print(ex) } try{ unsubscribeTable(tableName=`tradeProcessStream, actionName="tradeTSAggr"+string(i)) } catch(ex){ print(ex) } try{ dropStreamEngine("tradeProcess"+string(i)) } catch(ex){ print(ex) } try{ dropStreamEngine("tradeTSAggr"+string(i)) } catch(ex){ print(ex) } } try{ unsubscribeTable(tableName=`tradeOriginalStream, actionName="tradeToDatabase") } catch(ex){ print(ex) } try{ dropStreamTable(`tradeOriginalStream) } catch(ex){ print(ex) } try{ dropStreamTable(`tradeProcessStream) } catch(ex){ print(ex) } try{ dropStreamTable(`capitalFlowStream) } catch(ex){ print(ex) } undef all } //calculation parallel, developers need to modify according to the development environment parallel = 3 cleanEnvironment(parallel) go //create stream table: tradeOriginalStream colName = `SecurityID`Market`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum colType = `SYMBOL`SYMBOL`TIMESTAMP`DOUBLE`INT`DOUBLE`INT`INT tradeOriginalStreamTemp = streamTable(1000000:0, colName, colType) try{ enableTableShareAndPersistence(table=tradeOriginalStreamTemp, tableName="tradeOriginalStream", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) } undef("tradeOriginalStreamTemp") go setStreamTableFilterColumn(tradeOriginalStream, `SecurityID) //create stream table: tradeProcessStream colName = `SecurityID`TradeTime`Num`TradeQty`TradeAmount`BSFlag colType = `SYMBOL`TIMESTAMP`INT`INT`DOUBLE`SYMBOL tradeProcessStreamTemp = streamTable(1000000:0, colName, colType) try{ enableTableShareAndPersistence(table=tradeProcessStreamTemp, tableName="tradeProcessStream", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) } undef("tradeProcessStreamTemp") go setStreamTableFilterColumn(tradeProcessStream, `SecurityID) //create stream table: capitalFlow colName = `TradeTime`SecurityID`BuySmallAmount`BuyBigAmount`SellSmallAmount`SellBigAmount colType = `TIMESTAMP`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE capitalFlowStreamTemp = streamTable(1000000:0, colName, colType) try{ enableTableShareAndPersistence(table=capitalFlowStreamTemp, tableName="capitalFlowStream", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) } undef("capitalFlowStreamTemp") go setStreamTableFilterColumn(capitalFlowStream, `SecurityID)