session 异步提交
在高吞吐率的场景下,尤其是典型的高速小数据写入,使用 API 的异步调用功能可以有效提高 API 的任务吞吐量。异步方式提交有如下几个特点:
- API 客户端提交任务后,DolphinDB 接到任务后客户端即认为任务已完成。
- API 客户端无法得知任务在 DolphinDB 执行的情况和结果。
- API 客户端的异步任务提交时间取决于提交参数的序列化及其网络传输时间。
注意:异步方式不适用于前后任务之间有依赖的场景。比如有两个任务,前一个任务向分布式数据库写入数据,后一个任务将新写入的数据结合历史数据做分析。像这类后一个任务对前一任务有依赖的场景,不能使用异步提交的方式。
Python API 开启 ASYNC(异步)模式的操作可以参照 session 建立 DolphinDB 连接的部分,即设置 session 的 enableASYNC 参数为 True。通过这种方式异步写入数据可以节省 API 端检测返回值的时间。
s = ddb.session(enableASYNC=True)
以追加数据到分布式表为例,在 Python 中可以参考如下脚本使用异步方式追加数据。
import dolphindb as ddb import dolphindb.settings as keys import numpy as np import pandas as pd s = ddb.session(enableASYNC=True) # 打开异步模式 s.connect("localhost", 8848, "admin", "123456") dbPath = "dfs://testDB" tbName = "tb1" script = """ dbPath="dfs://testDB" tbName=`tb1 if(existsDatabase(dbPath)) dropDatabase(dbPath) db=database(dbPath, VALUE, ["AAPL", "AMZN", "A"]) testDictSchema=table(5:0, `id`ticker`price, [INT,SYMBOL,DOUBLE]) tb1=db.createPartitionedTable(testDictSchema, tbName, `ticker) """ s.run(script) #此处脚本可以在服务器端运行 tb = pd.DataFrame({ 'id': np.array([1, 2, 2, 3], dtype="int32"), 'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'], 'price': [22, 3.5, 21, 26], }) s.run(f"append!{{loadTable('{dbPath}', `{tbName})}}", tb)
注意 :异步通讯的条件下,只能通过 session.run()
方法与 DolphinDB 实现通讯,并无返回值。
示例 1
由于在数据吞吐量较高的情况下使用异步的效果更佳,下面给出一个 Python API 写入流数据表的示例。
import dolphindb as ddb import numpy as np import pandas as pd import random import datetime s = ddb.session(enableASYNC=True) s.connect("localhost", 8848, "admin", "123456") n = 100 script = """ share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades """ s.run(script) # 此处的脚本可以在服务端直接运行 # 生成一个 DataFrame time_list = [np.datetime64(datetime.date(2020, random.randint(1, 12), random.randint(1, 20))) for _ in range(n)] sym_list = np.random.choice(['IBN', 'GTYU', 'FHU', 'DGT', 'FHU', 'YUG', 'EE', 'ZD', 'FYU'], n) price_list = [round(np.random.uniform(1, 100), 1) for _ in range(n)] id_list = np.random.choice([1, 2, 3, 4, 5], n) tb = pd.DataFrame({ 'time': time_list, 'sym': sym_list, 'price': price_list, 'id': id_list, }) for _ in range(50000): s.run("tableInsert{trades}", tb)
Linux 系统中可以使用 time 命令来统计代码执行的时间,Windows 系统中则可以使用 Measure-Command 命令。在本例中使用 time 来统计代码执行时间。
同步模式下,即 enableAsync=False
时,测试上述代码的执行时间为 8.39 秒;异步模式下,即 enableAsync=True
时,测试上述代码的执行之间为 4.89 秒。对比可见,在同步模式下,频繁写入大量小数据,会有较大的网络 IO 开销;而使用异步模式则能解决这一问题,只需要将数据上传至服务端,而不需要返回值。在网络性能较差、任务数量较多的情况下,使用异步模式能够显著提升总体性能,但其缺点是无法保证数据能够正常写入,没有异常报错,因此难以发现问题。
示例 2
由于异步模式的特殊性,在异步追加数据时,如果追加的数据需要进行时间类型的转换,不能直接调用 upload 提交数据到 DolphinDB,然后再在 DolphinDB 用 SQL 脚本进行类型转换,因为异步可能导致数据提交还未完成却已经开始执行 SQL 的情况。为了解决这个问题,建议先在 DolphinDB 中定义好函数视图,后续操作只需要调用该函数视图即可。
以下为简单示例。
首先,在 DolphihinDB 中定义一个视图函数 appendStreamingData:
login("admin","123456")
share streamTable(10000:0,`time`sym`price`id, [DATE,SYMBOL,DOUBLE,INT]) as trades
def appendStreamingData(mutable data){
tableInsert(trades, data.replaceColumn!(`time, date(data.time)))
}
addFunctionView(appendStreamingData)
然后在 Python API 异步追加数据:
import dolphindb as ddb import numpy as np import pandas as pd import random import datetime s = ddb.session(enableASYNC=True) s.connect("localhost", 8848, "admin", "123456") n = 100 # 生成一个 DataFrame time_list = [np.datetime64(datetime.date(2020, random.randint(1, 12), random.randint(1, 20))) for _ in range(n)] sym_list = np.random.choice(['IBN', 'GTYU', 'FHU', 'DGT', 'FHU', 'YUG', 'EE', 'ZD', 'FYU'], n) price_list = [round(np.random.uniform(1, 100), 1) for _ in range(n)] id_list = np.random.choice([1, 2, 3, 4, 5], n) tb = pd.DataFrame({ 'time': time_list, 'sym': sym_list, 'price': price_list, 'id': id_list, }) for _ in range(50000): s.run("appendStreamingData", tb)
使用函数视图可以将数据作为函数参数传递给 DolphinDB,这也将极大方便 DolphinDB 中的数据清洗、类型转换等任务,结合使用异步模式可以进一步提高总体的性能。