从 Postgre/Greenplum 迁移到 DolphinDB

PostgreSQL 是一种开源的关系型数据库管理系统(DBMS),是最广泛使用的开源数据库之一。它允许用户通过添加自定义函数、数据类型和索引等方式扩展其功能,支持 ACID 事务,并使用多版本并发控制(MVCC)来管理并发访问,具有优良的安全性和可扩展性。但是,由于存在并发问题和扩容问题等,PostgreSQL 的发展仍面临着许多瓶颈。

DolphinDB 是一种高效、分布式的数据管理和分析平台,集成了功能强大的编程语言如 SQL、Python、Java 等和高容量高速度的流数据分析系统,为海量数据(特别是时间序列数据)的快速存储、检索、分析及计算提供一站式解决方案。其操作简单,可扩展性强,具有良好的容错能力及优异的多用户并发访问能力,适合于各种大规模数据处理场景。

本文旨在为有从 PostgreSQL 数据库迁移到 DolphinDB 需求的用户提供一份简明的教程参考。本教程也适用于在 PostgreSQL 基础上研发的其他类型数据库,比如 Greenplum ,具体实施可能有所区别,以实际数据库官方说明为准。

PostgreSQL 迁移数据到 DolphinDB 的整体框架如下:


实现方法

从 PostgreSQL 迁移数据到 DolphinDB 的方法有以下二种:

ODBC 插件

ODBC(Open Database Connectivity)插件是 DolphinDB 提供的通过 ODBC 接口访问 PostgreSQL 的开源产品。使用插件配合 DolphinDB 脚本使用,与服务器在同一个进程空间内运行,能高效地完成 Oracle 数据到 DolphinDB 的数据写入。

ODBC 提供如下函数,函数的具体使用请参考 odbc 插件使用说明

  • odbc::connect(connStr, [dataBaseType])
  • odbc::close(conn)
  • odbc::query(connHandle or connStr, querySql, [t], [batchSize], [tranform])
  • odbc::execute(connHandle or connStr, SQLstatements)
  • odbc::append(connHandle, tableData, tablename, [createTableIfNotExist], [insertIgnore])

DataX 驱动

DataX 是可扩展的数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,理论上 DataX 框架可以支持任意数据源类型的数据同步工作。

DolphinDB 提供基于 DataXReader 和 DataXWriter 的开源驱动。DolphinDBWriter 插件实现了向 DolphinDB 写入数据,使用 DataX 的现有 reader 插件结合 DolphinDBWriter 插件,即可实现从不同数据源向 DolphinDB 同步数据。用户可以在 Java 项目中包含 DataX 的驱动包,开发从 Oracle 数据源到 DolphinDB 的数据迁移软件。

应用需求

很多存储在 PostgreSQL 中的数据,可以通过上述的两种方式将数据同步到 DolphinDB 中。本文的实践案例基于 2021.01.04 一天的逐笔成交数据,数据量约 2721 万。其部分数据示例如下:

SecurityIDTradeTimeTradePriceTradeQtyTradeAmountBuyNoSellNoTradeIndexChannelNoTradeBSFlagBizIndex
6000202021.01.04T09:25:00.6303.3921007,119.8596178316197191N123713
6000202021.01.04T09:25:00.6303.3910003,390.12994578316197201N123714
6000202021.01.04T09:25:00.6303.399003,051.15803978316197211N123715
6000202021.01.04T09:25:00.6303.39500016,950.158039158092197221N123716
6000202021.01.04T09:25:00.6303.39660022,374.158039118594197231N123717
6000202021.01.04T09:25:00.6303.3925008,475.158039138050197241N123718
6000202021.01.04T09:25:00.6303.394001,356.42283138050197251N123719
6000202021.01.04T09:25:00.6303.39100339.4228338902197261N123720
6000202021.01.04T09:25:00.6303.39388313,163.374228345597197271N123721
6000202021.01.04T09:25:00.6303.391200040,680.42283168862197281N123722

迁移案例与操作步骤

环境配置

本次案例中使用了以下数据库及插件,各版本型号如下:

2.00.10 版本 Server 自带 ODBC 插件,位于 Server 的 <HomeDir>/plugins 目录,可直接加载使用。如果 <HomeDir>/plugins 目录下不存在 ODBC 文件夹,则通过如下链接下载:

DolphinDBPlugin 【分支:release200.10】

请注意,DolphinDB ODBC 插件版本号必须与 Server 版本号一致,否则可能出现报错。例如,DolphinDB Server 版本号为 2.00.10.X,则必须使用 release200.10 分支的 ODBC 插件。

注意:如果想自行编译,请参考:odbc 插件使用说明 文章中的编译部分。

建库建表

PostgreSQL 建表语句如下:

create table ticksh(
  SecurityID         varchar(20),
  TradeTime       	 TIMESTAMP,
  TradePrice         NUMERIC(38,4),
  TradeQty 	         NUMERIC(38),
  TradeAmount        NUMERIC(38,4),
  BuyNo              NUMERIC(38),
  SellNo             NUMERIC(38),
  TradeIndex         NUMERIC(38),
  ChannelNo          NUMERIC(38),
  TradeBSFlag        varchar(10),
  BizIndex           integer
);

设计建库建表方案时,需要综合考虑实际数据的字段、类型、数据量以及在 DolphinDB 中是否需要分区、分区方案以及使用 OLAP 还是 TSDB 引擎等因素。一些数据存储库表设计实践,可以参考 DolphinDB 数据库分区教程

本例 DolphinDB 建库建表文件 createTable.dos 内容如下:

def createTick(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	db1 = database(, VALUE, 2020.01.01..2021.01.01)
	db2 = database(, HASH, [SYMBOL, 10])
	db = database(dbName, COMPO, [db1, db2], , "TSDB")
	db = database(dbName)
	name = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNo`SellNo`ChannelNo`TradeIndex`TradeBSFlag`BizIndex
	type = `SYMBOL`TIMESTAMP`DOUBLE`INT`DOUBLE`INT`INT`INT`INT`SYMBOL`INT
	schemaTable = table(1:0, name, type)
	db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeTime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns=`SecurityID`TradeTime, keepDuplicates=ALL)
}

dbName="dfs://TSDB_tick"
tbName="tick"
createTick(dbName, tbName)

从 PostgreSQL 迁移到 DolphinDB 的数据字段映射关系如下表:

PostgreSQL 字段含义PostgreSQL 字段PostgreSQL 数据类型DolphinDB 字段含义DolphinDB 字段DolphinDB 数据类型
证券代码SecurityIDVARCHAR(20)证券代码SecurityIDSYMBOL
交易时间TradeTimeTIMESTAMP交易时间TradeTimeTIMESTAMP
交易价格TradePriceNUMERIC(38,4)交易价格TradePriceDOUBLE
交易数量TradeQtyNUMERIC(38)交易数量TradeQtyINT
交易金额TradeAmountNUMERIC(38,4)交易金额TradeAmountDOUBLE
买方委托索引BuyNoNUMERIC(38)买方委托索引BuyNoINT
卖方委托索引SellNoNUMERIC(38)卖方委托索引SellNoINT
成交编号TradeIndexNUMERIC(38)成交编号TradeIndexINT
频道代码ChannelNoNUMERIC(38)频道代码ChannelNoINT
成交方向TradeBSFlagVARCHAR(10)成交方向TradeBSFlagSYMBOL
业务序列号BizIndexINTEGER业务序列号BizIndexINT

通过 ODBC 迁移

安装 ODBC 驱动

本例中部署 DolphinDB 的服务器操作系统为 Centos。

step1:在安装 PostgreSQL 的 ODBC 驱动前,需先安装 unixODBC 库,使用以下命令安装

Centos 系统:

# 安装 unixODBC 库
yum install unixODBC unixODBC-devel

如果是 Ubuntu 系统,使用以下命令安装

# 安装 unixODBC 库
apt-get install unixodbc unixodbc-dev

注意:安装完成后可用odbcinst -j命令查看安装配置文件所在的位置,有两个比较常用的配置,一个是 ODBC 驱动配置,默认在/etc/odbcinst.ini,另一个是系统数据源配置,默认在/etc/odbc.ini

step2:下载 PostgreSQL 的 ODBC 驱动并安装

以 Centos 系统为例,可以通过以下方式直接在 Linux 系统中直接安装 Postgresql-ODBC

yum install -y postgresql-odbc.x86_64

step3:配置 ODBC 配置文件

1)odbcinst.ini 文件用于设置 ODBC 的某个 Driver 要用到的 ODBC 驱动库的文件路径。 配置 odbcinst.ini 文件的内容(如果配置文件不存在,需手工创建) :

[PostgreSQL]
Description     = ODBC for PostgreSQL
Driver          = /usr/lib/psqlodbcw.so
Setup           = /usr/lib/libodbcpsqlS.so
Driver64        = /usr/lib64/psqlodbcw.so
Setup64         = /usr/lib64/libodbcpsqlS.so
FileUsage       = 1

2)/etc/odbc.ini 文件用于设置 ODBC 中所用到的 Driver 、所要使用的数据库等配置,更多配置项可参考 ODBC 连接字符串配置。其中的 Driver 为 /etc/odbcinst.ini 文件中配置的第一行方括号中的内容。添加内容如下(如果配置文件不存在,需手工创建):

[postgresql] 					# ODBC 数据源名称
Description = PostgresSQLODBC	# ODBC 的描述
Driver = PostgreSQL				# 驱动名称
Database = postgres				# 数据库名称
Servername = 127.0.0.1			# 布置了 Postgresql 数据库的服务器 IP 地址
UserName = postgres				# 数据库相关的用户名
Password = postgres				# 数据库密码
Port = 5432					    # 布置了 Postgresql 数据库的服务器的端口号
ReadOnly = 0					# 关闭只读特性
ConnSettings = set client_encoding to UTF8	# 客户端编码

step4:测试 ODBC 连接

使用 ODBC 登录 Postgresql 数据库,测试连接及预期结果如下:

isql postgresql postgres postgres  # 后两位分别是用户名和密码

/**********output********/
+---------------------------------------+
| Connected!                            |
|                                       |
| sql-statement                         |
| help [tablename]                      |
| quit                                  |
|                                       |
+---------------------------------------+

登录成功如下所示,可输入 SQL 操作数据库

SQL>

注意:如有其他常见问题,可参考 ODBC插件常见问题 中第5章注意事项和常见问题。

同步数据

step1:运行以下命令加载 ODBC 插件( ServerPath 请自行修改)

loadPlugin("ServerPath/plugins/odbc/PluginODBC.txt")

step2:运行以下命令建立与 Oracle 的连接 (第一个参数为 ODBC 连接字符串 connStr)

connstr 请参阅连接字符串参考自行修改。

conn = odbc::connect("Driver={PostgreSQL};Server=*;Port=5432;Database=postgres;Uid=postgres;Pwd=postgres;", `PostgreSQL)

step3:运行以下命令开始同步数据

def transForm(mutable msg){
	msg.replaceColumn!(`TradeQty, int(msg[`TradeQty]))
	msg.replaceColumn!(`BuyNo, int(msg[`BuyNo]))
	msg.replaceColumn!(`SellNo, int(msg[`SellNo]))
	msg.replaceColumn!(`ChannelNo, int(msg[`ChannelNo]))
	msg.replaceColumn!(`TradeIndex, int(msg[`TradeIndex]))
	msg.replaceColumn!(`BizIndex, int(msg[`BizIndex]))
	return msg
}

def syncData(conn, dbName, tbName, dt){
	sql = "select SecurityID, TradeTime, TradePrice, TradeQty, TradeAmount, BuyNo, SellNo, ChannelNo, TradeIndex, TradeBSFlag, BizIndex from ticksh"
	if(!isNull(dt)) {
		sql = sql + " where to_date(TradeTime,'yyyy-MM-dd hh24:mi:ss') = dt"
	}
    odbc::query(conn,sql, loadTable(dbName,tbName), 100000, transForm)
}

dbName="dfs://TSDB_tick"
tbName="tick"
syncData(conn, dbName, tbName, NULL)

数据共 27211975 条,同步数据耗时约 597 秒。

step4:后台多任务同步数据

本教程中 DolphinDB 的数据库按天分区,如果需要同步多天的数据,可以提交多个任务到后台:

for(dt in 2021.01.04..2021.01.05){
	submitJob(`syncPostgreTick, `syncPostgreTick, syncData, conn, dbName, tbName, dt)
}
// 查看后台任务
select * from getRecentJobs() where jobDesc = `syncPostgreTick

通过 DataX 迁移

部署 DataX

DataX 下载地址 下载 DataX 压缩包后,解压至自定义目录。执行 datax 自检如下:

cd datax/bin
python datax.py /opt/datax/job/job.json

可能出现报错,报错为 [您提供的配置文件存在错误信息... plugin.json ] 不存在:


该问题是因为在 readerwriter 目录下存在临时文件,影响了 datax 的运行,解决方法如下:

# 这里的/datax/plugin/writer/应修改为实际目录
find /datax/plugin/reader/ -type f -name "._*er" | xargs rm -rf
find /datax/plugin/writer/ -type f -name "._*er" | xargs rm -rf

自检成功后,将 DataX-DolphinDBWriter中源码的 ./dist/dolphindbwriter 目录下所有内容拷贝到 DataX/plugin/writer 目录下,即可使用。

执行 DataX 任务

step1:配置 json 文件

配置文件 pgddb.json 的具体内容如下,并将 json 文件置于自定义目录下,本教程中放置于 datax/job 目录下。

{
    "job": {
            "content": [{
                    "writer": {
                            "name": "dolphindbwriter",
                            "parameter": {
                                    "userId": "admin",
                                    "pwd": "123456",
                                    "host": "10.0.0.80",
                                    "port": 8848,
                                    "dbPath": "dfs://TSDB_tick",
                                    "tableName": "Tick",
                                    "table": [
                                        {
                                             "type": "DT_SYMBOL",
                                             "name": "SecurityID"
                                        },
                                        {
                                            "type": "DT_TIMESTAMP",
                                            "name": "TradeTime"
                                        },
                                        {
                                            "type": "DT_DOUBLE",
                                            "name": "TradePrice"
                                        },
                                        {
                                            "type": "DT_INT",
                                            "name": "TradeQty"
                                        },
                                        {
                                            "type": "DT_DOUBLE",
                                            "name": "TradeAmount"
                                        },
                                        {
                                            "type": "DT_INT",
                                            "name": "BuyNo"
                                        },
                                        {
                                            "type": "DT_INT",
                                            "name": "SellNo"
                                        },
                                        {
                                            "type": "DT_INT",
                                            "name": "TradeIndex"
                                        },
                                        {
                                            "type": "DT_INT",
                                            "name": "ChannelNo"
                                        },
                                        {
                                            "type": "DT_SYMBOL",
                                            "name": "TradeBSFlag"
                                        },
                                        {
                                            "type": "DT_INT",
                                            "name": "BizIndex"
                                        }
                                    ]                            
                            }
                    },
                    "reader": {
                            "name": "postgresqlreader",
                            "parameter": {
                                    "username": "postgres",
                                    "column": ["SecurityID", "TradeTime", "TradePrice", "TradeQty", "TradeAmount", "BuyNo", "SellNo", "ChannelNo", "TradeIndex", "TradeBSFlag", "BizIndex"],
                                    "connection": [{
                                            "table": ["ticksh"],
                                            "jdbcUrl": ["jdbc:postgresql:postgres"]
                                    }],
                                    "password": "postgres",
                                    "where": ""
                            }                            
                    }
            }],
            "setting": {
                    "speed": {
                            "channel": 1
                    }
            }
    }
}

step2:Linux 终端中执行以下命令以执行 DataX 任务

cd ./datax
python bin/datax.py --jvm=-Xmx8g job/ddbtopg.json

step3:查看 DataX 同步结果

任务启动时刻                    : 2023-08-29 14:19:53
任务结束时刻                    : 2023-08-29 14:26:33
任务总计耗时                    :                400s
任务平均流量                    :            4.08MB/s
记录写入速度                    :          68029rec/s
读出记录总数                    :            27211975
读写失败总数                    :                   0

基准性能

分别使用 ODBC 插件和 DataX 驱动进行数据迁移, 数据量 2721 万条,迁移耗时对比如下表所示:

ODBC 插件DataX
597.54s400s

综上,ODBC 插件与 DataX 均能实现将 PostgreSql 中数据迁移到 DolphinDB 中,但是各有优缺点:

  • ODBC 使用简单,适合定制数据的导入,但是运维管理不便。
  • DataX 导入数据需要编写复杂的导入配置,但是其扩展灵活,适合批量导入,方便监控,社区支持丰富。

用户可以根据自己数据量的大小以及工程化的便捷性选择合适的导入方式。