AutoFitTableUpsert
Java API 提供了 AutoFitTableUpsert
类来支持更新并写入 DolphinDB 表。
构造方法
AutoFitTableUpsert 提供了以下构造方法:
AutoFitTableUpsert(String dbUrl, String tableName, DBConnection connection, boolean ignoreNull, String[] pkeyColNames, String[] psortColumns)
参数说明
dbUrl String 类型,表示分布式数据库地址。若要写入内存表时,该参数须为空。
tableName String 类型,表示分布式表或内存表的表名。
connection 表示 DBConnection 对象,用于连接 server 并 upsert 数据。
ignoreNull boolean 类型,表示 upsert! 的一个参数。其含义为若 upsert! 的新数据表中某元素为 NULL 值,是否对目标表中的相应数据进行更新。
pkeyColNames String 类型,表示 upsert! 的一个参数。用于指定 DFS 表(目标表)的键值列。
psortColumns String 类型,表示 upsert! 的一个参数。设置该参数,更新的分区内的所有数据会根据指定的列进行排序。排序在每个分区内部进行,不会跨分区排序。
upsert 方法说明
AutoFitTableUpsert
写入并更新数据的方法:
int upsert(BasicTable table)
函数说明
将一个 BasicTable 对象更新到目标表中。若写入成功则返回 0。
使用示例 1
本例首先生成一张分区表,然后使用 AutoFitTableUpsert
向表中更新数据,设置 pkeyColNames 为 id 做为键值列,最后获取更新后的表并在控制台打印该表内容。
@Test
public void testAutoFitTableUpsert() {
DBConnection conn = new DBConnection(false, false, false);
conn.connect("192.168.1.116", 18999, "admin", "123456");
String dbName ="dfs://upsertTable";
String tableName = "pt";
String script = "dbName = \"dfs://upsertTable\"\n"+
"if(exists(dbName)){\n"+
"\tdropDatabase(dbName)\t\n"+
"}\n"+
"db = database(dbName, RANGE,1 10000,,'TSDB')\n"+
"t = table(1000:0, `id`value,[ INT, INT[]])\n"+
"pt = db.createPartitionedTable(t,`pt,`id,,`id)";
conn.run(script);
BasicIntVector v1 = new BasicIntVector(3);
v1.setInt(0, 1);
v1.setInt(1, 100);
v1.setInt(2, 9999);
BasicArrayVector ba = new BasicArrayVector(Entity.DATA_TYPE.DT_INT_ARRAY, 1);
ba.Append(v1);
ba.Append(v1);
ba.Append(v1);
List<String> colNames = new ArrayList<>();
colNames.add("id");
colNames.add("value");
List<Vector> cols = new ArrayList<>();
cols.add(v1);
cols.add(ba);
BasicTable bt = new BasicTable(colNames, cols);
String[] keyColName = new String[]{"id"};
AutoFitTableUpsert aftu = new AutoFitTableUpsert(dbName, tableName, conn, false, keyColName, null);
aftu.upsert(bt);
BasicTable res = (BasicTable) conn.run("select * from pt;");
System.out.println(res.getString());
}
代码输出:
id value
---- ------------
1 [1,100,9999]
100 [1,100,9999]
9999 [1,100,9999]
使用示例 2
本例首先生成一张分区表,然后使用 AutoFitTableUpsert
向表中更新数据,设置 pkeyColNames 为 sym 做为键值列、psortColumns 为 date、val 做为排序列,最后获取更新后的表并在控制台打印该表内容。
@Test
public void test_tableUpsert_DP_sortColumns() throws Exception {
DBConnection conn = new DBConnection();
conn.connect("loaclhost", 8848, "admin", "123456");
String script = "if(existsDatabase(\"dfs://upsert\")) {\n" +
"dropDatabase(\"dfs://upsert\")\n" +
"}\n" +
"sym=`A`B`C`A`D`B`A\n" +
"date=take(2021.12.10,3) join take(2021.12.09, 3) join 2021.12.10\n" +
"price=8.3 7.2 3.7 4.5 6.3 8.4 7.6\n" +
"val=10 19 13 9 19 16 10\n" +
"t=table(sym, date, price, val)\n" +
"db=database(\"dfs://upsert\", VALUE, `A`B`C)\n" +
"pt=db.createPartitionedTable(t, `pt, `sym)\n" +
"pt.append!(t)";
conn.run(script);
BasicTable bt = (BasicTable) conn.run("t1=table(`A`B`E as sym, take(2021.12.11, 3) as date, 11.1 10.5 6.9 as price, 12 9 11 as val);t1;");
String[] keyColName = new String[]{"sym"};
String[] psortColumns = new String[]{"date","val"};
AutoFitTableUpsert aftu = new AutoFitTableUpsert("dfs://upsert","pt",conn,false,keyColName,psortColumns);
aftu.upsert(bt);
BasicTable ua = (BasicTable) conn.run("select * from pt");
System.out.println(ua.getString());
}
执行结果:
sym date price val
--- ---------- ----- ---
A 2021.12.09 4.5 9
A 2021.12.10 7.6 10
A 2021.12.11 11.1 12
B 2021.12.09 8.4 16
B 2021.12.11 10.5 9
C 2021.12.10 3.7 13
D 2021.12.09 6.3 19
E 2021.12.11 6.9 11