AutoFitTableUpsert

Java API 提供了 AutoFitTableUpsert 类来支持更新并写入 DolphinDB 表。

构造方法

AutoFitTableUpsert 提供了以下构造方法:

AutoFitTableUpsert(String dbUrl, String tableName, DBConnection connection, boolean ignoreNull, String[] pkeyColNames, String[] psortColumns)

参数说明

dbUrl String 类型,表示分布式数据库地址。若要写入内存表时,该参数须为空。

tableName String 类型,表示分布式表或内存表的表名。

connection 表示 DBConnection 对象,用于连接 server 并 upsert 数据。

ignoreNull boolean 类型,表示 upsert! 的一个参数。其含义为若 upsert! 的新数据表中某元素为 NULL 值,是否对目标表中的相应数据进行更新。

pkeyColNames String 类型,表示 upsert! 的一个参数。用于指定 DFS 表(目标表)的键值列。

psortColumns String 类型,表示 upsert! 的一个参数。设置该参数,更新的分区内的所有数据会根据指定的列进行排序。排序在每个分区内部进行,不会跨分区排序。

upsert 方法说明

AutoFitTableUpsert 写入并更新数据的方法:

int upsert(BasicTable table)

函数说明

将一个 BasicTable 对象更新到目标表中。若写入成功则返回 0。

使用示例 1

本例首先生成一张分区表,然后使用 AutoFitTableUpsert 向表中更新数据,设置 pkeyColNames 为 id 做为键值列,最后获取更新后的表并在控制台打印该表内容。

@Test
public void testAutoFitTableUpsert() {
    DBConnection conn = new DBConnection(false, false, false);
    conn.connect("192.168.1.116", 18999, "admin", "123456");
    String dbName ="dfs://upsertTable";
    String tableName = "pt";
    String script = "dbName = \"dfs://upsertTable\"\n"+
            "if(exists(dbName)){\n"+
            "\tdropDatabase(dbName)\t\n"+
            "}\n"+
            "db  = database(dbName, RANGE,1 10000,,'TSDB')\n"+
            "t = table(1000:0, `id`value,[ INT, INT[]])\n"+
            "pt = db.createPartitionedTable(t,`pt,`id,,`id)";
    conn.run(script);

    BasicIntVector v1 = new BasicIntVector(3);
    v1.setInt(0, 1);
    v1.setInt(1, 100);
    v1.setInt(2, 9999);

    BasicArrayVector ba = new BasicArrayVector(Entity.DATA_TYPE.DT_INT_ARRAY, 1);
    ba.Append(v1);
    ba.Append(v1);
    ba.Append(v1);

    List<String> colNames = new ArrayList<>();
    colNames.add("id");
    colNames.add("value");
    List<Vector> cols = new ArrayList<>();
    cols.add(v1);
    cols.add(ba);
    BasicTable bt = new BasicTable(colNames, cols);
    String[] keyColName = new String[]{"id"};
    AutoFitTableUpsert aftu = new AutoFitTableUpsert(dbName, tableName, conn, false, keyColName, null);
    aftu.upsert(bt);
    BasicTable res = (BasicTable) conn.run("select * from pt;");
    System.out.println(res.getString());
}

代码输出:

id   value       
---- ------------
1    [1,100,9999]
100  [1,100,9999]
9999 [1,100,9999]

使用示例 2

本例首先生成一张分区表,然后使用 AutoFitTableUpsert 向表中更新数据,设置 pkeyColNames 为 sym 做为键值列、psortColumns 为 date、val 做为排序列,最后获取更新后的表并在控制台打印该表内容。

@Test
public void test_tableUpsert_DP_sortColumns() throws Exception {
    DBConnection conn = new DBConnection();
    conn.connect("loaclhost", 8848, "admin", "123456");
    String script = "if(existsDatabase(\"dfs://upsert\")) {\n" +
            "dropDatabase(\"dfs://upsert\")\n" +
            "}\n" +
            "sym=`A`B`C`A`D`B`A\n" +
            "date=take(2021.12.10,3) join take(2021.12.09, 3) join 2021.12.10\n" +
            "price=8.3 7.2 3.7 4.5 6.3 8.4 7.6\n" +
            "val=10 19 13 9 19 16 10\n" +
            "t=table(sym, date, price, val)\n" +
            "db=database(\"dfs://upsert\", VALUE,  `A`B`C)\n" +
            "pt=db.createPartitionedTable(t, `pt, `sym)\n" +
            "pt.append!(t)";
    conn.run(script);
    BasicTable bt = (BasicTable) conn.run("t1=table(`A`B`E as sym, take(2021.12.11, 3) as date, 11.1 10.5 6.9 as price, 12 9 11 as val);t1;");
    String[] keyColName = new String[]{"sym"};
    String[] psortColumns = new String[]{"date","val"};
    AutoFitTableUpsert aftu = new AutoFitTableUpsert("dfs://upsert","pt",conn,false,keyColName,psortColumns);
    aftu.upsert(bt);
    BasicTable ua = (BasicTable) conn.run("select * from pt");
    System.out.println(ua.getString());
}

执行结果:

sym date       price val
--- ---------- ----- ---
A   2021.12.09 4.5   9  
A   2021.12.10 7.6   10 
A   2021.12.11 11.1  12 
B   2021.12.09 8.4   16 
B   2021.12.11 10.5  9  
C   2021.12.10 3.7   13 
D   2021.12.09 6.3   19 
E   2021.12.11 6.9   11