Calculating High-Frequency Factors in Real-Time

DolphinDB is a real-time analytics and stream processing platform powered by distributed time-series databases. It delivers high-performance solutions for real-time high-frequency factor calculation using its built-in streaming engines. This tutorial provides a systematic introduction to various high-frequency factor implementation methods in DolphinDB, covering stateless factors (which rely only on the latest data), sliding window factors (based on time-window aggregation), stateful factors (requiring historical state maintenance), and pipeline factors (involving cascaded stream processing). We will explore core components such as streaming subscription mechanisms, time-series aggregation engines, and reactive state engines, while sharing performance optimization techniques like dictionary optimization and parallel computing. These insights aim to empower developers to build low-latency, high-throughput analysis systems for high-frequency trading.

The sample code in this tutorial is compatible with DolphinDB version 1.20.0 and above.

1. Overview

DolphinDB’s stream processing framework consists of three key components: message publishers, brokers, and subscribers. Publishers and subscribers can be on local or remote data nodes, or they can be external applications using DolphinDB APIs (e.g., Python, C++, or Java). Message brokers are implemented through DolphinDB stream tables, where publishers insert messages that are then distributed to subscribers.

High-frequency factor calculations typically use trade and quote data as the input stream, and other types of reference data may also be incorporated. Messages input into streaming engines can be organized in tables or tuples, as specified by the msgAsTable parameter of the subscribeTable function. The output of factor calculations is usually routed to a stream table or an in-memory table for further processing.

2. Environment Setup

This tutorial uses a DolphinDB server deployed in standalone mode, with the configuration file (dolphindb.cfg) set up as follows:

mode=single
workerNum=4
maxPubConnections=8
subPort=20001
persistenceDir=dbCache
subThrottle=1
subExecutors=2

Parameter Explanations:

  • In standalone mode, the node acts as both publisher and subscriber. The maxPubConnections parameter is required for the publisher node.
  • To mitigate OOM risks, it is recommended to set the configuration parameter persistenceDir to prevent memory exhaustion from unbounded stream table growth.
  • Setting subThrottle to 1 to enable millisecond-level throttling (for Section 5.1).
  • Setting subExecutors and workerNum to configure subscription threads (Section 6) and global thread pool (Section 7.3).
  • For more detailed configuration reference, refer to user manual Configuration > Reference > Streaming.

Download the sample data of CSV files from here used in this tutorial and save them to the yourDir directory before executing the scripts.

3. Stateless Factors

Stateless factors are calculated using only the most recent message without requiring any historical data. For stateless factor computation, it is recommended to use table format for messages and compute the factors using SQL statements.

Assume the input stream table contains market identifier columns including symbol, date, and time, along with order book data columns including 5-level quote prices and volumes.

Factor Definition

We define two stateless factors for demonstration.

  • Factor 1: Calculates (askVolume1-bidVolume1)/(askVolume1+bidVolume1) for each latest quote
    select symbol,time(now()) as time, (askVolume1-bidVolume1)/(askVolume1+bidVolume1) as factorValue from msg
  • Factor 2: Calculate the weighted bid-ask volume ratio
    w = exp(-1 * 0..4)
    select symbol, time(now()) as time, 0.5*log(rowSum([bidVolume1,bidVolume2,bidVolume3,bidVolume4,bidVolume5]*w)/rowSum([askVolume1,askVolume2,askVolume3,askVolume4,askVolume5]*w)) as factorValue from msg

To calculate these stateless factors in real-time, encapsulate the factor definition into a function and pass it as the handler of the subscribeTable function. The message handler can be a unary function, and its only parameter is the subscribed messages.

Example: Real-Time Calculation of Factor 2

quotesData = loadText(yourDir + "sampleQuotes.csv")

def factorHandler(mutable factor2, msg){
    w = exp(-1 * 0..4)
    x=select symbol, datetime(now()) as datetime, 0.5*log(rowSum([bidVolume1,bidVolume2,bidVolume3,bidVolume4,bidVolume5]*w)/rowSum([askVolume1,askVolume2,askVolume3,askVolume4,askVolume5]*w)) as factorValue from msg
    factor2.tableInsert(x)
}
factor2=table(1000000:0, `symbol`datetime`factorValue, [SYMBOL,DATETIME,DOUBLE]) 
x=quotesData.schema().colDefs
share(streamTable(1000000:0, x.name, x.typeString), "quotes")

subscribeTable(tableName="quotes", actionName="hfFactor", handler=factorHandler{factor2}, msgAsTable=true)

Use the replay function to replay the sample data to simulate real-time ingestion of market streams.

replay(inputTables=quotesData, outputTables=quotes, dateColumn=`date, timeColumn=`time)

You can then check the results in the factor2 table. Note that the time column in the results reflects processing time rather than the original message times, and the latter could be used instead by referencing the input data's date and time columns.

4. Sliding Window Factors

Sliding-window aggregation is a common technique in stream processing that operates over continuously advancing time windows, where adjacent windows may overlap. DolphinDB’s time-series engines are designed specifically for sliding-window aggregations at specified frequencies. These engines implement incremental optimizations for aggregate operators and support complex metric expressions, including combinations of aggregate functions such as <[avg(price1)-avg(price2)]>, or aggregate functions involving multiple columns such as <[std(price1-price2)]>.

Example: Calculating minute-level OHLC

The following example uses the time-series engine to calculate minute-level OHLC.

tradesData = loadText(yourDir + "sampleTrades1.csv")

// define the stream table Trade
x=tradesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as Trade

// define the output table OHLC
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume`updatetime,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,TIMESTAMP]) as OHLC

// define the time-series engine: calculate 5-minute OHLC
tsAggrOHLC = createTimeSeriesEngine(name="aggr_ohlc", windowSize=300000, step=60000, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(Volume),now()]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)

// subscribe to the Trade data
subscribeTable(tableName="Trade", actionName="minuteOHLC1", offset=0, handler=append!{tsAggrOHLC}, msgAsTable=true)

replay(inputTables=tradesData, outputTables=Trade, dateColumn=`Datetime)

Query results:

select top 10 * from OHLC

Example: Defining Message Handler for Pre-Processing

In the example above, the stream table Trade is directly used as input to the engine. However, certain scenarios require preprocessing of streaming data before engine ingestion. For example, if the Volume column contains cumulative trading volumes from market open, we need to convert these to incremental volumes for proper aggregation. The following example defines the function calcVolume as a preprocessing handler function for data subscription. This function transforms cumulative volumes into delta volumes before feeding them to the aggregation engine. It maintains a dictionary, dictVol, to track each stock's previous cumulative volume for delta calculation.

When implementing such handlers, note that while the subscribeTable function expects a unary handler (accepting only the message parameter msg), we can bind additional parameters like dictVol and tsAggrOHLC through partial application.

def calcVolume(mutable dictVolume, mutable tsAggrOHLC, msg){
	t = select Symbol, DateTime, Price, Volume from msg context by Symbol limit -1 
	update t set prevVolume = dictVolume[Symbol]
	dictVolume[t.Symbol] = t.Volume
	tsAggrOHLC.append!(t.update!("Volume", <Volume - prevVolume>).dropColumns!("prevVolume"))
}

tradesData = loadText(yourDir + "sampleTrades2.csv")

// define the stream table Trade
x=tradesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as Trade

// define the output table OHLC
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume`updatetime,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,TIMESTAMP]) as OHLC

// define the time-series engine: calculate 5-minute OHLC
tsAggrOHLC = createTimeSeriesEngine(name="aggr_ohlc", windowSize=300000, step=60000, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(Volume),now()]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)

// subscribe to the Trade data
dictVol = dict(STRING, DOUBLE)
subscribeTable(tableName="Trade", actionName="minuteOHLC2", offset=0, handler=calcVolume{dictVol,tsAggrOHLC}, msgAsTable=true)

replay(inputTables=tradesData, outputTables=Trade, dateColumn=`Datetime)

Query results:

select top 10 * from OHLC

Example: Chaining Streaming Engines

An alternative approach implements the volume conversion through an engine pipeline, chaining the reactive state engine and time-series engine. The reactive state engine provides built-in support for order-sensitive operations, allowing us to automatically compute volume differences using the deltas function. This eliminates the need for manual preprocessing while maintaining the same calculation accuracy.

tradesData = loadText(yourDir + "sampleTrades2.csv")

// define the stream table Trade
x=tradesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as Trade

// define the output table OHLC
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume`updatetime,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,TIMESTAMP]) as OHLC

// define the time-series engine: calculate 5-minute OHLC
tsAggrOHLC = createTimeSeriesEngine(name="aggr_ohlc", windowSize=300000, step=60000, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(Volume),now()]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)

// define the reactive state engine: preprocess volume
rsAggrOHLC = createReactiveStateEngine(name="calc_vol", metrics=<[Datetime, Price, deltas(Volume)]>, dummyTable=Trade, outputTable=tsAggrOHLC, keyColumn=`Symbol)

// subscribe to the Trade data
subscribeTable(tableName="Trade", actionName="minuteOHLC2", offset=0, handler=append!{rsAggrOHLC}, msgAsTable=true)

replay(inputTables=tradesData, outputTables=Trade, dateColumn=`Datetime)

Query results:

select top 10 * from OHLC

5. Stateful Factors

Stateful factor calculation requires both the latest record and maintained states that include previous records and intermediate results. These states are continuously updated and preserved within the streaming engines for ongoing processing. The stateful factor calculation workflow consists of three key steps: saving current batch messages to historical data, computing factors using the updated historical data, and writing results to the output table while purging obsolete records.

To maintain historical states and access them in message handlers, use partial application by defining a multi-parameter function where one parameter receives incoming messages while others store fixed, handler-specific states. States can be stored either in in-memory tables or dictionaries as demonstrated in the following sections.

5.1 Calculation Using In-Memory Tables

This example calculates the ratio between the current best ask price and the best ask price from 30 quotes earlier, requiring retention of at least 30 historical quotes per stock. An in-memory table 'history' is created to store all stock states.

quotesData = loadText(yourDir + "sampleQuotes.csv")

defg factorAskPriceRatio(x){
	cnt = x.size()
	if(cnt < 31) return double()
	else return x[cnt - 1]/x[cnt - 31]
}

def factorHandler(mutable history, mutable factors, msg){
	history.append!(select symbol, askPrice1 from msg)
	syms = msg.symbol
	t = select factorAskPriceRatio(askPrice1) as factor from history where symbol in syms group by symbol
	factors.append!(t.update!("timestamp", now()).reorderColumns!("timestamp"))
}

x=quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1
history = table(1000000:0, `symbol`askPrice1, [SYMBOL,DOUBLE])
share streamTable(100000:0, `timestamp`symbol`factor, [TIMESTAMP,SYMBOL,DOUBLE]) as factors
subscribeTable(tableName = "quotes1", offset=0, handler=factorHandler{history, factors}, msgAsTable=true, batchSize = 3000, throttle=0.005)

replay(inputTables=quotesData, outputTables=quotes1, dateColumn=`date, timeColumn=`time)

Key code explanation:

  • The user-defined aggregate function factorAskPriceRatio is used to calculate the factor for each stock.
  • The function factorHandler is defined as the message handler:
    • Line 10 saves the current batch of messages to the in-memory table history.
    • Line 11 extracts the stock symbols from the current messages.
    • Line 12 calculates the factor for each stock.
    • Line 13 outputs the factors to the factors table.
  • Two parameters history and factors that maintain the historical states and generated factors of factorHandler are partially fixed.

Query results:

select top 10 * from factors where isValid(factor)

5.2 Calculation Using Partitioned In-Memory Tables

Partitioned in-memory tables enable parallel factor computation, with SQL queries generating tasks per partition executed by streaming and global worker threads. Maximum parallelism is workerNum + 1 when partitions exceed workerNum.

The SQL syntax for in-memory and partitioned in-memory tables is mostly the same, with the primary difference being the table creation.

history_model = table(1000000:0, `symbol`askPrice1, [SYMBOL,DOUBLE])
syms = format(600000..601000, "000000")
db = database(partitionType=VALUE, partitionScheme=syms)
history = db.createPartitionedTable(table=history_model, tableName=`history, partitionColumns=`symbol)
Note: The syms column only includes 1001 stock symbols from the sample data. You can modify the script as needed.

For value-partitioned tables, when calculating factors with simple logic, you can directly compute the factor in each partition instead of using SQL. The following code redefines the function factorHandler which uses the built-in function getTablet to get all tablets corresponding to all stock symbols, then loops through each stock to calculate the factor. This looped approach, although single-threaded, can outperform SQL when working with a large number of small partitions.

def factorHandler(mutable history, mutable factors, msg){
	history.append!(select symbol, askPrice1 from msg)
	syms = msg.symbol
	tables = getTablet(history, syms)
	cnt = syms.size()
	v = array(DOUBLE, cnt)
	for(i in 0:cnt){
		v[i] = factorAskPriceRatio(tables[i].askPrice1)
	}
	factors.tableInsert([take(now(), cnt), syms, v])
}

5.3 Calculation Using Dictionaries

The following example uses a dictionary with STRING keys and tuple values, storing quote data per stock. The dictUpdate! function is used to update this dictionary, and then the factor for each stock is calculated in a loop. Since each stock's historical data is stored separately, no grouping is required, making this method highly efficient for large-scale or per-stock processing.

defg factorAskPriceRatio(x){
	cnt = x.size()
	if(cnt < 31) return double()
	else return x[cnt - 1]/x[cnt - 31]
}
def factorHandler(mutable historyDict, mutable factors, msg){
	historyDict.dictUpdate!(function=append!, keys=msg.symbol, parameters=msg.askPrice1, initFunc=x->array(x.type(), 0, 512).append!(x))
	syms = msg.symbol.distinct()
	cnt = syms.size()
	v = array(DOUBLE, cnt)
	for(i in 0:cnt){
	    v[i] = factorAskPriceRatio(historyDict[syms[i]])
	}
	factors.tableInsert([take(now(), cnt), syms, v])
}

x=quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1
history = dict(STRING, ANY)
share streamTable(100000:0, `timestamp`symbol`factor, [TIMESTAMP,SYMBOL,DOUBLE]) as factors
subscribeTable(tableName = "quotes1", offset=0, handler=factorHandler{history, factors}, msgAsTable=true, batchSize=3000, throttle=0.005)

replay(inputTables=quotesData, outputTables=quotes1, dateColumn=`date, timeColumn=`time)

Query results:

select top 10 * from factors where isValid(factor)

5.4 Summary

Each method has distinct advantages suited to different scenarios based on factor complexity, data volume, and performance needs:

  • In-Memory Table: Simple to use and allows factor calculation via SQL. However, it has lower performance, especially for per-stock calculations.
  • Partitioned In-Memory Table: Enables parallel computation, offering better efficiency than regular in-memory tables, especially for larger datasets. It can handle both simple and complex factors, offering a balance between SQL-based calculations and direct partition-based calculations.
  • Dictionary: Most efficient for simple factors, excelling at both bulk and individual stock processing, though less suitable for complex factor calculations.

6. Factor Calculation Pipeline

Complex factor calculations often require pipeline processing. A common scenario involves first generating minute-level OHLC data from raw market feeds using a time-series engine, then computing stateful factors based on the OHLC results. This pipeline approach can be implemented by chaining streaming engines, where the output of one message handler becomes the input to another processing stage.

The following example demonstrates pipeline processing for factor calculation: First, calculate minute-level OHLC for each stock; Then, based on the latest 10 recent OHLC records, calculate the ratio of net money flow (volume × price direction) over the first 5 quotes to the net money flow over the last 5 quotes.

tradesData = loadText(yourDir + "sampleTrades1.csv")

defg factorMoneyFlowRatio(x){
	n = x.size()
	if(n < 9) return double()
	else return x.subarray((n-9):(n-5)).sum()\x.subarray((n-4):n).sum()
}

def factorHandler2(mutable historyDict, mutable factors, msg){
	netAmount = exec volume * iif(close>=open, 1, -1) from msg
	dictUpdate!(historyDict, append!, msg.symbol, netAmount, x->array(x.type(), 0, 500).append!(x))
	syms = msg.symbol.distinct()
	cnt = syms.size()
	v = array(DOUBLE, cnt)
	for(i in 0:cnt){
	    v[i] = factorMoneyFlowRatio(historyDict[syms[i]])
	}
	factors.tableInsert([take(now(), cnt), syms, v])
}

// define the stream table Trade
x=tradesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as Trade

// define the output table for OHLC
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume`updatetime,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,TIMESTAMP]) as OHLC

// define time-series engine: calculate 1-min OHLC
tsAggrOHLC = createTimeSeriesEngine(name="aggr_ohlc", windowSize=60000, step=60000, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(Volume),now()]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)

// subscribe to the Trade data
subscribeTable(tableName="Trade", actionName="minuteOHLC3", offset=0, handler=append!{tsAggrOHLC}, msgAsTable=true)

// subscribe to the OHLC data and calculate factors
dictHistory = dict(STRING, ANY)
share streamTable(100000:0, `timestamp`symbol`factor, [TIMESTAMP,SYMBOL,DOUBLE]) as factors
subscribeTable(tableName="OHLC", actionName="calcMoneyFlowRatio", offset=0, handler=factorHandler2{dictHistory,factors}, msgAsTable=true)

replay(inputTables=tradesData, outputTables=Trade, dateColumn=`Datetime)

Query results:

select top 10 * from factors where isValid(factor)

For improved pipeline performance, each processing stage can be assigned to separate threads for parallel execution. This is achieved through the hash parameter in subscribeTable: given N total threads (configurable via subExecutors1), the assigned thread ID equals hash % N (0-based). This threading model helps reduce overall processing latency.

7. Performance Optimization

7.1 Data Pre-Grouping

Real-time factor calculation frequently requires processing grouped by stock. Pre-grouping data by stock can substantially reduce computation time, achievable through dictionaries and partitioned in-memory tables (as detailed in Section 5).

We tested three state storage methods with 4,000 stocks, each maintaining 300 historical records. The batch processing time (including data insertion, factor calculation, and result insertion) for 4,000 messages (one per stock) was measured as:

  • Regular in-memory tables: 180 ms
  • Partitioned in-memory tables: 32 ms
  • Dictionaries: 20 ms

This shows dictionaries and partitioned in-memory tables significantly enhance computation efficiency. However, two considerations arise when using these methods:

  • Handling New Stock Symbols: Intraday streaming may introduce new stocks not accounted for during initialization.
    • For dictionaries, we can use the initFunc parameter of the dictUpdate! function to specify an initialization function. For example, in the factorHandler definition, use the following lambda function to create a pre-allocated vector for 512 elements.
      x->array(x.type(), 0, 512).append!(x)
    • For value-partitioned in-memory tables, you can configure newValuePartitionPolicy=add to automatically create new value partitions.
  • Grouping by Multiple Fields: Currently, neither dictionary nor partitioned in-memory tables support composite fields. A workaround is to concatenate multiple fields into a single string field.

7.2 Balancing Latency and Throughput

High-frequency calculations demand low latency, but processing messages individually reduces throughput. The subscribeTable function provides two optional parameters to adjust this balance:

  • throttle: Maximum wait time (in seconds) before processing a batch of messages (minimum: 0.001s).
  • batchSize: Maximum number of messages accumulated before processing.

When either condition is met, unprocessed messages are sent to the task queue for handling by the specified message handler.

7.3 Parallel Computing

Parallel computing increases system throughput, enabling more messages to be processed per unit of time. In streaming computations, parallelism can be implemented in two ways.

7.3.1 Subscription Thread Configuration

The number of threads handling subscribed messages can be specified using the configuration parameter subExecutors. By default, only one thread is used, meaning this single thread handles all subscribed messages sequentially. When multiple subscription threads are configured, two parallel modes are supported:

By default, one subscription is assigned a thread, and a thread may handle multiple subscriptions. To explicitly assign a specific thread to a subscription, the hash parameter can be set in subscribeTable. This approach ensures that the message handler for a subscription is executed by only one thread, thereby eliminating thread-safety and synchronization issues and offering high efficiency. However, if message volume and processing complexity vary significantly across subscriptions, processing some subscriptions may become bottlenecks.

Another mode is using the subscription thread pool. In this mode, all message processing tasks are placed into a single queue and randomly dispatched to available threads. As a result, the same subscription's handler may be executed concurrently by multiple threads. To ensure correctness, it is essential that the message handler is thread-safe, and the processing of individual messages should be logically independent. Among DolphinDB's data structures, only the synchronized dictionary and shared table are thread-safe for concurrent access by multiple threads. To enable thread pool mode, set the configuration parameter subExecutorPooling to true.

7.3.2 Global Thread Pooling

When processing numerous stocks or multiple factors within a message handler, task decomposition combined with multithreading can significantly reduce execution time. This approach utilizes both the current subscription thread and DolphinDB's global thread pool, with the latter configured through the workerNum parameter.

The following example modifies the factorHandler function from Section 5.3 for parallel execution. It encapsulates factor calculation and result insertion into a nested function. The processing jobs are divided based on the number of stocks. If fewer than 400 stocks are involved, the processing is done in a single job; otherwise split into two. This optimization reduces processing time for 4,000 stocks from approximately 20 ms to about 13 ms. The cut function is used to split the stock list, and the high-order function ploop enables parallel execution.

def factorHandler(mutable historyDict, mutable factors, msg){
	historyDict.dictUpdate!(append!, msg.symbol, msg.ap, x->array(x.type(), 0, 512).append!(x))
	syms = msg.symbol.distinct()
	cnt = syms.size()
	f = def(syms, d, mutable factors){
		cnt = syms.size()
		signals = array(DOUBLE, cnt)
		for(i in 0:cnt)	signals[i] = factorAskPriceRatio(d[syms[i]])
		factors.tableInsert([take(now(), cnt), syms, signals])
	}
	if(cnt < 400) f(syms, historyDict, factors)
	else ploop(f{,historyDict, factors}, syms.cut(ceil(cnt/2.0)))
}

7.4 Just-In-Time Compilation

For message processing functions containing complex logic with loops and conditional statements that cannot be vectorized, DolphinDB's Just-In-Time (JIT) compilation version provides substantial performance improvements. For details, refer to DolphinDB Tutorial: JIT Compilation.

The following example demonstrates JIT compilation for factor calculations requiring analysis of the top 5 bid/ask levels and corresponding order book data. Since this processing cannot be vectorized, JIT compilation offers optimal performance enhancement.

@jit
 def sum_diff(x, y){
     return 1.0 * (x-y)/(x+y) 
 }
 
 @jit
 def wbvol(bp, bv, price, jump) {
     return exp(0.6*(bp-price)/jump) * bv
 }
 
 @jit
 def wavol(ap, av, price, jump) {
     return exp(0.6 * (price - ap)/jump) * av
 }
 
 @jit
 def factor1(ap1, ap2, ap3, ap4, ap5, av1, av2, av3, av4, av5, bp1, bp2, bp3, bp4, bp5, bv1, bv2, bv3, bv4, bv5, mp, initMP, delta){
     n = ap1.size()
     re = array(DOUBLE, n)
     for(i in 0:n){
         jump = ceil(initMP[i] * 15.0 / 100.0) / 100.0
         w_av1 = 0.0  
         w_bv1 = 0.0  
         w_av1 = wavol(ap1[i], av1[i], mp[i], jump) + wavol(ap2[i], av2[i], mp[i], jump) + wavol(ap3[i], av3[i], mp[i], jump) + wavol(ap4[i], av4[i], mp[i], jump) + wavol(ap5[i], av5[i], mp[i], jump)
         w_bv1 = wbvol(bp1[i], bv1[i], mp[i], jump) + wbvol(bp2[i], bv2[i], mp[i], jump) + wbvol(bp3[i], bv3[i], mp[i], jump) + wbvol(bp4[i], bv4[i], mp[i],jump) + wbvol(bp5[i], bv5[i], mp[i], jump)
         if(delta[i]>0){
             re[i] = sum_diff(w_bv1*1.1, w_av1)
         }else{
             re[i] = sum_diff(w_bv1, w_av1 * 1.1)
         }		
     }
     return re
 }

 // message handler
def factor1_handler(mutable d, mutable factor, msg){
     start = now(true)
     n = msg.size()
     mp = (msg.bidPrice1+msg.askPrice1)/2
     dictUpdate!(d,append!, msg.symbol, mp)
     delta = array(DOUBLE, n)
     initMP = array(DOUBLE, n)
     sym = msg.symbol
     for(i in 0:n){
         &his_mp = d[sym[i]]
         initMP[i] = his_mp[0]
         delta[i] = nullFill(his_mp.tail()-his_mp.tail(5).avg(), 0)
     }
     factorValue = factor1(msg.askPrice1, msg.askPrice2, msg.askPrice3, msg.askPrice4, msg.askPrice5, msg.askVolume1, msg.askVolume2, msg.askVolume3, msg.askVolume4, msg.askVolume5, msg.bidPrice1, msg.bidPrice2, msg.bidPrice3, msg.bidPrice4, msg.bidPrice5, msg.bidVolume1, msg.bidVolume1, msg.bidVolume1, msg.bidVolume1, msg.bidVolume1, mp, initMP, delta)
     factor.tableInsert(take(start,n), take(now(true), n), sym, take("factor1", n), factorValue)
}

def clear(){
	try{
	unsubscribeTable(, `Trade, `act_factor)
	undef(`Trade, SHARED)
	undef(`factor_result, SHARED)
	}catch(ex){}
}

login("admin","123456")
clear()
go

quotesData = loadText(yourDir + "sampleQuotes.csv")

x=quotesData.schema().colDefs
share(streamTable(1000000:0, x.name, x.typeString), "quotes")

d = dict(STRING, ANY)
for(id in quotesData[`symbol].distinct())
     d[id]= array(DOUBLE,0,0)
dictUpdate!(d,append!, quotesData.symbol, (quotesData.bidPrice1+quotesData.askPrice1)/2)
share streamTable(100:0,`starttime`endtime`symbol`factorName`orderbook_factor_15, [LONG,LONG,SYMBOL,SYMBOL,DOUBLE]) as factor_result
subscribeTable(tableName="quotes", actionName="act_factor", offset=-1, handler=factor1_handler{d, factor_result}, msgAsTable=true);

replay(inputTables=quotesData, outputTables=quotes, dateColumn=`date, timeColumn=`time)

8. Stream Processing Debugging

8.1 Debugging Message Handlers

Message handlers are the core of stream processing, with two primary debugging approaches available:

(1) Debug the message handling function separately;

(2) Print logs within the function.

The main input to a message handler is the message itself, which can be either a table or a tuple. Debugging can be initiated by constructing appropriate test data structures matching these formats. For complex functions requiring step-by-step analysis, a recommended practice involves creating variables mirroring the function parameters and executing the function body sequentially. Using the factorHandler function as an example, we construct three test variables historyDict, factors, and msg:

k=4000
syms = string(1..k)
n = 1200000
min_history = table(take(syms, n) as symbol, rand(10.0, n) as ap)
msg = table(string(1..k) as symbol, rand(10.0, k) as ap)
historyDict = dict(STRING, ANY)
historyDict.dictUpdate!(append!, min_history.symbol, min_history.ap, x->array(x.type(), 0, 512).append!(x))
factors = streamTable(100000:0, `timestamp`symbol`factor, [TIMESTAMP,SYMBOL,DOUBLE])

An alternative debugging method involves inserting diagnostic logging using the writeLog function, with the output visible in DolphinDB's system logs.

8.2 Backtesting With Historical Data

While this tutorial utilizes limited sample datasets, production deployments should incorporate comprehensive backtesting using substantial historical data volumes. This validation process helps identify potential issues in program logic, business logic, and system performance. DolphinDB provides the functions replay and replayDS for replaying tick data from historical databases, which can be referred to the user manual for more information.