MultithreadedTableWriter

针对单条数据批量写入的场景,DolphinDB Python API 提供 MultithreadedTableWriter(推荐)和 BatchTableWrite (不推荐) 两个类对象用于批量异步追加数据,并在客户端维护了一个数据缓冲队列。当服务器端忙于网络 I/O 时,客户端的写入线程仍然可以将数据持续写入缓冲队列(该队列由客户端维护),写入队列后即可返回,从而避免了写线程的忙等。若指定 threadCount 不为 1,则须确保待写入表支持并发写入。

工作原理

MultithreadedTableWriter,以下简称 MTW,主要用于批量写入单条数据。构造 MTW 时,需要指定后台写入线程数 threadCount,MTW 将会创建 threadCount+1 个后台 C++线程,包含1个转换线程和 threadCount 个写入线程。以下为简要的使用说明:

  1. MTW 提供接口 insert 方法,该方法一次接收一条数据,将输入的 Python 对象放到待转换队列中,然后返回 ErrorCodeInfo。该过程中仅会校验写入数据列数是否和待写入表一致,并不会进行类型检查。如遇到写入数据列数不匹配、或写入线程已退出等情况,ErrorCodeInfo.hasError() 将会返回 True,表示执行 insert 时发生错误。

  2. MTW 在构造时,会创建 1 个转换线程,用于将 Python 对象转换为 C++ 对象,减弱 Python GIL 锁在 C++ 多线程中的影响。在转换过程中,如果发生转换失败等错误,将会立刻结束所有后台线程(包括写入线程),并将错误信息记录至 MultithreadedTableWriterStatus,可以通过 MTW 的 getStatus 方法获得。转换成功的 C++对象,将分发至不同的写入线程中;转换失败的 Python 对象将会被收集起来,可以通过 MTW 的 getUnwrittenData 方法获得。

  3. 后台创建的 threadCount 个写入线程按照构造 MTW 时指定的 batchSizethrottle 参数,将数据和写入脚本发送至 DolphinDB。同样的,如果在此过程中发生错误,也会终止所有后台线程,并将错误信息记录至 MultithreadedTableWriterStatus。

  4. 调用 MTW 的 waitForThreadCompletion 方法时,类似于线程的 join,会阻塞等待后台写入任务全部执行完毕后,结束工作线程。注意:调用结束后,原 MTW 将无法再次使用 insert 方法写入数据,需要新建一个 MTW 对象才能继续写入。

MultithreadedTableWriter

MultithreadedTableWriter(host, port, userId, password, dbPath, tableName, useSSL=False, 
                         enableHighAvailability=False, highAvailabilitySites=[], batchSize=1, 
                         throttle=1, threadCount=1, partitionCol="", compressMethods=[],
                         mode="", modeOption=[])

参数介绍详见下方内容。

连接参数

  • host :字符串,必填,表示所连接的服务器的 IP 地址。
  • port :整数,必填,表示所连接的服务器的端口号。
  • userName :字符串,可选,表示登录时的用户名。
  • password :字符串,可选,表示登录时用户名对应的密码。
  • dbPath :字符串,必填,表示分布式数据库地址。内存表时该参数为空。请注意,若使用 1.30.17.4 及以下版本的 API 向内存表写入数据时,该参数须填写内存表表名。
  • tableName :字符串,必填,表示分布式表或内存表的表名。请注意,若使用 1.30.17.4 及以下版本的 API 向内存表写入数据时,该参数须为空。
  • useSSL :布尔值,默认值为 False。表示是否启用加密通讯。和 session 类似,当 MultithreadedTableWriter 启用加密通讯时,DolphinDB 需配置 enableHTTPS=true
  • enableHighAvailability / highAvailabilitySites :高可用配置参数。有关配置方式,请参考章节 Connect

配置参数

  • batchSize :整数,表示批处理的消息的数量,默认值是1,表示客户端写入数据后就立即发送给服务器。如果该参数大于 1,表示数据量达到 batchSize 时,客户端才会将数据发送给服务器。
  • throttle :大于0的浮点数,单位为秒。若客户端有数据写入,但数据量小于 batchSize,则等待 throttle 的时间再发送数据。
  • threadCount :整数,表示创建的工作线程数量,默认为1,表示单线程。若写入对象为不支持并发写入的表,则该参数值必须为 1
  • partitionCol :字符串类型,默认为空,仅在 threadCount 大于1 时起效。若写入对象为分区表,必须指定任意一个分区字段;其他情况需指定为表的字段名。
  • compressMethods :列表,用于指定每一列采用的压缩传输方式,若该参数为空则表示不压缩。每一列可选的压缩方式(大小写不敏感)包括:
    • "LZ4": LZ4 压缩。
    • "DELTA": DELTAOFDELTA 压缩。
  • mode :字符串,表示数据写入的方式,可选值为 "Upsert" 或 "Append"。"Upsert" 表示以 upsert!方式更新表数据;"Append" 表示以tableInsert方式更新表数据。
  • modeOption:字符串列表,表示upsert!可选参数组成的 list,仅当 mode 指定为 "Upsert" 时有效。

构造示例

writer = ddb.MultithreadedTableWriter(
    "localhost", 8848, "admin", "123456", "dfs://testMTW", "pt", batchSize=2, throttle=0.01,  
    threadCount=3, partitionCol="date", compressMethods=["LZ4", "DELTA", "LZ4", "DELTA"],
    mode="Upsert", modeOption=["true", "`index"]
)

以上示例展示了如何构造一个 MultithreadedTableWriter,其中设置了3个写入线程,用于写入数据库 dfs://testMTW 中的分区表 pt。设置参数 batchSize=2throttle=0.01,一旦当前待写入队列的长度超过2或者等待时间超过0.01秒,就会立即触发写入线程,将写入队列中的所有数据发送到 DolphinDB。其中 partitionCol 被指定为 date 列,当数据分配到待写入队列时,将根据 date 列将数据分别分配到三个写入线程对应的队列中。由于待写入表有四列数据,分别指定了压缩方式为 LZ4、DELTA、LZ4、DELTA。写入模式被指定为 Upsert,并传入了upsert!函数对应的参数 ["true", "`index"],即每次写入时调用的 upsert! 函数的 ignoreNull 参数为 true,keyColNames 为 `index。

insert

res:ErrorCodeInfo = writer.insert(*args)

功能说明

插入单行数据。

返回一个 ErrorCodeInfo 对象,包含 errorCodeerrorInfo,分别表示错误代码和错误信息。

ErrorCodeInfo 介绍

  • errorCode:"" 表示没有错误发生;如果有错误发生,值为错误码,例如:"A2"
  • errorInfo:表示错误信息。当没有错误发生时,返回 None;发生错误时,返回包含错误信息的字符串。
  • hasError():判断插入待转换队列时是否发生错误。如果发生错误,返回 True;反之,返回 False
  • succeed():判断是否成功插入待转换队列。如成功插入,返回 True;反之,返回 False

ErrorCodeInfo 类提供了 hasError 和 succeed 方法用于获取数据是否成功放入数据队列的结果。在插入一行数据后,推荐调用 hasError 来判断是否成功将数据插入到待转换队列。

参数说明

  • args:不定长位置参数,代表插入的一行数据。

示例

下面展示一个简单的写入示例,在 insert 执行结束后,再使用 hasError 方法判断数据是否成功写入后台待转换队列。

import numpy as np
import pandas as pd
import dolphindb as ddb
import random

s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")

script = """
    t = table(1000:0, `date`ticker`price, [DATE,SYMBOL,LONG]);
    share t as tglobal;
"""
s.run(script)

writer = ddb.MultithreadedTableWriter(
    "localhost", 8848, "admin", "123456", dbPath="", tableName="tglobal", 
    batchSize=10, throttle=1, threadCount=5, partitionCol="date"
)

for i in range(10):
    if i == 3:
        res = writer.insert(np.datetime64(f'2022-03-2{i%6}'), random.randint(1,10000))
    else:
        res = writer.insert(np.datetime64(f'2022-03-2{i%6}'), "AAAA", random.randint(1,10000))
    if res.hasError():
        print("insert error: ", res.errorInfo)

writer.waitForThreadCompletion()
print(s.run("""select count(*) from tglobal"""))

# output
"""
insert error:  Column counts don't match 3
   count
0      9
"""

在上例中,共写入 10 条数据,其中包含一条列数为2的数据,在插入这条数据时,res.hasError() 返回 True,表示插入的数据有误,打印后显示插入数据与待写入表列数不匹配。查询 tglobal 中数据条数为 9,和预期一致。

getUnwrittenData

data:list = writer.getUnwrittenData()

功能说明

返回一个嵌套列表,表示未写入服务器的数据,包含未转换的数据以及待发送的数据两部分。

注意 :通过该方法获取到数据资源后,MultithreadedTableWriter 将释放这些数据资源。

insertUnwrittenData

res:ErrorCodeInfo = insertUnwrittenData(unwrittenData)

功能说明

将数据插入数据表。

返回值同 insert 方法。与 insert 方法的区别在于,insert 只能插入单行数据,而 insertUnwrittenData 可以同时插入多行数据。通常 insertUnwrittenData 用于批量写回之前未成功写入的数据。

参数说明

unwrittenData :需要再次写入的数据。一般通过方法 getUnwrittenData 获取该对象。

getStatus

status:MultithreadedTableWriterStatus = writer.getStatus()

功能说明

获取 MultithreadedTableWriter 对象当前的运行状态。返回 MultithreadedTableWriterStatus 类对象。

MultithreadedTableWriterStatus 介绍

  • isExiting:表示写入线程是否正在退出。
  • errorCode:表示错误码。
  • errorInfo:表示错误信息。
  • sentRows:表示成功发送(写入表中)的总记录数。
  • unsentRows:表示待发送的总记录数(包含待转换和已经转换但还未进入发送队列的数据)。
  • sendFailedRows:表示发送失败的总记录数(发送队列中的数据)。
  • threadStatus:表示写入线程状态列表。
    • threadId:表示线程 Id。
    • sentRows:表示该线程成功发送的记录数。
    • unsentRows:表示该线程待发送的记录数。
    • sendFailedRows:表示发送失败的记录数。
  • hasError():判断写入中是否发生错误,如果发生错误,返回 True;反之,返回 False
  • succeed():判断是否写入成功,如写入成功,返回 True;反之,返回 False

以一个 MultithreadedTableWriterStatus 打印输出为例:

errorCode     : A1
 errorInfo     : Data conversion error: Cannot convert double to LONG
 isExiting     : True
 sentRows      : 2493
 unsentRows    : 0
 sendFailedRows: 7507
 threadStatus  : 
        threadId        sentRows        unsentRows      sendFailedRows
               0               0                 0                7507
        3567691520           415                 0                   0
        3489658624           831                 0                   0
        3481265920           416                 0                   0
        3472873216           416                 0                   0
        3464480512           415                 0                   0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0102c76d30>

以上输出内容显示,在本次 MTW 写入流程中发生了异常,异常代码是 A1,异常信息为 Data conversion error: Cannot convert double to LONGisExiting=True 表示当前线程正在退出。sentRows=2493 表示当前有 2493 条数据已经写入到 DolphinDB 服务端;unsentRows=0/sendFailedRows=7507 则表示当前仍在 API 的未成功写入 DolphinDB 服务端的数据一共有 0+7507=7507 条。在后面的表格中,列出了各个后台线程的处理情况,threadId=0 表示所有线程的统计总数。通常而言,成功写入的结果(sentRows)会分别显示在各个线程中,例如上述输出中的 sentRows 列;写入失败的结果(unsentRows、sendFailedRows)会集中显示在 threadId=0 行,表示整个过程中的写入失败条数。

waitForThreadCompletion

writer.waitForThreadCompletion()

功能说明

调用此方法后,MTW 会进入等待状态,待后台工作线程全部完成后再退出等待状态。

常规流程

MultithreadedTableWriter 的常规处理流程如下:

# 准备数据 data
prepre_Data()
# 构造 MTW 对象
writer = ddb.MultithreadedTableWriter(...)

try:
    for data in datas:
        # 插入数据
        res = writer.insert(data)
        # 判断数据是否成功放入转换队列
        if res.hasError():
            print(res.errorInfo)
    # 等待 MTW 工作完成
    writer.waitForThreadCompletion()
except Exception as e:
    # 获取 MTW 工作状态,通过 writeStatus.hasError() 和 writeStatus.succeed() 判断是否正常执行完成
    writeStatus = writer.getStatus()
    print(e)
    if writeStatus.hasError():
        # 获取并修正失败数据后将失败数据重新写入MTW
        print(writeStatus)
        unwrittendata = writer.getUnwrittenData()
        # 调用预先定义的修正函数来修复错误数据
        unwrittendata = revise(unwrittendata)
        newwriter = ddb.MultithreadedTableWriter(...)
        newwriter.insertUnwrittenData(unwrittendata)

使用示例

下例以写入分布式表为例,介绍 MultithreadedTableWriter 插入数据的总体流程:

步骤一: 创建 DolphinDB 数据库分布式表

import numpy as np
import pandas as pd
import dolphindb as ddb
import time
import random

s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")

script = """
    dbName = 'dfs://valuedb3';
    if(exists(dbName)){
        dropDatabase(dbName);
    }
    datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,INT]);
    db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]);
    pt=db.createPartitionedTable(datetest,'pdatetest','id');
"""
s.run(script)

步骤二: 创建 MultithreadedTableWriter 对象

writer = ddb.MultithreadedTableWriter(
    "localhost", 8848, "admin", "123456", dbPath="dfs://valuedb3", tableName="pdatetest",
    batchSize=10000, throttle=1, threadCount=5, partitionCol="id", compressMethods=["LZ4","LZ4","DELTA"]
)

在这一步中,创建一个包含 5 个线程的 MTW 对象,同时设置 batchSize=10000throttle=1,指定分区列为 "id"、各列压缩方式分别为 LZ4、LZ4、DELTA,向 dfs://valuedb3 分布式数据库中的分区表 pdatetest 写入数据。

步骤三: 写入数据

try:
    # 插入100行正确数据 
    for i in range(100):
        res = writer.insert(random.randint(1,10000),"AAAAAAAB", random.randint(1,10000))
        if res.hasError():
            print("MTW insert error: ", res.errorInfo)
except Exception as ex:
    # MTW 抛出异常
    print("MTW exit with exception: ", ex)
# 等待 MTW 插入完成
writer.waitForThreadCompletion()
writeStatus = writer.getStatus()
if writeStatus.succeed():
    print("Write successfully!")
print("writeStatus: \n", writeStatus)
print(s.run("select count(*) from pt"))

调用 writer.insert() 方法向 writer 中写入数据,并通过 writer.getStatus() 获取 writer 的状态。

输出结果

Write successfully!
writeStatus: 
 errorCode     : None
 errorInfo     : 
 isExiting     : True
 sentRows      : 100
 unsentRows    : 0
 sendFailedRows: 0
 threadStatus  : 
        threadId        sentRows        unsentRows      sendFailedRows
               0               0                 0                   0
        1677719296            22                 0                   0
        1669326592            20                 0                   0
        1660933888            19                 0                   0
        1652541184            14                 0                   0
        1644148480            25                 0                   0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7fe896260d30>
   count
0    100

由上述结果可知,MultithreadedTableWriter 对象中的所有数据均成功写入分布式表,此时 errorCode 的输出为 "None"。

步骤四:错误处理

下述示例将介绍如果写入过程中发生错误,该如何通过 getStatus 和 try-catch 进行错误处理。

注意

  • 此时分区表中已有 100 条记录。
  • 因为步骤三中 MTW 已经调用 waitForThreadCompletion 方法结束后台线程,所以在后续代码中需要重新构造一个新的 MTW 对象,才能继续写入数据。
writer = ddb.MultithreadedTableWriter(
    "localhost", 8848, "admin", "123456", dbPath="dfs://valuedb3", tableName="pdatetest",
    batchSize=10000, throttle=1, threadCount=5, partitionCol="id", compressMethods=["LZ4","LZ4","DELTA"]
)

try:
    # 插入100行正确数据 (类型和列数都正确),MTW正常运行
    for i in range(100):
        res = writer.insert(np.datetime64('2022-03-23'),"AAAAAAAB", random.randint(1,10000))
    # 插入10行类型错误数据,此时 MTW 并不会进行类型判断,这些数据能够进入 MTW 待转换队列
    # 直到转换线程对这些数据进行转换时,检测到类型不匹配,就会立刻终止 MTW 所有后台线程
    for i in range(10):
        res = writer.insert(np.datetime64('2022-03-23'),222, random.randint(1,10000))
        if res.hasError():
            # 此处不会执行到
            print("Insert wrong format data:\n", res)
    # 插入1行数据(列数不匹配),MTW 立刻发现待插入数据列数与待插入表的列数不匹配,立刻返回错误信息,
    # 本条数据并不会进入待转换队列
    res = writer.insert(np.datetime64('2022-03-23'),"AAAAAAAB")
    if res.hasError():
        # 数据错误,插入列数不匹配数据
        print("Column counts don't match:\n", res)
    # sleep 1秒,等待 MTW 转换线程处理数据直至检测到第2次插入的10行数据类型不匹配
    # 此时 MTW 立刻终止所有线程,并修改状态为错误状态
    time.sleep(1)

    # 再插入1行正确数据,MTW 会因为工作线程终止而抛出异常,且不会写入该行数据
    res = writer.insert(np.datetime64('2022-03-23'),"AAAAAAAB", random.randint(1,10000))
    print("MTW has exited")
except Exception as ex:
    # MTW 抛出异常
    print("MTW exit with exception %s" % ex)
# 等待 MTW 插入完成
writer.waitForThreadCompletion()
writeStatus = writer.getStatus()
if writeStatus.hasError():
    print("Error in writing:")
print(writeStatus)
print(s.run("select count(*) from pt"))
"""
Column counts don't match:
 errorCode: A2
 errorInfo: Column counts don't match 3
<dolphindb.session.ErrorCodeInfo object at 0x7f0d52947040>
MTW exit with exception <Exception> in insert: thread is exiting.
Error in writing:
errorCode     : A1
 errorInfo     : Data conversion error: Cannot convert long to SYMBOL
 isExiting     : True
 sentRows      : 0
 unsentRows    : 4
 sendFailedRows: 106
 threadStatus  : 
        threadId        sentRows        unsentRows      sendFailedRows
               0               0                 0                 106
        511690496              0                 2                   0
        520083200              0                 1                   0
        528475904              0                 0                   0
        618751744              0                 0                   0
        536868608              0                 1                   0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0d25d68910>
   count
0    100
"""

步骤五:重新写入

当 MTW 发生类型转换错误、写入线程写入失败等错误时,API 将会终止所有线程。此时可以通过 writer.getUnwrittenData() 方法获取失败数据,通过 insertUnwrittenData(unwrittendata) 重新写入失败数据。因为原有 MTW 对象的所有线程已经终止,无法再次被用来写入数据,因此需要重新构造一个新的 MTW 对象,将失败数据重新写入新的 MTW 对象中。

if writeStatus.hasError():
    print("Error in writing:")
    unwrittendata = writer.getUnwrittenData()
    print("Unwrittendata: %d" % len(unwrittendata))
    # 重新构造新的 MTW 对象
    newwriter = ddb.MultithreadedTableWriter(
        "localhost", 8848, "admin", "123456", dbPath="dfs://valuedb3", tableName="pdatetest",
        batchSize=10000, throttle=1, threadCount=5, partitionCol="id", compressMethods=["LZ4","LZ4","DELTA"]
    )
    try:
        # 修正失败数据后将失败数据重新写入 MTW
        for row in unwrittendata:
            row[1]="aaaaa"
        res = newwriter.insertUnwrittenData(unwrittendata)
        if res.hasError():
            print("Failed to write data again: \n", res)
    except Exception as ex:
        # MTW 抛出异常
        print("MTW exit with exception %s" % ex)
    finally:
        # 确保 newwriter工作线程结束运行
        newwriter.waitForThreadCompletion()
        writeStatus = newwriter.getStatus()
        print("Write again:\n", writeStatus)
else:
    print("Write successfully:\n", writeStatus)

print(s.run("select count(*) from pt"))
"""
Error in writing:
Unwrittendata: 110
Write again:
 errorCode     : None
 errorInfo     : 
 isExiting     : True
 sentRows      : 110
 unsentRows    : 0
 sendFailedRows: 0
 threadStatus  : 
        threadId        sentRows        unsentRows      sendFailedRows
               0               0                 0                   0
        618751744             17                 0                   0
        528475904             21                 0                   0
        520083200             24                 0                   0
        511690496             25                 0                   0
        503297792             23                 0                   0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0d52947040>
   count
0    210
"""

Python 多线程调用

MTW 内部使用 C++ 多线程完成数据转换和写入任务,同时也保证 Python 多线程调用 MTW 的线程安全。

具体示例如下:

import numpy as np
import pandas as pd
import dolphindb as ddb
import time
import threading
import random

s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")

script = """
    dbName = 'dfs://valuedb3';
    if(exists(dbName)){
        dropDatabase(dbName);
    }
    datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,INT]);
    db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]);
    pt=db.createPartitionedTable(datetest,'pdatetest','id');
"""
s.run(script)

writer = ddb.MultithreadedTableWriter(
    "localhost", 8848, "admin", "123456", dbPath="dfs://valuedb3", tableName="pdatetest",
    batchSize=10000, throttle=1, threadCount=5, partitionCol="id", compressMethods=["LZ4","LZ4","DELTA"]
)

def insert_MTW(writer):
    try:
        # 插入100行正确数据 
        for i in range(100):
            res = writer.insert(random.randint(1,10000),"AAAAAAAB", random.randint(1,10000))
    except Exception as ex:
        # MTW 抛出异常
        print("MTW exit with exception %s" % ex)

# 创建 10 个线程,在线程中将数据写入MTW
threads = []
for i in range(10):
    threads.append(threading.Thread(target=insert_MTW, args=(writer,)))

for thread in threads:
    thread.start()

# 完成其他任务,此处用 sleep 模拟
time.sleep(10)

# 现在需要结束任务
# 1 - 等待线程退出
for thread in threads:
    thread.join()
# 2 - 等待 MTW 线程结束
writer.waitForThreadCompletion()
# 3 - 检查写入结果
writeStatus = writer.getStatus()
print("writeStatus:\n", writeStatus)
print(s.run("select count(*) from pt"))
writeStatus:
 errorCode     : None
 errorInfo     : 
 isExiting     : True
 sentRows      : 1000
 unsentRows    : 0
 sendFailedRows: 0
 threadStatus  : 
        threadId        sentRows        unsentRows      sendFailedRows
               0               0                 0                   0
        1309443840           194                 0                   0
        1301051136           188                 0                   0
        1292658432           215                 0                   0
        1284265728           208                 0                   0
        1275873024           195                 0                   0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7ff2802bf9a0>
   count
0   1000

本例中演示了如何在 Python 多线程中使用 MTW 对象写入数据。其中开启10个 Python 线程,并且在每个线程中写入 100 条数据。从结果可以看出,成功写入 1000 条数据。