无状态算子

概述

无状态算子用于无状态计算,是指在一个计算过程中,其输出只依赖于输入,而不依赖先前的状态或历史记录。即无论计算执行之前是否有其他输入,只要每次输入相同,每次输出就会相同。

DolphinDB 内置的无状态算子

DolphinDB 内置了超过 1600 个函数,尤其是提供了丰富的计算函数,方便用户低代码实现复杂的数据分析和金融量化因子计算。内置的转换函数都可作为无状态算子用于流数据的无状态计算。

应用例子:自定义回调函数进行数据的 ETL

本用例以 Level-2 股票快照数据为例,对原始行情数据进行数据的清洗和转换。

step 1:创建发布流数据表

colNames = `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`InstrumentStatus`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9
colTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, INT, INT, INT, INT, INT, INT, INT]
share(table=streamTable(1:0, colNames, colTypes), sharedName="pubTable")

step 2:创建存储处理后数据的共享流数据表

colNames = ["SecurityID", "TradeDate", "TradeTime", "avgBidPrice", "sumBidQty", "minAskPrice", "maxAskQty"]
colTypes = [SYMBOL, DATE, TIME, DOUBLE, INT, DOUBLE, DOUBLE]
share(table=streamTable(1:0, colNames, colTypes), sharedName=`result)

step 3:定义数据清洗和转换的回调函数

def dataETL(mutable result, msg){
	// data ETL
	tmp = 	select 	SecurityID,
			date(DateTime) as TradeDate,
			time(DateTime) as TradeTime,
			avg(BidPrice0+BidPrice1+BidPrice2+BidPrice3+BidPrice4+BidPrice5+BidPrice6+BidPrice7+BidPrice8+BidPrice9) as avgBidPrice,
			sum(BidOrderQty0+BidOrderQty1+BidOrderQty2+BidOrderQty3+BidOrderQty4+BidOrderQty5+BidOrderQty6+BidOrderQty7+BidOrderQty8+BidOrderQty9) as sumBidQty,
			min(OfferPrice0+OfferPrice1+OfferPrice2+OfferPrice3+OfferPrice4+OfferPrice5+OfferPrice6+OfferPrice7+OfferPrice8+OfferPrice9) as stdAskPrice,
			max(OfferOrderQty0+OfferOrderQty1+OfferOrderQty2+OfferOrderQty3+OfferOrderQty4+OfferOrderQty5+OfferOrderQty6+OfferOrderQty7+OfferOrderQty8+OfferOrderQty9) as skewAskQty
		from msg
		where LastPx>0, time(DateTime)>=09:30:00.000
	// result storage
	result.append!(tmp)
}
注:

自定义回调函数 dataETL 的参数 result 为存储处理后数据的共享流数据表,需要用 mutable 标识为可变参数,否则是只读变量,不能进行 append! 数据追加等修改操作。

step 4:订阅发布流数据表

subscribeTable(tableName="pubTable", actionName="dataETL", offset=-1, handler=dataETL{result}, msgAsTable=true, batchSize=2000, throttle=0.01, reconnect=true)
注:

handler 是自定义回调函数 dataETL,其有 2 个参数(resultmsg):其中 result 是外部定义好的变量,需要通过部分传参({})的方法传入;msg 是订阅流数据表发布的增量数据对象,在 handler 函数中不需要实际指定传入。

step 5:模拟批量数据写入

rowNums = 10
simulateData = table(
	take(`000001SZ, rowNums) as SecurityID,
	take(0..(rowNums-1), rowNums)*1000*3+2023.12.15T09:30:00.000 as DateTime,
	take(10.6, rowNums) as PreClosePx,
	take(10.8, rowNums) as OpenPx,
	take(10.8, rowNums) as HighPx,
	take(10.2, rowNums) as LowPx,
	take(10.5, rowNums) as LastPx,
	take(0..(rowNums-1), rowNums)*1000+100000 as TotalVolumeTrade,
	take(0..(rowNums-1), rowNums)*1000*10.6+100000*10.6 as TotalValueTrade,
	take(`s, rowNums) as InstrumentStatus,
	take(10.4, rowNums) as BidPrice0,
	take(10.3, rowNums) as BidPrice1,
	take(10.2, rowNums) as BidPrice2,
	take(10.1, rowNums) as BidPrice3,
	take(10.0, rowNums) as BidPrice4,
	take(9.9, rowNums) as BidPrice5,
	take(9.8, rowNums) as BidPrice6,
	take(9.7, rowNums) as BidPrice7,
	take(9.6, rowNums) as BidPrice8,
	take(9.5, rowNums) as BidPrice9,
	take(10000, rowNums) as BidOrderQty0,
	take(20000, rowNums) as BidOrderQty1,
	take(30000, rowNums) as BidOrderQty2,
	take(40000, rowNums) as BidOrderQty3,
	take(50000, rowNums) as BidOrderQty4,
	take(60000, rowNums) as BidOrderQty5,
	take(50000, rowNums) as BidOrderQty6,
	take(40000, rowNums) as BidOrderQty7,
	take(30000, rowNums) as BidOrderQty8,
	take(20000, rowNums) as BidOrderQty9,
	take(10.6, rowNums) as OfferPrice0,
	take(10.7, rowNums) as OfferPrice1,
	take(10.8, rowNums) as OfferPrice2,
	take(10.9, rowNums) as OfferPrice3,
	take(11.0, rowNums) as OfferPrice4,
	take(11.1, rowNums) as OfferPrice5,
	take(11.2, rowNums) as OfferPrice6,
	take(11.3, rowNums) as OfferPrice7,
	take(11.4, rowNums) as OfferPrice8,
	take(11.5, rowNums) as OfferPrice9,
	take(10000, rowNums) as OfferOrderQty0,
	take(20000, rowNums) as OfferOrderQty1,
	take(30000, rowNums) as OfferOrderQty2,
	take(40000, rowNums) as OfferOrderQty3,
	take(50000, rowNums) as OfferOrderQty4,
	take(60000, rowNums) as OfferOrderQty5,
	take(50000, rowNums) as OfferOrderQty6,
	take(40000, rowNums) as OfferOrderQty7,
	take(30000, rowNums) as OfferOrderQty8,
	take(20000, rowNums) as OfferOrderQty9)
tableInsert(pubTable, simulateData)

step 6:查询结果表数据

res = select * from result where tradetime=09:30:09.000

返回结果 res:

可以看到成功对原始行情数据进行了清洗和转换。

step 7:取消订阅

unsubscribeTable(tableName="pubTable", actionName="dataETL")

step 8:删除发布流数据表和结果流数据表

注:

删除发布流数据表前,必须先把其所有订阅取消掉。

dropStreamTable(tableName="pubTable")
dropStreamTable(tableName="result")