Factor Computing Platform with Python Celery and DolphinDB

Research and trading in quantitative finance rely on factor discovery as its core work. Traditional development workflow often adopts Python to read data from relational databases like SQL Server and perform factor calculations. As trading volumes and market data continue to expand, these traditional platforms face performance bottlenecks.

This tutorial focuses on improving factor computation performance by introducing DolphinDB as a core computing tool into traditional Python-based platforms. The proposed DolphinDB-based factor computing platform includes data synchronization module, factor computing module, and task scheduling module. It can provide business departments with real-time factor computation, batch processing, and historical factor query services. DolphinDB is well-suited for factor computation across high, medium, and low frequency data. It offers extensive APIs and ETL tools that allow integrating DolphinDB as a computational engine while retaining existing Python processes. We will walk through the process of building a complete factor computing platform, using the first factor WQAlpha1 in the WorldQuant 101 Alpha library as an example.

1. Architecture Overview

The unified factor computing platform built in this tutorial consists of the following key components:

  • Data synchronization module:

DataX is a widely used tool/platform for offline data synchronization to migrate the original and incremental data from the relational database SQL Server to DolphinDB.

  • Factor computing and data storage module:

DolphinDBserves as the core data processing framework for factor computing and data storage. It provides function views that allow invoking functions via the DolphinDB Python API in a Python program.

  • Task scheduling module:

Celery is an open-source distributed task queue, which helps improve the performance and scalability of applications. Celery serves as the task scheduling framework, combined with Redis for message brokering and result backend.

  • The calculation results are saved in DataFrames and visualized in web-based interface.

2. Environment Setup

A single-node cluster is deployed as follows for test environment in this tutorial.

HardwareDescription
Hostcnserver 9
IPxxx.xxx.xxx.122
OSLinux kernel 3.10 or higher
RAM64 GB
CPUx86_64 (12 cores)
SoftwareVersion
DolphinDBV2.00.7
SQL Serverlatest release of 2019
dataX3.0
JDK (required by dataX installation)1.8.0_xxx
Maven (required by dataX installation)3.6.1+
Python (required by dataX installation)2.x
Celery4.3.0
Python (NumPy, pandas, Celery libraries, and DolphinDB Python API required)3.7.9
Redis6.2.7

3. Platform Building

3.1 Table Structures

This tutorial uses the daily closing prices of several stocks from 2020.01.01 to 2021.01.01, with a total of 544,174 records. Below are the corresponding table structures in SQL Server and DolphinDB:

ColumnDescriptionData Type (SQL Server)Data Type (DolphinDB)
SecurityIDstock IDvarcharSYMBOL
TradeDatetrade datedateDATE
Valueclosing pricefloatDOUBLE

3.2 Metrics

This example calculates the factor WQAlpha1 defined in the DolphinDB WorldQuant 101 Alpha module.

3.3 Synchronizing Data from SQL Server to DolphinDB

Note: The following migration instruction assumes that SQL Server databases are already created. The DolphinDB port number is 8848.

(1) Creating DolphinDB databases and tables

Before importing data, databases and tables must be created first on the DolphinDB server. Create database dfs://tick_close and table tick_close:

dbName = "dfs://tick_close"
tbName = "tick_close"
if(existsDatabase(dbName)){
	dropDatabase(dbName)
}
db = database(dbName, RANGE, date(datetimeAdd(2000.01M,0..50*12,'M')))
name = `SecurityID`TradeDate`Value
type = `SYMBOL`DATE`DOUBLE
schemaTable = table(1:0, name, type)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeDate)

(2) Configuring data import

The dataX import requires a .json config file specifying data source details. In general, the synchronization of a table often requires one configuration file. The file configured for the table tick_close is shown as follows:

{
    "job": {
        "content": [
            {
                "writer": {
                    "parameter": {
                        "dbPath": "dfs://tick_close",
                        "tableName": "tick_close",
                        "batchSize": 100,
                        "userId": "admin",
                        "pwd": "123456",
                        "host": "127.0.0.1",
                        "table": [
                            {
                                "type": "DT_SYMBOL",
                                "name": "SecurityID"
                            },
                            {   "type": "DT_DATE",
                                "name": "TradeDate"
                            },
                            {
                                "type": "DT_DOUBLE",
                                "name": "Value"
                            }
],
                        "port": 8848
                    },
                    "name": "dolphindbwriter"
                },
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "username": "SA",
                        "password": "Sa123456",
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "tick_close"
                                ],
                                "jdbcUrl": [
                                "jdbc:sqlserver://127.0.0.1:1234;DatabaseName=tick_close"
                                ]
                                
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

Note: The data synchronization in this tutorial is only a full synchronization of historical data. For incremental data, two configuration parameters saveFunctionName and saveFunctionDef must be added in the writer configuration.

(3) Running data import

Go to the bin directory of dataX and execute the following command to import data to the table tick_close:

python datax.py ../conf/tick_close.json

Parameters:

  • datax.py (required): the startup script of dataX.
  • ../conf/tick_close.json (required): the path to configuration file.

Expected output:

2022-12-14 00:50:04.125 [job-0] INFO  JobContainer -
Start Time                      : 2022-12-14 00:49:33
End Time                        : 2022-12-14 00:50:04
Elapsed Time                    :                 30s
Average Flow                    :          555.88KB/s
Write Speed                     :          18139rec/s
Total Read Records              :              544174
Total Failed Attempts           :                   0

3.4 Scheduling Factor Computing With Celery

This section introduces how to use Celery to schedule and asynchronously invoke factor computing.

3.4.1 Setting up Redis for message brokering and result backend

The Celery framework requires a message broker to send messages for task scheduling, and a result backend to store execution results. We recommend using Redis. The port number is 6379 in this example. You can customize your tools and configuration according to your needs.

3.4.2 Implementing factor computing

Connect to the DolphinDB cluster and invoke the predefined function. The factor WQAlpha1 is implemented as follows:

use wq101alpha
defg get_alpha1(security_id, begin_date, end_date){
	if (typestr(security_id) == 'STRING VECTOR' && typestr(begin_date) == `DATE && typestr(end_date) == `DATE){
	tick_list = select * from loadTable("dfs://tick_close", "tick_close") where TradeDate >= begin_date and TradeDate <= end_date and SecurityID in security_id
	alpha1_list=WQAlpha1(panel(tick_list.TradeDate, tick_list.SecurityID, tick_list.Value))
	return table(alpha1_list.rowNames() as TradeDate, alpha1_list)
	}
	else {
		print("What you have entered is a wrong type")
		return `NULLValue
	}
}

Input parameters:

ParameterRequiredDescriptionDolphinDB Data Type
security_idstock IDs, splitted by date.STRING VECTOR
begin_datebegin dateDATE
end_dateend dateDATE

The function returns an in-memory table that contains the highest retracement ratio of a fund in a given time interval. If an exception occurs, a wrongNum value of STRING type is returned.

A Python program uses the Python API to connect to DolphinDB and invoke predefined functions. These functions are defined on the server in a separate session, so we create function views and grant the user with the view execution privileges to enable function calls from Python.

// add the function as a function view
addFunctionView(get_alpha1)
// grant a user xxx the execution privilege
grant("xxx", VIEW_EXEC, "get_alpha1")

3.4.3 Scheduling Tasks with Celery

Execute the following pip command to install Celery:

pip install celery==4.3.0 && pip install redis==3.2.0

To avoid errors like TypeError: __init__() got an unexpected keyword argument 'username' while using Celery, it is recommended to uninstall the default kombu library after installing Celery and install version 5.1.0 of the library.

After all required libraries are installed, build the project directory and files:

mkdir celery_project && touch celery_project/tasks.py celery_project/app.py

Execute tree ./celery_project to check the directory tree:

./celery_project
├── app.py
└── tasks.py

0 directories, 2 files
  • The task.py file is used to construct session to DolphinDB, and declare the function execution as an asynchronous task to be scheduled.

    Import required Python packages:

    from celery import Celery
    import dolphindb as ddb
    import numpy as np
    import pandas as pd
    from datetime import datetime

    Construct a session object:

    s = ddb.session()
    s.connect("127.0.0.1", 8848, "admin", "123456")

    Instantialize a Celery object and set up configurations:

    app = Celery(
        'celeryApp',
        broker='redis://localhost:6379/1',
        backend='redis://localhost:6379/2'
    )
    app.conf.update(
        task_serializer='pickle',
        accept_content=['pickle'], 
        result_serializer='pickle',
        timezone='Asia/Shanghai',
        enable_utc=True,
    )

    The factor calculation involves passing and returning of datetime and DataFrame object. Since the default serialization method json does not support these data types, the parameters task_serializer, accept_content, and result_serializer must be set to pickle.

    To declare the factor calculation as an asynchronous task, we encapsulate it in a function and decorate it with @app.task():

    @app.task()
    def get_alpha1(security_id, begin_date, end_time):
        return s.run("get_alpha1", security_id, begin_date, end_time)

    Note: Python data types are used here for data passing. You can refer to DolphinDB Python API Reference Guide for the mappings between Python and DolphinDB data types.

  • The file app.py uses a for-loop statement to call the function delay() of Celery to send two task requests and print each task id.

    import numpy as np
    from tasks import get_alpha1
    security_id_list=[["600020", "600021"],["600022", "600023"]]
    if __name__ == '__main__':
    for i in security_id_list:
        result = get_alpha1.delay(i, np.datetime64('2020-01-01'), np.datetime64('2020-01-31'))
        print(result)

3.4.4 Processing tasks with Celery

Run the Celery worker server to process tasks:

celery -A tasks worker --loglevel=info

Expected output:

 -------------- celery@cnserver9 v4.3.0 (rhubarb)
---- **** -----
--- * ***  * -- Linux-3.10.0-1160.53.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core 2022-11-11 00:10:34
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         celeryApp:0x7f597a1d4e48
- ** ---------- .> transport:   redis://localhost:6379/1
- ** ---------- .> results:     redis://localhost:6379/2
- *** --- * --- .> concurrency: 64 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.max_drawdown

[2022-11-11 00:10:37,413: INFO/MainProcess] Connected to redis://localhost:6379/1
[2022-11-11 00:10:37,437: INFO/MainProcess] mingle: searching for neighbors
[2022-11-11 00:10:38,465: INFO/MainProcess] mingle: all alone
[2022-11-11 00:10:38,488: INFO/MainProcess] celery@cnserver9 ready.

After running the above command, worker will remain in an interactive mode. Then we need to establish a new session to connect to the machine.

Go to the Celery project directory and execute the following command to send asynchronous invoking requests:

python3 app.py

Expected output:

400a3024-65a1-4ba6-b8a9-66f6558be242
cd830360-e866-4850-aba0-3a07e8738f78

Check the worker status with the above command, and we can obtain the execution status of asynchronous tasks:

 -------------- celery@cnserver9 v4.3.0 (rhubarb)
---- **** -----
--- * ***  * -- Linux-3.10.0-1160.53.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core 2022-11-11 00:10:34
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         celeryApp:0x7f597a1d4e48
- ** ---------- .> transport:   redis://localhost:6379/1
- ** ---------- .> results:     redis://localhost:6379/2
- *** --- * --- .> concurrency: 64 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.max_drawdown

[2022-11-11 00:10:37,413: INFO/MainProcess] Connected to redis://localhost:6379/1
[2022-11-11 00:10:37,437: INFO/MainProcess] mingle: searching for neighbors
[2022-11-11 00:10:38,465: INFO/MainProcess] mingle: all alone
[2022-11-11 00:10:38,488: INFO/MainProcess] celery@cnserver9 ready.
[2022-11-11 00:12:44,365: INFO/MainProcess] Received task: tasks.max_drawdown[400a3024-65a1-4ba6-b8a9-66f6558be242]
[2022-11-11 00:12:44,369: INFO/MainProcess] Received task: tasks.max_drawdown[cd830360-e866-4850-aba0-3a07e8738f78]
[2022-11-11 00:12:44,846: INFO/ForkPoolWorker-63] Task tasks.get_alpha1[400a3024-65a1-4ba6-b8a9-66f6558be242] succeeded in 0.04292269051074982s:    TradeDate  600020  600021
0  2020-01-01     NaN     NaN
1  2020-01-02     NaN     NaN
2  2020-01-03     NaN     NaN
3  2020-01-06     NaN     NaN
4  2020-01-07     0.5     0.0
5  2020-01-08     0.5     0.0
6  2020-01-09     0.0     0.5
7  2020-01-10     0.0     0.5
8  2020-01-13     0.0     0.5
9  2020-01-14     0.0     0.5
10 2020-01-15     0.5     0.0
11 2020-01-16     0.5     0.0
12 2020-01-17     0.5     0.0
13 2020-01-20     0.5     0.0
14 2020-01-21     0.0     0.5
15 2020-01-22     0.5     0.0
16 2020-01-23     0.5     0.0
17 2020-01-24     0.5     0.0
18 2020-01-27     0.5     0.0
19 2020-01-28     0.0     0.5
20 2020-01-29     0.0     0.5
21 2020-01-30     0.0     0.5
22 2020-01-31     0.0     0.5

[2022-11-11 00:12:45,054: INFO/ForkPoolWorker-1] Task tasks.get_alpha1[cd830360-e866-4850-aba0-3a07e8738f78] succeeded in 0.06510275602340698s:     TradeDate  600022  600023
0  2020-01-01     NaN     NaN
1  2020-01-02     NaN     NaN
2  2020-01-03     NaN     NaN
3  2020-01-06     NaN     NaN
4  2020-01-07     0.0     0.0
5  2020-01-08     0.0     0.0
6  2020-01-09     0.0     0.0
7  2020-01-10     0.0     0.0
8  2020-01-13     0.0     0.0
9  2020-01-14     0.0     0.0
10 2020-01-15     0.0     0.5
11 2020-01-16     0.0     0.0
12 2020-01-17     0.0     0.5
13 2020-01-20     0.5     0.0
14 2020-01-21     0.5     0.0
15 2020-01-22     0.5     0.0
16 2020-01-23     0.5     0.0
17 2020-01-24     0.0     0.5
18 2020-01-27     0.0     0.0
19 2020-01-28     0.5     0.0
20 2020-01-29     0.5     0.0
21 2020-01-30     0.5     0.0
22 2020-01-31     0.5     0.0

After the tasks are executed, the execution results can also be retrieved from Redis.

Note: The asynchronous invoking requests can be sent before starting the worker, but the execution status or results cannot be retrieved and only a task id is returned.

4. Conclusion

This tutorial demonstrates how DolphinDB can be integrated into traditional factor calculation platforms to solve performance bottlenecks. Through testing, we combined DolphinDB's powerful computing and storage capabilities with the asynchronous task scheduling of Celery framework to provide a production-ready solution.