From SQL Server to DolphinDB

This tutorial introduces two methods for migrating from SQL Server to DolphinDB with a database example. Additionally, it compares the differences in performance and application between the two methods.

Migration Methods

DolphinDB provides two migration tools:

  • ODBC Plugin:

The Microsoft Open Database Connectivity (ODBC) interface is a C programming language interface that makes it possible for applications to access data from a variety of database management systems (DBMS). You can use the ODBC plugin to migrate data from SQL Server to DolphinDB.

See DolphinDB ODBC Plugin for detailed instructions on the methods.

  • DataX:

DataX is a platform for offline data synchronization between various heterogeneous data sources. As a data synchronization framework, DataX abstracts the synchronization of different data sources as a Reader plugin for reading data from the data source and a Writer plugin for writing data to the target end. It implements efficient data synchronization between various heterogeneous data sources.

DolphinDB provides open-source drivers based on DataXReader and DataXWriter. Combined with the reader plugin of DataX, DolphinDBWriter can synchronize data from different sources to DolphinDB. Users can include the DataX driver package in their Java projects and develop data migration software from SQL Server data sources to DolphinDB.

MethodData Write PerformanceHigh Availability
ODBC Pluginhighsupported
DataX Drivermediumnot supported

Migration Requirements

This example uses:

  • DolphinDB: DolphinDB_Linux64_V2.00.8.6
  • SQL Server: Microsoft SQL Server 2017 (RTM-CU31) (KB5016884) - 14.0.3456.2 (X64)

We have tick data of 10 years stored in SQL Server:

TradeDateOrigTimeSendTimeRecvTimeLocalTimeChannelNoMDStreamIDApplSeqNumSecurityIDSecurityIDSourcePriceOrderQtyTransactTimeSideOrderTypeConfirmIDContactorContactInfoExpirationDaysExpirationType
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.309000020131113001041022.749111002019-01-02 09:15:00.000120.0NULLNULL00
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.313000020231111599191023.0021002019-01-02 09:15:00.000120.0NULLNULL00
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.314000020231121599191023.0021002019-01-02 09:15:00.000120.0NULLNULL00
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.314000020231131599191023.0021002019-01-02 09:15:00.000120.0NULLNULL00
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.314000020231141599191023.0021002019-01-02 09:15:00.000120.0NULLNULL00
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.314000020231151599191023.0021002019-01-02 09:15:00.000120.0NULLNULL00
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.314000020231161599191023.0021002019-01-02 09:15:00.000120.0NULLNULL00
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.3140000201311200278510210.526644002019-01-02 09:15:00.000120.0NULLNULL00
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.314000020131132006131025.1244002019-01-02 09:15:00.000220.0NULLNULL00
2019-01-0209:15:00.000000009:15:00.073000009:15:00.303000009:15:00.3140000201311400278510210.522555002019-01-02 09:15:00.000120.0NULLNULL00

The corresponding table schemata in SQL Server and DolphinDB are as shown below:

SQL Server Column NameDescriptionData TypeDolphinDB Column NameDescriptionData Type
tradedatetrade datedate
OrigTimerecord creation timetime(7)
SendTimerecord send timetime(7)
Recvtimerecord receive timetime(7)
LocalTimelocal timetime(7)LocalTimelocal timeTIME
ChannelNochannel numberintChannelNochannel numberINT
MDStreamIDidentifier of the price streamintMDStreamIDidentifier of the price streamINT
ApplSeqNumapplication sequence numberbigintApplSeqNumapplication sequence numberLONG
SecurityIDsecurity IDvarchar(255)SecurityIDsecurity IDSYMBOL
SecurityIDSourcesecurity ID sourceintSecurityIDSourcesecurity ID sourceINT
Priceorder pricefloatPriceorder priceDOUBLE
OrderQtyorder quantityintOrderQtyorder quantityINT
TransactTimeorder timedatetimeTransactTimeorder timeTIMESTAMP
Sidebuy or sell ordervarchar(255)Sidebuy or sell orderSYMBOL
OrderTypeorder typevarchar(255)OrderTypeorder typeSYMBOL
ConfirmIDconfirm IDvarchar(255)
Contactorcontactorvarchar(255)
ContactInfocontact informationvarchar(255)
ExpirationDaysexpiration daysint
ExpirationTypeexpiration typevarchar(255)
BizIndexbusiness indexLONG
DataStatusdata statusINT
SeqNosequence numberLONG
MarketmarketSYMBOL

Migration Steps

Migrate With ODBC Plugin

Install ODBC Driver

In this example, the operating system where DolphinDB server is deployed is ubuntu22.04.

(1) Run the following commands in Terminal to install the freeTDS library, the unixODBC library, and the SQL Server ODBC driver.

# install freeTDS
apt install -y freetds

# install unixODBC library
apt-get install unixodbc unixodbc-dev

# install SQL Server ODBC driver
apt-get install tdsodbc

(2) Add the IP and port of SQL Server in /etc/freetds/freetds.conf:

[sqlserver]
host = 127.0.0.1
port = 1433

(3) Configure /etc/odbcinst.ini:

[SQLServer]
Description = ODBC for SQLServer
Driver = /usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so
Setup = /usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so
FileUsage = 1

For CentOS, follow the steps below:

(1) Run the following commands in the terminal to install the freeTDS library and the unixODBC library.

# install freeTDS
yum install -y freetds

# install unixODBC library
yum install unixODBC-devel

(2) Add the IP and port of SQL Server in /etc/freetds.conf.

(3) Configure /etc/odbcinst.ini:

[SQLServer]
Description = ODBC for SQLServer
Driver = /usr/lib64/libtdsodbc.so.0.0.0
Setup = /usr/lib64/libtdsodbc.so.0.0.0
FileUsage = 1

Synchronize Data

(1) Run the following command to load the ODBC plugin:

loadPlugin("/home/Linux64_V2.00.8/server/plugins/odbc/PluginODBC.txt")

(2) Establish a connection to SQL Server:

conn =odbc::connect("Driver={SQLServer};Servername=sqlserver;Uid=sa;Pwd=DolphinDB;database=historyData;;");

(3) Synchronize data:

def transform(mutable msg){
	msg.replaceColumn!(`LocalTime,time(temporalParse(msg.LocalTime,"HH:mm:ss.nnnnnn")))
    msg.replaceColumn!(`Price,double(msg.Price))
	msg[`SeqNo]=int(NULL)
	msg[`DataStatus]=int(NULL)
	msg[`BizIndex]=long(NULL)
	msg[`Market]="SZ"
	msg.reorderColumns!(`ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TransactTime`OrderType`LocalTime`SeqNo`Market`DataStatus`BizIndex)
    return msg
}

def synsData(conn,dbName,tbName){
    odbc::query(conn,"select ChannelNo,ApplSeqNum,MDStreamID,SecurityID,SecurityIDSource,Price,OrderQty,Side,TransactTime,OrderType,LocalTime from data",loadTable(dbName,tbName),100000,transform)
}

submitJob("synsData","synsData",synsData,conn,dbName,tbName)

As the output shows, the elapsed time is around 122 seconds.

startTime                      endTime
2022.11.28 11:51:18.092        2022.11.28 11:53:20.198

Incremental synchronization can be performed with DolphinDB built-in function scheduleJob. For example, set a daily scheduled job to synchronize data of the previous day at 00:05:

def synchronize(){
	login(`admin,`123456)
    conn =odbc::connect("Driver={SQLServer};Servername=sqlserver;Uid=sa;Pwd=DolphinDB;database=historyData;;")
    sqlStatement = "select ChannelNo,ApplSeqNum,MDStreamID,SecurityID,SecurityIDSource,Price,OrderQty,Side,TransactTime,OrderType,LocalTime from data where TradeDate ='" + string(date(now())-1) + "';"
    odbc::query(conn,sqlStatement,loadTable("dfs://TSDB_Entrust",`entrust),100000,transform)
}
scheduleJob(jobId=`test, jobDesc="test",jobFunc=synchronize,scheduleTime=00:05m,startDate=2022.11.11, endDate=2023.01.01, frequency='D')

Note: To avoid parsing failure of the scheduled job, add preloadModules=plugins::odbc to the configuration file before startup.

Migrate with DataX

Deploy DataX

Download DataX and extract it to a custom directory.

Deploy DataX-DolphinDBWriter Plugin

Copy all the contents of the ./dist/dolphindbwriter directory from the DataX-Writer project to the DataX/plugin/writer directory.

Synchronize Data

(1) The configuration file synchronization.json is placed in the data/job directory:

{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 5242880
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte":10485760
            }
        },
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "username": "sa",
                        "password": "DolphinDB123",
                        "column": [
                            "ChannelNo","ApplSeqNum","MDStreamID","SecurityID","SecurityIDSource","Price","OrderQty","Side","TransactTime","OrderType","LocalTime"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "data"
                                ],
                                "jdbcUrl": [
                                    "jdbc:sqlserver://127.0.0.1:1433;databasename=historyData"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "dolphindbwriter",
                    "parameter": {
                        "userId": "admin",
                        "pwd": "123456",
                        "host": "127.0.0.1",
                        "port": 8888,
                        "dbPath": "dfs://TSDB_Entrust",
                        "tableName": "entrust",
                        "batchSize": 100000,
                        "saveFunctionDef": "def customTableInsert(dbName, tbName, mutable data) {data.replaceColumn!(`LocalTime,time(temporalParse(data.LocalTime,\"HH:mm:ss.nnnnnn\")));data.replaceColumn!(`Price,double(data.Price));data[`SeqNo]=int(NULL);data[`DataStatus]=int(NULL);data[`BizIndex]=long(NULL);data[`Market]=`SZ;data.reorderColumns!(`ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TransactTime`OrderType`LocalTime`SeqNo`Market`DataStatus`BizIndex);pt = loadTable(dbName,tbName);pt.append!(data)}",
                        "saveFunctionName" : "customTableInsert",
                        "table": [
                            {
                                "type": "DT_INT",
                                "name": "ChannelNo"
                            },
                            {
                                "type": "DT_LONG",
                                "name": "ApplSeqNum"
                            },
                            {
                                "type": "DT_INT",
                                "name": "MDStreamID"
                            },
                            {
                                "type": "DT_SYMBOL",
                                "name": "SecurityID"
                            },
                            {
                                "type": "DT_INT",
                                "name": "SecurityIDSource"
                            },
                            {
                                "type": "DT_DOUBLE",
                                "name": "Price"
                            },
                            {
                                "type": "DT_INT",
                                "name": "OrderQty"
                            },
                            {
                                "type": "DT_SYMBOL",
                                "name": "Side"
                            },
                            {
                                "type": "DT_TIMESTAMP",
                                "name": "TransactTime"
                            },
                            {
                                "type": "DT_SYMBOL",
                                "name": "OrderType"
                            },
                            {
                                "type": "DT_STRING",
                                "name": "LocalTime"
                            }
                        ]

                    }
                }
            }
        ]
    }
}

(2) Run the following command in the Linux terminal to execute the synchronization task:

cd ./DataX/bin/
python DataX.py ../job/synchronization.json

The log is displayed as follows:

Start Time               : 2022-11-28 17:58:52
End Time                 : 2022-11-28 18:02:24
Elapsed Time             :                212s
Average Flow             :            3.62MB/s
Write Speed              :          78779rec/s
Total Read Records       :            16622527
Total Failed Attempts    :                   0

Similarly, you can add the "where" condition to the "reader" part of synchronization.json to incrementally synchronize the selected data. For example, specify a "where" filter on the trading date as shown below, and then every time when the synchronization task is executed, only the data filtered in the where condition (i.e., data of the previous day) is synchronized.

"reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "username": "sa",
                        "password": "DolphinDB123",
                        "column": [
                            "ChannelNo","ApplSeqNum","MDStreamID","SecurityID","SecurityIDSource","Price","OrderQty","Side","TransactTime","OrderType","LocalTime"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "data"
                                ],
                                "jdbcUrl": [
                                    "jdbc:sqlserver://127.0.0.1:1433;databasename=historyData"
                                ]
                            }
                        ]
                        "where":"TradeDate=(select CONVERT ( varchar ( 12) , dateadd(d,-1,getdate()), 102))"
                    }
                }

Performance Comparison

The table shows the time consumed by data migration:

ODBC pluginDataX
122s212s

We can see that the ODBC plugin demonstrates better performance than DataX. This performance advantage becomes evident when there is a larger amount of data (especially with more tables involved). Both methods support full or incremental synchronization, but the ODBC plugin is fully integrated with DolphinDB, while DataX requires deployment and JSON configurations for migration. Additionally, if the migrated data does not require further processing, migration with DataX requires configuring a JSON file for each table, whereas the ODBC plugin only needs modification on the table names.