/** streamComputing.txt Script about real time stream computing DolphinDB Inc. DolphinDB server version: 1.30.18 2022.05.09 / 2.00.6 2022.05.09 Last modification time: 2022.05.10 */ /** Attention: 1. There are one places in the script that need to be modified according to the environment */ //clean up environment def cleanEnvironment(){ try{ unsubscribeTable(tableName=`snapshotStream, actionName="aggrFeatures10min") } catch(ex){ print(ex) } try{ unsubscribeTable(tableName=`aggrFeatures10min, actionName="predictRV") } catch(ex){ print(ex) } try{ dropStreamEngine("aggrFeatures10min") } catch(ex){ print(ex) } try{ dropStreamTable(`snapshotStream) } catch(ex){ print(ex) } try{ dropStreamTable(`aggrFeatures10min) } catch(ex){ print(ex) } try{ dropStreamTable(`result1min) } catch(ex){ print(ex) } undef all } cleanEnvironment() go //login account login("admin", "123456") /** modified location 1: modelSavePath, dbName and tbName */ modelSavePath = "/hdd/hdd9/machineLearning/realizedVolatilityModel_1.30.18.bin" //modelSavePath = "/hdd/hdd9/machineLearning/realizedVolatilityModel_2.00.6.bin" dbName = "dfs://snapshot_SH_L2_OLAP" tableName = "snapshot_SH_L2_OLAP" //load model model = loadModel(modelSavePath) //define stream table name = `SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9 type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT share streamTable(100000:0, name, type) as snapshotStream share streamTable(100000:0 , `TradeTime`SecurityID`BAS`DI0`DI1`DI2`DI3`DI4`DI5`DI6`DI7`DI8`DI9`Press`RV,`TIMESTAMP`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE) as aggrFeatures10min share streamTable(100000:0 , `TradeTime`SecurityID`PredictRV`CostTime, `TIMESTAMP`SYMBOL`DOUBLE`INT) as result1min go //define function to process data with matrix operation defg featureEngine(bidPrice,bidQty,offerPrice,offerQty){ bas = offerPrice[0]\bidPrice[0]-1 wap = (bidPrice[0]*offerQty[0] + offerPrice[0]*bidQty[0])\(bidQty[0]+offerQty[0]) di = (bidQty-offerQty)\(bidQty+offerQty) bidw=(1.0\(bidPrice-wap)) bidw=bidw\(bidw.rowSum()) offerw=(1.0\(offerPrice-wap)) offerw=offerw\(offerw.rowSum()) press=log((bidQty*bidw).rowSum())-log((offerQty*offerw).rowSum()) rv=std(log(wap)-log(prev(wap)))*sqrt(24*252*size(wap)) return avg(bas),avg(di[0]),avg(di[1]),avg(di[2]),avg(di[3]),avg(di[4]),avg(di[5]),avg(di[6]),avg(di[7]),avg(di[8]),avg(di[9]),avg(press),rv } metrics= //register stream computing engine createTimeSeriesEngine(name="aggrFeatures10min", windowSize=600000, step=60000, metrics=metrics, dummyTable=snapshotStream, outputTable=aggrFeatures10min, timeColumn=`TradeTime, useWindowStartTime=true, keyColumn=`SecurityID) //subscribe data subscribeTable(tableName="snapshotStream", actionName="aggrFeatures10min", offset=-1, handler=getStreamEngine("aggrFeatures10min"), msgAsTable=true, batchSize=2000, throttle=1, hash=0, reconnect=true) //define handler def predictRV(mutable result1min, model, msg){ startTime = now() predicted = model.predict(msg) temp = select TradeTime, SecurityID, predicted as PredictRV, (now()-startTime) as CostTime from msg result1min.append!(temp) } //subscribe data subscribeTable(tableName="aggrFeatures10min", actionName="predictRV", offset=-1, handler=predictRV{result1min, model}, msgAsTable=true, hash=1, reconnect=true) go //replay history data, the day is 2020.10.19 stockList=`601318`600519`600036`600276`601166`600030`600887`600016`601328`601288`600000`600585`601398`600031`601668`600048`601888`600837`601601`601012`603259`601688`600309`601988`601211`600009`600104`600690`601818`600703`600028`601088`600050`601628`601857`601186`600547`601989`601336`600196`603993`601138`601066`601236`601319`603160`600588`601816`601658`600745 data = select SecurityID, TradeTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, BidPrice0, BidPrice1, BidPrice2, BidPrice3, BidPrice4, BidPrice5, BidPrice6, BidPrice7, BidPrice8, BidPrice9, BidOrderQty0, BidOrderQty1, BidOrderQty2, BidOrderQty3, BidOrderQty4, BidOrderQty5, BidOrderQty6, BidOrderQty7, BidOrderQty8, BidOrderQty9, OfferPrice0, OfferPrice1, OfferPrice2, OfferPrice3, OfferPrice4, OfferPrice5, OfferPrice6, OfferPrice7, OfferPrice8, OfferPrice9, OfferOrderQty0, OfferOrderQty1, OfferOrderQty2, OfferOrderQty3, OfferOrderQty4, OfferOrderQty5, OfferOrderQty6, OfferOrderQty7, OfferOrderQty8, OfferOrderQty9 from loadTable(dbName, tableName) where date(TradeTime)=2020.10.19, SecurityID in stockList, (time(TradeTime) between 09:30:00.000 : 11:29:59.999) || (time(TradeTime) between 13:00:00.000 : 14:56:59.999) order by TradeTime, SecurityID submitJob("replay_trade", "trade", replay{data, snapshotStream, `TradeTime, `TradeTime, 20000, true, 1}) data = NULL getRecentJobs()