ETL Tuning: From 4.5 Hours to 3.5 Minutes
Extract, transform, and load (ETL) is a data integration process which first extracts the data from various sources, then transforms the data according to business rules, and loads the data into a destination data warehouse.
When utilizing traditional tech stacks like Python, MySQL, and Java as ETL tools, it's common for users to experience performance limitations as the volume of data grows, particularly when processing high-frequency trading data. In this tutorial, we will show you how to implement an ETL process and enhance its performance with SQL tuning techniques using DolphinDB. Our optimized script reduces the processing time from 4.5 hours to 3.5 minutes, with the performance improved by over 70 times.
1. Data Preparation
In this tutorial, we prepare 1 year's raw trade data (up to 1.7 TB before compression) for ETL processing. Table "trade" contains tick data of around 3000 stocks with 60 million records per day.
Field | Data Type | Description |
---|---|---|
securityID | STRING | stock ID |
tradingdate | DATE | trading date |
tradingtime | TIMESTAMP | trading time |
tradetype | SYMBOL | trade type |
recid | INT | record ID |
tradeprice | DOUBLE | trading price |
tradevolume | INT | trading volume |
buyorderid | INT | ID of the buy order |
sellorderid | INT | ID of the sell order |
unix | TIMESTAMP | Unix timestamp |
We will write the processed data to the target table. Both the source and target tables use the OLAP storage engines and adopt composite partitions that combine date-based value partition and stock-based hash partition (with 20 hash partitions).
To meet the requirements for quantitative analysis listed below, the ETL process is supposed to convert the field types, add symbol suffixes, add calculated fields, filter failed trades, etc.
- Convert the data type of column "tradingDate" from DATE to INT;
- Convert the data type of column "tradingTime" from TIMESTAMP to LONG type;
- Add a column of BSFlag (Buy/Sell flag);
- Add a column of tradevalue (total value traded).
1.1. Environment
- CPU: Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz
- Logical processors: 16
- Memory: 256 GB
- OS: 64-bit CentOS Linux 7 (Core)
- DolphinDB Server version: 2.00.6
- Deployment: high-availability cluster (with 3 controllers and 3 data nodes)
2. ETL Process
2.1. ETL Design
A typical ETL design operates on each shard of source dataset and consolidates the outcomes into the target database. Specifically in DolphinDB, the ETL process works as follows:
(1) Partition the original dataset by trading date and stock ID.
data = [cut1, cut2, ... , cutN]
(2) Clean and transform the data within each partition, and save the processed data to an in-memory object "tradingdf".
(3) Append the in-memory table "tradingdf" to a DFS table.
def genDataV1(date1, dateN){
tradeSrc = loadTable("dfs://originData", "trade")
tradeTgt = loadTable("dfs://formatData", "trade")
for (aDate in date1..dateN){
tradeSecurityID = (exec distinct(securityID) from tradeSrc where tradingdate = aDate).shuffle()
for (m in tradeSecurityID){
tradingdf = select * from tradeSrc where securityID = m and tradingdate = aDate
tradingdf["symbol"] = m + "SZ"
//print("stock " + m + ",date is " + aDate + ",tradingdf size " + tradingdf.size())
tradingdf["buysellflag"] =iif(tradingdf["sellorderid"] > tradingdf["buyorderid"],"S", "B")
tradingdf["tradeamount"] = tradingdf["tradevolume"] * tradingdf["tradeprice"]
tradingdf = tradingdf[(tradingdf["tradetype"] == "0") || (tradingdf["tradetype"] == "F")]
tradingdf = select symbol,tradingdate, tradingtime, recid, tradeprice, tradevolume, tradeamount, buyorderid, sellorderid, buysellflag, unix from tradingdf
tradingdf = select * from tradingdf order by symbol, tradingtime, recid
tradingdf.replaceColumn!("tradingdate", toIntDate(::date(tradingdf.tradingDate)))
tradingtime = string(exec tradingtime from tradingdf)
tradingdf.replaceColumn!(`tradingtime, tradingtime)
unix = long(exec unix from tradingdf)
tradingdf.replaceColumn!(`unix, unix)
tradeTgt.append!(tradingdf)
}
}
}
When using ETL tools lik Python, MySQL, Java, or middleware like Kettle, the processing is often limited due to its single-threaded nature. When the similar logic is applied to the ETL process in DolphinDB, it takes up to 4.5 hours to process trade data of 20 trading days. In the following section we will analyze the performance bottleneck and demonstrate how to optimize the script.
2.2. Bottleneck Analysis
The performance bottlenecks of the ETL process mainly lie in:
(1) Nested loops
The code is executed in nested loops based on the stock ID and trading date. The time complexity t can be calculated as follows:
t = O(N) * O(M) * t0 = O(MN) * t0
- N: trading days
- M: number of stocks
- t0: execution time (in seconds) of the innermost loop
The execution time of the innermost loop is about 0.4 seconds, and the execution time for the full script is estimated to be t ~= 20 * 0.4 * 3000 = 6.7 hours
.
(2) Repeated data access
As shown above, the transformation script repeatedly operates on the same dataset, causing higher round-trip time. However, some operations, such as data filtering and sorting, can be done within one query.
(3) Computing is conducted on only one data node
Starting from the assignment statement for "tradingdf":
tradingdf=select * from loadTable("dfs://test", 'szl2_stock_trade_daily') where symbol = m and tradingDate = date
The subsequent script falls short in leveraging DolphinDB's distributed and concurrent computing capabilities, as it only utilizes the resources of one data node. As a result, the performance deteriorates as the volume of data continues to increase.
3. Performance Tuning
The formula of execution time is referenced to optimize the performance of ETL process in DolphinDB:
t = S / V
- t: execution time.
- S: space complexity, i.e., the amount of data to be processed.
- V: speed of data processing, i.e., how many records can be processed per second.
Therefore, the key to reducing execution time t for an ETL process lies in reducing space complexity and increasing processing speed.
3.1. Space Complexity
In DolphinDB, space complexity can be reduced by partition pruning, columnar storage, indexing, and so on.
Partition pruning
For the time series data that is divided into partitions (based on the temporal column), if a filtering condition in the where clause specifies the partitioning column, then only the needed partitions are accessed.
Columnar storage
For example, if a table contains hundreds of columns, but only a few of them are needed for an aggregate query. DolphinDB's OLAP storage engine adopts columnar storage, enabling users to access only the required columns, which greatly reduces the disk I/O.
Indexing
If a DFS table uses TSDB engine and the query statement is filtered by the specified sort columns, the corresponding block ID can be quickly queried by scanning the sparse index. Only the required blocks are accessed for the query, thus avoiding full a table scan.
3.2. Processing Speed
To improve the efficiency of batch processing, you can set a proper batch size and apply multithreaded and distributed computing.
Proper batch size
DolphinDB manages historical data for batching processing on a partition basis. It is recommended to set the partition size to 100 MB – 500 MB (before compression).
Multithreading
DolphinDB makes extensive use of multithreading. Specifically, for a distributed SQL query, multiple threads are applied to concurrent processing of partitioned data.
Distributed computing
DolphinDB leverages distributed computing in multi-machine clusters and coordinates distributed transactions. When a distributed SQL query is executed, it uses a Map-Reduce-Merge model to perform parallel computation. In the Map phase, the query is automatically split into tasks that are assigned to the nodes in the cluster to maximize the utilization of the cluster's hardware resources.
4. Optimized ETL Process
Based on the performance bottleneck analysis, we optimize the code in the following aspects:
- Improve parallelism
- Reduce query times
- Adopt vectorized processing
4.1. Optimized Script
The optimized script processes daily market data of 3000 stocks (distributed across 20 hash partitions) efficiently through a distributed SQL query with 20 tasks. Each task is assigned to the corresponding node within the cluster, allowing for parallel execution and optimized processing.
The optimized code is as follows:
def transformData(tradeDate){
tradeSrc = loadTable("dfs://originData", "trade")
tradeTgt = loadTable("dfs://formatData", "trade")
data = select
securityID + "SZ" as securityID
,toIntDate(tradingdate) as tradingdate
,tradingtime$STRING as tradingtime
,recid as recid
,tradeprice
,tradevolume
,tradevolume * tradeprice as tradeamount
,buyorderid as buyrecid
,sellorderid as sellrecid
,iif(sellorderid> buyorderid,"S", "B") as buysellflag
,unix$LONG as unix
from tradeSrc
where tradingdate = tradeDate and tradetype in ["0", "F"]
tradeTgt.append!(data)
pnodeRun(flushOLAPCache)
}
allDays = 2022.05.01..2022.05.20
for(aDate in allDays){
jobId = "transform_"+ strReplace(aDate$STRING, ".", "")
jobDesc = "transform data"
submitJob(jobId, jobDesc, transformData, aDate)
}
The script takes about 40 seconds to process one day's market data of 3000 stocks. Data of 20 trading days can be processed in concurrent jobs submitted by the function submitJob
. By configuring maxBatchJobWorker =16 (generally it is set to the number of CPU cores), the script takes only 210 seconds, which improves the performance by 74 times.
4.2. Performance Analysis
The performance improvement is mainly due to:
Distributed computing with high parallelism
The
select
statement uses parallel execution, with the parallelism determined by the number of partitions of the source dataset and the number of configured threads. Based on the partitioning scheme of the source table "trade", the parallelism is up to 20, which implies that compared to single-threaded data processing the execution speed is improved by 20 times theoretically. In practical tests, this enhancement is observed to be 18 times when handling data for a single trading day. Furthermore, multiple tasks can be executed in parallel throughsubmitJob
.Efficient queries
All the processing logic, including data filtering, data type conversion, and adding derived fields, can be done within one query. The data does not need to be accessed repeatedly.
Vectorization
The OLAP engine adopts columnar storage, and each column is loaded into memory in the form of a vector. Therefore, vectorization is applied to improve the efficiency of SQL queries.
5. Conclusion
In this tutorial we explore the optimization of ETL process in DolphinDB using a practical example of SQL query tuning. By avoiding nested loops and adopting distributed and vectorized computing techniques, the optimized SQL query demonstrates a remarkable 74-fold increase in efficiency. Specifically, the ETL processing of market data for 3000 stocks over 20 trading days, which previously took over 4.5 hours, can now be completed in just 210 seconds. By incorporating vectorization and leveraging the distributed, parallel processing capabilities of columnar databases, DolphinDB can serve as a highly efficient ETL tool for preprocessing large datasets.
6. Recommended Reading
- Partitions and performance tuning in DolphinDB: DolphinDB Partitioned Database Tutorial
- Threading Model: Overview of Threading Model in DolphinDB
- TSDB Storage Engine: Introduction to TSDB