PartitionedTableAppender
与 TableAppender 对象类似,使用 PartitionedTableAppender 对象向表中追加时间类型数据时,能够实现对时间类型数据的自动转换和多线程并行写入。其基本原理是在构造时接收一个 DBConnectionPool 对象作为参数,再调用 append 方法,将数据按照分区拆分后传入连接池以实现并发追加。需注意,一个分区在同一时间只能由一个连接写入。
接口说明
PartitionedTableAppender(dbPath=None, tableName=None, partitionColName=None, dbConnectionPool=None)
- dbPath :分布式数据库名字。如为内存表则不需要指定。
- tableName :分区表表名。
- partitionColName :字符串,表示分区字段。
- dbConnectionPool :DBConnectionPool,连接池对象。
注: 指定参数 partitionColName 时,如果分区表中仅包含一个分区列,则该参数必须指定为该分区列;如果存在多个分区,则该参数可指定为任意分区列。在指定该参数后,API 将根据分区字段进行数据分配。
该类仅支持 append
方法,接口如下:
append(table)
- table:待写入数据,通常为 pandas.DataFrame 类型的本地数据。
示例
下例创建了一个分布式数据库 dfs://valuedb 和一个分布式表 pt,同时创建了连接池 pool 并传入 PartitionedTableAppender,然后使用 append 方法向分布式表并发写入本地数据。最后查询写入的数据。
import dolphindb as ddb import pandas as pd import numpy as np import random s = ddb.session() s.connect("localhost", 8848, "admin", "123456") script = """ dbPath = "dfs://valuedb" if(existsDatabase(dbPath)){ dropDatabase(dbPath) } t = table(100:0, `id`date`vol, [SYMBOL, DATE, LONG]) db = database(dbPath, VALUE, `APPL`IBM`AMZN) pt = db.createPartitionedTable(t, `pt, `id) """ s.run(script) pool = ddb.DBConnectionPool("localhost", 8848, 3, "admin", "123456") appender = ddb.PartitionedTableAppender(dbPath="dfs://valuedb", tableName="pt", partitionColName="id", dbConnectionPool=pool) n = 100 dates = [] for i in range(n): dates.append(np.datetime64( "201{:d}-0{:1d}-{:2d}".format(random.randint(0, 9), random.randint(1, 9), random.randint(10, 28)))) data = pd.DataFrame({ "id": np.random.choice(['AMZN', 'IBM', 'APPL'], n), "time": dates, "vol": np.random.randint(100, size=n) }) re = appender.append(data) print(re) print(s.run("pt = loadTable('dfs://valuedb', 'pt'); select * from pt;"))
输出结果如下:
100
id date vol
0 AMZN 2017-01-22 60
1 AMZN 2014-08-12 37
2 AMZN 2012-09-10 68
3 AMZN 2012-03-14 48
4 AMZN 2016-07-12 1
.. ... ... ...
95 IBM 2016-05-15 25
96 IBM 2012-06-19 6
97 IBM 2010-05-10 96
98 IBM 2017-07-10 32
99 IBM 2012-09-23 68
[100 rows x 3 columns]