多种写入方案

Python API 提供多种写入方案,可以适配不同场景的写入需求,下面将详细介绍各种写入方案之间的区别。

场景条件建议使用说明
上传变量upload直接上传变量,适用于所有类型,类型转换更为自由
执行服务端函数时附带参数run作为参数上传,同样适用所有类型
面向对象地在 Python 端操作服务端数据库、数据表table 等相关方法便捷地在 Python 端使用服务端数据
写入批量数据,且数据结构可以较为便捷地转换为 pandas.DataFrameTableAppenderTableUpserterPartitionedTableAppender自动类型转换,不需要关心类型对应的问题,具体使用请参考本文第二节第四小节
写入流式数据MultithreadedTableWriter自动类型转换,将流式数据批量发送至服务端
API资源较为紧张,服务端资源较为充裕异步模式 session将写入压力转移至服务端,通常不推荐该方式写入

upload, table 与 tableInsert

upload

session.upload 方法可以直接上传数据。这种方案适用于上传各种数据形式,例如 TABLE、DICTIONARY、VECTOR 等。在调用 upload 时,须指定数据上传后的变量名。代码示例如下:

>>> data = pd.DataFrame({·
...     'clong': [1, 2, 3],
... })
...
>>> s.upload({'data': data})

此外,在 upload 上传数据时,因为 DolphinDB 的数据类型和 Python 的原生数据类型、numpy 以及 pandas 的数据类型无法一一对应,因此会出现某些数据类型无法直接上传的情况,例如 UUID、MINUTE 等数据类型。1.30.22.1 及之后版本的 Python API 新增强制类型转换,可以在调用 upload 上传 pd.DataFrame 时,通过添加 __DolphinDB_Type__ 指定待上传列的类型。

table

session.table 方法可以传入一个本地数据对象,例如 pandas.DataFrame/dict/...,将该数据对象作为一个临时表上传到 DolphinDB,其生存周期由 Python API 进行维护。该种方法仅支持上传 TABLE 数据形式的对象。其内部的实现调用了 session.upload,因此也可以通过指定 __DolphinDB_Type__ 以实现强制类型转换。

但和 upload 方法稍有不同,upload 方法上传的变量需要手动析构上传变量的生存周期,若处理不当可能会导致 session 占用内存过大。而 table 方法返回一个 Python API 定义的 Table 类对象,在析构时会同时析构 DolphinDB 服务端的该临时表,不需要手动维护生存周期。

代码示例如下:

>>> data = pd.DataFrame({
...     'clong': [1, 2, 3],
... })
...
>>> tb = s.table(data=data)
>>> tb
<dolphindb.table.Table object at 0x7faf42e67a00>

tableInsert

不同于前两种方法,tableInsert 并非 Python API 提供的方法,而是通过 run 方法上传参数的功能以实现写入的方式。

  • 从数据序列化的角度来看,该方法和 upload, table 没有区别。
  • 从使用的角度来说,在某些不需要指定上传变量名的流程中,将数据作为 run 方法的参数上传更为简单直接。例如在写入表时,无需先上传临时表到服务端,然后调用 tableInsert 写入,而是直接作为 tableInsert 的参数上传并写入,以此可简化流程。

代码示例如下:

>>> data = pd.DataFrame({
...     'clong': [1, 2, 3],
... })
...
>>> s.run("t = table(100:0, [`clong], [LONG])")
>>> s.run("tableInsert{t}", data)
3

此外,如果代码写入部分涉及到访问权限问题,或者写入时有较长步骤,则用户可以将这些内容封装为 functionview,再将需要上传的内容作为 functionview 的参数上传至服务端。

参考链接:

upload, table 与 tableInsert 的对比

这三种方式本质上都是同一种写入流程,即先判断待上传变量的数据形式、类型,类型转换后作为函数的参数/变量上传至服务端。

方法实现原理适用范围
upload直接上传各种数据形式
table封装 upload 方法Table 数据形式对象
run通过 run 方法上传参数的功能任何需要传入参数的服务器函数、函数视图

TableAppender, TableUpserter 与 PartitionedTableAppender

TableAppender

TableAppender 的内部实现等价于 run("tableInsert{tableName}", data)。和直接调用不同的是,TableAppender 在构造时通过获得待写入表的列类型,能够根据列类型实现自动类型转换。

代码示例:

>>> s.run("t = table(100:0, `csymbol`cvalue, [SYMBOL, LONG])")
>>> tbAppender = ddb.TableAppender(tableName="t", ddbSession=s)
>>> data = pd.DataFrame({
...     'csymbol': ["aaa", "bbb", "aaa"],
...     'cvalue': [1, 2, 3],
... })
...
>>> tbAppender.append(data)
3

TableUpserter

TableUpserter 同样会在构造时获取待更新表的列类型,再根据列类型实现自动类型转换。其内部实现等价于 upsert! 方法,故在构造 TableUpserter 时需指定键值列。

代码示例:

>>> s.run("t = keyedTable(`csymbol, 100:0, `csymbol`cvalue, [SYMBOL, LONG])")
>>> tbUpserter = ddb.TableUpserter(tableName="t", ddbSession=s, keyColNames=["csymbol"])
>>> data = pd.DataFrame({
...     'csymbol': ["aaa", "bbb", "aaa"],
...     'cvalue': [1, 2, 3],
... })
...
>>> tbUpserter.upsert(data)

参考:upsert!

PartitionedTableAppender

不同于前两种都是基于 session 写入数据的方法,PartitionedTableAppender 需要在构造时传入 DBConnectionPool 对象,进而将数据并发地写入分区表中。同样的,PartitionedTableAppender 也支持写入数据时的自动类型转换。

代码示例:

>>> if s.existsDatabase("dfs://test"):
...     s.dropDatabase("dfs://test")
...
>>> db = s.database(dbPath="dfs://test", partitionType=keys.VALUE, partitions=[1, 2, 3])
>>> s.run("schema_table = table(100:0, `cindex`cvalue, [INT, DOUBLE]);")
>>> schema_table = s.table(data="schema_table")
>>> tb = db.createPartitionedTable(table=schema_table, tableName="pt", partitionColumns="cindex")
>>> pool = ddb.DBConnectionPool("localhost", 8848, 3, "admin", "123456")
>>> ptableAppender = ddb.PartitionedTableAppender(dbPath="dfs://test", tableName="pt", partitionColName="cindex", dbConnectionPool=pool)
>>> data = pd.DataFrame({
...     'cindex': [1, 2, 3, 4, 5],
...     'cvalue': [1.1, 2.2, 3.3, 4.4, 5.5]
... })
...
>>> ptableAppender.append(data)
5

TableAppender, TableUpserter 与 PartitionedTableAppender 的对比

这三种方式都能够将 pandas.DataFrame 形式的数据自动类型转换后写入到指定表中,但是三者的适用场景有一定区别。

方法实现原理适用范围
TableAppender内部实现等价于 run("tableInsert{tableName}", data)所有表的写入
TableUpserter内部实现等价于 upsert! 方法键值表、索引表、分区表的更新写入
PartitionedTableAppender在构造时传入 DBConnectionPool 对象,再将数据并发地写入分区表中分区表等支持同时写入的表

MTW, BTW 与 Async tableInsert

MultithreadedTableWriter

MTW 在后台启用多个 C++ 线程,异步地进行数据的类型转换和上传写入。对于每个表,都需要构造一个对应的 MTW 对象进行写入。在写入时,前台调用 insert 后,并不立刻将数据进行转换,而是先将数据放入待转换队列,等待转换线程将数据转换完毕后放入写入队列。最后由多个写入队列向服务端写入数据。

代码示例:

>>> if s.existsDatabase("dfs://test"):
...     s.dropDatabase("dfs://test")
>>> db = s.database(dbPath="dfs://test", partitionType=keys.VALUE, partitions=[1, 2, 3])
>>> s.run("schema_table = table(100:0, `cindex`cvalue, [INT, DOUBLE])")
>>> schema_table = s.table(data="schema_table")
>>> pt = db.createPartitionedTable(table=schema_table, tableName="pt", partitionColumns="cindex")
>>> writer = ddb.MultithreadedTableWriter("localhost", 8848, "admin", "123456", dbPath="dfs://test", tableName="pt", threadCount=1)
>>> for i in range(100):
...     writer.insert(i, i*1.1)
>>> writer.waitForThreadCompletion()
>>> res = writer.getStatus()
>>> if res.succeed():
...     print("Data successfully written.")
... 
Data successfully written.

BatchTableWriter

BTW 仅为每张表创建一个写入线程,不同于 MTW,BTW 在 insert 时进行类型转换,总体性能较差。

注意: 目前已经停止维护 BTW。

Async tableInsert

和 tableInsert 方法类似,Async tableInsert 并非 API 提供的方法,而是在异步模式 session 中调用 run 方法,将待上传数据作为参数上传的一种方式。参考session 异步提交,该方法的工作原理是 session 的异步模式执行脚本时,仅需将脚本发送至服务端,方法立刻返回,而无需等待脚本执行完毕再返回。

代码示例:

>>> s = ddb.session(enableASYNC=True)
>>> s.connect("localhost", 8848, "admin", "123456")
>>> s.run("t = table(100:0, `cindex`cvalue, [INT, DOUBLE]);")
>>> data = pd.DataFrame({
...     'cindex': [1, 2, 3, 4, 5],
...     'cvalue': [1.1, 2.2, 3.3, 4.4, 5.5]
... })
... 
>>> for i in range(100):
...     s.run("tableInsert{t}", data)
... 

MTW, BTW 与 Async tableInsert 的对比

这三种写入方式都是异步写入,工作原理上稍有不同。

方法名实现原理适用范围优点缺点
MTW采用后台 C++ 写入线程的处理方式,提供 batchSize, throttle 参数用于指定批数据处理的粒度和等待时间适用于流式数据场景根据列类型自动转换,将流式数据批量发送至服务端,减少网络影响受制于 Python 本身的全局解释器锁,MTW 在类型转换时难以利用多线程提速。
BTW采用后台 C++ 写入线程的处理方式,每隔 100 ms 将待写入数据发送至服务端适用于流式数据场景批量写入流式数据,减少网络影响无法根据待写入表的列类型进行自动类型转换
Async tableInsert本质上利用了 session 的异步模式,和 run 方法传入参数适用于网络带宽资源紧张的情况有效降低网络占用和等待时间将写入压力转移至服务端,可能会造成服务端资源占用过多。 如果单次写入仅为一条数据,非批量数据,则可能会占用大量服务器资源。
  • 在写入阶段,后台多个 C++ 工作线程可以有效进行数据分流和批量上传,降低网络状况带来的影响。
  • 当前版本,BTW 已经停止维护,MTW 基本可以替代所有 BTW 的写入场景。