TableUpserter
Python API 提供 TableUpserter 对象,可以通过 upsert 方式向索引内存表、键值内存表以及分布式表中追加数据。与 TableAppender 对象类似,使用 TableUpserter 对象向表中添加本地的 DataFrame 数据,能够对时间类型进行自动转换,用户无须再进行额外的手动转换。
注: 1.30.19.4 及之后版本的 API 同时支持使用 TableUpserter 进行 UUID 等特殊类型的自动转换。
接口说明
TableUpserter(dbPath=None, tableName=None, ddbSession=None, ignoreNull=False, keyColNames=[], sortColumns=[])
- dbPath :分布式数据库地址。若待写入表为内存表,可以不指定该参数。
- tableName :分布式表或索引内存表、键值内存表的表名。
- ddbSession :已连接 DolphinDB 的 session 对象。
- ignoreNull :布尔值。若追加的新数据中某元素为 NULL 值,是否对目标表中的相应数据进行更新。默认值为 False。
- keyColNames :字符串列表。由于 DFS 表不包含键值列,在更新 DFS 表时,会将该参数指定的列视为键值列。
- sortColumns :字符串列表。设置该参数后,更新的分区内的所有数据会根据指定的列进行排序。排序在每个分区内部进行,不会跨分区排序。
该类仅支持upsert
方法,接口如下:
upsert(table)
- table:待写入数据,通常为 pandas.DataFrame 类型的本地数据。
示例 1
下例创建了一个以 id 列为 key 的共享键值内存表 keyed_t,然后构造 TableAppender 对象,调用其 upsert
方法向表 keyed_t 中添加数据。在构造的数据中,id 列在 0-9 之间循环,text 列则不断递增。最后查询写入的数据,仅保留每个 id 下最后一条写入的数据。
import dolphindb as ddb import dolphindb.settings as keys import numpy as np import pandas as pd s = ddb.session() s.connect("localhost", 8848, "admin", "123456") script_KEYEDTABLE = """ testtable=keyedTable(`id,1000:0,`date`text`id,[DATETIME,STRING,LONG]) share testtable as keyed_t """ s.run(script_KEYEDTABLE) upserter = ddb.TableUpserter(tableName="keyed_t", ddbSession=s) dates=[] texts=[] ids=[] for i in range(1000): dates.append(np.datetime64('2012-06-13 13:30:10.008')) texts.append(f"test_i_{i}") ids.append(i%10) df = pd.DataFrame({ 'date': dates, 'text': texts, 'id': ids, }) upserter.upsert(df) keyed_t = s.run("keyed_t") print(keyed_t)
输出结果符合预期:
date text id
0 2012-06-13 13:30:10 test_i_990 0
1 2012-06-13 13:30:10 test_i_991 1
2 2012-06-13 13:30:10 test_i_992 2
3 2012-06-13 13:30:10 test_i_993 3
4 2012-06-13 13:30:10 test_i_994 4
5 2012-06-13 13:30:10 test_i_995 5
6 2012-06-13 13:30:10 test_i_996 6
7 2012-06-13 13:30:10 test_i_997 7
8 2012-06-13 13:30:10 test_i_998 8
9 2012-06-13 13:30:10 test_i_999 9
示例 2
若要写入没有键值列的分区表或者内存表,则需要在构造 TableUpserter 时指定键值列。
下例中,首先定义一个 VALUE 分区方式的分区表 p_table,然后构造 TableUpserter 对象并指定 id 为键值列,调用其 upsert 方法向表 p_table 中添加数据。最后查询写入的数据。
import dolphindb as ddb import dolphindb.settings as keys import numpy as np import pandas as pd import datetime s = ddb.session() s.connect("localhost", 8848, "admin", "123456") script_DFS_VALUE = """ if(existsDatabase("dfs://valuedb")){ dropDatabase("dfs://valuedb") } db = database("dfs://valuedb", VALUE, 0..9) t = table(1000:0, `date`text`id`flag, [DATETIME, STRING, LONG, INT]) p_table = db.createPartitionedTable(t, `pt, `flag) """ s.run(script_DFS_VALUE) upserter = ddb.TableUpserter(dbPath="dfs://valuedb", tableName="pt", ddbSession=s, keyColNames=["id"]) for i in range(10): dates = [np.datetime64(datetime.datetime.now()) for _ in range(100)] texts = [f"test_{i}_{_}" for _ in range(100)] ids = np.array([ _ % 10 for _ in range(100)], dtype="int32") flags = [ _ % 10 for _ in range(100)] df = pd.DataFrame({ 'date': dates, 'text': texts, 'id': ids, 'flag': flags, }) upserter.upsert(df) p_table = s.run("select * from p_table") print(p_table)
输出结果如下所示:
date text id flag
0 2023-03-16 10:09:33 test_9_90 0 0
1 2023-03-16 10:09:26 test_0_10 0 0
2 2023-03-16 10:09:26 test_0_20 0 0
3 2023-03-16 10:09:26 test_0_30 0 0
4 2023-03-16 10:09:26 test_0_40 0 0
.. ... ... .. ...
95 2023-03-16 10:09:26 test_0_59 9 9
96 2023-03-16 10:09:26 test_0_69 9 9
97 2023-03-16 10:09:26 test_0_79 9 9
98 2023-03-16 10:09:26 test_0_89 9 9
99 2023-03-16 10:09:26 test_0_99 9 9
[100 rows x 4 columns]
由上述结果可知,当键值列在某分区中值不唯一时,执行 upsert
时仅会覆盖分区中当前键值列下该键值对应的第一个数据。
注: TableUpserter 实际上调用了 DolphinDB 的 upsert!
函数,同时传入 pandas.DataFrame 作为参数以实现其功能。upsert!
函数的详细使用方式请参考 upsert!。