方法介绍
本节将介绍连接池 DBConnectionPool 的三类常用方法,该三类方法兼可用于异步执行脚本,用户可根据实际需求选用不同的方法。
- run 是一个协程函数,其内部维护递增的 taskId,可以使用协程方式进行调用。
- addTask,isFinished,getData 方法是将脚本任务提交给 DBConnectionPool,由 DBConnectionpool 直接维护任务的异步进行和获得返回值,在使用时需要用户自行指定 taskId。
- runTaskAsync 是在 DBConnectionPool 的内部维护一个事件循环,被调用后将使用 run 方法在该内部事件循环中执行脚本任务,并返回一个
concurrent.futures.Future
对象。
注意: 由于第一种和第三种方法的 taskId 由系统自动生成,而第二种方法的 taskId 由用户自行指定,故为了避免 taskId 冲突,建议用户在实际使用中不要将第二种方法和第一/三种方法混用。
run
run(script, *args, **kwargs)
- script:待执行的 DolphinDB 脚本。
- *args:传递给 DolphinDB 函数的参数。
- **kwargs:
- clearMemory:是否在查询后释放变量。默认值为 True,表示释放。
- pickleTableToList:是否将结果中的 Table 类型对象转换为 list 类型对象。True 表示转换为 list 类型对象,False 表示转换为 DataFrame 类型对象,该参数默认值为 False。
- priority:设置任务执行的优先级,默认值为 4。取值范围为 0-9,取值越高表示任务执行的优先级越高。Python API 自 1.30.22.2 版本起,开始支持该参数。
- parallelism:设置任务的并行度。用于设置在单个数据节点上同时执行该作业产生的子任务的最大线程数量,默认值为 2。Python API 自 1.30.22.2 版本起,开始支持该参数。
addTask 根据 taskId 将任务提交至 DBConnectionPool 的连接池中,由连接池分配连接执行脚本任务。如下所示,调用
addTask
向连接池中添加一个 taskId 为 12 的任务。
为了提高效率,DBConnectionPool 中的 run 方法被包装成了协程函数,通过 run 方法将脚本传入线程池中调用线程运行,因此在 Python 中调用 run 方法时需要使用协程以进行使用。
示例 1
以下内容介绍一个简单的固定任务示例。
首先,创建一个最大连接数为8的连接池 DBConnectionPool。和通常连接池有所不同,当不再使用连接时,API 不会立刻销毁该连接,而是直到析构 DBConnectionPool 时才进行销毁,或者手动执行 shutDown()
关闭 DBConnectionPool 时才会销毁连接。
import dolphindb as ddb import time import asyncio pool = ddb.DBConnectionPool("localhost", 8848, 8)
创建一个协程任务函数,使用 sleep
模拟一段运行的时间。
async def test_run(i):
try:
return await pool.run(f"sleep(2000);1+{i}")
except Exception as e:
print(e)
定义任务列表,并创建一个事件循环对象,运行任务列表直到完成全部任务。
tasks = [ asyncio.ensure_future(test_run(1)), asyncio.ensure_future(test_run(3)), asyncio.ensure_future(test_run(5)), asyncio.ensure_future(test_run(7)), ] loop = asyncio.get_event_loop() try: time_st = time.time() loop.run_until_complete(asyncio.wait(tasks)) time_ed = time.time() except Exception as e: print("catch e:") print(e)
任务结束后,打印执行时间和各个任务的结果,并关闭连接池对象。
print("time: ", time_ed-time_st)
for task in tasks:
print(task.result())
pool.shutDown()
期望的输出结果如下所示:
time: 2.0017542839050293
2
4
6
8
上述例子展示了已固定脚本任务调用 DBConnectionPool 的用法,在 Python 中只有一个主线程,但使用了协程创建子任务并调用 DBConnectionPool 以实现运行。须注意,实际上 Python API 的底层实现是通过使用 C++ 线程以维护每一个连接。若提交任务数超出实际线程数,则可能出现任务迟迟没有执行的情况,与通常的协程并发有一定区别。
此外,用户也可以自定义传入脚本的对象,可参考下述示例2。
示例 2
下例定义了一个可以传入自定义脚本作为参数的类,并配合 Python 的多线程机制动态添加子任务。
import dolphindb as ddb import time import asyncio import threading # 在该例子中主线程负责创建协程对象传入自定义脚本并调用自定义的对象去运行,并新起子线程运行事件循环防止阻塞主线程。 class DolphinDBHelper(object): pool = ddb.DBConnectionPool("localhost", 8848, 10) @classmethod async def test_run(cls,script): print(f"run script: [{script}]") return await cls.pool.run(script) @classmethod async def runTest(cls,script): start = time.time() task = loop.create_task(cls.test_run(script)) result = await asyncio.gather(task) print(f"""[{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}] time: {time.time()-start} result: {result}""") return result # 定义一个跑事件循环的线程函数 def start_thread_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() if __name__=="__main__": start = time.time() print("In main thread",threading.current_thread()) loop = asyncio.get_event_loop() # 在子线程中运行事件循环, 让它 run_forever t = threading.Thread(target= start_thread_loop, args=(loop,)) t.start() task1 = asyncio.run_coroutine_threadsafe(DolphinDBHelper.runTest("sleep(1000);1+1"),loop) task2 = asyncio.run_coroutine_threadsafe(DolphinDBHelper.runTest("sleep(3000);1+2"),loop) task3 = asyncio.run_coroutine_threadsafe(DolphinDBHelper.runTest("sleep(5000);1+3"),loop) task4 = asyncio.run_coroutine_threadsafe(DolphinDBHelper.runTest("sleep(1000);1+4"),loop) end = time.time() print("main thread time: ", end - start)
运行结果如下:
In main thread <_MainThread(MainThread, started 139838803788160)>
main thread time: 0.00039839744567871094
run script: [sleep(1000);1+1]
run script: [sleep(3000);1+2]
run script: [sleep(5000);1+3]
run script: [sleep(1000);1+4]
[2023-03-14 16:46:56] time: 1.0044968128204346 result: [2]
[2023-03-14 16:46:56] time: 1.0042989253997803 result: [5]
[2023-03-14 16:46:58] time: 3.0064148902893066 result: [3]
[2023-03-14 16:47:00] time: 5.005709409713745 result: [4]
上述例子中,在主线程中创建子线程开启事件循环,并指定该事件循环一直保持运行。随后向该事件循环中加入四个脚本执行任务,每个任务分别需要耗时 1s、3s、5s和1s。从主线程打印 main thread time: 0.00039839744567871094
可以看出,四个事件放入事件循环后实现了异步执行,随后每个协程都打印自身执行的结束时间和时长。由于任务1和任务4耗时一致,因此同时打印结果;2s后任务2执行结束;再过2s后任务3也执行结束。由结果可知,四个任务的执行结果符合并发执行的预期。
addTask, isFinished, getData
不同于协程函数 run,addTask 会将用户脚本任务按照 taskId 直接提交给 DBConnectionPool 执行。用户可以通过 isFinished
判断线程池中的任务是否结束,并使用 getData
获取任务的返回结果。下述内容将依次介绍三个函数。
addTask
addTask(script, taskId, *args, **kwargs)
- script:待执行的 DolphinDB 脚本。
- taskId:指定的任务 Id。
- *args:传递给 DolphinDB 函数的参数。
- **kwargs:
- clearMemory:是否查询后释放变量。True 表示释放,默认值为 True。
- pickleTableToList:是否将结果的 Table 类型对象转换为 list 类型对象。True 表示转换为 list 类型对象,False 表示转换为 DataFrame 类型对象,默认值为 False。
addTask 根据 taskId 将任务提交至 DBConnectionPool 的连接池中,由连接池分配连接执行脚本任务。如下所示,调用 addTask
向连接池中添加一个 taskId 为 12 的任务。
pool.addTask("sleep(1000);1+2", taskId=12)
isFinished
isFinished(taskId)
- taskId:查询的任务 Id。
该函数通过 taskId 来查询对应任务是否已经完成。如果已完成,则返回 True;反之返回 False。
简单使用示例如下:
if pool.isFinished(taskId=12): print("task has done!")
getData
getData(taskId)
- taskId:查询的任务 Id。
该函数通过 taskId 来查询对应任务的返回结果。
简单使用示例如下:
res = pool.getData(taskId=12)
注意 : 每次执行 addTask
指定 taskId 并创建任务后,使用 getData
方法只能对该 taskID 对应任务的返回结果执行一次查询。若在创建任务后未调用 getData
方法,则在下次使用 addTask
指定同一 taskId 并创建任务时,其执行结果将覆盖掉前一次该 taskId 对应任务的执行结果。
综合示例
在如下脚本中,首先创建一个 DBConnectionPool 连接池对象,然后调用 addTask
向连接池中添加一个 taskId 为12的任务,随后通过 isFinished
方法判断任务是否执行完毕,执行完毕后跳出循环,调用 getData
方法获取任务结果。
import dolphindb as ddb import time pool = ddb.DBConnectionPool("localhost", 8848, 8) taskid = 12 pool.addTask("sleep(1500);1+2", taskId=taskid) while True: if pool.isFinished(taskId=taskid): break time.sleep(0.01) res = pool.getData(taskId=taskid) print(res) # output: 3
runTaskAsync
runTaskAsync(script, *args, **kwargs)
- script:待执行的 DolphinDB 脚本。
- args:传递给 DolphinDB 函数的参数。
- kwargs:
- clearMemory:是否查询后释放变量。True 表示释放,默认值为 True。
- pickleTableToList:是否将结果中的 Table 类型对象转换为 list 类型对象。True 表示转换为 list 类型对象,False 表示转换为 DataFrame 类型对象,默认值为 False。
注意:
- 在1.30.17.4及以前的版本中,该函数的名称为
runTaskAsyn
。 - 若使用该方法异步执行脚本,在任务结束后,用户需要手动调用
pool.shutDown()
才能正确析构连接池对象。
除了使用 run 和 addTask 的方法来执行脚本,DBConnectionPool 还提供了 runTaskAsync
的方法以实现异步执行脚本。
用户可以调用 runTaskAsync
方法向连接池中添加任务,返回一个 concurrent.futures.Future
对象。然后调用这个对象的 result(timeout=None)
方法获得结果(timeout,单位为秒)。如果在 result()
方法中设置了 timeout 参数,任务还未完成,则继续等待 timeout 时间;在 timeout 时间内若任务完成,则将返回结果,否则将抛出 timeoutError 异常。下面演示如何使用 runTaskAsync
创建异步任务。
import dolphindb as ddb import time pool = ddb.DBConnectionPool("localhost", 8848, 10) t1 = time.time() task1 = pool.runTaskAsync("sleep(1000); 1+0;") task2 = pool.runTaskAsync("sleep(2000); 1+1;") task3 = pool.runTaskAsync("sleep(4000); 1+2;") task4 = pool.runTaskAsync("sleep(1000); 1+3;") t2 = time.time() print("Task1 Result: ", task1.result()) t3 = time.time() print("Task2 Result: ", task2.result()) t4 = time.time() print("Task4 Result: ", task4.result()) t5 = time.time() print("Task3 Result: ", task3.result()) t6 = time.time() print("Add Tasks: ", t2-t1) print("Get Task1: ", t3-t1) print("Get Task2: ", t4-t1) print("Get Task4: ", t5-t1) print("Get Task3: ", t6-t1) pool.shutDown()
输出结果如下:
Task1 Result: 1
Task2 Result: 2
Task4 Result: 4
Task3 Result: 3
Add Tasks: 0.0015881061553955078
Get Task1: 1.0128183364868164
Get Task2: 2.0117716789245605
Get Task4: 2.0118134021759033
Get Task3: 4.012163162231445
如上示例中,首先创建一个 DBConnectionPool 连接池对象,调用 runTaskAsync
方法执行 4 个耗时不同的脚本,并分别返回 4 个 concurrent.futures.Future
对象。再调用其 result
方法依次阻塞地获得各个任务的返回值,并将耗时打印出来。如下为打印说明:
- t2 - t1:表示添加任务所耗时间。
- t3 - t1:表示获取task1结果的总耗时。
- t4 - t1:表示获取task2结果的总耗时。
- t5 - t1:表示获取task4结果的总耗时。
- t6 - t1:表示获取task3结果的总耗时。
Task1 执行耗时 1s,因此 t3-t1=1s;Task2 执行耗时 2s,因此 t4-t1=2s;Task4 执行耗时 1s,在等待获取 Task2 结果的时候,Task4 任务已经执行结束,因此 t5-t1=2s;Task3 执行耗时 4s,因此 t6-t1=4s。
其他方法
shutDown
pool.shutDown()
该方法用于关闭不再使用的 DBConnectionPool,停止线程池中使用的事件循环,并且中止所有的异步任务。调用 shutDown 方法后,关闭的线程池不可继续使用。
注意 : 如果使用了 runTaskAsync()
的方式创建异步任务,必须在不使用 DBConnectionPool 时调用该函数。
getSessionId
sessionids = pool.getSessionId()
该方法用于获得当前线程池中所有 session 会话中的 session Id。