无状态算子
概述
无状态算子用于无状态计算,是指在一个计算过程中,其输出只依赖于输入,而不依赖先前的状态或历史记录。即无论计算执行之前是否有其他输入,只要每次输入相同,每次输出就会相同。
DolphinDB 内置的无状态算子
DolphinDB 内置了超过 1600 个函数,尤其是提供了丰富的计算函数,方便用户低代码实现复杂的数据分析和金融量化因子计算。内置的转换函数都可作为无状态算子用于流数据的无状态计算。
应用例子:自定义回调函数进行数据的 ETL
本用例以 Level-2 股票快照数据为例,对原始行情数据进行数据的清洗和转换。
step 1:创建发布流数据表
colNames = `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9 colTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT] share(table=streamTable(1:0, colNames, colTypes), sharedName="pubTable")
step 2:创建存储处理后数据的共享流数据表
colNames = ["SecurityID", "TradeDate", "TradeTime", "avgBidPrice", "sumBidQty", "minAskPrice", "maxAskQty"] colTypes = [SYMBOL, DATE, TIME, DOUBLE, INT, DOUBLE, DOUBLE] share(table=streamTable(1:0, colNames, colTypes), sharedName=`result)
step 3:定义数据清洗和转换的回调函数
def dataETL(mutable result, msg){ // data ETL tmp = select SecurityID, date(DateTime) as TradeDate, time(DateTime) as TradeTime, avg(BidPrice0+BidPrice1+BidPrice2+BidPrice3+BidPrice4+BidPrice5+BidPrice6+BidPrice7+BidPrice8+BidPrice9) as avgBidPrice, sum(BidOrderQty0+BidOrderQty1+BidOrderQty2+BidOrderQty3+BidOrderQty4+BidOrderQty5+BidOrderQty6+BidOrderQty7+BidOrderQty8+BidOrderQty9) as sumBidQty, min(OfferPrice0+OfferPrice1+OfferPrice2+OfferPrice3+OfferPrice4+OfferPrice5+OfferPrice6+OfferPrice7+OfferPrice8+OfferPrice9) as stdAskPrice, max(OfferOrderQty0+OfferOrderQty1+OfferOrderQty2+OfferOrderQty3+OfferOrderQty4+OfferOrderQty5+OfferOrderQty6+OfferOrderQty7+OfferOrderQty8+OfferOrderQty9) as skewAskQty from msg where LastPx>0, time(DateTime)>=09:30:00.000 // result storage result.append!(tmp) }
注:
自定义回调函数 dataETL
的参数 result 为存储处理后数据的共享流数据表,需要用 mutable
标识为可变参数,否则是只读变量,不能进行 append!
数据追加等修改操作。
step 4:订阅发布流数据表
subscribeTable(tableName="pubTable", actionName="dataETL", offset=-1, handler=dataETL{result}, msgAsTable=true, batchSize=2000, throttle=0.01, reconnect=true)
注:
handler
是自定义回调函数 dataETL
,其有 2 个参数(result
和 msg):其中 result 是外部定义好的变量,需要通过部分传参({})的方法传入;msg
是订阅流数据表发布的增量数据对象,在 handler
函数中不需要实际指定传入。
step 5:模拟批量数据写入
rowNums = 10 simulateData = table( take(`000001SZ, rowNums) as SecurityID, take(0..(rowNums-1), rowNums)*1000*3+2023.12.15T09:30:00.000 as DateTime, take(10.6, rowNums) as PreClosePx, take(10.8, rowNums) as OpenPx, take(10.8, rowNums) as HighPx, take(10.2, rowNums) as LowPx, take(10.5, rowNums) as LastPx, take(0..(rowNums-1), rowNums)*1000+100000 as TotalVolumeTrade, take(0..(rowNums-1), rowNums)*1000*10.6+100000*10.6 as TotalValueTrade, take(`s, rowNums) as InstrumentStatus, take(10.4, rowNums) as BidPrice0, take(10.3, rowNums) as BidPrice1, take(10.2, rowNums) as BidPrice2, take(10.1, rowNums) as BidPrice3, take(10.0, rowNums) as BidPrice4, take(9.9, rowNums) as BidPrice5, take(9.8, rowNums) as BidPrice6, take(9.7, rowNums) as BidPrice7, take(9.6, rowNums) as BidPrice8, take(9.5, rowNums) as BidPrice9, take(10000, rowNums) as BidOrderQty0, take(20000, rowNums) as BidOrderQty1, take(30000, rowNums) as BidOrderQty2, take(40000, rowNums) as BidOrderQty3, take(50000, rowNums) as BidOrderQty4, take(60000, rowNums) as BidOrderQty5, take(50000, rowNums) as BidOrderQty6, take(40000, rowNums) as BidOrderQty7, take(30000, rowNums) as BidOrderQty8, take(20000, rowNums) as BidOrderQty9, take(10.6, rowNums) as OfferPrice0, take(10.7, rowNums) as OfferPrice1, take(10.8, rowNums) as OfferPrice2, take(10.9, rowNums) as OfferPrice3, take(11.0, rowNums) as OfferPrice4, take(11.1, rowNums) as OfferPrice5, take(11.2, rowNums) as OfferPrice6, take(11.3, rowNums) as OfferPrice7, take(11.4, rowNums) as OfferPrice8, take(11.5, rowNums) as OfferPrice9, take(10000, rowNums) as OfferOrderQty0, take(20000, rowNums) as OfferOrderQty1, take(30000, rowNums) as OfferOrderQty2, take(40000, rowNums) as OfferOrderQty3, take(50000, rowNums) as OfferOrderQty4, take(60000, rowNums) as OfferOrderQty5, take(50000, rowNums) as OfferOrderQty6, take(40000, rowNums) as OfferOrderQty7, take(30000, rowNums) as OfferOrderQty8, take(20000, rowNums) as OfferOrderQty9) tableInsert(pubTable, simulateData)
step 6:查询结果表数据
res = select * from result where tradetime=09:30:09.000
返回结果 res:
可以看到成功对原始行情数据进行了清洗和转换。
step 7:取消订阅
unsubscribeTable(tableName="pubTable", actionName="dataETL")
step 8:删除发布流数据表和结果流数据表
注:
删除发布流数据表前,必须先把其所有订阅取消掉。
dropStreamTable(tableName="pubTable") dropStreamTable(tableName="result")