In [178]:
#1.模块导入
import dolphindb as ddb
import numpy as np
import pandas as pd
import os
from multiprocessing import Queue, Process
import time
import random

In [179]:
#2.参数配置
# parallel 是指定并发数（并发查询进程数）
parallels = [1, 10, 50, 100]
# 单个进程执行多少次循环查询
sqlNum = 100
# DolphinDB 测试节点信息
ddbIP = "192.198.1.38"
ddbPorts = [8073, 8074]
ddbUserId = "admin"
ddbUserPassword = "123456"
dbName = "dfs://l2TLDB"
tbName = "trade"

In [180]:
s = ddb.session(compress=True)
ddbPort = ddbPorts[random.sample(range(0,2), 1)[0]]
s.connect(host=ddbIP, port=ddbPort, userid=ddbUserId, password=ddbUserPassword)
scripts = """
counts = select count(*) from loadTable("dfs://l2TLDB", "trade") where date(TradeTime) = 2023.02.01 group by SecurityID
exec SecurityID from counts order by count limit 6000
"""
SecurityIDs = s.run(scripts)

In [181]:
#3.定义查询函数
# 单个进程的查询函数
countQue = Queue()
costQue = Queue()
def testSQL(processName, sql):
    s = ddb.session(compress=True)
    ddbPort = ddbPorts[random.sample(range(0,2), 1)[0]]
    s.connect(host=ddbIP, port=ddbPort, userid=ddbUserId, password=ddbUserPassword)
    #print(f"The query of process {os.getpid()} begins, the start time is：{pd.Timestamp.now()},"+"IP:'{ip}'".format(ip=ddbIP) , flush=True)
    count = 0
    cost = 0
    for i in range(sqlNum):
        startTime = pd.Timestamp.now()
        data = s.run(sql)
        costTime = (pd.Timestamp.now() - startTime) / np.timedelta64(1, 'ms')
        cost += costTime
        #count += len(data)
    #countQue.put(count)
    costQue.put(cost)
def main():
    for parallel in parallels:
        # 计划查询总次数
        countQueMax = parallel*sqlNum
        print("并发查询进程数是{0}，单个进程的循环查询次数是{1}，计划查询总次数是{2}。".format(parallel, sqlNum, countQueMax))
        taskList = []
        for i in range(parallel):
            ids = SecurityIDs[random.sample(range(0,6000), 1)[0]]
            sql = "select * from loadTable('{db}', '{tb}') where date(TradeTime) = 2023.02.01 and SecurityID = '{id}'".format(db = dbName, tb = tbName, id = ids)
            testTask = Process(target=testSQL, args=("Process" + str(i), sql))
            taskList.append(testTask)
        startTime = pd.Timestamp.now()
        print("开始执行并行查询的任务的时间是{0}".format(startTime))
        for p in taskList:
            p.start()
        for p in taskList:
            p.join()
        endTime = pd.Timestamp.now()
        sumCount = []
        while countQue.empty() == False:
            data = countQue.get()
            sumCount.append(data)
        sumCount = np.sum(sumCount)
        sumCost = []
        while costQue.empty() == False:
            data = costQue.get()
            sumCost.append(data)
        sumCost = np.sum(sumCost)
        print("结束执行并行查询的任务的时间是{0}".format(endTime))
        print("并发查询任务总耗时是{0}毫秒".format((endTime - startTime) / np.timedelta64(1, 'ms')))
        #print("总查询记录数是{0}".format(sumCount))
        #print("查询平均用时是{0}".format(sumCost / sqlNum / parallel))
        #print("并发查询性能RPS是{0}".format(sumCount/((endTime - startTime) / np.timedelta64(1, 's'))))
        print("并发查询性能QPS是{0}".format(countQueMax/((endTime - startTime) / np.timedelta64(1, 's'))))
        print("每次查询的平均耗时是{0}毫秒".format(((endTime - startTime) / np.timedelta64(1, 'ms'))/countQueMax))
        print("end")

In [182]:
#4.运行查询函数
if __name__ == '__main__':
    main()

并发查询进程数是1，单个进程的循环查询次数是100，计划查询总次数是100。
开始执行并行查询的任务的时间是2024-12-16 17:15:01.206837
结束执行并行查询的任务的时间是2024-12-16 17:15:01.734942
并发查询任务总耗时是528.105毫秒
并发查询性能QPS是189.3562833148711
每次查询的平均耗时是5.2810500000000005毫秒
end
并发查询进程数是10，单个进程的循环查询次数是100，计划查询总次数是1000。
开始执行并行查询的任务的时间是2024-12-16 17:15:01.737297
结束执行并行查询的任务的时间是2024-12-16 17:15:04.190957
并发查询任务总耗时是2453.66毫秒
并发查询性能QPS是407.55442889397875
每次查询的平均耗时是2.4536599999999997毫秒
end
并发查询进程数是50，单个进程的循环查询次数是100，计划查询总次数是5000。
开始执行并行查询的任务的时间是2024-12-16 17:15:04.195746
结束执行并行查询的任务的时间是2024-12-16 17:15:07.496616
并发查询任务总耗时是3300.87毫秒
并发查询性能QPS是1514.7521713972376
每次查询的平均耗时是0.6601739999999999毫秒
end
并发查询进程数是100，单个进程的循环查询次数是100，计划查询总次数是10000。
开始执行并行查询的任务的时间是2024-12-16 17:15:07.501431
结束执行并行查询的任务的时间是2024-12-16 17:15:12.291141
并发查询任务总耗时是4789.71毫秒
并发查询性能QPS是2087.8090740357975
每次查询的平均耗时是0.478971毫秒
end
