构造 DBConnectionPool

DBConnectionPool (连接池)可以实现并发执行脚本。由前一章节的内容可知,session(会话控制)可以实现 API 客户端与 DolphinDB 之间的信息交互。Python API 通过 session 在 DolphinDB 上执行脚本和函数,同时实现双向的数据传递。但由于 session 只能调用 run() 方法来串行执行脚本,且无法在多线程中使用同一 session 执行脚本。因此,若需要并发地执行脚本,建议使用 DBConnectionPool 以提高任务运行的效率。

DBConnectionPool 通过创建多个线程以实现并发执行任务。如下展示创建一个 DBConnectionPool 的完整示例:

DBConnectionPool(host, port, threadNum=10, userid=None, password=None, 
                 loadBalance=False, highAvailability=False, compress=False,  
                 reConnect=False, python=False, protocol=PROTOCOL_DEFAULT,
                 *,
                 show_output=True, sqlStd=SqlStd.DolphinDB, tryReconnectNums=None)

通过调用方法函数 getSessionId() 来获取 DBConnectionPool 对象创建的所有线程会话的 session id。若不再使用当前 DBConnectionPool,API 会在析构时自动释放连接。

以下内容将对创建 DBConnectionPool 的相关参数进行详细说明。

连接参数 host, port, threadNum, userid, password

  • host :所连接服务器的地址。
  • port :所连接服务器的端口。
  • threadNum :建立连接的数量,默认为10。
  • userid :登录时的用户名。
  • password :登录时用户名对应的密码。

用户可以使用指定的域名(或 IP 地址)和端口号把 DBConnectionPool 连接到 DolphinDB,并且在建立连接的同时登录账号。使用示例如下:

import dolphindb as ddb

# 连接地址为localhost,端口为8848的DolphinDB,连接数为10
pool = ddb.DBConnectionPool("localhost", 8848)

# 连接地址为localhost,端口为8848的DolphinDB,登录用户名为admin,密码为123456的账户,连接数为8
pool = ddb.DBConnectionPool("localhost", 8848, 8, "admin", "123456")

注意

  • 在构造 DBConnectionPool 时,必须指定参数 host, port

负载均衡参数 loadBalance

  • loadBalance:连接池负载均衡相关配置参数,默认值为 False。

该参数的默认值为 False,表示不开启负载均衡。若要开启负载均衡,则将参数设置为 True。示例脚本如下:

import dolphindb as ddb

# 创建连接池;开启负载均衡
pool = ddb.DBConnectionPool("localhost", 8848, 8, loadBalance=True)

注意,在负载均衡模式下:

  • 如果开启高可用,则可连接节点为集群中所有数据节点。此时负载均衡参数无效。
  • 如果不开启高可用模式,则 DBConnectionPool 会向所有可连接的数据节点均匀建立连接。例如,集群中有 3 个节点,当前连接数分别为[5, 12, 13],DBConnectionPool 的连接数为 6,则在建立连接后,集群中 3 个节点的连接数分别为[7, 14, 15],即每个节点均增加 2 个连接数。

高可用参数 highAvailability

  • highAvailability :是否在集群所有节点上进行高可用配置,默认值为 False。

在高可用模式下,如果不启用负载均衡模式,DBConnectionPool 会和当前集群中负载最小的节点建立连接。但由于 DBConnectionPool 中的连接为同时建立,每个连接计算出的负载值几乎一致,导致所有连接会和同一个节点建立连接,故无法保证节点资源的负载均衡。

示例脚本如下:

import dolphindb as ddb

# 创建连接池;开启高可用,使用集群所有节点作为高可用节点
pool = ddb.DBConnectionPool("localhost", 8848, 8, "admin", "123456", highAvailability=True)

压缩参数 compress

  • compress:当前连接是否开启压缩,默认参数为 False。

该模式适用于大数据量的写入或查询。压缩数据后再传输,这可以节省网络带宽,但会增加 DolphinDB 和 API 端的计算量。使用示例如下:

import dolphindb as ddb
import dolphindb.settings as keys

# api version >= 1.30.21.1,开启压缩,需指定协议为PROTOCOL_DDB
pool = ddb.DBConnectionPool("localhost", 8848, 8, compress=True, protocol=keys.PROTOCOL_DDB)

# api version <= 1.30.19.4,开启压缩,默认使用协议为PROTOCOL_DDB,即enablePickle=False
pool = ddb.DBConnectionPool("localhost", 8848, 8, compress=True)

注意

  • DolphinDB 自1.30.6版本起支持压缩参数 compress
  • 目前仅在配置协议参数 protocol 为 PROTOCOL_DDB 的情况下支持开启压缩。(API version<=1.30.19.4 时,默认协议使用PROTOCOL_DDB,支持开启压缩)

重连参数 reConnect, tryReconnectNums

  • reConnect:bool 类型,,默认值为 False.在不开启高可用的情况下,是否在 API 检测到连接异常时进行重连。
  • tryReconnectNums:int 类型,表示重连尝试次数。
    • 若不开启高可用,须与 reconnect 参数搭配使用,对单节点进行有限次重连。若不填写该参数,默认进行无限重连。
    • 当开启 highAvailability 高可用参数时,
      • 若指定该参数,将在断开连接后遍历可用节点列表内的每个节点进行有限次重连。一次遍历中,每个节点只会被重连一次,最多进行 tryReconnectNums 次遍历尝试。
      • 若不填写该参数,默认是无限重连。

若开启高可用模式,则 API 在检测到连接异常时将自动进行重连,不需要设置参数 reConnect。若未开启高可用,通过配置 reConnect = True,即可实现 API 在检测到连接异常时进行重连。使用示例如下:

import dolphindb as ddb

# 创建连接池;开启重连
pool = ddb.DBConnectionPool("localhost", 8848, 8, reConnect=True, tryReconnectNums=5)

协议参数 protocol

  • protocol:API 与 DolphinDB 交互时使用的数据格式协议,默认值为 PROTOCOL_DEFAULT,表示使用 PROTOCOL_DDB。注:3.0.1.1 及之前版本,protocol 默认使用 PROTOCOL_PICKLE。

目前 DolphinDB 支持三种协议:PROTOCOL_DDB, PROTOCOL_PICKLE, PROTOCOL_ARROW。使用不同的协议,会影响 API 执行 DolphinDB 脚本后接收到的数据格式。有关协议的详细说明请参考章节类型转换

import dolphindb.settings as keys

# 使用协议 PROTOCOL_DDB
pool = ddb.DBConnectionPool("localhost", 8848, 10, protocol=keys.PROTOCOL_DDB)

# 使用协议 PROTOCOL_PICKLE
pool = ddb.DBConnectionPool("localhost", 8848, 10, protocol=keys.PROTOCOL_PICKLE)

# 使用协议 PROTOCOL_ARROW
pool = ddb.DBConnectionPool("localhost", 8848, 10, protocol=keys.PROTOCOL_ARROW)

注意:在 1.30.21.1 版本及之后,API 支持使用 protocol 来指定数据格式协议。1.30.19.4 版本及之前,默认 API 内部使用 PROTOCOL_DDB,即 enablePickle=False

其他参数 show_output

  • show_output:是否在执行后打印脚本中 print 语句的输出。默认值为 True,表示打印 print 语句输出。

使用示例如下:

# 启用 show_output
pool = ddb.DBConnectionPool("localhost", 8848, 8, show_output=True)
taskid = 12
pool.addTask("print(1);2", taskId=taskid)
while True:
    if pool.isFinished(taskId=taskid):
        break
    time.sleep(0.01)

res = pool.getData(taskId=taskid)
print(res)

// output:
1
2


# 不启用 show_output
pool = ddb.DBConnectionPool("localhost", 8848, 8, show_output=False)
taskid = 12
pool.addTask("print(1);2", taskId=taskid)
while True:
    if pool.isFinished(taskId=taskid):
        break
    time.sleep(0.01)

res = pool.getData(taskId=taskid)
print(res)

// output:
2

SQL 方言参数 sqlStd

  • sqlStd :执行脚本时采用的 SQL 方言标准。现支持三种方言:DolphinDB(默认值),Oracle 和 MySQL。

注:DolphinDB 服务端自 2.00.10、1.30.22 起开始支持 Oracle 和 MySQL 方言。Python AIP 自 3.0.2.0 版本起开始支持该参数, 方便用户选择方言。

在使用时,需要从 dolphindb.settings 中引入 SqlStd,并通过 SqlStd 枚举类型来指定该参数。

import dolphindb as ddb
from dolphindb.settings import SqlStd

pool = ddb.DBConnectionPool("localhost", 8848, 10, sqlStd=SqlStd.Oracle)

其他参数 python

  • python:是否启用 python parser 特性。

指定该参数后,可以在 DBConnectionPool.run 执行脚本时启用 python parser 特性.使用示例如下:

import dolphindb as ddb

# 启用 python parser 特性
pool = ddb.DBConnectionPool("localhost", 8848, 10, python=True)

注意: 仅支持 DolphinDB 3.00 版本。