多种写入方案
Python API 提供多种写入方案,可以适配不同场景的写入需求,下面将详细介绍各种写入方案之间的区别。
场景条件 | 建议使用 | 说明 |
---|---|---|
上传变量 | upload | 直接上传变量,适用于所有类型,类型转换更为自由 |
执行服务端函数时附带参数 | run | 作为参数上传,同样适用所有类型 |
面向对象地在 Python 端操作服务端数据库、数据表 | table 等相关方法 | 便捷地在 Python 端使用服务端数据 |
写入批量数据,且数据结构可以较为便捷地转换为 pandas.DataFrame | TableAppenderTableUpserterPartitionedTableAppender | 自动类型转换,不需要关心类型对应的问题,具体使用请参考本文第二节第四小节 |
写入流式数据 | MultithreadedTableWriter | 自动类型转换,将流式数据批量发送至服务端 |
API资源较为紧张,服务端资源较为充裕 | 异步模式 session | 将写入压力转移至服务端,通常不推荐该方式写入 |
upload, table 与 tableInsert
upload
session.upload 方法可以直接上传数据。这种方案适用于上传各种数据形式,例如 TABLE、DICTIONARY、VECTOR 等。在调用 upload 时,须指定数据上传后的变量名。代码示例如下:
>>> data = pd.DataFrame({· ... 'clong': [1, 2, 3], ... }) ... >>> s.upload({'data': data})
此外,在 upload 上传数据时,因为 DolphinDB 的数据类型和 Python 的原生数据类型、numpy 以及 pandas 的数据类型无法一一对应,因此会出现某些数据类型无法直接上传的情况,例如 UUID、MINUTE 等数据类型。1.30.22.1 及之后版本的 Python API 新增强制类型转换,可以在调用 upload 上传 pd.DataFrame 时,通过添加 __DolphinDB_Type__ 指定待上传列的类型。
table
session.table 方法可以传入一个本地数据对象,例如 pandas.DataFrame/dict/...,将该数据对象作为一个临时表上传到 DolphinDB,其生存周期由 Python API 进行维护。该种方法仅支持上传 TABLE 数据形式的对象。其内部的实现调用了 session.upload,因此也可以通过指定 __DolphinDB_Type__ 以实现强制类型转换。
但和 upload 方法稍有不同,upload 方法上传的变量需要手动析构上传变量的生存周期,若处理不当可能会导致 session 占用内存过大。而 table 方法返回一个 Python API 定义的 Table 类对象,在析构时会同时析构 DolphinDB 服务端的该临时表,不需要手动维护生存周期。
代码示例如下:
>>> data = pd.DataFrame({ ... 'clong': [1, 2, 3], ... }) ... >>> tb = s.table(data=data) >>> tb <dolphindb.table.Table object at 0x7faf42e67a00>
tableInsert
不同于前两种方法,tableInsert 并非 Python API 提供的方法,而是通过 run 方法上传参数的功能以实现写入的方式。
- 从数据序列化的角度来看,该方法和 upload, table 没有区别。
- 从使用的角度来说,在某些不需要指定上传变量名的流程中,将数据作为 run 方法的参数上传更为简单直接。例如在写入表时,无需先上传临时表到服务端,然后调用 tableInsert 写入,而是直接作为 tableInsert 的参数上传并写入,以此可简化流程。
代码示例如下:
>>> data = pd.DataFrame({ ... 'clong': [1, 2, 3], ... }) ... >>> s.run("t = table(100:0, [`clong], [LONG])") >>> s.run("tableInsert{t}", data) 3
此外,如果代码写入部分涉及到访问权限问题,或者写入时有较长步骤,则用户可以将这些内容封装为 functionview,再将需要上传的内容作为 functionview 的参数上传至服务端。
参考链接:
upload, table 与 tableInsert 的对比
这三种方式本质上都是同一种写入流程,即先判断待上传变量的数据形式、类型,类型转换后作为函数的参数/变量上传至服务端。
方法 | 实现原理 | 适用范围 |
---|---|---|
upload | 直接上传 | 各种数据形式 |
table | 封装 upload 方法 | Table 数据形式对象 |
run | 通过 run 方法上传参数的功能 | 任何需要传入参数的服务器函数、函数视图 |
TableAppender, TableUpserter 与 PartitionedTableAppender
TableAppender
TableAppender 的内部实现等价于 run("tableInsert{tableName}", data)
。和直接调用不同的是,TableAppender 在构造时通过获得待写入表的列类型,能够根据列类型实现自动类型转换。
代码示例:
>>> s.run("t = table(100:0, `csymbol`cvalue, [SYMBOL, LONG])") >>> tbAppender = ddb.TableAppender(tableName="t", ddbSession=s) >>> data = pd.DataFrame({ ... 'csymbol': ["aaa", "bbb", "aaa"], ... 'cvalue': [1, 2, 3], ... }) ... >>> tbAppender.append(data) 3
TableUpserter
TableUpserter 同样会在构造时获取待更新表的列类型,再根据列类型实现自动类型转换。其内部实现等价于 upsert! 方法,故在构造 TableUpserter 时需指定键值列。
代码示例:
>>> s.run("t = keyedTable(`csymbol, 100:0, `csymbol`cvalue, [SYMBOL, LONG])") >>> tbUpserter = ddb.TableUpserter(tableName="t", ddbSession=s, keyColNames=["csymbol"]) >>> data = pd.DataFrame({ ... 'csymbol': ["aaa", "bbb", "aaa"], ... 'cvalue': [1, 2, 3], ... }) ... >>> tbUpserter.upsert(data)
参考:upsert!
PartitionedTableAppender
不同于前两种都是基于 session 写入数据的方法,PartitionedTableAppender 需要在构造时传入 DBConnectionPool 对象,进而将数据并发地写入分区表中。同样的,PartitionedTableAppender 也支持写入数据时的自动类型转换。
代码示例:
>>> if s.existsDatabase("dfs://test"): ... s.dropDatabase("dfs://test") ... >>> db = s.database(dbPath="dfs://test", partitionType=keys.VALUE, partitions=[1, 2, 3]) >>> s.run("schema_table = table(100:0, `cindex`cvalue, [INT, DOUBLE]);") >>> schema_table = s.table(data="schema_table") >>> tb = db.createPartitionedTable(table=schema_table, tableName="pt", partitionColumns="cindex") >>> pool = ddb.DBConnectionPool("localhost", 8848, 3, "admin", "123456") >>> ptableAppender = ddb.PartitionedTableAppender(dbPath="dfs://test", tableName="pt", partitionColName="cindex", dbConnectionPool=pool) >>> data = pd.DataFrame({ ... 'cindex': [1, 2, 3, 4, 5], ... 'cvalue': [1.1, 2.2, 3.3, 4.4, 5.5] ... }) ... >>> ptableAppender.append(data) 5
TableAppender, TableUpserter 与 PartitionedTableAppender 的对比
这三种方式都能够将 pandas.DataFrame 形式的数据自动类型转换后写入到指定表中,但是三者的适用场景有一定区别。
方法 | 实现原理 | 适用范围 |
---|---|---|
TableAppender | 内部实现等价于 run("tableInsert{tableName}", data) | 所有表的写入 |
TableUpserter | 内部实现等价于 upsert! 方法 | 键值表、索引表、分区表的更新写入 |
PartitionedTableAppender | 在构造时传入 DBConnectionPool 对象,再将数据并发地写入分区表中 | 分区表等支持同时写入的表 |
MTW, BTW 与 Async tableInsert
MultithreadedTableWriter
MTW 在后台启用多个 C++ 线程,异步地进行数据的类型转换和上传写入。对于每个表,都需要构造一个对应的 MTW 对象进行写入。在写入时,前台调用 insert 后,并不立刻将数据进行转换,而是先将数据放入待转换队列,等待转换线程将数据转换完毕后放入写入队列。最后由多个写入队列向服务端写入数据。
代码示例:
>>> if s.existsDatabase("dfs://test"): ... s.dropDatabase("dfs://test") >>> db = s.database(dbPath="dfs://test", partitionType=keys.VALUE, partitions=[1, 2, 3]) >>> s.run("schema_table = table(100:0, `cindex`cvalue, [INT, DOUBLE])") >>> schema_table = s.table(data="schema_table") >>> pt = db.createPartitionedTable(table=schema_table, tableName="pt", partitionColumns="cindex") >>> writer = ddb.MultithreadedTableWriter("localhost", 8848, "admin", "123456", dbPath="dfs://test", tableName="pt", threadCount=1) >>> for i in range(100): ... writer.insert(i, i*1.1) >>> writer.waitForThreadCompletion() >>> res = writer.getStatus() >>> if res.succeed(): ... print("Data successfully written.") ... Data successfully written.
BatchTableWriter
BTW 仅为每张表创建一个写入线程,不同于 MTW,BTW 在 insert 时进行类型转换,总体性能较差。
注意: 目前已经停止维护 BTW。
Async tableInsert
和 tableInsert 方法类似,Async tableInsert 并非 API 提供的方法,而是在异步模式 session 中调用 run 方法,将待上传数据作为参数上传的一种方式。参考 session 异步提交,该方法的工作原理是 session 的异步模式执行脚本时,仅需将脚本发送至服务端,方法立刻返回,而无需等待脚本执行完毕再返回。
代码示例:
>>> s = ddb.session(enableASYNC=True) >>> s.connect("localhost", 8848, "admin", "123456") >>> s.run("t = table(100:0, `cindex`cvalue, [INT, DOUBLE]);") >>> data = pd.DataFrame({ ... 'cindex': [1, 2, 3, 4, 5], ... 'cvalue': [1.1, 2.2, 3.3, 4.4, 5.5] ... }) ... >>> for i in range(100): ... s.run("tableInsert{t}", data) ...
MTW, BTW 与 Async tableInsert 的对比
这三种写入方式都是异步写入,工作原理上稍有不同。
方法名 | 实现原理 | 适用范围 | 优点 | 缺点 |
---|---|---|---|---|
MTW | 采用后台 C++ 写入线程的处理方式,提供 batchSize, throttle 参数用于指定批数据处理的粒度和等待时间 | 适用于流式数据场景 | 根据列类型自动转换,将流式数据批量发送至服务端,减少网络影响 | 受制于 Python 本身的全局解释器锁,MTW 在类型转换时难以利用多线程提速。 |
BTW | 采用后台 C++ 写入线程的处理方式,每隔 100 ms 将待写入数据发送至服务端 | 适用于流式数据场景 | 批量写入流式数据,减少网络影响 | 无法根据待写入表的列类型进行自动类型转换 |
Async tableInsert | 本质上利用了 session 的异步模式,和 run 方法传入参数 | 适用于网络带宽资源紧张的情况 | 有效降低网络占用和等待时间 | 将写入压力转移至服务端,可能会造成服务端资源占用过多。 如果单次写入仅为一条数据,非批量数据,则可能会占用大量服务器资源。 |
- 在写入阶段,后台多个 C++ 工作线程可以有效进行数据分流和批量上传,降低网络状况带来的影响。
- 当前版本,BTW 已经停止维护,MTW 基本可以替代所有 BTW 的写入场景。