BatchTableWriter
注意 :BatchTableWriter 现已不再维护,不推荐使用。
工作原理
BatchTableWriter 以下简称 BTW,用于批量写入单条数据。以下为简要的使用说明:
BTW 提供接口 addTable 方法,用于添加一个写入表,执行该方法后,将会创建一个写入队列和一个 C++ 工作线程。
写入数据时,需要调用 insert 方法,并指定写入表的信息和待写入的一行数据。将数据转换为 C++ 对象后放入后台写入队列中。
工作线程每隔 100 ms 取出写入队列中的所有数据,一次性将这一批数据全部写入 DolphinDB。
如果写入数据时(insert)发生类型转换错误或者列数不匹配等错误,会立刻抛出异常信息,该条数据不会放入写入队列中;如果在工作线程中将数据发送到 DolphinDB 的过程中发生错误,则会将当前写入队列中的数据和发送失败的数据保存起来,调用 getUnwrittenData 即可获得。
如果需要查看当前各表的写入状态,可以调用 getAllStatus 方法,该方法将返回一张表,表示各个待写入表的工作状态,包含写入队列长度、发送行数、是否已销毁、是否结束。也可以调用 getStatus,传入表名和数据库名,就可以获得指定表当前的写入状态。
不需要再往表中写数据时,可以调用 removeTable。该方法将停止工作线程,然后销毁相关的工作线程、连接、写入队列。
BatchTableWriter
BatchTableWriter(host, port, userid=None, password=None, acquireLock=True)
参数说明
- host :字符串,必填,表示所连接的服务器的 IP 地址。
- port :整数,必填,表示所连接的服务器的端口号。
- userName :字符串,可选,表示登录时的用户名。
- password :字符串,可选,表示登录时用户名对应的密码。
- acquireLock :布尔值,表示在使用过程中是否对 BTW 内部加锁。默认为 True,表示需要加锁。若在并发调用 API 的场景下,建议加锁。
构造示例
writer = ddb.BatchTableWriter("localhost", 8848, "admin", "123456", acquireLock=True)
addTable
writer.addTable(dbPath=None, tableName=None, partitioned=True)
功能说明
向 BTW 中添加一个待写入的表,如果是分区表则需要设置 partitioned=True
。
参数说明
- dbName :表示数据库名。若待写入表为磁盘表时,须填写该参数;若为内存表,则不需要填写该参数。
- tableName :表示数据表的表名。
- partitioned :表示添加的表是否为分区表。若设置该参数为 True,则表示为分区表。如果添加的表是磁盘未分区表,须设置
partitioned=False
。
注意:
- 如果添加的是内存表,则需要共享该表。
- 表名不可重复添加。如果重复添加,则需要先移除之前添加的表,否则会抛出异常。
insert
writer.insert(dbPath=None, tableName=None, *args)
功能说明
向指定表中插入单行数据。
参数说明
- dbPath :表示数据库名。若待写入表为磁盘表时,须填写该参数;若为内存表,则不需要填写该参数。
- tableName :表示数据表的表名。
- args :变长参数,表示插入的一行数据。
注意:
- 在调用 insert 前须先使用 addTable 添加表,否则会抛出异常。
- 变长参数的个数和数据类型必须与 insert 表的列数及类型匹配。
- 若在插入过程中出现异常进而导致后台线程退出,此时再次调用 insert 将会抛出异常。建议使用 getUnwrittenData 来获取之前所有已经写入缓冲队列但是没有成功写入服务器的数据(不包括本次 insert 的数据),然后使用 removeTable 释放资源。如果需要再次插入数据,则须重新调用 addTable。
- 在移除指定表的过程中调用 insert 仍能成功插入数据,但插入的这部分数据并不会发送到服务器。该操作属于未定义行为,不建议用户进行类似操作。
removeTable
writer.removeTable(dbPath=None, tableName=None)
功能说明
释放由 addTable 添加的表所占用的资源。
第一次调用该函数,若该函数返回即表示后台线程已退出。再次写入需要重新调用 addTable。
getUnwrittenData
data:pandas.DataFrame = writer.getUnwrittenData(dbPath=None, tableName=None)
功能说明
获取还未写入的数据。
当写入出现错误时,使用该函数可以获取剩余未写入的数据。但是这些未写入的数据不会尝试重写,若需要重新写入,则需要使用 removeTable 后重新调用 addTable,再通过 insert 写入数据。
getStatus
res:list = writer.getStatus(dbPath=None, tableName=None)
功能说明
获取当前的写入状态。详细信息可参考下方返回值说明。
返回值说明
返回值是由一个整型和两个布尔型组合的列表,分别表示当前写入队列的深度、当前表是否被移除(True: 表示正在被移除),以及后台写入线程是否因为出错而退出(True: 表示后台线程因出错而退出)。
getAllStatus
res:pandas.DataFrame = writer.getAllStatus()
功能说明
获取所有当前存在的表的信息,不包含被移除的表。
返回值说明
该函数的返回值是一个 2*6 的表格,包含数据库名(DatabaseName)、数据表名(TableName)、写入队列长度(WriteQueueDepth)、发送行数(sendedRows)、是否已销毁(Removing)和是否结束(Finished)。
返回值的示例如下表:
DatabaseName | TableName | WriteQueueDepth | sendedRows | Removing | Finished |
---|---|---|---|---|---|
0 | tglobal | 0 | 5 | False | False |
使用示例
本例中,首先定义一个共享表 tglobal,然后构造 BTW,调用 addTable 将 tglobal 加入待写入表。随后调用 insert 方法写入 5 条数据,然后立刻调用 getUnwritternData、getStatus、getAllStatus 获取尚未写入的数据、当前表的写入状态和所有表的写入状态。最后,使用 session 查询待写入表中的当前写入数据。
注意: 示例中,执行 insert 后立刻调用 getUnwrittenData,写入队列中的数据全部被取出,因此,此时 BTW 的写入队列中没有数据,仅有之前已经从写入队列取出、进入工作线程的数据成功写入 DolphinDB。
import dolphindb as ddb import numpy as np import pandas as pd s = ddb.session() s.connect("localhost", 8848, "admin", "123456") script = """ t = table(1000:0,`id`date`ticker`price, [INT,DATE,SYMBOL,DOUBLE]); share t as tglobal; """ s.run(script) writer = ddb.BatchTableWriter("localhost", 8848) writer.addTable(tableName="tglobal") writer.insert("","tglobal", 1, np.datetime64("2019-01-01"),'AAPL', 5.6) writer.insert("","tglobal", 2, np.datetime64("2019-01-01"),'GOOG', 8.3) writer.insert("","tglobal", 3, np.datetime64("2019-01-02"),'GOOG', 4.2) writer.insert("","tglobal", 4, np.datetime64("2019-01-03"),'AMZN', 1.4) writer.insert("","tglobal", 5, np.datetime64("2019-01-05"),'AAPL', 6.9) print(writer.getUnwrittenData(dbPath="", tableName="tglobal")) print(writer.getStatus(tableName="tglobal")) print(writer.getAllStatus()) print("rows:", s.run("tglobal.rows()")) print(s.run("select * from tglobal"))
输出结果如下:
id date ticker price
0 2 2019-01-01 GOOG 8.3
1 3 2019-01-02 GOOG 4.2
2 4 2019-01-03 AMZN 1.4
3 5 2019-01-05 AAPL 6.9
[0, False, False]
DatabaseName TableName WriteQueueDepth SendedRows Removing Finished
0 tglobal 0 1 False False
rows: 1
id date ticker price
0 1 2019-01-01 AAPL 5.6