PartitionedTableAppender

Java API 提供 PartitionedTableAppender 类,可以将数据从 Java 客户端并发写入到 DolphinDB 分布式表中。

PartitionedTableAppender 的基本原理是构造一个连接池用于多线程写入,然后调用 server 的 schema 函数以获取分布式表的分区信息,按指定的分区列将用户写入的数据进行分类分别交给不同的连接来并行写入。

构造方法

DolphinDB 不允许多个 writer 同时将数据写入到同一个分区,因此在客户端多线程并行写入数据时,需要确保每个线程分别写入不同的分区。Java API 提供了自动按分区分流数据并行写入的简便方法,构造方法如下:

public PartitionedTableAppender(String dbUrl, String tableName, String partitionColName, String appendFunction, DBConnectionPool pool)

参数介绍

dbUrl String 类型,表示分布式数据库地址。

tableName String 类型,表示分布式表名。

partitionColName String 类型,表示分区字段。

appendFunction String 类型,表示自定义写入函数名,可选参数。若不填该参数,则调用内置 tableInsert 函数。

pool 表示连接池,能够并行写入数据。

构造示例代码

DBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, true, true);
PartitionedTableAppender appender = new PartitionedTableAppender(dbUrl, tableName , "sym", pool);

使用示例

本示例为创建表与线程池后向一个分区表中插入 1000 万条数据。

注意,本例采用较大数据量,如果 DolphinDB 服务器配置较低则可能会出现死机现象,请谨慎运行。推荐配置为 CPU 8 核,内存 16G 来运行此段代码。

DBConnection conn = new DBConnection();
conn.connect(HOST,PORT,USER,PASSWORD);

//创建分区表
String script = "\n" +
        "t = table(timestamp(1..10)  as date,int(1..10) as sym,string(1..10) as str)\n" +
        "db1=database(\"\",VALUE,date(now())+0..100)\n" +
        "db2=database(\"\",RANGE,int(1..10))\n" +
        "if(existsDatabase(\"dfs://demohash\")){\n" +
        "\tdropDatabase(\"dfs://demohash\")\n" +
        "}\n" +
        "db =database(\"dfs://demohash\",COMPO,[db1,db2])\n" +
        "pt = db.createPartitionedTable(t,`pt,`date`sym)\n";
conn.run(script);

//创建线程池
ExclusiveDBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, false, true);
PartitionedTableAppender appender = new PartitionedTableAppender(DBPATH, TABLE_NAME, "sym", pool);
List<String> colNames = new ArrayList<String>(3);
colNames.add("date");
colNames.add("sym");
colNames.add("str");
List<Vector> cols = new ArrayList<Vector>(3);
BasicTimestampVector date = new BasicTimestampVector(10000);
for (int i =0 ;i<10000;i++)
    date.setTimestamp(i, LocalDateTime.now());
cols.add(date);
BasicIntVector sym = new BasicIntVector(10000);
for (int i =0 ;i<10000;i+=4) {
    sym.setInt(i, 1);
    sym.setInt(i + 1, 2);
    sym.setInt(i + 2, 3);
    sym.setInt(i + 3, 4);
}
cols.add(sym);
BasicStringVector str = new BasicStringVector(10000);
for (int i =0 ;i<10000;i++) {
    str.setString(i,"32");
}
cols.add(str);
for (int i =0 ;i<1000;i++) {
    int m = appender.append(new BasicTable(colNames, cols));
    assertEquals(10000,m);
}
BasicLong re = (BasicLong) conn.run("pt= loadTable(\"dfs://demohash\",`pt)\n" +
        "exec count(*) from pt");
System.out.println(re);
pool.shutdown();

示例输出为:

10000000