TableUpserter

Python API 提供 TableUpserter 对象,可以通过 upsert 方式向索引内存表、键值内存表以及分布式表中追加数据。与 TableAppender 对象类似,使用 TableUpserter 对象向表中添加本地的 DataFrame 数据,能够对时间类型进行自动转换,用户无须再进行额外的手动转换。

注: 1.30.19.4 及之后版本的 API 同时支持使用 TableUpserter 进行 UUID 等特殊类型的自动转换。

接口说明

TableUpserter(dbPath=None, tableName=None, ddbSession=None, ignoreNull=False, keyColNames=[], sortColumns=[])
  • dbPath :分布式数据库地址。若待写入表为内存表,可以不指定该参数。
  • tableName :分布式表或索引内存表、键值内存表的表名。
  • ddbSession :已连接 DolphinDB 的 session 对象。
  • ignoreNull :布尔值。若追加的新数据中某元素为 NULL 值,是否对目标表中的相应数据进行更新。默认值为 False。
  • keyColNames :字符串列表。由于 DFS 表不包含键值列,在更新 DFS 表时,会将该参数指定的列视为键值列。
  • sortColumns :字符串列表。设置该参数后,更新的分区内的所有数据会根据指定的列进行排序。排序在每个分区内部进行,不会跨分区排序。

该类仅支持upsert方法,接口如下:

upsert(table)
  • table:待写入数据,通常为 pandas.DataFrame 类型的本地数据。

示例 1

下例创建了一个以 id 列为 key 的共享键值内存表 keyed_t,然后构造 TableAppender 对象,调用其 upsert 方法向表 keyed_t 中添加数据。在构造的数据中,id 列在 0-9 之间循环,text 列则不断递增。最后查询写入的数据,仅保留每个 id 下最后一条写入的数据。

import dolphindb as ddb
import dolphindb.settings as keys
import numpy as np
import pandas as pd
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")


script_KEYEDTABLE = """
    testtable=keyedTable(`id,1000:0,`date`text`id,[DATETIME,STRING,LONG])
    share testtable as keyed_t
    """
s.run(script_KEYEDTABLE)
upserter = ddb.TableUpserter(tableName="keyed_t", ddbSession=s)
dates=[]
texts=[]
ids=[]
for i in range(1000):
    dates.append(np.datetime64('2012-06-13 13:30:10.008'))
    texts.append(f"test_i_{i}")
    ids.append(i%10)
df = pd.DataFrame({
    'date': dates,
    'text': texts,
    'id': ids,
})
upserter.upsert(df)
keyed_t = s.run("keyed_t")
print(keyed_t)

输出结果符合预期:

                 date        text  id
0 2012-06-13 13:30:10  test_i_990   0
1 2012-06-13 13:30:10  test_i_991   1
2 2012-06-13 13:30:10  test_i_992   2
3 2012-06-13 13:30:10  test_i_993   3
4 2012-06-13 13:30:10  test_i_994   4
5 2012-06-13 13:30:10  test_i_995   5
6 2012-06-13 13:30:10  test_i_996   6
7 2012-06-13 13:30:10  test_i_997   7
8 2012-06-13 13:30:10  test_i_998   8
9 2012-06-13 13:30:10  test_i_999   9

示例 2

若要写入没有键值列的分区表或者内存表,则需要在构造 TableUpserter 时指定键值列。

下例中,首先定义一个 VALUE 分区方式的分区表 p_table,然后构造 TableUpserter 对象并指定 id 为键值列,调用其 upsert 方法向表 p_table 中添加数据。最后查询写入的数据。

import dolphindb as ddb
import dolphindb.settings as keys
import numpy as np
import pandas as pd
import datetime

s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
script_DFS_VALUE = """
    if(existsDatabase("dfs://valuedb")){
        dropDatabase("dfs://valuedb")
    }
    db = database("dfs://valuedb", VALUE, 0..9)
    t = table(1000:0, `date`text`id`flag, [DATETIME, STRING, LONG, INT])
    p_table = db.createPartitionedTable(t, `pt, `flag)
"""
s.run(script_DFS_VALUE)
upserter = ddb.TableUpserter(dbPath="dfs://valuedb", tableName="pt", ddbSession=s, keyColNames=["id"])

for i in range(10):
    dates = [np.datetime64(datetime.datetime.now()) for _ in range(100)]
    texts = [f"test_{i}_{_}" for _ in range(100)]
    ids = np.array([ _ % 10 for _ in range(100)], dtype="int32")
    flags = [ _ % 10 for _ in range(100)]
    df = pd.DataFrame({
        'date': dates,
        'text': texts,
        'id': ids,
        'flag': flags,
    })
    upserter.upsert(df)

p_table = s.run("select * from p_table")
print(p_table)

输出结果如下所示:

                  date       text  id  flag
0  2023-03-16 10:09:33  test_9_90   0     0
1  2023-03-16 10:09:26  test_0_10   0     0
2  2023-03-16 10:09:26  test_0_20   0     0
3  2023-03-16 10:09:26  test_0_30   0     0
4  2023-03-16 10:09:26  test_0_40   0     0
..                 ...        ...  ..   ...
95 2023-03-16 10:09:26  test_0_59   9     9
96 2023-03-16 10:09:26  test_0_69   9     9
97 2023-03-16 10:09:26  test_0_79   9     9
98 2023-03-16 10:09:26  test_0_89   9     9
99 2023-03-16 10:09:26  test_0_99   9     9

[100 rows x 4 columns]

由上述结果可知,当键值列在某分区中值不唯一时,执行 upsert 时仅会覆盖分区中当前键值列下该键值对应的第一个数据。

注: TableUpserter 实际上调用了 DolphinDB 的 upsert! 函数,同时传入 pandas.DataFrame 作为参数以实现其功能。upsert! 函数的详细使用方式请参考 upsert!