TableUpserter
Python API 提供 TableUpserter 对象,可以通过 upsert 方式向索引内存表、键值内存表以及分布式表中追加数据。与 TableAppender 对象类似,使用 TableUpserter 对象向表中添加本地的 DataFrame 数据,能够对时间类型进行自动转换,用户无须再进行额外的手动转换。
注: 1.30.19.4 及之后版本的 API 同时支持使用 TableUpserter 进行 UUID 等特殊类型的自动转换。
接口说明
TableUpserter(dbPath=None, tableName=None, ddbSession=None, ignoreNull=False, keyColNames=[], sortColumns=[])- dbPath :分布式数据库地址。若待写入表为内存表,可以不指定该参数。
- tableName :分布式表或索引内存表、键值内存表的表名。自 3.0.4.0 版本起,也支持指定 Orca 流表名称。
- ddbSession :已连接 DolphinDB 的 session 对象。
- ignoreNull :布尔值。若追加的新数据中某元素为 NULL 值,是否对目标表中的相应数据进行更新。默认值为 False。
- keyColNames :字符串列表。由于 DFS 表不包含键值列,在更新 DFS 表时,会将该参数指定的列视为键值列。
- sortColumns :字符串列表。设置该参数后,更新的分区内的所有数据会根据指定的列进行排序。排序在每个分区内部进行,不会跨分区排序。
该类仅支持upsert方法,接口如下:
upsert(table)- table:待写入数据,通常为 pandas.DataFrame 类型的本地数据。
示例 1
下例创建了一个以 id 列为 key 的共享键值内存表 keyed_t,然后构造 TableAppender 对象,调用其 upsert 方法向表 keyed_t 中添加数据。在构造的数据中,id 列在 0-9 之间循环,text 列则不断递增。最后查询写入的数据,仅保留每个 id 下最后一条写入的数据。
import dolphindb as ddb
import dolphindb.settings as keys
import numpy as np
import pandas as pd
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
script_KEYEDTABLE = """
testtable=keyedTable(`id,1000:0,`date`text`id,[DATETIME,STRING,LONG])
share testtable as keyed_t
"""
s.run(script_KEYEDTABLE)
upserter = ddb.TableUpserter(tableName="keyed_t", ddbSession=s)
dates=[]
texts=[]
ids=[]
for i in range(1000):
dates.append(np.datetime64('2012-06-13 13:30:10.008'))
texts.append(f"test_i_{i}")
ids.append(i%10)
df = pd.DataFrame({
'date': dates,
'text': texts,
'id': ids,
})
upserter.upsert(df)
keyed_t = s.run("keyed_t")
print(keyed_t)输出结果符合预期:
date text id
0 2012-06-13 13:30:10 test_i_990 0
1 2012-06-13 13:30:10 test_i_991 1
2 2012-06-13 13:30:10 test_i_992 2
3 2012-06-13 13:30:10 test_i_993 3
4 2012-06-13 13:30:10 test_i_994 4
5 2012-06-13 13:30:10 test_i_995 5
6 2012-06-13 13:30:10 test_i_996 6
7 2012-06-13 13:30:10 test_i_997 7
8 2012-06-13 13:30:10 test_i_998 8
9 2012-06-13 13:30:10 test_i_999 9示例 2
若要写入没有键值列的分区表或者内存表,则需要在构造 TableUpserter 时指定键值列。
下例中,首先定义一个 VALUE 分区方式的分区表 p_table,然后构造 TableUpserter 对象并指定 id 为键值列,调用其 upsert 方法向表 p_table 中添加数据。最后查询写入的数据。
import dolphindb as ddb
import dolphindb.settings as keys
import numpy as np
import pandas as pd
import datetime
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
script_DFS_VALUE = """
if(existsDatabase("dfs://valuedb")){
dropDatabase("dfs://valuedb")
}
db = database("dfs://valuedb", VALUE, 0..9)
t = table(1000:0, `date`text`id`flag, [DATETIME, STRING, LONG, INT])
p_table = db.createPartitionedTable(t, `pt, `flag)
"""
s.run(script_DFS_VALUE)
upserter = ddb.TableUpserter(dbPath="dfs://valuedb", tableName="pt", ddbSession=s, keyColNames=["id"])
for i in range(10):
dates = [np.datetime64(datetime.datetime.now()) for _ in range(100)]
texts = [f"test_{i}_{_}" for _ in range(100)]
ids = np.array([ _ % 10 for _ in range(100)], dtype="int32")
flags = [ _ % 10 for _ in range(100)]
df = pd.DataFrame({
'date': dates,
'text': texts,
'id': ids,
'flag': flags,
})
upserter.upsert(df)
p_table = s.run("select * from p_table")
print(p_table)输出结果如下所示:
date text id flag
0 2023-03-16 10:09:33 test_9_90 0 0
1 2023-03-16 10:09:26 test_0_10 0 0
2 2023-03-16 10:09:26 test_0_20 0 0
3 2023-03-16 10:09:26 test_0_30 0 0
4 2023-03-16 10:09:26 test_0_40 0 0
.. ... ... .. ...
95 2023-03-16 10:09:26 test_0_59 9 9
96 2023-03-16 10:09:26 test_0_69 9 9
97 2023-03-16 10:09:26 test_0_79 9 9
98 2023-03-16 10:09:26 test_0_89 9 9
99 2023-03-16 10:09:26 test_0_99 9 9
[100 rows x 4 columns]由上述结果可知,当键值列在某分区中值不唯一时,执行 upsert 时仅会覆盖分区中当前键值列下该键值对应的第一个数据。
注: TableUpserter 实际上调用了 DolphinDB 的 upsert! 函数,同时传入 pandas.DataFrame 作为参数以实现其功能。upsert! 函数的详细使用方式请参考 upsert!。
