/** metacode_derived_features.txt Script to aggregate data DolphinDB Inc. DolphinDB server version: 2.00.6 2022.05.09 Last modification time: 2022.08.31 */ /** Attention: 1. The developer need to import level2 snapshot data into the database in advance 2. There is one place in the script that need to be modified according to the environment */ //login account login("admin", "123456") clearAllCache() undef(all) go /** part1: Load data from database modified location 1: dbName and tableName */ stockList=`601318`600519`600036`600276`601166`600030`600887`600016`601328`601288`600000`600585`601398`600031`601668`600048 dbName = "dfs://SH_TSDB_snapshot_MultiColumn" tableName = "snapshot" snapshot = loadTable(dbName, tableName) /** part2: define functions */ def logReturn(s){ return log(s)-log(prev(s)) } def realizedVolatility(s){ return sqrt(sum2(s)) } //与pandas中的group by agg功能相同,传入字典(key为列名,value为functions),批量生成元编程代码 def createAggMetaCode(aggDict){ metaCode = [] metaCodeColName = [] for(colName in aggDict.keys()){ for(funcName in aggDict[colName]) { metaCode.append!(sqlCol(colName, funcByName(funcName), colName + `_ + funcName$STRING)) metaCodeColName.append!(colName + `_ + funcName$STRING) } } return metaCode, metaCodeColName$STRING } /** part3: feature engineering */ features = { "DateTime":[`count] } for( i in 0..9) { features["Wap"+i] = [`sum, `mean, `std] features["LogReturn"+i] = [`sum, `realizedVolatility, `mean, `std] features["LogReturnOffer"+i] = [`sum, `realizedVolatility, `mean, `std] features["LogReturnBid"+i] = [`sum, `realizedVolatility, `mean, `std] } features["WapBalance"] = [`sum, `mean, `std] features["PriceSpread"] = [`sum, `mean, `std] features["BidSpread"] = [`sum, `mean, `std] features["OfferSpread"] = [`sum, `mean, `std] features["TotalVolume"] = [`sum, `mean, `std] features["VolumeImbalance"] = [`sum, `mean, `std] aggMetaCode, metaCodeColName = createAggMetaCode(features) /** part4: define aggregate function */ defg featureEngineering(DateTime, BidPrice, BidOrderQty, OfferPrice, OfferOrderQty, aggMetaCode){ wap = (BidPrice * OfferOrderQty + BidOrderQty * OfferPrice) \ (BidOrderQty + OfferOrderQty) wapBalance = abs(wap[0] - wap[1]) priceSpread = (OfferPrice[0] - BidPrice[0]) \ ((OfferPrice[0] + BidPrice[0]) \ 2) BidSpread = BidPrice[0] - BidPrice[1] OfferSpread = OfferPrice[0] - OfferPrice[1] totalVolume = OfferOrderQty.rowSum() + BidOrderQty.rowSum() volumeImbalance = abs(OfferOrderQty.rowSum() - BidOrderQty.rowSum()) LogReturnWap = logReturn(wap) LogReturnOffer = logReturn(OfferPrice) LogReturnBid = logReturn(BidPrice) subTable = table(DateTime as `DateTime, BidPrice, BidOrderQty, OfferPrice, OfferOrderQty, wap, wapBalance, priceSpread, BidSpread, OfferSpread, totalVolume, volumeImbalance, LogReturnWap, LogReturnOffer, LogReturnBid) colNum = 0..9$STRING colName = `DateTime <- (`BidPrice + colNum) <- (`BidOrderQty + colNum) <- (`OfferPrice + colNum) <- (`OfferOrderQty + colNum) <- (`Wap + colNum) <- `WapBalance`PriceSpread`BidSpread`OfferSpread`TotalVolume`VolumeImbalance <- (`LogReturn + colNum) <- (`LogReturnOffer + colNum) <- (`LogReturnBid + colNum) subTable.rename!(colName) subTable['BarDateTime'] = bar(subTable['DateTime'], 10m) result = sql(select = aggMetaCode, from = subTable).eval().matrix() result150 = sql(select = aggMetaCode, from = subTable, where = = (time(BarDateTime) + 150*1000) >).eval().matrix() result300 = sql(select = aggMetaCode, from = subTable, where = = (time(BarDateTime) + 300*1000) >).eval().matrix() result450 = sql(select = aggMetaCode, from = subTable, where = = (time(BarDateTime) + 450*1000) >).eval().matrix() return concatMatrix([result, result150, result300, result450]) } /** part5: meta conditions and run with timer */ whereConditions = [, , <(time(DateTime) between 09:30:00.000 : 11:29:59.999) or (time(DateTime) between 13:00:00.000 : 14:56:59.999)>] timer result = sql(select = sqlColAlias(, metaCodeColName <- (metaCodeColName+"_150") <- (metaCodeColName+"_300") <- (metaCodeColName+"_450")), from = snapshot, where = whereConditions, groupBy = [, ]).eval()