PartitionedTableAppender
与 TableAppender 对象类似,使用 PartitionedTableAppender 对象向表中追加时间类型数据时,能够实现对时间类型数据的自动转换和多线程并行写入。其基本原理是在构造时接收一个 DBConnectionPool 对象作为参数,再调用 append 方法,将数据按照分区拆分后传入连接池以实现并发追加。需注意,一个分区在同一时间只能由一个连接写入。
接口说明
PartitionedTableAppender(dbPath=None, tableName=None, partitionColName=None, dbConnectionPool=None)- dbPath :分布式数据库名字。如为内存表则不需要指定。
- tableName :分区表表名。自 3.0.4.0 版本起,也支持指定 Orca 流表名称。
- partitionColName :字符串,表示分区字段。
- dbConnectionPool :DBConnectionPool,连接池对象。
注: 指定参数 partitionColName 时,如果分区表中仅包含一个分区列,则该参数必须指定为该分区列;如果存在多个分区,则该参数可指定为任意分区列。在指定该参数后,API 将根据分区字段进行数据分配。
该类仅支持 append 方法,接口如下:
append(table)- table:待写入数据,通常为 pandas.DataFrame 类型的本地数据。
示例
下例创建了一个分布式数据库 dfs://valuedb 和一个分布式表 pt,同时创建了连接池 pool 并传入 PartitionedTableAppender,然后使用 append 方法向分布式表并发写入本地数据。最后查询写入的数据。
import dolphindb as ddb
import pandas as pd
import numpy as np
import random
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
script = """
dbPath = "dfs://valuedb"
if(existsDatabase(dbPath)){
dropDatabase(dbPath)
}
t = table(100:0, `id`date`vol, [SYMBOL, DATE, LONG])
db = database(dbPath, VALUE, `APPL`IBM`AMZN)
pt = db.createPartitionedTable(t, `pt, `id)
"""
s.run(script)
pool = ddb.DBConnectionPool("localhost", 8848, 3, "admin", "123456")
appender = ddb.PartitionedTableAppender(dbPath="dfs://valuedb", tableName="pt", partitionColName="id", dbConnectionPool=pool)
n = 100
dates = []
for i in range(n):
dates.append(np.datetime64(
"201{:d}-0{:1d}-{:2d}".format(random.randint(0, 9), random.randint(1, 9), random.randint(10, 28))))
data = pd.DataFrame({
"id": np.random.choice(['AMZN', 'IBM', 'APPL'], n),
"time": dates,
"vol": np.random.randint(100, size=n)
})
re = appender.append(data)
print(re)
print(s.run("pt = loadTable('dfs://valuedb', 'pt'); select * from pt;"))输出结果如下:
100
id date vol
0 AMZN 2017-01-22 60
1 AMZN 2014-08-12 37
2 AMZN 2012-09-10 68
3 AMZN 2012-03-14 48
4 AMZN 2016-07-12 1
.. ... ... ...
95 IBM 2016-05-15 25
96 IBM 2012-06-19 6
97 IBM 2010-05-10 96
98 IBM 2017-07-10 32
99 IBM 2012-09-23 68
[100 rows x 3 columns]