连接池 ExclusiveDBConnectionPool

Java API 提供 ExclusiveDBConnectionPool 连接池用以执行任务。用户可以通过 execute 方法执行任务,然后使用 BasicDBTaskgetResult 方法获取该任务的执行结果。

构造

构造方法:

ExclusiveDBConnectionPool(String host, int port, String uid, String pwd, int count, boolean loadBalance, boolean enableHighAvailability, String[] highAvailabilitySites, String initialScript,boolean compress, boolean useSSL, boolean usePython)

参数介绍:

host String 类型,表示主机名。

port int 类型,表示端口。

uid String 类型,可选参数,表示登录的用户名,默认值为空 。

pwd String 类型,可选参数,表示登录的密码,默认值为空。

count 正整数,表示连接池中的连接数量。当提交的任务数量大于连接池的连接数时,部分任务会进行等待,直到出现空闲的连接再进行分配。

loadBalance boolean 类型,表示在初始化连接时是否开启负载均衡。

  • 如果不开启,连接池中的所有连接都按照 hostport 建立连接。
  • 如果开启,在连接池初始化时,API 会获取集群中所有的节点信息,然后将连接池中的连接均匀分配给这些节点。

enableHighAvailability boolean 类型,可选参数,表示是否针对连接池中的连接开启高可用,默认为 false,表示不开启。

highAvailabilitySites String 类型,可选参数,表示所有可用节点的地址和端口,格式为 ip:port。须开启 enableHighAvailability 后才可使用,默认为 null。该参数处理逻辑如下:

  • 如果为空,使用 3.00.0.1 之前版本,则仅支持连接通过 hostport 指定的节点;使用 3.00.0.1 及之后版本,会将集群中所有节点做为高可用节点。
  • 如果不为空,此时会将 highAvailabilitySites 指定的节点做为高可用节点。若当前节点断开,会按照 highAvailabilitySites 中填写的节点顺序进行切换。

initialScript String 类型,可选参数,表示初始化的脚本,默认为空。

compress boolean 类型,可选参数,表示是否在下载时压缩数据,默认值为 false,表示不压缩。该模式适用于大数据量的查询。压缩数据后再传输可以节省网络带宽,但也会增加 DolphinDB 服务器和 API 客户端的计算量。

useSSL boolean 类型,可选参数,表示是否使用 SSL 连接,默认值为 false,表示不使用。注意:若要开启 SSL 连接,服务器端的配置文件(单节点:dolphindb.cfg,集群:cluster.cfg)须同时配置功能参数 enableHTTPS=true

usePython boolean 类型,可选参数,表示是否开启 Python 解析,默认值为 false,表示不开启。

示例

以下构造示例表示建立一个大小为 5 的线程池,开启负载均衡 loadBalance = true,开启高可用 enableHighAvailability = true,指定高可用节点 highAvailablitySites,initialScript 为 ”clearAllCache()” 表示清除缓存数据包含分区表中已经载入内存的数据和分布式计算中 map-reduce 任务的中间结果,开启压缩 compress = true,使用 SSL 链接 useSSL = true,不使用 Python 解析。

@Test
public void testDBConnectionPool() throws IOException {
    String[] highAvailabilitySites = new String[]{"192.168.1.167:18921", "192.168.1.167:18922", "192.168.1.167:18923"};
    DBConnectionPool pool = new ExclusiveDBConnectionPool("192.168.0.68", 8848, "admin","123456", 5, true, true, highAvailabilitySites, "", true, true, false);
}

方法介绍

ExclusiveDBConnectionPool 支持的方法如下表:

方法名详情
ExclusiveDBConnectionPool(string host, int port, string uid,string pwd, int count, bool loadBalance,bool enableHighAvailability, string[] highAvailabilitySites = null, string initialScript, bool compress = false, bool useSSL = false, bool usePython = false)构造方法,参数 count 为连接数,loadBalance 为 true 会连接不同的节点
execute(IDBTask task)执行任务
execute(List tasks)执行批量任务
getConnectionCount()获取连接数
shutdown()关闭连接池。请注意,若当前 ExclusiveDBConnectionPool 线程池不再使用,会自动被释放,但存在释放延时,可以通过调用 shutdown() 等待线程任务执行结束后立即释放连接。

BasicDBTask 包装了需要执行的脚本和参数。

方法名详情
BasicDBTask(string script, List args)script 为需要执行的函数,args 为参数。
BasicDBTask(string script)需要执行的脚本
isSuccessful()任务是否执行成功
getResult()获取脚本运行结果
getErrorMsg()获取任务运行时发生的异常信息

创建一个任务

Java API 提供了 BasicDBTask 对象帮助用户创建一个任务。BasicDBTask 包装了需要执行的脚本、参数、执行的状态和结果。以下为 BasicDBTask 构造方法:

public BasicDBTask(String script)

参数介绍:

script 表示要执行的脚本。

示例代码:

@Test
public void testBasicDBTask() {
    BasicDBTask task = new BasicDBTask("x=1;");
}

提交任务 execute

创建好任务后,您可以使用 excute 方法向连接池提交一个任务,execute 方法参数如下:

task 任务对象。

tasks 批量任务对象,当需要提交多个任务时,使用 tasks 替代 task。

timeOut 超时时间,单位毫秒数,可选参数;默认值为-1,表示一直等待任务结束。

代码示例如下,执行一个简单脚本任务,并设置超时时间为 10000:

@Test
public void testExclusiveDBConnectionPool() throws IOException {
    ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool("192.168.0.68",8848,"admin","123456",5,false,false);
    DBTask task = new BasicDBTask("x=1; x;");
    pool.execute(task, 10000);
    pool.shutdown();
}

提交多个任务

当需要同时提交多个任务时,为避免逐个提交的复杂性,用户可以声明一个泛型为BasicDBTask 的 List 以实现一次提交多个任务的目的。示例代码如下(其中 DBTaskBasicDBTask 类的接口):

@Test
public void testExclusiveDBConnectionPool() throws IOException {
    DBConnectionPool pool = new ExclusiveDBConnectionPool("192.168.0.68",8848,"admin","123456",5,false,false);
    List<DBTask> dbTaskList = new ArrayList<>();
    dbTaskList.add(new BasicDBTask("select * from loadTable(\"dfs://test\",`tb1)"));
    dbTaskList.add(new BasicDBTask("x = 1; x;"));
    dbTaskList.add(new BasicDBTask("sum(1 2 3 NULL 4);"));
    pool.execute(dbTaskList);
    pool.shutdown();
}

获取任务的返回值

可以通过 BasicDBTask 类的以下方法获取任务执行的返回值和信息:

  • boolean isSuccessful() :返回一个 boolean 类型,以查询是否执行成功。
  • Entity getResult() :返回一个 Entity 对象,以获取 Task 执行的返回值。
  • String getErrorMsg():返回一个 String 类型变量,以查询执行期间的错误信息。

使用示例:

@Test
public void testExclusiveDBConnectionPool() throws IOException {
    DBConnectionPool pool = new ExclusiveDBConnectionPool("192.168.0.68",8848,"admin","123456",5,false,false);
    DBTask task = new BasicDBTask("sum(1 2 3 NULL 4);");
    pool.execute(task);

    // 打印 task 是否执行成功
    System.out.println(task.isSuccessful());

    // 获取 task 执行结果
    System.out.println(task.getResult());

    // 获取 task 错误信息
    System.out.println(task.getErrorMsg());
}

执行结果:

Connect to 192.168.0.68:8848.
Connect to 192.168.0.68:8848.
Connect to 192.168.0.68:8848.
Connect to 192.168.0.68:8848.
Connect to 192.168.0.68:8848.
true
10
null

关闭线程池

用户可通过调用线程池的 shutdown 方法来关闭一个线程池。示例代码如下:

dbConnectionPool.shutdown()

调用 shutdown() 会等待全部的连接池任务都执行完后再释放连接。