无状态算子
概述
无状态算子用于无状态计算,是指在一个计算过程中,其输出只依赖于输入,而不依赖先前的状态或历史记录。即无论计算执行之前是否有其他输入,只要每次输入相同,每次输出就会相同。
DolphinDB 内置的无状态算子
DolphinDB 内置了超过 1600 个函数,尤其是提供了丰富的计算函数,方便用户低代码实现复杂的数据分析和金融量化因子计算。内置的转换函数都可作为无状态算子用于流数据的无状态计算。
应用例子:自定义回调函数进行数据的 ETL
本用例以 Level-2 股票快照数据为例,对原始行情数据进行数据的清洗和转换。
step 1:创建发布流数据表
colNames = `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9
colTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT]
share(table=streamTable(1:0, colNames, colTypes), sharedName="pubTable")
step 2:创建存储处理后数据的共享流数据表
colNames = ["SecurityID", "TradeDate", "TradeTime", "avgBidPrice", "sumBidQty", "minAskPrice", "maxAskQty"]
colTypes = [SYMBOL, DATE, TIME, DOUBLE, INT, DOUBLE, DOUBLE]
share(table=streamTable(1:0, colNames, colTypes), sharedName=`result)
step 3:定义数据清洗和转换的回调函数
def dataETL(mutable result, msg){
// data ETL
tmp = select SecurityID,
date(DateTime) as TradeDate,
time(DateTime) as TradeTime,
avg(BidPrice0+BidPrice1+BidPrice2+BidPrice3+BidPrice4+BidPrice5+BidPrice6+BidPrice7+BidPrice8+BidPrice9) as avgBidPrice,
sum(BidOrderQty0+BidOrderQty1+BidOrderQty2+BidOrderQty3+BidOrderQty4+BidOrderQty5+BidOrderQty6+BidOrderQty7+BidOrderQty8+BidOrderQty9) as sumBidQty,
min(OfferPrice0+OfferPrice1+OfferPrice2+OfferPrice3+OfferPrice4+OfferPrice5+OfferPrice6+OfferPrice7+OfferPrice8+OfferPrice9) as stdAskPrice,
max(OfferOrderQty0+OfferOrderQty1+OfferOrderQty2+OfferOrderQty3+OfferOrderQty4+OfferOrderQty5+OfferOrderQty6+OfferOrderQty7+OfferOrderQty8+OfferOrderQty9) as skewAskQty
from msg
where LastPx>0, time(DateTime)>=09:30:00.000
// result storage
result.append!(tmp)
}
注:
自定义回调函数 dataETL 的参数 result 为存储处理后数据的共享流数据表,需要用 mutable
标识为可变参数,否则是只读变量,不能进行 append! 数据追加等修改操作。
step 4:订阅发布流数据表
subscribeTable(tableName="pubTable", actionName="dataETL", offset=-1, handler=dataETL{result}, msgAsTable=true, batchSize=2000, throttle=0.01, reconnect=true)
注:
handler 是自定义回调函数 dataETL,其有 2 个参数(result
和 msg):其中 result 是外部定义好的变量,需要通过部分传参({})的方法传入;msg
是订阅流数据表发布的增量数据对象,在 handler 函数中不需要实际指定传入。
step 5:模拟批量数据写入
rowNums = 10
simulateData = table(
take(`000001SZ, rowNums) as SecurityID,
take(0..(rowNums-1), rowNums)*1000*3+2023.12.15T09:30:00.000 as DateTime,
take(10.6, rowNums) as PreClosePx,
take(10.8, rowNums) as OpenPx,
take(10.8, rowNums) as HighPx,
take(10.2, rowNums) as LowPx,
take(10.5, rowNums) as LastPx,
take(0..(rowNums-1), rowNums)*1000+100000 as TotalVolumeTrade,
take(0..(rowNums-1), rowNums)*1000*10.6+100000*10.6 as TotalValueTrade,
take(`s, rowNums) as InstrumentStatus,
take(10.4, rowNums) as BidPrice0,
take(10.3, rowNums) as BidPrice1,
take(10.2, rowNums) as BidPrice2,
take(10.1, rowNums) as BidPrice3,
take(10.0, rowNums) as BidPrice4,
take(9.9, rowNums) as BidPrice5,
take(9.8, rowNums) as BidPrice6,
take(9.7, rowNums) as BidPrice7,
take(9.6, rowNums) as BidPrice8,
take(9.5, rowNums) as BidPrice9,
take(10000, rowNums) as BidOrderQty0,
take(20000, rowNums) as BidOrderQty1,
take(30000, rowNums) as BidOrderQty2,
take(40000, rowNums) as BidOrderQty3,
take(50000, rowNums) as BidOrderQty4,
take(60000, rowNums) as BidOrderQty5,
take(50000, rowNums) as BidOrderQty6,
take(40000, rowNums) as BidOrderQty7,
take(30000, rowNums) as BidOrderQty8,
take(20000, rowNums) as BidOrderQty9,
take(10.6, rowNums) as OfferPrice0,
take(10.7, rowNums) as OfferPrice1,
take(10.8, rowNums) as OfferPrice2,
take(10.9, rowNums) as OfferPrice3,
take(11.0, rowNums) as OfferPrice4,
take(11.1, rowNums) as OfferPrice5,
take(11.2, rowNums) as OfferPrice6,
take(11.3, rowNums) as OfferPrice7,
take(11.4, rowNums) as OfferPrice8,
take(11.5, rowNums) as OfferPrice9,
take(10000, rowNums) as OfferOrderQty0,
take(20000, rowNums) as OfferOrderQty1,
take(30000, rowNums) as OfferOrderQty2,
take(40000, rowNums) as OfferOrderQty3,
take(50000, rowNums) as OfferOrderQty4,
take(60000, rowNums) as OfferOrderQty5,
take(50000, rowNums) as OfferOrderQty6,
take(40000, rowNums) as OfferOrderQty7,
take(30000, rowNums) as OfferOrderQty8,
take(20000, rowNums) as OfferOrderQty9)
tableInsert(pubTable, simulateData)
step 6:查询结果表数据
res = select * from result where tradetime=09:30:09.000
返回结果 res:
可以看到成功对原始行情数据进行了清洗和转换。
step 7:取消订阅
unsubscribeTable(tableName="pubTable", actionName="dataETL")
step 8:删除发布流数据表和结果流数据表
注:
删除发布流数据表前,必须先把其所有订阅取消掉。
dropStreamTable(tableName="pubTable")
dropStreamTable(tableName="result")
