Functional Programming Cases

DolphinDB supports functional programming-a declarative programming style that solves problems by applying a series of functions to data. With higher-order functions, DolphinDB allows functions to be passed as parameters, enhancing code expressiveness, simplifying logic, and enabling complex tasks to be accomplished in just a few lines.

This tutorial explores common use cases of functional programming in DolphinDB, with a focus on higher-order functions and their applications.

1. Data Import Use Cases

In many CSV files, time is represented as integers — for example, 93100000 corresponds to “9:31:00.000”. To make querying and analysis more convenient, it’s recommended to convert such values into the TIME data type before storing them in a DolphinDB database.

To handle this, the loadTextEx function's transform parameter can be used to specify how to convert the time column during data import.

1.1 Converting Integer Timestamps to TIME Type

In the following example, we’ll use a sample CSV file named candle_201801.csv, with data like the following:

symbol,exchange,cycle,tradingDay,date,time,open,high,low,close,volume,turnover,unixTime
000001,SZSE,1,20180102,20180102,93100000,13.35,13.39,13.35,13.38,2003635,26785576.72,1514856660000
000001,SZSE,1,20180102,20180102,93200000,13.37,13.38,13.33,13.33,867181
......

(1)Create the Database

Create a database VALUE partitioned on dates:

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)

(2)Create the Table

First, use the extractTextSchema function to infer the table schema from the CSV file. The “time” column is recognized as INT type. To store it as TIME, use an UPDATE statement to change its type in the schema, and then use this modified schema to create a partitioned table (partitioned by the “date” column).

schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0, schemaTB.name, schemaTB.type)
tb=db.createPartitionedTable(tb, `tb1, `date);

Note: Instead of using extractTextSchema, you may also define the schema manually.

(3) Importing the Data

First define a transformation function i2t to preprocess the “time” column, converting it from INT to TIME type. This function is then passed to loadTextEx using the transform parameter so the conversion happens automatically during import.

def i2t(mutable t){
    return t.replaceColumn!(`time, t.time.format("000000000").temporalParse("HHmmssSSS"))
}

Note: When modifying data inside a function, use in-place operations (functions ending with !) to boost performance.

Import the data using:

tmpTB=loadTextEx(dbHandle=db, tableName=`tb1, partitionColumns=`date, filename=dataFilePath, transform=i2t);

(4) Querying the data

To verify the results, check the first two rows:

select top 2 * from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time               open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- --------------     ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 09:31:00.000       13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 09:32:00.000       13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000

Complete Code

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date);

def i2t(mutable t){
    return t.replaceColumn!(`time,t.time.format("000000000").temporalParse("HHmmssSSS"))
}

tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=i2t);

1.2 Converting Integer Timestamps to NANOTIMESTAMP Type

In this example, we'll import data where nanosecond timestamps are stored as integers, and convert them into the NANOTIMESTAMP type. The example uses a text file named nx.txt, with sample data like this:

SendingTimeInNano#securityID#origSendingTimeInNano#bidSize
1579510735948574000#27522#1575277200049000000#1
1579510735948606000#27522#1575277200049000000#2
...

Each line uses # as the delimiter. The “SendingTimeInNano” and “origSendingTimeInNano” columns contain timestamps in nanosecond format.

(1) Creating the Database and Table

We start by defining a COMPO-partitioned DFS database db, and table nx with specified column names and types:

dbSendingTimeInNano = database(, VALUE, 2020.01.20..2020.02.22);
dbSecurityIDRange = database(, RANGE,  0..10001);
db = database("dfs://testdb", COMPO, [dbSendingTimeInNano, dbSecurityIDRange]);

nameCol = `SendingTimeInNano`securityID`origSendingTimeInNano`bidSize;
typeCol = [`NANOTIMESTAMP,`INT,`NANOTIMESTAMP,`INT];
schemaTb = table(1:0,nameCol,typeCol);

db = database("dfs://testdb");
nx = db.createPartitionedTable(schemaTb, `nx, `SendingTimeInNano`securityID);

(2) Importing the Data

To import the data, we define a transformation function that converts integer timestamps into the NANOTIMESTAMP type using the nanotimestamp function:

def dataTransform(mutable t){
  return t.replaceColumn!(`SendingTimeInNano, nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano, nanotimestamp(t.origSendingTimeInNano))
}

Finally, we use loadTextEx to import the data.

Complete Code

dbSendingTimeInNano = database(, VALUE, 2020.01.20..2020.02.22);
dbSecurityIDRange = database(, RANGE,  0..10001);
db = database("dfs://testdb", COMPO, [dbSendingTimeInNano, dbSecurityIDRange]);

nameCol = `SendingTimeInNano`securityID`origSendingTimeInNano`bidSize;
typeCol = [`NANOTIMESTAMP,`INT,`NANOTIMESTAMP,`INT];
schemaTb = table(1:0,nameCol,typeCol);

db = database("dfs://testdb");
nx = db.createPartitionedTable(schemaTb, `nx, `SendingTimeInNano`securityID);

def dataTransform(mutable t){
  return t.replaceColumn!(`SendingTimeInNano, nanotimestamp(t.SendingTimeInNano)).replaceColumn!(`origSendingTimeInNano, nanotimestamp(t.origSendingTimeInNano))
}

pt=loadTextEx(dbHandle=db,tableName=`nx , partitionColumns=`SendingTimeInNano`securityID,filename="nx.txt",delimiter='#',transform=dataTransform);

For more on text file importing, see user manual.

2. Lambda Expressions

In DolphinDB, you can define two types of functions: named functions and anonymous functions. Anonymous functions are often implemented as lambda expressions, which are concise function definitions containing just a single statement.

The following example uses the lambda expression x -> x + 1 as the input function for the each operation:

x = 1..10
each(x -> x + 1, x)

This lambda expression takes a value x and returns x + 1. When used with the each function, it applies this operation to every element in the vector 1 to 10.

Additional examples of lambda expressions will be introduced in the following chapters.

3. Use Cases of Higher-order Functions

3.1 cross (:C)

3.1.1 Applying a Function Pairwise to Two Vectors or Matrices

The cross function allows you to apply a binary function to the permutation of all individual elements from two input vectors or matrices. The pseudocode is as follows:

for(i:0~(size(X)-1)){
   for(j:0~(size(Y)-1)){
       result[i,j]=<function>(X[i], Y[j]);
   }
}
return result;

For example, computing a covariance matrix traditionally requires nested for loops, as shown below:

def matlab_cov(mutable matt){
	nullFill!(matt,0.0)
	rowss,colss=matt.shape()
	msize = min(rowss, colss)
	df=matrix(float,msize,msize)
	for (r in 0..(msize-1)){
		for (c in 0..(msize-1)){
			df[r,c]=covar(matt[:,r],matt[:,c])
		}
	}
	return df
}

While the logic here is straightforward, the code is verbose, harder to maintain, and more error-prone.

In DolphinDB, you can use the higher-order function cross (or pcross for parallel computing) to achieve the same result in a much simpler way:

cross(covar, matt)

3.2 each (:E)

In some scenarios, we may need to apply a function to every element within a given parameter. Without functional programming, this often requires a for loop. DolphinDB offers higher-order functions such as each, peach, loop, and ploop, which simplify the code significantly.

3.2.1 Counting NULL Values in Each Column

In DolphinDB:

  • size returns the total number of elements in a vector or matrix.
  • count returns only the number of non-NULL elements.

Therefore, the difference between size and count gives the number of NULLs. For tables, we can efficiently count NULL values in each column using the higher-order function each:

each(x->x.size() - x.count(), t.values())

This works because t.values() returns a tuple containing all columns of table t, and the lambda expression x->x.size() - x.count() calculates the NULL count for each column.

3.2.2 Removing Rows Containing NULL values

Let's start by creating a sample table with some NULL values:

sym = take(`a`b`c, 110)
id = 1..100 join take(int(),10)
id2 =  take(int(),10) join 1..100
t = table(sym, id,id2)

There are two ways to remove rows with NULL values.

Method 1: Row-wise Filtering

This approach checks each row individually:

t[each(x -> !(x.id == NULL || x.id2 == NULL), t)]

Note that for row operations on tables, each row is represented as a dictionary. In this example expression, the lambda expression checks for NULL values in the specified columns id and id2.

If the table has many columns and listing each one is impractical, use this expression instead:

t[each(x -> all(isValid(x.values())), t)]

This works by:

  • Getting all values in a row as a dictionary with x.values()
  • Checking for valid (non-NULL) values with isValid() in each row
  • Using all() to ensure every value in the row is valid

However, when working with large datasets, row-wise operations can be inefficient.

Method 2: Column-wise Filtering for Better Performance

Since DolphinDB uses columnar storage, column operations typically outperform row operations. We can leverage this with:

t[each(isValid, t.values()).rowAnd()]

Use each with isValid to apply the function to each column and get the result in a matrix. Then use rowAnd to identify rows without NULLs:

For extremely large datasets that might trigger the error:

The number of cells in a matrix can't exceed 2 billions.

Use reduce to perform iterative column-wise validation:

t[reduce(def(x,y) -> x and isValid(y), t.values(), true)]

3.2.3 Performance Comparison: Row-wise vs Column-wise Processing

The following example demonstrates how to transform a column in a table by changing "aaaa_bbbb" to "bbbb_aaaa".

First, create a table t:

t=table(take("aaaa_bbbb", 1000000) as str);

There are two approaches to perform this transformation: row-wise and column-wise processing.

Row-wise Processing: Use the higher-order function each to iterate through each row, splitting the string and reversing the segments:

each(x -> split(x, '_').reverse().concat('_'), t[`str])

Column-wise Processing: Locate the position of the underscore and then use string operations on the entire column:

pos = strpos(t[`str], "_")
substr(t[`str], pos+1)+"_"+t[`str].left(pos)

When comparing performance between the two approaches, row-wise processing requires approximately 2.3 seconds to complete, while column-wise processing executes significantly faster at just 100 milliseconds.

Complete Code

t=table(take("aaaa_bbbb", 1000000) as str);

timer r = each(x -> split(x, '_').reverse().concat('_'), t[`str])

timer {
	pos = strpos(t[`str], "_")
	r = substr(t[`str], pos+1)+"_"+t[`str].left(pos)
}

3.2.4 Checking if Two Tables are Identical

To determine whether two tables contain exactly the same data, use the higher-order function each to compare each column in both tables. For example, for tables t1 and t2:

all(each(eqObj, t1.values(), t2.values()))

3.3 loop (:U)

The higher-order functions loop and each are similar, but they differ in the format and type of their return values.

The each function determines the data form of its return based on the type and form of the result from each subtask:

  • If each subtask returns a scalar, each returns a vector.
  • If each subtask returns a vector, each returns a matrix.
  • If each subtask returns a dictionary, each returns a table.
  • If the return types or forms of the subtasks vary, each returns a tuple; otherwise, it returns a vector or matrix as appropriate.

For example:

m=1..12$4:3;
m;
each(add{1 2 3 4}, m);

This yields:

col1 col2 col3
2 6 10
4 8 12
6 10 14
8 12 16

On the other hand, loop always returns a tuple. For instance, to calculate the maximum value of each column:

t = table(1 2 3 as id, 4 5 6 as value, `IBM`MSFT`GOOG as name);
t;
loop(max, t.values());

This yields:

offset 0 1 2
0 3 6 MSFT

Importing multiple files

Suppose we have multiple CSV files with the same structure stored in a directory, and we want to import them into a single in-memory table in DolphinDB. We can use loop to achieve this:

loop(loadText, fileDir + "/" + files(fileDir).filename).unionAll(false)

3.4 moving / rolling

3.4.1 moving

This example shows how to use the moving function to determine whether the values of a column fall within a specified range.

For each record, use the columns “UpAvgPrice” and “DownAvgPrice” to define a range. Then we look at the previous 20 values of the “close” column and check whether they fall within the range [DownAvgPrice, UpAvgPrice]. If at least 75% of those values are within the range, the “signal” column is set to true; otherwise, it's false.

Sample Data:

For instance, for the record where “trade_date” is 2019.06.17, the range is [11.5886533, 12.8061868]. We examine the 20 preceding “close” values (column marked as "1" in the figure). If at least 75% fall within the range, the corresponding “signal” value (column "4") is set to true.

Solution:

We define a function rangeTest that checks whether the percentage of “close” values in a given window fall within the specified range [DownAvgPrice, UpAvgPrice]. Then use the higher-order function moving to apply rangeTest to each sliding window in the dataset.

defg rangeTest(close, downlimit, uplimit){
  size = close.size() - 1
  return between(close.subarray(0:size), downlimit.last():uplimit.last()).sum() >= size*0.75
}

update t set signal = moving(rangeTest, [close, downAvgPrice, upAvgPrice], 21)

In this example:

  • Since we're checking the 20 rows before the current one, the window size is set to 21 (20 previous + current row).
  • The between function is used to check whether each value falls within the inclusive range defined by a and b.

We can simulate real market data to validate the script above. For example:

t=table(rand("d"+string(1..n),n) as ts_code, nanotimestamp(2008.01.10+1..n) as trade_date, rand(n,n) as open, rand(n,n) as high, rand(n,n) as low, rand(n,n) as close, rand(n,n) as pre_close, rand(n,n) as change, rand(n,n) as pct_chg, rand(n,n) as vol, rand(n,n) as amount, rand(n,n) as downAvgPrice, rand(n,n) as upAvgPrice, rand(1 0,n) as singna)

Note on rolling vs moving

Both rolling and moving apply a function to a sliding window over a dataset. However, there are some key differences:

  • rolling allows you to set a custom step size (step), while moving always uses a step of 1.
  • Their behavior in handling null values differs. For more information, refer to the Window Calculations.

3.4.2 Performance Comparison: moving(sum) vs msum

Although DolphinDB provides the moving higher-order function, for performance-critical operations, it’s recommended to use specialized m- functions (such as msum, mcount, mavg, etc.) whenever possible.

Example:

x=1..1000000
timer moving(sum, x, 10)
timer msum(x, 10)

Depending on data size, msum can be 50 to 200 times faster than moving(sum).

Why the performance difference?

  • Data access: msum loads data into memory once and reuses it; moving(sum) creates a new sub-object for each window, which involves memory allocation and cleanup.
  • Computation method: msum uses incremental computation — it reuses the previous result by adding the new value and subtracting the value that falls out of the window. In contrast, moving(sum) recalculates the entire window each time.

3.5 eachPre (:P)

In this case, we first create a table t with two columns: sym and bidPrice:

t = table(take(`a`b`c`d`e ,100) as sym, rand(100.0,100) as bidPrice)

We want to perform the following calculations:

  1. Compute a new column “ln”: This column stores the result of the following calculation: For each row, divide the current bidPrice by the average bidPrice of the previous 3 rows (excluding the current row), and then take the natural logarithm of the result.
  2. Compute a new column “clean” based on “ln”: This column filters out abnormal values in “ln”. The logic is:
    1. Take the absolute value of “ln”.
    2. If it exceeds a fluctuation threshold F, then use the previous “ln” value instead. Otherwise, treat the current value as normal and keep it.

Since the computation of “ln“ involves a sliding window of size 3, we can refer to the example in section 3.4.1. The corresponding script is:

t2 = select *, log(bidPrice / prev(moving(avg, bidPrice,3))) as ln from t

However, for better performance, we can use built-in function mavg, which is more efficient than the moving higher-order function. The code can be rewritten in two equivalent ways:

//method 1
t2 = select *, log(bidPrice / prev(mavg(bidPrice,3))) as ln from t

//method 2
t22 = select *, log(bidPrice / mavg(prev(bidPrice),3)) as ln from t

Here, prev returns the data from the previous row.

Both approaches give equivalent results. The only difference is that in t22, a value is generated starting from the third row.

Now, to clean abnormal values in “ln”. Assuming the fluctuation threshold F is 0.02, we define a function cleanFun as follows:

F = 0.02
def cleanFun(F, x, y): iif(abs(x) > F, y, x)
  • x is the current “ln” value.
  • y is the previous “ln” value.
  • If the absolute value of x exceeds F, the function returns y instead of x.

Then we use the high-order function eachPre to apply this logic pairwise across the column. The implementation of eachPre is equivalent to: F(X[0], pre), F(X[1], X[0]), ..., F(X[n], X[n-1]). The corresponding code:

t2[`clean] = eachPre(cleanFun{F}, t2[`ln])

Here is the complete script:

F = 0.02
t = table(take(`a`b`c`d`e ,100) as sym, rand(100.0,100) as bidPrice)
t2 = select *, log(bidPrice / prev(mavg(bidPrice,3))) as ln from t
def cleanFun(F,x,y) : iif(abs(x) > F, y,x)
t2[`clean] = eachPre(cleanFun{F}, t2[`ln])

3.6 byRow (:H)

This example demonstrates how to find the index of the maximum value in each row of a matrix. First, we define a matrix m as follows:

a1 = 2 3 4
a2 = 1 2 3
a3 = 1 4 5
a4 = 5 3 2
m = matrix(a1,a2,a3,a4)

One approach is to calculate the index of the maximum value for each row individually. This can be done using the imax function. By default, imax operates on each column of a matrix and returns a vector.

To compute the result by row instead, we can first transpose the matrix and then apply imax:

imax(m.transpose())

Alternatively, DolphinDB provides a higher-order function byRow, which applies a specified function to each row of a matrix. This avoids the need to transpose the matrix:

byRow(imax, m)

The same operation can also be performed using the built-in row function rowImax:

print rowImax(m)

3.7 segmentby

The segmentby function is a higher-order function used to apply a computation to segments of data, where each segment is defined by consecutive identical values in another vector. The high-order function segmentby has the following syntax:

segmentby(func, funcArgs, segment)

func specifies the operation to perform on each segment, funcArgs provides the data to be processed, and segment determines how the data is divided into groups. When consecutive elements in segment have the same value, they form a single group. Each group of values from funcArgs is processed independently by func, and the results are combined to produce an output vector with the same length as the original segment vector.

x = 1 2 3 0 3 2 1 4 5  
y = 1 1 1 -1 -1 -1 1 1 1  
segmentby(cumsum, x, y)

In the above example, y defines three groups: 1 1 1, -1 -1 -1, and 1 1 1. Based on this grouping, x is divided accordingly into: 1 2 3, 0 3 2, and 1 4 5. The cumsum function is then applied to each group of x, computing the cumulative sum within each segment.

DolphinDB also provides the built-in segment function, which is used for grouping in SQL queries. Unlike segmentby, it only returns group identifiers without performing calculations.

The following example shows how to group data based on a threshold value. Continuous values that are either above or below the threshold are grouped together. For groups where values exceed the threshold, we retain only the record with the maximum value within each group (if there are duplicates, only the first is kept).

Given the table below, when the threshold is set to 0.3,the rows indicated by arrows should be kept:

Define the table:

dated = 2021.09.01..2021.09.12  
v = 0 0 0.3 0.3 0 0.5 0.3 0.7 0 0 0.3 0  
t = table(dated as date, v)

To group the data based on whether values are continuously greater than minV, use the segment function:

segment(v >= minV)

In SQL, we can combine segment with the CONTEXT BY clause to perform group calculations. Then, filter for groups with the maximum value using the HAVING clause. Since filtering might return multiple rows, use the LIMIT clause to keep only the first matching record per group.

Here is the complete SQL query:

select * from t 
context by segment(v >= minV) 
having (v = max(v) and v >= minV) 
limit 1

3.8 pivot

The high-order function pivot reorganizes data along two specified dimensions and returns the result as a matrix.

Suppose we have a table t1 with four columns:

syms=`600300`600400`600500$SYMBOL
sym=syms[0 0 0 0 0 0 0 1 1 1 1 1 1 1 2 2 2 2 2 2 2]
time=09:40:00+1 30 65 90 130 185 195 10 40 90 140 160 190 200 5 45 80 140 170 190 210
price=172.12 170.32 172.25 172.55 175.1 174.85 174.5 36.45 36.15 36.3 35.9 36.5 37.15 36.9 40.1 40.2 40.25 40.15 40.1 40.05 39.95
volume=100 * 10 3 7 8 25 6 10 4 5 1 2 8 6 10 2 2 5 5 4 4 3
t1=table(sym, time, price, volume);
t1;

We want to reorganize the data in t1 by the columns “time” (at the minute level) and “sym”, and compute the volume-weighted average price (VWAP) for each stock per minute.

To do this, we use the pivot function:

stockprice = pivot(wavg, [t1.price, t1.volume], minute(t1.time), t1.sym)
stockprice.round(2)

3.9 contextby (:X)

The higher-order function contextby groups data based on specified column(s) and applies a given function within each group.

Example:

sym = `IBM`IBM`IBM`MS`MS`MS
price = 172.12 170.32 175.25 26.46 31.45 29.43
qty = 5800 700 9000 6300 2100 5300
trade_date = 2013.05.08 2013.05.06 2013.05.07 2013.05.08 2013.05.06 2013.05.07
contextby(avg, price, sym)

The contextby function can also be used in SQL queries. For example, the following query uses contextby to select trades where the price is above the group average:

t1 = table(trade_date, sym, qty, price)
select trade_date, sym, qty, price from t1 where price > contextby(avg, price, sym)

3.10 call / unifiedCall

When we need to apply different functions to the same set of inputs in batch, we can use the higher-order functions call/unifiedCall together with each/ loop.

Note: call and unifiedCall serve the same purpose but differ in how they accept parameters. For details, refer to function reference.

In the example below, the call function is used within a partial application to apply the functions sin and log to a fixed vector [1, 2, 3] using the higher-order function each:

each(call{, 1..3}, (sin, log));

Functions can also be invoked dynamically using metaprogramming with funcByName. The previous example can be rewritten as:

each(call{, 1..3}, (funcByName("sin"), funcByName("log")));
Note: This is just a simplified example to demonstrate the concept of dynamic function invocation using funcByName. In real-world applications, funcByName enables dynamic passing of functions at runtime.

You can also use makeCall or makeUnifiedCall to generate executable code representations of function calls, which can then be evaluated (using eval) later:

each(eval, each(makeCall{, 1..3}, (sin, log)));

3.11 accumulate (:A)

Given minute-level trading data, the goal is to split the data for a stock into segments such that each segment contains approximately 1.5 million shares traded. The length of each time window will vary depending on how many trading records are needed to reach this volume. The rule for segmentation is: if adding the current data point brings the total volume closer to the 1.5 million share target, include it in the current segment; otherwise, begin a new segment starting from that point.

Generate a sample dataset:

timex = 13:03:00+(0..27)*60
volume = 288658 234804 182714 371986 265882 174778 153657 201388 175937 138388 169086 203013 261230 398871 692212 494300 581400 348160 250354 220064 218116 458865 673619 477386 454563 622870 458177 880992
t = table(timex as time, volume)

We define a grouping function to accumulate volume and split segments when the cumulative volume gets closest to 1.5 million:

def caclCumVol(target, preResult, x){
 result = preResult + x
 if(result - target> target - preResult) return x
 else return result
}
accumulate(caclCumVol{1500000}, volume)

caclCumVol accumulates the volume values and segments the data when the cumulative sum approaches 1.5 million most closely. A new group starts from the next value.

To identify the start of each group, we compare the accumulated volume with the original volume values. If they are equal, that point marks the start of a new group. We then use ffill() to forward-fill these start times:

iif(accumulate(caclCumVol{1500000}, volume) == volume, timex, NULL).ffill()

Finally, we group the data by these start times and perform aggregations.

output = select 
    sum(volume) as sum_volume, 
    last(time) as endTime 
from t 
group by iif(accumulate(caclCumVol{1500000}, volume) == volume, time, NULL).ffill() as startTime

Complete Script:

timex = 13:03:00+(0..27)*60
volume = 288658 234804 182714 371986 265882 174778 153657 201388 175937 138388 169086 203013 261230 398871 692212 494300 581400 348160 250354 220064 218116 458865 673619 477386 454563 622870 458177 880992
t = table(timex as time, volume)

def caclCumVol(target, preResult, x){
 result = preResult + x
 if(result - target> target - preResult) return x
 else return result
}
output = select sum(volume) as sum_volume, last(time) as endTime from t group by iif(accumulate(caclCumVol{1500000}, volume)==volume, time, NULL).ffill() as startTime

3.12 window

This example demonstrates how to use the window function to process a column in a table. The goal is to determine whether a given value is the minimum within both its preceding 5 values and following 5 values (both including itself). If it is, mark it as 1; otherwise, mark it as 0.

First, create a table t:

t = table(rand(1..100,20) as id)

Next, apply the window function to define a sliding window of 9 elements centered on each row (4 before, the current one, and 4 after). Within this window, use the min function to find the minimum value. Note: The window function includes the boundary values in the window range.

The implementation is as follows:

select *, iif(id==window(min, id, -4:4), 1, 0) as mid from t

3.13 reduce (:T)

Some of the previous examples also made use of the higher-order function reduce. Here's the pseudocode for it:

result = <function>(init, X[0])
for (i = 1 to size(X) - 1) {
  result = <function>(result, X[i])
}
return result

Unlike accumulate, which returns all intermediate results, reduce only returns the final result.

For example, in the following factorial calculation:

r1 = reduce(mul, 1..10)
r2 = accumulate(mul, 1..10)[9]

Both r1 and r2 produce the same final result.

4. Partial Application Use Cases

Partial application is a technique where some of a function’s arguments are fixed in advance, creating a new function that takes fewer parameters. It’s especially useful with higher-order functions that expect functions with a specific number of arguments.

4.1 Scheduling a Parameterized Job

Suppose we need to set up a scheduled job that runs daily at midnight to calculate the maximum temperature recorded by a specific device on the previous day.

Assume that the temperature data is stored in a table named “sensor” located in the distributed database dfs://dolphindb. The table contains a timestamp column ts of type DATETIME. The following script defines a function getMaxTemperature to perform the required calculation:

def getMaxTemperature(deviceID){
    maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
            where ID=deviceID ,date(ts) = today()-1
    return  maxTemp
}

Once the function is defined, we can use the scheduleJob function to submit the scheduled job. However, since scheduleJob does not allow passing arguments to the job function directly—and getMaxTemperature requires the deviceID as a parameter—we can use partial application to bind the argument in advance, effectively turning it into a no-argument function:

scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');

This example schedules the job to calculate the maximum temperature for device ID 1.

The complete script looks like this:

def getMaxTemperature(deviceID){
    maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
            where ID=deviceID ,date(ts) = today()-1
    return  maxTemp
}

scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');

4.2 Retrieving Job Information from Other Nodes

In DolphinDB, after submitting scheduled jobs, we can use the getRecentJobs function to check the status of recent batch jobs (including scheduled jobs) on the local node. For example, to view the statuses of the three most recent batch jobs on the local node, use the following script:

getRecentJobs(3);

To retrieve job information from other nodes in the cluster, use the rpc function to call getRecentJobs on the remote node. For example, to get job information from the node with the alias P1-node1, we can do the following:

rpc("P1-node1",getRecentJobs)

However, if we try to retrieve the three most recent jobs from P1-node1 using the following script, it will result in an error:

rpc("P1-node1",getRecentJobs(3))

This is because the second argument of the rpc function must be a function (either built-in or user-defined), not the result of a function call. To work around this, we can use partial application to bind the argument to getRecentJobs, creating a new function that can be passed to rpc:

rpc("P1-node1",getRecentJobs{3})

4.3 Using Stateful Functions in Stream Computing

In stream computing, users typically need to define a message processing function that handles incoming messages. This function generally accepts a single argument (the incoming subscription data) or a table, making it challenging to maintain state between processing events.

However, partial application offers a solution to this limitation. The following example uses partial application to define a stateful message handler called cumulativeAverage, which calculates the cumulative average of incoming data.

We first define a stream table trades. For each incoming message, we calculate the average of the price column and output the result to the avgTable. Here's the script:

share streamTable(10000:0,`time`symbol`price, [TIMESTAMP,SYMBOL,DOUBLE]) as trades
avgT=table(10000:0,[`avg_price],[DOUBLE])

def cumulativeAverage(mutable avgTable, mutable stat, trade){
   newVals = exec price from trade;

   for(val in newVals) {
      stat[0] = (stat[0] * stat[1] + val )/(stat[1] + 1)
      stat[1] += 1
      insert into avgTable values(stat[0])
   }
}

subscribeTable(tableName="trades", actionName="action30", handler=cumulativeAverage{avgT,0.0 0.0}, msgAsTable=true)

In the user-defined function cumulativeAverage, avgTable is the output table that stores the results. stat is a vector holding two values: stat[0] stores the current cumulative average, and stat[1] keeps track of the count of processed values. The function updates these values as it processes each message and inserts the updated average into the output table.

When subscribing to the stream table, we use partial application to pre-bind the first two parameters (avgTable and stat), allowing us to create a “stateful” function.

5. Financial Use Cases

5.1 Downsampling Tick Data Using MapReduce

The following example demonstrates how to use the mr (MapReduce) function to convert high-frequency tick data into minute-level data.

In DolphinDB, we can use a SQL query to aggregate tick data into minute-level data:

minuteQuotes = select avg(bid) as bid, avg(ofr) as ofr from t group by symbol, date, minute(time) as minute

This can also be achieved using DolphinDB’s distributed computing framework. The mr (MapReduce) function is a core feature of DolphinDB’s distributed computing framework.

Here is the full script:

login(`admin, `123456)
db = database("dfs://TAQ")
quotes = db.loadTable("quotes")

//create a new table quotes_minute
model=select  top 1 symbol,date, minute(time) as minute,bid,ofr from quotes where date=2007.08.01,symbol=`EBAY
if(existsTable("dfs://TAQ", "quotes_minute"))
db.dropTable("quotes_minute")
db.createPartitionedTable(model, "quotes_minute", `date`symbol)

//populate data for table quotes_minute
def saveMinuteQuote(t){
minuteQuotes=select avg(bid) as bid, avg(ofr) as ofr from t group by symbol,date,minute(time) as minute
loadTable("dfs://TAQ", "quotes_minute").append!(minuteQuotes)
return minuteQuotes.size()
}

ds = sqlDS(<select symbol,date,time,bid,ofr from quotes where date between 2007.08.01 : 2007.08.31>)
timer mr(ds, saveMinuteQuote, +)

5.2 Data Replay and High-Frequency Factor Calculation

Stateful factors rely not only on current data but also on historical data for their calculations. Calculating such factors generally involves the following steps:

  1. Save the current batch of incoming messages.
  2. Compute the factor using the updated historical data.
  3. Write the computed factor to an output table.
  4. Optionally, clean up historical data that will no longer be needed.

In DolphinDB, message processing functions must be unary—they can only take the current message as input. To preserve historical state and still compute factors within the message handler, partial application is used. This technique fixes some parameters (to store state) and leaves one open for incoming messages. These fixed parameters are contained only within that specific message handler function.

The historical state can be stored in in-memory tables, dictionaries, or in-memory partitioned tables. This example uses a dictionary to store quote data and compute a factor using DolphinDB's streaming engines. For implementations using memory or in-memory partitioned tables, refer to Calculating High Frequency Factors in Real-Time.

Defining the Factor: We define a factor that calculates the ratio between the current ask price (askPrice1) and the ask price from 30 quotes ago.

The corresponding function is:

defg factorAskPriceRatio(x){
	cnt = x.size()
	if(cnt < 31) return double()
	else return x[cnt - 1] / x[cnt - 31]
}

Once the data is loaded and the stream table is set up, we can simulate a real-time stream processing scenario using the replay function.

quotesData = loadText("/data/ddb/data/sampleQuotes.csv")

x = quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1

Setting Up the Historical State: We use a dictionary to store historical ask prices keyed by stock symbol:

history = dict(STRING, ANY)

Each key is a stock symbol (STRING), and the value is a tuple containing the history of ask prices.

Message Handler: The message handler completes three key tasks: it updates the dictionary with the most recent ask price, calculates the factor for each unique stock symbol, and records the results in an output table.

Subscribe to the stream table and inject data into it using data replay. Each time a new record arrives, it triggers the factor calculation.

def factorHandler(mutable historyDict, mutable factors, msg){
	historyDict.dictUpdate!(
	    function=append!, 
	    keys=msg.symbol, 
	    parameters=msg.askPrice1, 
	    initFunc=x->array(x.type(), 0, 512).append!(x)
	)

	syms = msg.symbol.distinct()
	cnt = syms.size()
	v = array(DOUBLE, cnt)

	for(i in 0:cnt){
	    v[i] = factorAskPriceRatio(historyDict[syms[i]])
	}

	factors.tableInsert([take(now(), cnt), syms, v])
}

“historyDict” stores the price history; “factors” is the output table for storing calculated results.

Full Example

quotesData = loadText("/data/ddb/data/sampleQuotes.csv")

defg factorAskPriceRatio(x){
	cnt = x.size()
	if(cnt < 31) return double()
	else return x[cnt - 1]/x[cnt - 31]
}
def factorHandler(mutable historyDict, mutable factors, msg){
	historyDict.dictUpdate!(function=append!, keys=msg.symbol, parameters=msg.askPrice1, initFunc=x->array(x.type(), 0, 512).append!(x))
	syms = msg.symbol.distinct()
	cnt = syms.size()
	v = array(DOUBLE, cnt)
	for(i in 0:cnt){
	    v[i] = factorAskPriceRatio(historyDict[syms[i]])
	}
	factors.tableInsert([take(now(), cnt), syms, v])
}

x=quotesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as quotes1
history = dict(STRING, ANY)
share streamTable(100000:0, `timestamp`symbol`factor, [TIMESTAMP,SYMBOL,DOUBLE]) as factors
subscribeTable(tableName = "quotes1", offset=0, handler=factorHandler{history, factors}, msgAsTable=true, batchSize=3000, throttle=0.005)

replay(inputTables=quotesData, outputTables=quotes1, dateColumn=`date, timeColumn=`time)

View the results:

select top 10 * from factors where isValid(factor)

5.3 Dictionary-Based Computation

In the following example, we create a table “orders” that contains some simple stock order data:

orders = table(`IBM`IBM`IBM`GOOG as SecID, 1 2 3 4 as Value, 4 5 6 7 as Vol)

Next, we create a dictionary where the key is the stock symbol and the value is a subtable of “orders” that contains only the records for that particular stock.

The dictionary is defined as:

historyDict = dict(STRING, ANY)

Then, we use the dictUpdate! function to update the values for each key in the dictionary:

historyDict.dictUpdate!(function=def(x,y){tableInsert(x,y);return x}, keys=orders.SecID, parameters=orders, initFunc=def(x){t = table(100:0, x.keys(), each(type, x.values())); tableInsert(t, x); return t})

The dictUpdate! process can be understood as iterating over each row in parameters, using the specified function to update the dictionary based on the corresponding key defined by keys.

If a key does not yet exist in the dictionary, the system will call initFunc to initialize the value for that key.

In this example, the dictionary's key is the stock code, and the value is a sub-table of orders. We use orders.SecID as the keys. For the update logic, we define a lambda function to insert the current record into the corresponding table:

def(x,y){tableInsert(x,y);return x}

Note: We wrap tableInsert in a lambda function rather than simply passing function=tableInsert because tableInsert returns the number of rows inserted, not the updated table itself.

If we use function=tableInsert directly, the value in the dictionary would be overwritten with an integer (the row count). For example, when inserting the second record for IBM, the value would incorrectly become a number. Then, when inserting a third record, an error would be thrown because the value is no longer a table.

Initially, historyDict is empty. We use the initFunc parameter to initialize the value for a new key:

def(x){
  t = table(100:0, x.keys(), each(type, x.values()));
  tableInsert(t, x);
  return t
}

Complete code:

orders = table(`IBM`IBM`IBM`GOOG as SecID, 1 2 3 4 as Value, 4 5 6 7 as Vol)
historyDict = dict(STRING, ANY)
historyDict.dictUpdate!(function=def(x,y){tableInsert(x,y);return x}, keys=orders.SecID, parameters=orders,
            initFunc=def(x){t = table(100:0, x.keys(), each(type, x.values())); tableInsert(t, x); return t})

After execution, the contents of historyDict will be as follows:

GOOG->
Vol Value SecID
--- ----- -----
7   4     GOOG

IBM->
Vol Value SecID
--- ----- -----
4   1     IBM
5   2     IBM
6   3     IBM

6. Machine Learning Use Case: OLS Residuals

Create a sample table t as follows:

t=table(2020.11.01 2020.11.02 as date, `IBM`MSFT as ticker, 1.0 2 as past1, 2.0 2.5 as past3, 3.5 7 as past5, 4.2 2.4 as past10, 5.0 3.7 as past20, 5.5 6.2 as past30, 7.0 8.0 as past60)

We want to compute the regression residual between each row of the table and a benchmark vector benchX, and store the result in a new column.

The benchmark vector is defined as:

benchX = 10 15 7 8 9 1 2.0

DolphinDB provides an ordinary least squares (OLS) regression function called ols.

First, convert the relevant fields from the table into a matrix:

mt = matrix(t[`past1`past3`past5`past10`past20`past30`past60]).transpose()

Then, define a function to compute the residuals:

def(y, x) {
    return ols(y, x, true, 2).ANOVA.SS[1]
}

Finally, use the higher-order function each with partial application to apply the residual computation function to each row:

t[`residual] = each(def(y, x){ return ols(y, x, true, 2).ANOVA.SS[1]}{,benchX}, mt)

Complete code:

t=table(2020.11.01 2020.11.02 as date, `IBM`MSFT as ticker, 1.0 2 as past1, 2.0 2.5 as past3, 3.5 7 as past5, 4.2 2.4 as past10, 5.0 3.7 as past20, 5.5 6.2 as past30, 7.0 8.0 as past60)

mt = matrix(t[`past1`past3`past5`past10`past20`past30`past60]).transpose()
t[`residual] = each(def(y, x){ return ols(y, x, true, 2).ANOVA.SS[1]}{,benchX}, mt)

7. Summary

Beyond the functions and higher-order functions already discussed, DolphinDB provides a rich set of built-in libraries covering areas such as mathematics, statistics, distributions, hypothesis testing, machine learning, logic, strings, time, data manipulation, windowing, metaprogramming, distributed and stream computing, task scheduling, performance monitoring, and user access control. Together, these tools offer robust support for data analysis, modeling, and real-time computation in DolphinDB.