BatchTableWriter

注意 :BatchTableWriter 现已不再维护,不推荐使用。

工作原理

BatchTableWriter 以下简称 BTW,用于批量写入单条数据。以下为简要的使用说明:

  1. BTW 提供接口 addTable 方法,用于添加一个写入表,执行该方法后,将会创建一个写入队列和一个 C++ 工作线程。

  2. 写入数据时,需要调用 insert 方法,并指定写入表的信息和待写入的一行数据。将数据转换为 C++ 对象后放入后台写入队列中。

  3. 工作线程每隔 100 ms 取出写入队列中的所有数据,一次性将这一批数据全部写入 DolphinDB。

  4. 如果写入数据时(insert)发生类型转换错误或者列数不匹配等错误,会立刻抛出异常信息,该条数据不会放入写入队列中;如果在工作线程中将数据发送到 DolphinDB 的过程中发生错误,则会将当前写入队列中的数据和发送失败的数据保存起来,调用 getUnwrittenData 即可获得。

  5. 如果需要查看当前各表的写入状态,可以调用 getAllStatus 方法,该方法将返回一张表,表示各个待写入表的工作状态,包含写入队列长度、发送行数、是否已销毁、是否结束。也可以调用 getStatus,传入表名和数据库名,就可以获得指定表当前的写入状态。

  6. 不需要再往表中写数据时,可以调用 removeTable。该方法将停止工作线程,然后销毁相关的工作线程、连接、写入队列。

BatchTableWriter

BatchTableWriter(host, port, userid=None, password=None, acquireLock=True)

参数说明

  • host :字符串,必填,表示所连接的服务器的 IP 地址。
  • port :整数,必填,表示所连接的服务器的端口号。
  • userName :字符串,可选,表示登录时的用户名。
  • password :字符串,可选,表示登录时用户名对应的密码。
  • acquireLock :布尔值,表示在使用过程中是否对 BTW 内部加锁。默认为 True,表示需要加锁。若在并发调用 API 的场景下,建议加锁。

构造示例

writer = ddb.BatchTableWriter("localhost", 8848, "admin", "123456", acquireLock=True)

addTable

writer.addTable(dbPath=None, tableName=None, partitioned=True)

功能说明

向 BTW 中添加一个待写入的表,如果是分区表则需要设置 partitioned=True

参数说明

  • dbName :表示数据库名。若待写入表为磁盘表时,须填写该参数;若为内存表,则不需要填写该参数。
  • tableName :表示数据表的表名。
  • partitioned :表示添加的表是否为分区表。若设置该参数为 True,则表示为分区表。如果添加的表是磁盘未分区表,须设置 partitioned=False

注意:

  • 如果添加的是内存表,则需要共享该表。
  • 表名不可重复添加。如果重复添加,则需要先移除之前添加的表,否则会抛出异常。

insert

writer.insert(dbPath=None, tableName=None, *args)

功能说明

向指定表中插入单行数据。

参数说明

  • dbPath :表示数据库名。若待写入表为磁盘表时,须填写该参数;若为内存表,则不需要填写该参数。
  • tableName :表示数据表的表名。
  • args :变长参数,表示插入的一行数据。

注意:

  • 在调用 insert 前须先使用 addTable 添加表,否则会抛出异常。
  • 变长参数的个数和数据类型必须与 insert 表的列数及类型匹配。
  • 若在插入过程中出现异常进而导致后台线程退出,此时再次调用 insert 将会抛出异常。建议使用 getUnwrittenData 来获取之前所有已经写入缓冲队列但是没有成功写入服务器的数据(不包括本次 insert 的数据),然后使用 removeTable 释放资源。如果需要再次插入数据,则须重新调用 addTable。
  • 在移除指定表的过程中调用 insert 仍能成功插入数据,但插入的这部分数据并不会发送到服务器。该操作属于未定义行为,不建议用户进行类似操作。

removeTable

writer.removeTable(dbPath=None, tableName=None)

功能说明

释放由 addTable 添加的表所占用的资源。

第一次调用该函数,若该函数返回即表示后台线程已退出。再次写入需要重新调用 addTable。

getUnwrittenData

data:pandas.DataFrame = writer.getUnwrittenData(dbPath=None, tableName=None)

功能说明

获取还未写入的数据。

当写入出现错误时,使用该函数可以获取剩余未写入的数据。但是这些未写入的数据不会尝试重写,若需要重新写入,则需要使用 removeTable 后重新调用 addTable,再通过 insert 写入数据。

getStatus

res:list = writer.getStatus(dbPath=None, tableName=None)

功能说明

获取当前的写入状态。详细信息可参考下方返回值说明。

返回值说明

返回值是由一个整型和两个布尔型组合的列表,分别表示当前写入队列的深度、当前表是否被移除(True: 表示正在被移除),以及后台写入线程是否因为出错而退出(True: 表示后台线程因出错而退出)。

getAllStatus

res:pandas.DataFrame = writer.getAllStatus()

功能说明

获取所有当前存在的表的信息,不包含被移除的表。

返回值说明

该函数的返回值是一个 2*6 的表格,包含数据库名(DatabaseName)、数据表名(TableName)、写入队列长度(WriteQueueDepth)、发送行数(sendedRows)、是否已销毁(Removing)和是否结束(Finished)。

返回值的示例如下表:

DatabaseNameTableNameWriteQueueDepthsendedRowsRemovingFinished
0tglobal05FalseFalse

使用示例

本例中,首先定义一个共享表 tglobal,然后构造 BTW,调用 addTable 将 tglobal 加入待写入表。随后调用 insert 方法写入 5 条数据,然后立刻调用 getUnwritternData、getStatus、getAllStatus 获取尚未写入的数据、当前表的写入状态和所有表的写入状态。最后,使用 session 查询待写入表中的当前写入数据。

注意: 示例中,执行 insert 后立刻调用 getUnwrittenData,写入队列中的数据全部被取出,因此,此时 BTW 的写入队列中没有数据,仅有之前已经从写入队列取出、进入工作线程的数据成功写入 DolphinDB。

import dolphindb as ddb
import numpy as np
import pandas as pd

s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")

script = """
    t = table(1000:0,`id`date`ticker`price, [INT,DATE,SYMBOL,DOUBLE]);
    share t as tglobal;
"""
s.run(script)

writer = ddb.BatchTableWriter("localhost", 8848)
writer.addTable(tableName="tglobal")
writer.insert("","tglobal", 1, np.datetime64("2019-01-01"),'AAPL', 5.6)
writer.insert("","tglobal", 2, np.datetime64("2019-01-01"),'GOOG', 8.3)
writer.insert("","tglobal", 3, np.datetime64("2019-01-02"),'GOOG', 4.2)
writer.insert("","tglobal", 4, np.datetime64("2019-01-03"),'AMZN', 1.4)
writer.insert("","tglobal", 5, np.datetime64("2019-01-05"),'AAPL', 6.9)

print(writer.getUnwrittenData(dbPath="", tableName="tglobal"))
print(writer.getStatus(tableName="tglobal"))
print(writer.getAllStatus())

print("rows:", s.run("tglobal.rows()"))
print(s.run("select * from tglobal"))

输出结果如下:

   id       date ticker  price
0   2 2019-01-01   GOOG    8.3
1   3 2019-01-02   GOOG    4.2
2   4 2019-01-03   AMZN    1.4
3   5 2019-01-05   AAPL    6.9
[0, False, False]
  DatabaseName TableName  WriteQueueDepth  SendedRows  Removing  Finished
0                tglobal                0           1     False     False
rows: 1
   id       date ticker  price
0   1 2019-01-01   AAPL    5.6