MultithreadedTableWriter
针对单条数据批量写入的场景,DolphinDB Python API 提供 MultithreadedTableWriter(推荐)和 BatchTableWrite (不推荐) 两个类对象用于批量异步追加数据,并在客户端维护了一个数据缓冲队列。当服务器端忙于网络 I/O 时,客户端的写入线程仍然可以将数据持续写入缓冲队列(该队列由客户端维护),写入队列后即可返回,从而避免了写线程的忙等。若指定 threadCount 不为 1,则须确保待写入表支持并发写入。
工作原理
MultithreadedTableWriter,以下简称 MTW,主要用于批量写入单条数据。构造 MTW 时,需要指定后台写入线程数 threadCount,MTW 将会创建 threadCount+1 个后台 C++线程,包含1个转换线程和 threadCount 个写入线程。以下为简要的使用说明:
MTW 提供接口
insert
方法,该方法一次接收一条数据,将输入的 Python 对象放到待转换队列中,然后返回 ErrorCodeInfo。该过程中仅会校验写入数据列数是否和待写入表一致,并不会进行类型检查。如遇到写入数据列数不匹配、或写入线程已退出等情况,ErrorCodeInfo.hasError()
将会返回 True,表示执行 insert 时发生错误。MTW 在构造时,会创建 1 个转换线程,用于将 Python 对象转换为 C++ 对象,减弱 Python GIL 锁在 C++ 多线程中的影响。在转换过程中,如果发生转换失败等错误,将会立刻结束所有后台线程(包括写入线程),并将错误信息记录至 MultithreadedTableWriterStatus,可以通过 MTW 的 getStatus 方法获得。转换成功的 C++对象,将分发至不同的写入线程中;转换失败的 Python 对象将会被收集起来,可以通过 MTW 的
getUnwrittenData
方法获得。后台创建的 threadCount 个写入线程按照构造 MTW 时指定的 batchSize 和 throttle 参数,将数据和写入脚本发送至 DolphinDB。同样的,如果在此过程中发生错误,也会终止所有后台线程,并将错误信息记录至 MultithreadedTableWriterStatus。
调用 MTW 的 waitForThreadCompletion 方法时,类似于线程的 join,会阻塞等待后台写入任务全部执行完毕后,结束工作线程。注意:调用结束后,原 MTW 将无法再次使用
insert
方法写入数据,需要新建一个 MTW 对象才能继续写入。
MultithreadedTableWriter
MultithreadedTableWriter(host, port, userId, password, dbPath, tableName, useSSL=False,
enableHighAvailability=False, highAvailabilitySites=[], batchSize=1,
throttle=1, threadCount=1, partitionCol="", compressMethods=[],
mode="", modeOption=[])
参数介绍详见下方内容。
连接参数
- host :字符串,必填,表示所连接的服务器的 IP 地址。
- port :整数,必填,表示所连接的服务器的端口号。
- userName :字符串,可选,表示登录时的用户名。
- password :字符串,可选,表示登录时用户名对应的密码。
- dbPath :字符串,必填,表示分布式数据库地址。内存表时该参数为空。请注意,若使用 1.30.17.4 及以下版本的 API 向内存表写入数据时,该参数须填写内存表表名。
- tableName :字符串,必填,表示分布式表或内存表的表名。请注意,若使用 1.30.17.4 及以下版本的 API 向内存表写入数据时,该参数须为空。
- useSSL :布尔值,默认值为 False。表示是否启用加密通讯。和 session 类似,当 MultithreadedTableWriter 启用加密通讯时,DolphinDB 需配置
enableHTTPS=true
。 - enableHighAvailability / highAvailabilitySites :高可用配置参数。有关配置方式,请参考章节 Connect。
配置参数
- batchSize :整数,表示批处理的消息的数量,默认值是1,表示客户端写入数据后就立即发送给服务器。如果该参数大于 1,表示数据量达到 batchSize 时,客户端才会将数据发送给服务器。
- throttle :大于0的浮点数,单位为秒。若客户端有数据写入,但数据量小于 batchSize,则等待 throttle 的时间再发送数据。
- threadCount :整数,表示创建的工作线程数量,默认为1,表示单线程。若写入对象为不支持并发写入的表,则该参数值必须为 1
- partitionCol :字符串类型,默认为空,仅在 threadCount 大于1 时起效。若写入对象为分区表,必须指定任意一个分区字段;其他情况需指定为表的字段名。
- compressMethods :列表,用于指定每一列采用的压缩传输方式,若该参数为空则表示不压缩。每一列可选的压缩方式(大小写不敏感)包括:
- "LZ4": LZ4 压缩。
- "DELTA": DELTAOFDELTA 压缩。
- mode :字符串,表示数据写入的方式,可选值为 "Upsert" 或 "Append"。"Upsert" 表示以
upsert!
方式更新表数据;"Append" 表示以tableInsert
方式更新表数据。 - modeOption:字符串列表,表示
upsert!
可选参数组成的 list,仅当 mode 指定为 "Upsert" 时有效。
构造示例
writer = ddb.MultithreadedTableWriter( "localhost", 8848, "admin", "123456", "dfs://testMTW", "pt", batchSize=2, throttle=0.01, threadCount=3, partitionCol="date", compressMethods=["LZ4", "DELTA", "LZ4", "DELTA"], mode="Upsert", modeOption=["true", "`index"] )
以上示例展示了如何构造一个 MultithreadedTableWriter,其中设置了3个写入线程,用于写入数据库 dfs://testMTW 中的分区表 pt。设置参数 batchSize=2
、throttle=0.01
,一旦当前待写入队列的长度超过2或者等待时间超过0.01秒,就会立即触发写入线程,将写入队列中的所有数据发送到 DolphinDB。其中 partitionCol 被指定为 date 列,当数据分配到待写入队列时,将根据 date 列将数据分别分配到三个写入线程对应的队列中。由于待写入表有四列数据,分别指定了压缩方式为 LZ4、DELTA、LZ4、DELTA。写入模式被指定为 Upsert,并传入了upsert!
函数对应的参数 ["true", "`index"],即每次写入时调用的 upsert! 函数的 ignoreNull 参数为 true,keyColNames 为 `index。
insert
res:ErrorCodeInfo = writer.insert(*args)
功能说明
插入单行数据。
返回一个 ErrorCodeInfo 对象,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。
ErrorCodeInfo 介绍
- errorCode:
""
表示没有错误发生;如果有错误发生,值为错误码,例如:"A2"
。 - errorInfo:表示错误信息。当没有错误发生时,返回
None
;发生错误时,返回包含错误信息的字符串。 - hasError():判断插入待转换队列时是否发生错误。如果发生错误,返回
True
;反之,返回False
。 - succeed():判断是否成功插入待转换队列。如成功插入,返回
True
;反之,返回False
。
ErrorCodeInfo 类提供了 hasError 和 succeed 方法用于获取数据是否成功放入数据队列的结果。在插入一行数据后,推荐调用 hasError 来判断是否成功将数据插入到待转换队列。
参数说明
- args:不定长位置参数,代表插入的一行数据。
示例
下面展示一个简单的写入示例,在 insert 执行结束后,再使用 hasError 方法判断数据是否成功写入后台待转换队列。
import numpy as np
import pandas as pd
import dolphindb as ddb
import random
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
script = """
t = table(1000:0, `date`ticker`price, [DATE,SYMBOL,LONG]);
share t as tglobal;
"""
s.run(script)
writer = ddb.MultithreadedTableWriter(
"localhost", 8848, "admin", "123456", dbPath="", tableName="tglobal",
batchSize=10, throttle=1, threadCount=5, partitionCol="date"
)
for i in range(10):
if i == 3:
res = writer.insert(np.datetime64(f'2022-03-2{i%6}'), random.randint(1,10000))
else:
res = writer.insert(np.datetime64(f'2022-03-2{i%6}'), "AAAA", random.randint(1,10000))
if res.hasError():
print("insert error: ", res.errorInfo)
writer.waitForThreadCompletion()
print(s.run("""select count(*) from tglobal"""))
# output
"""
insert error: Column counts don't match 3
count
0 9
"""
在上例中,共写入 10 条数据,其中包含一条列数为2的数据,在插入这条数据时,res.hasError()
返回 True,表示插入的数据有误,打印后显示插入数据与待写入表列数不匹配。查询 tglobal 中数据条数为 9,和预期一致。
getUnwrittenData
data:list = writer.getUnwrittenData()
功能说明
返回一个嵌套列表,表示未写入服务器的数据,包含未转换的数据以及待发送的数据两部分。
注意 :通过该方法获取到数据资源后,MultithreadedTableWriter 将释放这些数据资源。
insertUnwrittenData
res:ErrorCodeInfo = insertUnwrittenData(unwrittenData)
功能说明
将数据插入数据表。
返回值同 insert 方法。与 insert 方法的区别在于,insert 只能插入单行数据,而 insertUnwrittenData 可以同时插入多行数据。通常 insertUnwrittenData 用于批量写回之前未成功写入的数据。
参数说明
unwrittenData :需要再次写入的数据。一般通过方法 getUnwrittenData 获取该对象。
getStatus
status:MultithreadedTableWriterStatus = writer.getStatus()
功能说明
获取 MultithreadedTableWriter
对象当前的运行状态。返回 MultithreadedTableWriterStatus
类对象。
MultithreadedTableWriterStatus 介绍
- isExiting:表示写入线程是否正在退出。
- errorCode:表示错误码。
- errorInfo:表示错误信息。
- sentRows:表示成功发送(写入表中)的总记录数。
- unsentRows:表示待发送的总记录数(包含待转换和已经转换但还未进入发送队列的数据)。
- sendFailedRows:表示发送失败的总记录数(发送队列中的数据)。
- threadStatus:表示写入线程状态列表。
- threadId:表示线程 Id。
- sentRows:表示该线程成功发送的记录数。
- unsentRows:表示该线程待发送的记录数。
- sendFailedRows:表示发送失败的记录数。
- hasError():判断写入中是否发生错误,如果发生错误,返回
True
;反之,返回False
。 - succeed():判断是否写入成功,如写入成功,返回
True
;反之,返回False
。
以一个 MultithreadedTableWriterStatus 打印输出为例:
errorCode : A1
errorInfo : Data conversion error: Cannot convert double to LONG
isExiting : True
sentRows : 2493
unsentRows : 0
sendFailedRows: 7507
threadStatus :
threadId sentRows unsentRows sendFailedRows
0 0 0 7507
3567691520 415 0 0
3489658624 831 0 0
3481265920 416 0 0
3472873216 416 0 0
3464480512 415 0 0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0102c76d30>
以上输出内容显示,在本次 MTW 写入流程中发生了异常,异常代码是 A1
,异常信息为 Data conversion error: Cannot convert double to LONG
。isExiting=True
表示当前线程正在退出。sentRows=2493
表示当前有 2493 条数据已经写入到 DolphinDB 服务端;unsentRows=0/sendFailedRows=7507
则表示当前仍在 API 的未成功写入 DolphinDB 服务端的数据一共有 0+7507=7507
条。在后面的表格中,列出了各个后台线程的处理情况,threadId=0
表示所有线程的统计总数。通常而言,成功写入的结果(sentRows)会分别显示在各个线程中,例如上述输出中的 sentRows 列;写入失败的结果(unsentRows、sendFailedRows)会集中显示在 threadId=0
行,表示整个过程中的写入失败条数。
waitForThreadCompletion
writer.waitForThreadCompletion()
功能说明
调用此方法后,MTW 会进入等待状态,待后台工作线程全部完成后再退出等待状态。
常规流程
MultithreadedTableWriter 的常规处理流程如下:
# 准备数据 data prepre_Data() # 构造 MTW 对象 writer = ddb.MultithreadedTableWriter(...) try: for data in datas: # 插入数据 res = writer.insert(data) # 判断数据是否成功放入转换队列 if res.hasError(): print(res.errorInfo) # 等待 MTW 工作完成 writer.waitForThreadCompletion() except Exception as e: # 获取 MTW 工作状态,通过 writeStatus.hasError() 和 writeStatus.succeed() 判断是否正常执行完成 writeStatus = writer.getStatus() print(e) if writeStatus.hasError(): # 获取并修正失败数据后将失败数据重新写入MTW print(writeStatus) unwrittendata = writer.getUnwrittenData() # 调用预先定义的修正函数来修复错误数据 unwrittendata = revise(unwrittendata) newwriter = ddb.MultithreadedTableWriter(...) newwriter.insertUnwrittenData(unwrittendata)
使用示例
下例以写入分布式表为例,介绍 MultithreadedTableWriter 插入数据的总体流程:
步骤一: 创建 DolphinDB 数据库分布式表
import numpy as np import pandas as pd import dolphindb as ddb import time import random s = ddb.session() s.connect("localhost", 8848, "admin", "123456") script = """ dbName = 'dfs://valuedb3'; if(exists(dbName)){ dropDatabase(dbName); } datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,INT]); db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]); pt=db.createPartitionedTable(datetest,'pdatetest','id'); """ s.run(script)
步骤二: 创建 MultithreadedTableWriter 对象
writer = ddb.MultithreadedTableWriter( "localhost", 8848, "admin", "123456", dbPath="dfs://valuedb3", tableName="pdatetest", batchSize=10000, throttle=1, threadCount=5, partitionCol="id", compressMethods=["LZ4","LZ4","DELTA"] )
在这一步中,创建一个包含 5 个线程的 MTW 对象,同时设置 batchSize=10000
,throttle=1
,指定分区列为 "id"、各列压缩方式分别为 LZ4、LZ4、DELTA,向 dfs://valuedb3 分布式数据库中的分区表 pdatetest 写入数据。
步骤三: 写入数据
try: # 插入100行正确数据 for i in range(100): res = writer.insert(random.randint(1,10000),"AAAAAAAB", random.randint(1,10000)) if res.hasError(): print("MTW insert error: ", res.errorInfo) except Exception as ex: # MTW 抛出异常 print("MTW exit with exception: ", ex) # 等待 MTW 插入完成 writer.waitForThreadCompletion() writeStatus = writer.getStatus() if writeStatus.succeed(): print("Write successfully!") print("writeStatus: \n", writeStatus) print(s.run("select count(*) from pt"))
调用 writer.insert()
方法向 writer 中写入数据,并通过 writer.getStatus()
获取 writer 的状态。
输出结果
Write successfully!
writeStatus:
errorCode : None
errorInfo :
isExiting : True
sentRows : 100
unsentRows : 0
sendFailedRows: 0
threadStatus :
threadId sentRows unsentRows sendFailedRows
0 0 0 0
1677719296 22 0 0
1669326592 20 0 0
1660933888 19 0 0
1652541184 14 0 0
1644148480 25 0 0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7fe896260d30>
count
0 100
由上述结果可知,MultithreadedTableWriter 对象中的所有数据均成功写入分布式表,此时 errorCode 的输出为 "None"。
步骤四:错误处理
下述示例将介绍如果写入过程中发生错误,该如何通过 getStatus 和 try-catch 进行错误处理。
注意 :
- 此时分区表中已有 100 条记录。
- 因为步骤三中 MTW 已经调用
waitForThreadCompletion
方法结束后台线程,所以在后续代码中需要重新构造一个新的 MTW 对象,才能继续写入数据。
writer = ddb.MultithreadedTableWriter( "localhost", 8848, "admin", "123456", dbPath="dfs://valuedb3", tableName="pdatetest", batchSize=10000, throttle=1, threadCount=5, partitionCol="id", compressMethods=["LZ4","LZ4","DELTA"] ) try: # 插入100行正确数据 (类型和列数都正确),MTW正常运行 for i in range(100): res = writer.insert(np.datetime64('2022-03-23'),"AAAAAAAB", random.randint(1,10000)) # 插入10行类型错误数据,此时 MTW 并不会进行类型判断,这些数据能够进入 MTW 待转换队列 # 直到转换线程对这些数据进行转换时,检测到类型不匹配,就会立刻终止 MTW 所有后台线程 for i in range(10): res = writer.insert(np.datetime64('2022-03-23'),222, random.randint(1,10000)) if res.hasError(): # 此处不会执行到 print("Insert wrong format data:\n", res) # 插入1行数据(列数不匹配),MTW 立刻发现待插入数据列数与待插入表的列数不匹配,立刻返回错误信息, # 本条数据并不会进入待转换队列 res = writer.insert(np.datetime64('2022-03-23'),"AAAAAAAB") if res.hasError(): # 数据错误,插入列数不匹配数据 print("Column counts don't match:\n", res) # sleep 1秒,等待 MTW 转换线程处理数据直至检测到第2次插入的10行数据类型不匹配 # 此时 MTW 立刻终止所有线程,并修改状态为错误状态 time.sleep(1) # 再插入1行正确数据,MTW 会因为工作线程终止而抛出异常,且不会写入该行数据 res = writer.insert(np.datetime64('2022-03-23'),"AAAAAAAB", random.randint(1,10000)) print("MTW has exited") except Exception as ex: # MTW 抛出异常 print("MTW exit with exception %s" % ex) # 等待 MTW 插入完成 writer.waitForThreadCompletion() writeStatus = writer.getStatus() if writeStatus.hasError(): print("Error in writing:") print(writeStatus) print(s.run("select count(*) from pt")) """ Column counts don't match: errorCode: A2 errorInfo: Column counts don't match 3 <dolphindb.session.ErrorCodeInfo object at 0x7f0d52947040> MTW exit with exception <Exception> in insert: thread is exiting. Error in writing: errorCode : A1 errorInfo : Data conversion error: Cannot convert long to SYMBOL isExiting : True sentRows : 0 unsentRows : 4 sendFailedRows: 106 threadStatus : threadId sentRows unsentRows sendFailedRows 0 0 0 106 511690496 0 2 0 520083200 0 1 0 528475904 0 0 0 618751744 0 0 0 536868608 0 1 0 <dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0d25d68910> count 0 100 """
步骤五:重新写入
当 MTW 发生类型转换错误、写入线程写入失败等错误时,API 将会终止所有线程。此时可以通过 writer.getUnwrittenData()
方法获取失败数据,通过 insertUnwrittenData(unwrittendata) 重新写入失败数据。因为原有 MTW 对象的所有线程已经终止,无法再次被用来写入数据,因此需要重新构造一个新的 MTW 对象,将失败数据重新写入新的 MTW 对象中。
if writeStatus.hasError(): print("Error in writing:") unwrittendata = writer.getUnwrittenData() print("Unwrittendata: %d" % len(unwrittendata)) # 重新构造新的 MTW 对象 newwriter = ddb.MultithreadedTableWriter( "localhost", 8848, "admin", "123456", dbPath="dfs://valuedb3", tableName="pdatetest", batchSize=10000, throttle=1, threadCount=5, partitionCol="id", compressMethods=["LZ4","LZ4","DELTA"] ) try: # 修正失败数据后将失败数据重新写入 MTW for row in unwrittendata: row[1]="aaaaa" res = newwriter.insertUnwrittenData(unwrittendata) if res.hasError(): print("Failed to write data again: \n", res) except Exception as ex: # MTW 抛出异常 print("MTW exit with exception %s" % ex) finally: # 确保 newwriter工作线程结束运行 newwriter.waitForThreadCompletion() writeStatus = newwriter.getStatus() print("Write again:\n", writeStatus) else: print("Write successfully:\n", writeStatus) print(s.run("select count(*) from pt")) """ Error in writing: Unwrittendata: 110 Write again: errorCode : None errorInfo : isExiting : True sentRows : 110 unsentRows : 0 sendFailedRows: 0 threadStatus : threadId sentRows unsentRows sendFailedRows 0 0 0 0 618751744 17 0 0 528475904 21 0 0 520083200 24 0 0 511690496 25 0 0 503297792 23 0 0 <dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0d52947040> count 0 210 """
Python 多线程调用
MTW 内部使用 C++ 多线程完成数据转换和写入任务,同时也保证 Python 多线程调用 MTW 的线程安全。
具体示例如下:
import numpy as np import pandas as pd import dolphindb as ddb import time import threading import random s = ddb.session() s.connect("localhost", 8848, "admin", "123456") script = """ dbName = 'dfs://valuedb3'; if(exists(dbName)){ dropDatabase(dbName); } datetest=table(1000:0,`date`symbol`id,[DATE,SYMBOL,INT]); db = database(directory=dbName, partitionType=HASH, partitionScheme=[INT, 10]); pt=db.createPartitionedTable(datetest,'pdatetest','id'); """ s.run(script) writer = ddb.MultithreadedTableWriter( "localhost", 8848, "admin", "123456", dbPath="dfs://valuedb3", tableName="pdatetest", batchSize=10000, throttle=1, threadCount=5, partitionCol="id", compressMethods=["LZ4","LZ4","DELTA"] ) def insert_MTW(writer): try: # 插入100行正确数据 for i in range(100): res = writer.insert(random.randint(1,10000),"AAAAAAAB", random.randint(1,10000)) except Exception as ex: # MTW 抛出异常 print("MTW exit with exception %s" % ex) # 创建 10 个线程,在线程中将数据写入MTW threads = [] for i in range(10): threads.append(threading.Thread(target=insert_MTW, args=(writer,))) for thread in threads: thread.start() # 完成其他任务,此处用 sleep 模拟 time.sleep(10) # 现在需要结束任务 # 1 - 等待线程退出 for thread in threads: thread.join() # 2 - 等待 MTW 线程结束 writer.waitForThreadCompletion() # 3 - 检查写入结果 writeStatus = writer.getStatus() print("writeStatus:\n", writeStatus) print(s.run("select count(*) from pt"))
writeStatus:
errorCode : None
errorInfo :
isExiting : True
sentRows : 1000
unsentRows : 0
sendFailedRows: 0
threadStatus :
threadId sentRows unsentRows sendFailedRows
0 0 0 0
1309443840 194 0 0
1301051136 188 0 0
1292658432 215 0 0
1284265728 208 0 0
1275873024 195 0 0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7ff2802bf9a0>
count
0 1000
本例中演示了如何在 Python 多线程中使用 MTW 对象写入数据。其中开启10个 Python 线程,并且在每个线程中写入 100 条数据。从结果可以看出,成功写入 1000 条数据。