/** calTradeCost_asofJoin.txt Script to use asof join engine to calculate trade cost DolphinDB Inc. DolphinDB server version: 2.00.6 2022.05.31 Storage engine: TSDB Last modification time: 2022.07.07 */ //login account login("admin", "123456") def createStreamTableFunc(){ //create stream table: messageStream colName = `msgTime`msgType`msgBody colType = [TIMESTAMP,SYMBOL, BLOB] messageTemp = streamTable(5000000:0, colName, colType) enableTableShareAndPersistence(table=messageTemp, tableName="messageStream", asynWrite=true, compress=true, cacheSize=5000000, retentionMinutes=1440, flushMode=0, preCache=10000) messageTemp = NULL //create stream table: prevailingQuotes colName = `TradeTime`SecurityID`Price`TradeQty`BidPX1`OfferPX1`TradeCost`SnapshotTime colType = [TIME, SYMBOL, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, TIME] prevailingQuotesTemp = streamTable(1000000:0, colName, colType) enableTableShareAndPersistence(table=prevailingQuotesTemp, tableName="prevailingQuotes", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) prevailingQuotesTemp = NULL } createStreamTableFunc() go //get table schema def createSchemaTable(dbName, tableName){ schema = loadTable(dbName, tableName).schema().colDefs return table(1:0, schema.name, schema.typeString) } tradeSchema = createSchemaTable("dfs://trade", "trade") snapshotSchema = createSchemaTable("dfs://snapshot", "snapshot") // register asof join stream computing engine joinEngine=createAsofJoinEngine(name="tradeJoinSnapshot", leftTable=tradeSchema, rightTable=snapshotSchema, outputTable=prevailingQuotes, metrics=<[Price, TradeQty, BidPX1, OfferPX1, abs(Price-(BidPX1+OfferPX1)/2), snapshotSchema.Time]>, matchingColumn=`SecurityID, timeColumn=`Time, useSystemTime=false, delayedTime=1) def appendLeftStream(msg){ tempMsg = select * from msg where Price > 0 and Time>=09:30:00.000 getLeftStream(getStreamEngine(`tradeJoinSnapshot)).tableInsert(tempMsg) } //register filter stream computing engine and subscribe the stream tables def filterAndParseStreamFunc(tradeSchema, snapshotSchema){ filter1 = dict(STRING,ANY) filter1["condition"] = "trade" filter1["handler"] = appendLeftStream filter2 = dict(STRING,ANY) filter2["condition"] = "snapshot" filter2["handler"] = getRightStream(getStreamEngine(`tradeJoinSnapshot)) schema = dict(["trade", "snapshot"], [tradeSchema, snapshotSchema]) engine = streamFilter(name="streamFilter", dummyTable=messageStream, filter=[filter1, filter2], msgSchema=schema) subscribeTable(tableName="messageStream", actionName="tradeJoinSnapshot", offset=-1, handler=engine, msgAsTable=true, reconnect=true) } filterAndParseStreamFunc(tradeSchema, snapshotSchema) //replay history data def replayStockMarketData(){ timeRS = cutPoints(09:15:00.000..15:00:00.000, 100) tradeDS = replayDS(sqlObj=, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS) inputDict = dict(["trade", "snapshot"], [tradeDS, snapshotDS]) submitJob("replay", "replay for factor calculation", replay, inputDict, messageStream, `Date, `Time, 100000, true, 2) } replayStockMarketData() //getRecentJobs() //cancelJob("your jobId") //select * from prevailingQuotes limit 100