/** streamComputingArrayVector.txt Script about real time stream computing with array vector DolphinDB Inc. DolphinDB server version: 2.00.6 2022.05.09 Last modification time: 2022.05.10 */ /** Attention: 1. There are one places in the script that need to be modified according to the environment */ //clean up environment def cleanEnvironment(){ try{ unsubscribeTable(tableName=`snapshotStream, actionName="aggrFeatures10min") } catch(ex){ print(ex) } try{ unsubscribeTable(tableName=`aggrFeatures10min, actionName="predictRV") } catch(ex){ print(ex) } try{ dropStreamEngine("aggrFeatures10min") } catch(ex){ print(ex) } try{ dropStreamTable(`snapshotStream) } catch(ex){ print(ex) } try{ dropStreamTable(`aggrFeatures10min) } catch(ex){ print(ex) } try{ dropStreamTable(`result1min) } catch(ex){ print(ex) } undef all } cleanEnvironment() go //login account login("admin", "123456") /** modified location 1: modelSavePath, dbName and tbName */ modelSavePath = "/hdd/hdd9/machineLearning/realizedVolatilityModel_2.00.6.bin" dbName = "dfs://snapshot_SH_L2_TSDB_ArrayVector" tableName = "snapshot_SH_L2_TSDB_ArrayVector" //load model model = loadModel(modelSavePath) //define stream table name = `SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`BidPrice`BidOrderQty`OfferPrice`OfferOrderQty type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE "DOUBLE[]" "INT[]" "DOUBLE[]" "INT[]" share streamTable(100000:0, name, type) as snapshotStream share streamTable(100000:0 , `TradeTime`SecurityID`BAS`DI0`DI1`DI2`DI3`DI4`DI5`DI6`DI7`DI8`DI9`Press`RV,`TIMESTAMP`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE) as aggrFeatures10min share streamTable(100000:0 , `TradeTime`SecurityID`PredictRV`CostTime, `TIMESTAMP`SYMBOL`DOUBLE`INT) as result1min go //define function to process data with matrix operation defg featureEngine(bidPrice,bidQty,offerPrice,offerQty){ bas = offerPrice[0]\bidPrice[0]-1 wap = (bidPrice[0]*offerQty[0] + offerPrice[0]*bidQty[0])\(bidQty[0]+offerQty[0]) di = (bidQty-offerQty)\(bidQty+offerQty) bidw=(1.0\(bidPrice-wap)) bidw=bidw\(bidw.rowSum()) offerw=(1.0\(offerPrice-wap)) offerw=offerw\(offerw.rowSum()) press=log((bidQty*bidw).rowSum())-log((offerQty*offerw).rowSum()) rv=std(log(wap)-log(prev(wap)))*sqrt(24*252*size(wap)) return avg(bas),avg(di[0]),avg(di[1]),avg(di[2]),avg(di[3]),avg(di[4]),avg(di[5]),avg(di[6]),avg(di[7]),avg(di[8]),avg(di[9]),avg(press),rv } metrics= //register stream computing engine createTimeSeriesEngine(name="aggrFeatures10min", windowSize=600000, step=60000, metrics=metrics, dummyTable=snapshotStream, outputTable=aggrFeatures10min, timeColumn=`TradeTime, useWindowStartTime=true, keyColumn=`SecurityID) //subscribe data subscribeTable(tableName="snapshotStream", actionName="aggrFeatures10min", offset=-1, handler=getStreamEngine("aggrFeatures10min"), msgAsTable=true, batchSize=2000, throttle=1, hash=0, reconnect=true) //define handler def predictRV(mutable result1min, model, msg){ startTime = now() predicted = model.predict(msg) temp = select TradeTime, SecurityID, predicted as PredictRV, (now()-startTime) as CostTime from msg result1min.append!(temp) } //subscribe data subscribeTable(tableName="aggrFeatures10min", actionName="predictRV", offset=-1, handler=predictRV{result1min, model}, msgAsTable=true, hash=1, reconnect=true) go //replay history data, the day is 2020.10.19 stockList=`601318`600519`600036`600276`601166`600030`600887`600016`601328`601288`600000`600585`601398`600031`601668`600048`601888`600837`601601`601012`603259`601688`600309`601988`601211`600009`600104`600690`601818`600703`600028`601088`600050`601628`601857`601186`600547`601989`601336`600196`603993`601138`601066`601236`601319`603160`600588`601816`601658`600745 data = select SecurityID, TradeTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, BidPrice, BidOrderQty, OfferPrice, OfferOrderQty from loadTable(dbName, tableName) where date(TradeTime)=2020.10.19, SecurityID in stockList, (time(TradeTime) between 09:30:00.000 : 11:29:59.999) || (time(TradeTime) between 13:00:00.000 : 14:56:59.999) order by TradeTime, SecurityID submitJob("replay_trade", "trade", replay{data, snapshotStream, `TradeTime, `TradeTime, 20000, true, 1}) data = NULL getRecentJobs()