基础写入
DolphinDB 数据表按存储方式分为两种:
- 内存表: 数据仅保存在内存中,存取速度最快,但是节点关闭后数据就不存在了。
- 分布式表:数据分布在不同的节点,通过DolphinDB的分布式计算引擎,逻辑上仍然可以像本地表一样做统一查询。
1 保存数据到 DolphinDB 内存表
DolphinDB 提供多种方式来保存数据到内存表:
- 通过
insert into
保存单条数据 - 通过
tableInsert
函数批量保存多条数据 - 通过
tableInsert
函数保存数据表
一般不建议通过append!
函数保存数据,因为append!
函数会返回表的 schema,产生不必要的通信量。
下面分别介绍三种方式保存数据的实例,在例子中使用到的数据表有 4 个列,分别是 STRING, INT, TIMESTAMP, DOUBLE 类型,列名分别为 cstring, cint, ctimestamp, cdouble。
t = table(10000:0,`cstring`cint`ctimestamp`cdouble,[STRING,INT,TIMESTAMP,DOUBLE]) share t as sharedTable
由于内存表是会话隔离的,所以该内存表只有当前会话可见。如果需要在其它会话中访问,需要通过share
在会话间共享内存表。
1.1 使用 insert into
保存单条数据
若将单条数据记录保存到 DolphinDB 内存表,可以使用类似 SQL 语句 insert into。
public void test_save_Insert(String str,int i, long ts,double dbl) throws IOException{
conn.run(String.format("insert into sharedTable values('%s',%s,%s,%s)",str,i,ts,dbl));
}
1.2 使用tableInsert
函数批量保存数组对象
tableInsert
函数比较适合用来批量保存数据,它可将多个数组追加到 DolphinDB 内存表中。若 Java 程序获取的数据可以组织成 List 方式,可使用 tableInsert
函数保存。
public void test_save_TableInsert(List<String> strArray,List<Integer> intArray,List<Long> tsArray,List<Double> dblArray) throws IOException{
//用数组构造参数
List<Entity> args = Arrays.asList(new BasicStringVector(strArray),new BasicIntVector(intArray),new BasicTimestampVector(tsArray),new BasicDoubleVector(dblArray));
conn.run("tableInsert{sharedTable}", args);
}
在本例中,使用了 DolphinDB 中的“部分应用”这一特性,将服务端表名以 tableInsert{sharedTable}
的方式固化到 tableInsert
中,作为一个独立函数来使用。具体文档请参考部分应用文档。
1.3 使用 tableInsert
函数保存 BasicTable 对象
若 Java 程序获取的数据处理后组织成 BasicTable 对象,tableInsert
函数也可以接受一个表对象作为参数,批量添加数据。
public void test_save_table(BasicTable table1) throws IOException {
List<Entity> args = Arrays.asList(table1);
conn.run("tableInsert{shareTable}", args);
}
2. 保存数据到分布式表
分布式表是 DolphinDB 推荐在生产环境下使用的数据存储方式,它支持快照级别的事务隔离,保证数据一致性。分布式表支持多副本机制,既提供了数据容错能力,又能作为数据访问的负载均衡。
2.1 使用 tableInsert
函数保存 BasicTable 对象
dbPath = 'dfs://testDatabase' tbName = 'tb1' if(existsDatabase(dbPath)){dropDatabase(dbPath)} db = database(dbPath,RANGE,2018.01.01..2018.12.31) db.createPartitionedTable(t,tbName,'ctimestamp')
DolphinDB 提供 loadTable
方法可以加载分布式表,通过 tableInsert
方式追加数据,具体的脚本示例如下:
public void test_save_table(String dbPath, BasicTable table1) throws IOException{ List<Entity> args = new ArrayList<Entity>(1); args.add(table1); conn.run(String.format("tableInsert{loadTable('%s','tb1')}",dbPath), args); }
Java 程序中的数组或列表,也可以很方便的构造出 BasicTable 用于追加数据。例如若有 boolArray, intArray, dblArray, dateArray, strArray 这 5 个列表对象(List<T>),可以通过以下语句构造 BasicTable 对象:
List<String> colNames = Arrays.asList("cbool","cint","cdouble","cdate","cstring"); List<Vector> cols = Arrays.asList(new BasicBooleanVector(boolArray),new BasicIntVector(intArray),new BasicDoubleVector(dblArray),new BasicDateVector(dateArray),new BasicStringVector(strArray)); BasicTable table1 = new BasicTable(colNames,cols);
2.2 分布式表的并发写入
DolphinDB 的分布式表支持并发读写,下面展示如何在 Java 客户端中将数据并发写入 DolphinDB 的分布式表。
注意:DolphinDB 不允许多个 writer 同时将数据写入到同一个分区,因此在客户端多线程并行写入数据时,需要确保每个线程分别写入不同的分区。Java API 提供了自动按分区分流数据并行写入的简便方法,函数定义如下:
public PartitionedTableAppender(String dbUrl, String tableName, String partitionColName, String appendFunction, DBConnectionPool pool)
- dbUrl 必填,表示分布式数据库地址。
- tableName 必填,表示分布式表名。
- partitionColName 必填,表示分区字段。
- appendFunction 可选,表示自定义写入函数名,不填此参数则调用内置tableInsert函数。
- pool 表示连接池,并行写入数据。
DBConnectionPool pool = new ExclusiveDBConnectionPool(HOST, PORT, "admin", "123456", 3, true, true); PartitionedTableAppender appender = new PartitionedTableAppender(dbUrl, tableName , "sym", pool);
首先,在 DolphinDB 服务端执行以下脚本,创建分布式数据库"dfs://DolphinDBUUID"和分布式表"device_status"。其中,数据库按照 VALUE-HASH-HASH 的组合进行三级分区。
t = table(timestamp(1..10) as date,string(1..10) as sym) db1=database(\"\",HASH,[DATETIME,10]) db2=database(\"\",HASH,[STRING,5]) if(existsDatabase(\"dfs://demohash\")){ dropDatabase(\"dfs://demohash\") } db =database(\"dfs://demohash\",COMPO,[db2,db1]) pt = db.createPartitionedTable(t,`pt,`sym`date)
请注意:DolphinDB 不允许多个 writer 同时将数据写入到同一个分区,因此在客户端多线程并行写入数据时,需要确保每个线程分别写入不同的分区。
使用 1.30 版本以上的 server,可以使用 Java API 中的 PartitionedTableAppender
类来写入分布式表,其基本原理是设计一个连接池用于多线程写入,然后利用 server 的 schema
函数获取分布式表的分区信息,按指定的分区列将用户写入的数据进行分类分别交给不同的连接来并行写入。
使用示例脚本如下:
DBConnectionPool conn = new ExclusiveDBConnectionPool(host, Integer.parseInt(port), "admin", "123456", Integer.parseInt(threadCount), false, false); PartitionedTableAppender appender = new PartitionedTableAppender(dbPath, tableName, "gid", "saveGridData{'" + dbPath + "','" + tableName + "'}", conn); BasicTable table1 = createTable(); appender.append(table1);
3 读取和使用数据表
3.1 读取分布式表
- 在 Java API 中读取分布式表使用如下代码一次性读取数据
String dbPath = "dfs://testDatabase"; String tbName = "tb1"; DBConnection conn = new DBConnection(); conn.connect(SERVER, PORT, USER, PASSWORD); BasicTable table = (BasicTable)conn.run(String.format("select * from loadTable('%s','%s') where cdate = 2017.05.03",dbPath,tbName));
- 对于大数据量的表,API 提供了分段读取方法。
Java API 提供了 EntityBlockReader 对象,在 run
方法中使用参数 fetchSize 指定分段大小,通过 read()
方法一段段的读取数据,示例如下:
DBConnection conn = new DBConnection(); conn.connect(SERVER, PORT, USER, PASSWORD); EntityBlockReader v = (EntityBlockReader)conn.run("table(1..22486 as id)",(ProgressListener) null,4,4,10000); BasicTable data = (BasicTable)v.read(); while(v.hasNext()){ BasicTable t = (BasicTable)v.read(); data = data.combine(t); }
在使用上述分段读取的方法时,若数据未读取完毕,需要放弃后续数据的读取时,必须调用 skipAll
方法来显示忽略后续数据,否则会导致套接字缓冲区滞留数据,引发后续数据的反序列化失败。
正确使用的示例代码如下:
EntityBlockReader v = (EntityBlockReader)conn.run("table(1..12486 as id)",(ProgressListener) null,4,4,10000); BasicTable data = (BasicTable)v.read(); v.skipAll(); BasicTable t1 = (BasicTable)conn.run("table(1..100 as id1)"); //若没有skipAll此段会抛出异常。
3.2 使用 BasicTable 对象
在 Java API 中,数据表保存为 BasicTable 对象。由于 BasicTable 是列式存储,所以若要在 Java API 中读取行数据需要先取出需要的列,再取出行。
以下例子中参数 BasicTable 的有 4 个列,列名分别为 cstring, cint, ctimestamp, cdouble,数据类型分别是 STRING, INT, TIMESTAMP, DOUBLE。
public void test_loop_basicTable(BasicTable table1) throws Exception{ BasicStringVector stringv = (BasicStringVector) table1.getColumn("cstring"); BasicIntVector intv = (BasicIntVector) table1.getColumn("cint"); BasicTimestampVector timestampv = (BasicTimestampVector) table1.getColumn("ctimestamp"); BasicDoubleVector doublev = (BasicDoubleVector) table1.getColumn("cdouble"); for(int ri=0; ri<table1.rows(); ri++){ System.out.println(stringv.getString(ri)); System.out.println(intv.getInt(ri)); LocalDateTime timestamp = timestampv.getTimestamp(ri); System.out.println(timestamp); System.out.println(doublev.getDouble(ri)); } }