MultithreadedTableWriter
Java API 提供 MultithreadedTableWriter 类用于批量异步追加数据,并在客户端维护了一个数据缓冲队列。当服务器端忙于网络 I/O 时,客户端写线程仍然可以将数据持续写入缓冲队列(该队列由客户端维护)。写入队列后即可返回,从而避免了写线程的忙等。目前,MultithreadedTableWriter
支持批量写入数据到内存表、分区表和维度表。
注意对于异步写入:
- API 客户端提交任务到缓冲队列,缓冲队列接到任务后,客户端即认为任务已完成。
- 提供
getStatus
等接口查看写入状态。
构造方法
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
String dbName, String tableName, boolean useSSL,
boolean enableHighAvailability, String[] highAvailabilitySites,
int batchSize, float throttle,
int threadCount, String partitionCol,
int[] compressTypes, Mode mode, String[] pModeOption,
boolean enableActualSendTime,
boolean reconnect,
int tryReconnectNums)
参数说明
hostName String 类型,表示所连接的服务器的地址。
port int 类型,表示服务器端口。
userId String 类型,登录时的用户名。
password String 类型,登录时的密码。
dbPath String 类型,表示分布式数据库地址。内存表时该参数为空。请注意,1.30.17 及以下版本 API,向内存表写入数据时,该参数需填写内存表表名。
tableName String 类型,表示分布式表或内存表的表名。 请注意:
- 自 3.00.4.0 版本起,该参数支持指定 Orca 流表名称。
- 1.30.17 及以下版本 API,向内存表写入数据时,该参数需为空。
useSSL boolean 类型,表示是否启用加密通讯。
enableHighAvailability boolean 类型,表示是否开启 API 高可用。
highAvailabilitySites 数组类型,表示所有可用节点的 ip:port
构成的 String 数组。
batchSize int 类型,表示批处理的消息的数量。
- 如果该参数值为 1,表示客户端写入数据后就立即发送给服务器。
- 如果该参数大于 1,表示数据量达到 batchSize 时,客户端才会将数据发送给服务器。
throttle 大于 0 的浮点数,单位为秒。若客户端有数据写入,但数据量不足 batchSize,则等待 throttle 的时间再发送数据。
threadCount int 类型,表示创建的工作线程数量,如果值为 1,表示单线程。对于维度表,其值必须为 1。
partitionCol String 类型,默认为空,仅在 threadCount 大于 1 时起效。
- 对于分区表,必须指定为分区字段名。
- 如果是流数据表,必须指定为表的字段名。
- 对于维度表,该参数不起效。
compressTypes 数组类型,用于指定每一列采用的压缩传输方式,为空表示不压缩。每一列可选的压缩方式(大小写不敏感) 包括:
- Vector.COMPRESS_LZ4:LZ4 压缩。
- Vector.COMPRESS_DELTA:DELTAOFDELTA 压缩。
mode 写入模式,用于指定 MultithreadedTableWriter 对象写入数据的方式,包括两种:
- Mode.M_Append:表示以
tableInsert
的方式向追加数据。对于 Orca 流表,仅支持此模式。 - Mode.M_Upsert:表示以
upsert!
方式更新(或追加)数据。
pModeOption String 类型数组,表示不同模式下的扩展选项,目前,仅当 mode 指定为 Mode.M_Upsert 时有效,表示由 upsert! 可选参数组成的 String 类型数组。
callbackHandler 回调类,默认为空,表示不使用回调。开启回调后,将继承回调接口 Callback 并重载回调方法,将回调的接口对象传入 MultithreadedTableWriter
。
enableActualSendTime boolean 类型,表示是否开启记录消息发送的时间戳。使用时须保证要写入表的最后一列为 NANOTIMESTAMP 类型;在开启该功能后,用户无需再写入数据时填入最后一列时间戳的数据。注意:自 Java API 3.00.1 版本起,DolphinDB Server 2.00.16/3.00.3 版本起,若通过 Server 的接口 setStreamTableTimestamp
为指定流表设置了时间戳列 ,数据写入流表的系统时间会自动记录到最后一列。此时若 Java API 同时开启 enableActualSendTime,则发送时间会自动向倒数第二列填入。
reconnect boolean 类型,表示是否开启单节点断连后自动重连,可选参数,默认值为 false,表示关闭。
tryReconnectNums 表示重连尝试次数,int 类型。使用方式如下:
若不开启高可用,须与 reconnect 参数搭配使用,对单节点进行有限次重连。若不填写该参数,默认进行无限重连。
当开启 enableHighAvailability 高可用参数时,
若指定该参数,将在断开连接后遍历高可用范围内的每个节点进行有限次重连。一次遍历中,每个节点只会被重连一次,最多进行 tryReconnectNums 次遍历尝试。
若不填写该参数,默认是无限重连。
回调函数 callbackHandler 使用介绍
MultithreadedTableWriter
在开启回调后,用户会在回调方法中获取到一个 BasicTable 类型的回调表,该表由两列构成:
- 第一列(String 类型),存放的是调用
MultithreadedTableWriter.insert
时增加的每一行的 id。 - 第二列(boolean 类型),表示每一行写入成功与否,true 表示写入成功,false 表示写入失败。
使用流程:
继承 Callback 接口并重载 writeCompletion 方法用于获取回调数据:
Callback callbackHandler = new Callback(){ public void writeCompletion(Table callbackTable){ List<String> failedIdList = new ArrayList<>(); BasicStringVector idVec = (BasicStringVector) callbackTable.getColumn(0); BasicBooleanVector successVec = (BasicBooleanVector) callbackTable.getColumn(1); for (int i = 0; i < successVec.rows(); i++){ if (!successVec.getBoolean(i)){ failedIdList.add(idVec.getString(i)); } } } };
构造
MultithreadedTableWriter
对象并传入回调对象:MultithreadedTableWriter mtw = new MultithreadedTableWriter(host, port, userName, password, dbName, tbName, useSSL, enableHighAvailability, null, 10000, 1, 1, "price", callbackHandler);
调用
MultithreadedTableWriter
的insert
方法并在第一列中为每一行写入 id:String theme = "theme1"; for (int id = 0; id < 1000000; id++){ //theme+id 为每一行对应的 id,将在回调时返回 mtw.insert(theme + id, code, price); }
对象方法说明
方法名 | 功能 |
---|---|
insert | 插入单行数据 |
getUnwrittenData | 获取未写入服务器的数据 |
insertUnwrittenData | 将数据插入数据表 |
getStatus | 获取 MTW 对象当前的运行状态 |
waitForThreadCompletion | MTW 会进入等待状态 |
以下为各方法的详细说明。
insert 方法
ErrorCodeInfo insert(Object... args)
参数说明
args 是变长参数,代表插入一行数据。
函数说明
插入单行数据。返回一个 ErrorCodeInfo
对象,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。
ErrorCodeInfo
介绍:
- errorCode:"" 空字符串表示没有错误发生;如果有错误发生,值为错误码,例如:"A2"。
- errorInfo:表示错误信息。当没有错误发生时,返回 null;发生错误时,返回包含错误信息的字符串。
- hasError():判断插入待转换队列时是否发生错误。如果发生错误,返回 true;反之,返回 false。
- succeed():判断是否成功插入数据缓存队列。如成功插入,返回 true;反之,返回 false。
ErrorCodeInfo 类提供了 hasError
和 succeed
方法用于获取数据是否成功放入数据队列的结果。在插入一行数据后,推荐调用 hasError
来判断是否成功将数据插入到待转换队列。
如果构造 MultithreadedTableWriter
时开启了回调(见上述参数说明),则每次调用 insert
时,需要在每行数据前面增加一列 String 类型的数据作为每行的标识符(id),此 id 列仅用于回调时返回给用户,不会写入表中。
示例
下例为演示采用多线程模式向表中插入一行记录。
MultithreadedTableWriter multithreadedTableWriter = new MultithreadedTableWriter("localhost", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest",
false, false, null, 10000, 1,
5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
ErrorCodeInfo pErrorInfo = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", 10000000L);
getUnwrittenData 方法
List<List<Entity>> getUnwrittenData()
函数说明
返回一个嵌套列表,表示未写入服务器的数据。
注意:该方法获取到数据资源后, MultithreadedTableWriter
将释放这些数据资源。
示例
下例为演示获取未写入服务器的数据。
List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();
insertUnwrittenData 方法
ErrorCodeInfo insertUnwrittenData(List<List<Entity>> records)
参数说明
records 需要再次写入的数据。可以通过方法 getUnwrittenData
获取该对象。
函数说明
将数据插入数据表。返回值同 insert
方法。与 insert
方法的区别在于,insert
只能插入单行数据,而 insertUnwrittenData
可以同时插入多行数据。
示例
下例为演示插入之前未写入服务器的数据。
List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();
ErrorCodeInfo ret = multithreadedTableWriter_.insertUnwrittenData(unwrittenData);
getStatus 方法
Status getStatus()
函数说明
获取 MultithreadedTableWriter
对象当前的运行状态。
返回值说明
Status 是 MultithreadedTableWriter.Status
类,继承自 ErrorCodeInfo
实现。
Status 类具有以下属性和方法。
Status 类属性:
- isExiting:写入线程是否正在退出。
- errorCode:错误码。
- errorInfo:错误信息。
- sentRows:成功发送的总记录数。
- unsentRows:待发送的总记录数。
- sendFailedRows:发送失败的总记录数。
- threadStatus:写入线程状态列表。
- threadId:线程 Id。
- sentRows:该线程成功发送的记录数。
- unsentRows:该线程待发送的记录数。
- sendFailedRows:该线程发送失败的记录数。
Status 类方法:
hasError()
:true 表示数据写入存在错误;false 表示数据写入无错误。succeed()
:true 表示数据写入成功;false 表示数据写入失败。
示例
下例为演示获取 Writer 工作状态。
MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status();
writeStatus = multithreadedTableWriter_.getStatus();
以一个 MultithreadedTableWriterStatus
打印输出为例:
errorCode : A1
errorInfo : Data conversion error: Cannot convert double to LONG
isExiting : True
sentRows : 2493
unsentRows : 0
sendFailedRows: 7507
threadStatus :
threadId sentRows unsentRows sendFailedRows
0 0 0 7507
3567691520 415 0 0
3489658624 831 0 0
3481265920 416 0 0
3472873216 416 0 0
3464480512 415 0 0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0102c76d30>
以上输出内容显示,在本次 MTW 写入流程中发生了异常,异常代码是 A1,异常信息为 Data conversion error: Cannot convert double to LONG。
isExiting=True
表示当前线程正在退出。
sentRows=2493
表示当前有 2493 条数据已经写入到 DolphinDB 服务端。
unsentRows=0
/sendFailedRows=7507
则表示当前仍在 API 的未成功写入 DolphinDB 服务端的数据一共有 0+7507=7507 条。
在后面的表格中,列出了各个后台线程的处理情况,threadId=0
表示所有线程的统计总数。通常而言,成功写入的结果(sentRows)会分别显示在各个线程中,例如上述输出中的 sentRows 列;写入失败的结果(unsentRows、sendFailedRows)会集中显示在 threadId=0
行,表示整个过程中的写入失败条数。
waitForThreadCompletion 方法
waitForThreadCompletion()
函数说明
调用此方法后,MTW 会进入等待状态,待后台工作线程全部完成后退出等待状态。
示例
下例为演示等待 Writer 写入完毕。
multithreadedTableWriter_.waitForThreadCompletion();
使用示例
示例 1
下例为演示用 MultithreadedTableWriter
向表中写入一系列数据并在发生错误后在控制台打印错误信息。
步骤包括:
- 创建 DFS 表。
- 创建 MTW 对象。
- 写入数据。
- 若有异常,进行异常处理。
@Test
public void testMultithreadedTableWriter() {
DBConnection conn= new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
Random random = new Random();
// 1、创建 DFS 表
String script =
"dbName = 'dfs://valuedb3'" +
"if (exists(dbName))" +
"{" +
"dropDatabase(dbName);" +
"}" +
"datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
"db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
"pt = db.createPartitionedTable(datetest,'pdatetest','id');";
conn.run(script);
// 2、创建 MTW 对象
MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
false, false, null, 10000, 1,
5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
ErrorCodeInfo ret;
// 3、写入数据
try {
// 插入100行正确数据
for (int i = 0; i < 100; ++i) {
ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", random.nextInt() % 10000);
}
}
catch (Exception e) {
// MTW 抛出异常
System.out.println("MTW exit with exception {0}" + e.getMessage());
}
// 等待 MTW 插入完成
multithreadedTableWriter_.waitForThreadCompletion();
MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status();
writeStatus = multithreadedTableWriter_.getStatus();
if (!writeStatus.errorInfo.equals("")) {
// 如果写入时发生错误
System.out.println("error in writing !");
}
System.out.println("writeStatus: {0}\n" + writeStatus.toString());
System.out.println(((BasicLong)conn.run("exec count(*) from pt")).getLong());
}
以上代码输出结果为:
writeStatus: {0}
errorCode :
errorInfo :
isExiting : true
sentRows : 100
unsentRows : 0
sendFailedRows: 0
threadStatus :
threadId sentRows unsentRows sendFailedRows
13 30 0 0
14 18 0 0
15 15 0 0
16 20 0 0
17 17 0 0
100
以上为多线程写入的结果,因此 threadId 与 sentRows 列的值会根据实际情况而有所不同,用户需要根据实际情况进行判断,正常情况下 sentRows 各数总和应为100。
示例 2
这里介绍 Mode.M_Upsert 模式的使用介绍:
@Test(timeout = 120000)
public void test_mtw_tableUpsert_DP_updateFirst() throws Exception {
String script = "if(existsDatabase(\"dfs://upsert\")) {\n" +
"dropDatabase(\"dfs://upsert\")\n" +
"}\n" +
"sym=\"A\" \"B\" \"C\" \"A\" \"D\" \"B\" \"A\"\n" +
"date=take(2021.12.10,3) join take(2021.12.09, 3) join 2021.12.10\n" +
"price=8.3 7.2 3.7 4.5 6.3 8.4 7.6\n" +
"val=10 19 13 9 19 16 10\n" +
"t=table(sym, date, price, val)\n" +
"db=database(\"dfs://upsert\", VALUE,\"A\" \"B\" \"C\")\n" +
"pt=db.createPartitionedTable(t, `pt, `sym)\n" +
"pt.append!(t)";
conn.run(script);
MultithreadedTableWriter mtw = new MultithreadedTableWriter(HOST,PORT,"admin","123456","dfs://upsert","pt",
false,false,null,1000,1,10,"sym",null,
MultithreadedTableWriter.Mode.M_Upsert,
new String[]{"ignoreNull=false", "keyColNames=`sym"});
mtw.insert(new BasicString("A"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(11.1),new BasicInt(12));
mtw.insert(new BasicString("B"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(10.5),new BasicInt(9));
mtw.insert(new BasicString("E"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(6.9),new BasicInt(11));
mtw.waitForThreadCompletion();
BasicTable ua = (BasicTable) conn.run("select * from pt;");
System.out.println(ua.getString());
}
执行结果:
sym date price val
--- ---------- ----- ---
A 2021.12.09 11.1 12
A 2021.12.09 4.5 9
A 2021.12.10 7.6 10
B 2021.12.09 10.5 9
B 2021.12.09 8.4 16
C 2021.12.10 3.7 13
D 2021.12.09 6.3 19
E 2021.12.09 6.9 11
异常返回
在使用 MultithreadedTableWriter
类调用 insert
方法插入数据时,可能会发生如下异常:
在调用
MultithreadedTableWriter
的insert
方法时,若插入数据的类型与表对应列的类型不匹配,则MultithreadedTableWriter
会立刻返回错误信息并打印出堆栈:DBConnection conn= new DBConnection(); conn.connect(HOST, PORT, "admin", "123456"); Random random = new Random(); String script = "dbName = 'dfs://valuedb3'" + "if (exists(dbName))" + "{" + "dropDatabase(dbName);" + "}" + "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" + "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" + "pt = db.createPartitionedTable(datetest,'pdatetest','id');"; conn.run(script); MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); ErrorCodeInfo ret; //插入1行类型错误数据,MTW 立刻返回错误信息 ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), 222, random.nextInt() % 10000); if (!ret.errorInfo.equals("")) System.out.println("insert wrong format data: {2}\n" + ret.toString());
以上代码输出结果为:
java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL. at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:795) at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:505) at com.xxdb.multithreadedtablewriter.MultithreadedTableWriter.insert(MultithreadedTableWriter.java:594) at com.xxdb.BehaviorTest.testMul(BehaviorTest.java:89) at com.xxdb.BehaviorTest.main(BehaviorTest.java:168) insert wrong format data: {2} code=A1 info=Invalid object error java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL.
在调用
MultithreadedTableWriter
的insert
方法时,若insert
插入数据的列数和表的列数不匹配,MultithreadedTableWriter
会立刻返回错误信息:DBConnection conn= new DBConnection(); conn.connect(HOST, PORT, "admin", "123456"); Random random = new Random(); String script = "dbName = 'dfs://valuedb3'" + "if (exists(dbName))" + "{" + "dropDatabase(dbName);" + "}" + "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" + "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" + "pt = db.createPartitionedTable(datetest,'pdatetest','id');"; conn.run(script); MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); ErrorCodeInfo ret; //插入1行数据,插入数据的列数和表的列数不匹配,MTW 立刻返回错误信息 ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), random.nextInt() % 10000); if (!ret.errorInfo.equals("")) System.out.println("insert wrong format data: {3}\n" + ret.toString());
以上代码输出结果为:
insert wrong format data: {3} code=A2 info=Column counts don't match.
如果
MultithreadedTableWriter
在运行时连接断开,则所有工作线程被终止。继续通过MultithreadedTableWriter
向服务器写数据时,会因为工作线程终止而抛出异常,且数据不会被写入。此时,可通过调用MultithreadedTableWriter
的getUnwrittenData
获取未插入的数据,并重新插入:List<List<Entity>> unwriterdata = new ArrayList<>(); unwriterdata = multithreadedTableWriter_.getUnwrittenData(); System.out.println("{5} unwriterdata: " + unwriterdata.size()); //重新获取新的 MTW 对象 MultithreadedTableWriter newmultithreadedTableWriter = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); try { boolean writesuccess = true; //将没有写入的数据写到新的 MTW 中 ret = newmultithreadedTableWriter.insertUnwrittenData(unwriterdata); } finally { newmultithreadedTableWriter.waitForThreadCompletion(); writeStatus = newmultithreadedTableWriter.getStatus(); System.out.println("writeStatus: {6}\n" + writeStatus.toString()); }
以上代码输出结果为:
{5} unwriterdata: 10 writeStatus: {6} errorCode : errorInfo : isExiting : true sentRows : 10 unsentRows : 0 sendFailedRows: 0 threadStatus : threadId sentRows unsentRows sendFailedRows 23 3 0 0 24 2 0 0 25 1 0 0 26 3 0 0 27 1 0 0
以上为多线程写入的结果,并且需要先造成断线情况,因此 threadId 与 sentRows列 的值会根据实际情况而有所不同,用户需要根据实际情况进行判断。
重新写入
当 MTW 发生类型转换错误、写入线程写入失败等错误时,API 将会终止所有线程。此时可以通过 writer.getUnwrittenData()
方法获取失败数据,通过 insertUnwrittenData(unwrittendata)
重新写入失败数据。因为原有 MTW 对象的所有线程已经终止,无法再次被用来写入数据,因此需要重新构造一个新的 MTW 对象,将失败数据重新写入新的 MTW 对象中。以下给出重新写入示例:
List<List<Entity>> unwriterdata;
unwriterdata = multithreadedTableWriter_.getUnwrittenData();
System.out.println("{5} unwriterdata: " + unwriterdata.size());
// 重新获取新的 MTW 对象
MultithreadedTableWriter newmultithreadedTableWriter = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
false, false, null, 10000, 1,
5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
try {
boolean writesuccess = true;
// 将没有写入的数据写到新的MTW中
ret = newmultithreadedTableWriter.insertUnwrittenData(unwriterdata);
for (int i = 0; i < 10 - unwriterdata.size(); ++i)
{
ret = newmultithreadedTableWriter.insert(pErrorInfo, new Date(2022, 3, 23), "AAAAAAAB", random.nextInt() % 10000);
}
} catch(Exception e) {
e.getMessage();
} finally {
newmultithreadedTableWriter.waitForThreadCompletion();
writeStatus = newmultithreadedTableWriter.getStatus();
System.out.println("writeStatus: {6}\n" + writeStatus.toString());
}