MultithreadedTableWriter
Java API 提供 MultithreadedTableWriter 类用于批量异步追加数据,并在客户端维护了一个数据缓冲队列。当服务器端忙于网络 I/O 时,客户端写线程仍然可以将数据持续写入缓冲队列(该队列由客户端维护)。写入队列后即可返回,从而避免了写线程的忙等。目前,MultithreadedTableWriter
支持批量写入数据到内存表、分区表和维度表。
注意对于异步写入:
- API 客户端提交任务到缓冲队列,缓冲队列接到任务后,客户端即认为任务已完成。
- 提供
getStatus
等接口查看写入状态。
构造方法
public MultithreadedTableWriter(String hostName, int port, String userId, String password,
String dbName, String tableName, boolean useSSL,
boolean enableHighAvailability, String[] highAvailabilitySites,
int batchSize, float throttle,
int threadCount, String partitionCol,
int[] compressTypes, Mode mode, String[] pModeOption,
boolean enableActualSendTime)
参数说明
hostName String 类型,表示所连接的服务器的地址。
port int 类型,表示服务器端口。
userId String 类型,登录时的用户名。
password String 类型,登录时的密码。
dbPath String 类型,表示分布式数据库地址。内存表时该参数为空。请注意,1.30.17 及以下版本 API,向内存表写入数据时,该参数需填写内存表表名。
tableName String 类型,表示分布式表或内存表的表名。请注意,1.30.17 及以下版本 API,向内存表写入数据时,该参数需为空。
useSSL boolean 类型,表示是否启用加密通讯。
enableHighAvailability boolean 类型,表示是否开启 API 高可用。
highAvailabilitySites 数组类型,表示所有可用节点的 ip:port
构成的 String 数组。
batchSize int 类型,表示批处理的消息的数量。
- 如果该参数值为 1,表示客户端写入数据后就立即发送给服务器。
- 如果该参数大于 1,表示数据量达到 batchSize 时,客户端才会将数据发送给服务器。
throttle 大于 0 的浮点数,单位为秒。若客户端有数据写入,但数据量不足 batchSize,则等待 throttle 的时间再发送数据。
threadCount int 类型,表示创建的工作线程数量,如果值为 1,表示单线程。对于维度表,其值必须为 1。
partitionCol String 类型,默认为空,仅在 threadCount 大于 1 时起效。
- 对于分区表,必须指定为分区字段名。
- 如果是流数据表,必须指定为表的字段名。
- 对于维度表,该参数不起效。
compressTypes 数组类型,用于指定每一列采用的压缩传输方式,为空表示不压缩。每一列可选的压缩方式(大小写不敏感) 包括:
- Vector.COMPRESS_LZ4:LZ4 压缩。
- Vector.COMPRESS_DELTA:DELTAOFDELTA 压缩。
mode 写入模式,用于指定 MultithreadedTableWriter 对象写入数据的方式,包括两种:
- Mode.M_Append:表示以
tableInsert
的方式向追加数据。 - Mode.M_Upsert:表示以
upsert!
方式更新(或追加)数据。
pModeOption String 类型数组,表示不同模式下的扩展选项,目前,仅当 mode 指定为 Mode.M_Upsert 时有效,表示由 upsert! 可选参数组成的 String 类型数组。
callbackHandler 回调类,默认为空,表示不使用回调。开启回调后,将继承回调接口 Callback 并重载回调方法,将回调的接口对象传入 MultithreadedTableWriter
。
enableActualSendTime boolean 类型,表示是否开启记录消息发送的时间戳。注意,使用时须保证要写入表的最后一列为 NANOTIMESTAMP 类型;在开启该功能后,用户无需再写入数据时填入最后一列时间戳的数据。
回调函数 callbackHandler 使用介绍
MultithreadedTableWriter
在开启回调后,用户会在回调方法中获取到一个 BasicTable 类型的回调表,该表由两列构成:
- 第一列(String 类型),存放的是调用
MultithreadedTableWriter.insert
时增加的每一行的 id。 - 第二列(boolean 类型),表示每一行写入成功与否,true 表示写入成功,false 表示写入失败。
使用流程:
继承 Callback 接口并重载 writeCompletion 方法用于获取回调数据:
Callback callbackHandler = new Callback(){ public void writeCompletion(Table callbackTable){ List<String> failedIdList = new ArrayList<>(); BasicStringVector idVec = (BasicStringVector) callbackTable.getColumn(0); BasicBooleanVector successVec = (BasicBooleanVector) callbackTable.getColumn(1); for (int i = 0; i < successVec.rows(); i++){ if (!successVec.getBoolean(i)){ failedIdList.add(idVec.getString(i)); } } } };
构造
MultithreadedTableWriter
对象并传入回调对象:MultithreadedTableWriter mtw = new MultithreadedTableWriter(host, port, userName, password, dbName, tbName, useSSL, enableHighAvailability, null, 10000, 1, 1, "price", callbackHandler);
调用
MultithreadedTableWriter
的insert
方法并在第一列中为每一行写入 id:String theme = "theme1"; for (int id = 0; id < 1000000; id++){ //theme+id 为每一行对应的 id,将在回调时返回 mtw.insert(theme + id, code, price); }
对象方法说明
方法名 | 功能 |
---|---|
insert | 插入单行数据 |
getUnwrittenData | 获取未写入服务器的数据 |
insertUnwrittenData | 将数据插入数据表 |
getStatus | 获取 MTW 对象当前的运行状态 |
waitForThreadCompletion | MTW 会进入等待状态 |
以下为各方法的详细说明。
insert 方法
ErrorCodeInfo insert(Object... args)
参数说明
args 是变长参数,代表插入一行数据。
函数说明
插入单行数据。返回一个 ErrorCodeInfo
对象,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。
ErrorCodeInfo
介绍:
- errorCode:"" 空字符串表示没有错误发生;如果有错误发生,值为错误码,例如:"A2"。
- errorInfo:表示错误信息。当没有错误发生时,返回 null;发生错误时,返回包含错误信息的字符串。
- hasError():判断插入待转换队列时是否发生错误。如果发生错误,返回 true;反之,返回 false。
- succeed():判断是否成功插入数据缓存队列。如成功插入,返回 true;反之,返回 false。
ErrorCodeInfo 类提供了 hasError
和 succeed
方法用于获取数据是否成功放入数据队列的结果。在插入一行数据后,推荐调用 hasError
来判断是否成功将数据插入到待转换队列。
如果构造 MultithreadedTableWriter
时开启了回调(见上述参数说明),则每次调用 insert
时,需要在每行数据前面增加一列 String 类型的数据作为每行的标识符(id),此 id 列仅用于回调时返回给用户,不会写入表中。
示例
下例为演示采用多线程模式向表中插入一行记录。
MultithreadedTableWriter multithreadedTableWriter = new MultithreadedTableWriter("localhost", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest",
false, false, null, 10000, 1,
5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
ErrorCodeInfo pErrorInfo = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", 10000000L);
getUnwrittenData 方法
List<List<Entity>> getUnwrittenData()
函数说明
返回一个嵌套列表,表示未写入服务器的数据。
注意:该方法获取到数据资源后, MultithreadedTableWriter
将释放这些数据资源。
示例
下例为演示获取未写入服务器的数据。
List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();
insertUnwrittenData 方法
ErrorCodeInfo insertUnwrittenData(List<List<Entity>> records)
参数说明
records 需要再次写入的数据。可以通过方法 getUnwrittenData
获取该对象。
函数说明
将数据插入数据表。返回值同 insert
方法。与 insert
方法的区别在于,insert
只能插入单行数据,而 insertUnwrittenData
可以同时插入多行数据。
示例
下例为演示插入之前未写入服务器的数据。
List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();
ErrorCodeInfo ret = multithreadedTableWriter_.insertUnwrittenData(unwrittenData);
getStatus 方法
Status getStatus()
函数说明
获取 MultithreadedTableWriter
对象当前的运行状态。
返回值说明
Status 是 MultithreadedTableWriter.Status
类,继承自 ErrorCodeInfo
实现。
Status 类具有以下属性和方法。
Status 类属性:
- isExiting:写入线程是否正在退出。
- errorCode:错误码。
- errorInfo:错误信息。
- sentRows:成功发送的总记录数。
- unsentRows:待发送的总记录数。
- sendFailedRows:发送失败的总记录数。
- threadStatus:写入线程状态列表。
- threadId:线程 Id。
- sentRows:该线程成功发送的记录数。
- unsentRows:该线程待发送的记录数。
- sendFailedRows:该线程发送失败的记录数。
Status 类方法:
hasError()
:true 表示数据写入存在错误;false 表示数据写入无错误。succeed()
:true 表示数据写入成功;false 表示数据写入失败。
示例
下例为演示获取 Writer 工作状态。
MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status();
writeStatus = multithreadedTableWriter_.getStatus();
以一个 MultithreadedTableWriterStatus
打印输出为例:
errorCode : A1
errorInfo : Data conversion error: Cannot convert double to LONG
isExiting : True
sentRows : 2493
unsentRows : 0
sendFailedRows: 7507
threadStatus :
threadId sentRows unsentRows sendFailedRows
0 0 0 7507
3567691520 415 0 0
3489658624 831 0 0
3481265920 416 0 0
3472873216 416 0 0
3464480512 415 0 0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0102c76d30>
以上输出内容显示,在本次 MTW 写入流程中发生了异常,异常代码是 A1,异常信息为 Data conversion error: Cannot convert double to LONG。
isExiting=True
表示当前线程正在退出。
sentRows=2493
表示当前有 2493 条数据已经写入到 DolphinDB 服务端。
unsentRows=0
/sendFailedRows=7507
则表示当前仍在 API 的未成功写入 DolphinDB 服务端的数据一共有 0+7507=7507 条。
在后面的表格中,列出了各个后台线程的处理情况,threadId=0
表示所有线程的统计总数。通常而言,成功写入的结果(sentRows)会分别显示在各个线程中,例如上述输出中的 sentRows 列;写入失败的结果(unsentRows、sendFailedRows)会集中显示在 threadId=0
行,表示整个过程中的写入失败条数。
waitForThreadCompletion 方法
waitForThreadCompletion()
函数说明
调用此方法后,MTW 会进入等待状态,待后台工作线程全部完成后退出等待状态。
示例
下例为演示等待 Writer 写入完毕。
multithreadedTableWriter_.waitForThreadCompletion();
使用示例
示例 1
下例为演示用 MultithreadedTableWriter
向表中写入一系列数据并在发生错误后在控制台打印错误信息。
步骤包括:
- 创建 DFS 表。
- 创建 MTW 对象。
- 写入数据。
- 若有异常,进行异常处理。
@Test
public void testMultithreadedTableWriter() {
DBConnection conn= new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
Random random = new Random();
// 1、创建 DFS 表
String script =
"dbName = 'dfs://valuedb3'" +
"if (exists(dbName))" +
"{" +
"dropDatabase(dbName);" +
"}" +
"datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
"db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
"pt = db.createPartitionedTable(datetest,'pdatetest','id');";
conn.run(script);
// 2、创建 MTW 对象
MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
false, false, null, 10000, 1,
5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
ErrorCodeInfo ret;
// 3、写入数据
try {
// 插入100行正确数据
for (int i = 0; i < 100; ++i) {
ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", random.nextInt() % 10000);
}
}
catch (Exception e) {
// MTW 抛出异常
System.out.println("MTW exit with exception {0}" + e.getMessage());
}
// 等待 MTW 插入完成
multithreadedTableWriter_.waitForThreadCompletion();
MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status();
writeStatus = multithreadedTableWriter_.getStatus();
if (!writeStatus.errorInfo.equals("")) {
// 如果写入时发生错误
System.out.println("error in writing !");
}
System.out.println("writeStatus: {0}\n" + writeStatus.toString());
System.out.println(((BasicLong)conn.run("exec count(*) from pt")).getLong());
}
以上代码输出结果为:
writeStatus: {0}
errorCode :
errorInfo :
isExiting : true
sentRows : 100
unsentRows : 0
sendFailedRows: 0
threadStatus :
threadId sentRows unsentRows sendFailedRows
13 30 0 0
14 18 0 0
15 15 0 0
16 20 0 0
17 17 0 0
100
以上为多线程写入的结果,因此 threadId 与 sentRows 列的值会根据实际情况而有所不同,用户需要根据实际情况进行判断,正常情况下 sentRows 各数总和应为100。
示例 2
这里介绍 Mode.M_Upsert 模式的使用介绍:
@Test(timeout = 120000)
public void test_mtw_tableUpsert_DP_updateFirst() throws Exception {
String script = "if(existsDatabase(\"dfs://upsert\")) {\n" +
"dropDatabase(\"dfs://upsert\")\n" +
"}\n" +
"sym=\"A\" \"B\" \"C\" \"A\" \"D\" \"B\" \"A\"\n" +
"date=take(2021.12.10,3) join take(2021.12.09, 3) join 2021.12.10\n" +
"price=8.3 7.2 3.7 4.5 6.3 8.4 7.6\n" +
"val=10 19 13 9 19 16 10\n" +
"t=table(sym, date, price, val)\n" +
"db=database(\"dfs://upsert\", VALUE,\"A\" \"B\" \"C\")\n" +
"pt=db.createPartitionedTable(t, `pt, `sym)\n" +
"pt.append!(t)";
conn.run(script);
MultithreadedTableWriter mtw = new MultithreadedTableWriter(HOST,PORT,"admin","123456","dfs://upsert","pt",
false,false,null,1000,1,10,"sym",null,
MultithreadedTableWriter.Mode.M_Upsert,
new String[]{"ignoreNull=false", "keyColNames=`sym"});
mtw.insert(new BasicString("A"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(11.1),new BasicInt(12));
mtw.insert(new BasicString("B"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(10.5),new BasicInt(9));
mtw.insert(new BasicString("E"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(6.9),new BasicInt(11));
mtw.waitForThreadCompletion();
BasicTable ua = (BasicTable) conn.run("select * from pt;");
System.out.println(ua.getString());
}
执行结果:
sym date price val
--- ---------- ----- ---
A 2021.12.09 11.1 12
A 2021.12.09 4.5 9
A 2021.12.10 7.6 10
B 2021.12.09 10.5 9
B 2021.12.09 8.4 16
C 2021.12.10 3.7 13
D 2021.12.09 6.3 19
E 2021.12.09 6.9 11
异常返回
在使用 MultithreadedTableWriter
类调用 insert
方法插入数据时,可能会发生如下异常:
在调用
MultithreadedTableWriter
的insert
方法时,若插入数据的类型与表对应列的类型不匹配,则MultithreadedTableWriter
会立刻返回错误信息并打印出堆栈:DBConnection conn= new DBConnection(); conn.connect(HOST, PORT, "admin", "123456"); Random random = new Random(); String script = "dbName = 'dfs://valuedb3'" + "if (exists(dbName))" + "{" + "dropDatabase(dbName);" + "}" + "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" + "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" + "pt = db.createPartitionedTable(datetest,'pdatetest','id');"; conn.run(script); MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); ErrorCodeInfo ret; //插入1行类型错误数据,MTW 立刻返回错误信息 ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), 222, random.nextInt() % 10000); if (!ret.errorInfo.equals("")) System.out.println("insert wrong format data: {2}\n" + ret.toString());
以上代码输出结果为:
java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL. at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:795) at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:505) at com.xxdb.multithreadedtablewriter.MultithreadedTableWriter.insert(MultithreadedTableWriter.java:594) at com.xxdb.BehaviorTest.testMul(BehaviorTest.java:89) at com.xxdb.BehaviorTest.main(BehaviorTest.java:168) insert wrong format data: {2} code=A1 info=Invalid object error java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL.
在调用
MultithreadedTableWriter
的insert
方法时,若insert
插入数据的列数和表的列数不匹配,MultithreadedTableWriter
会立刻返回错误信息:DBConnection conn= new DBConnection(); conn.connect(HOST, PORT, "admin", "123456"); Random random = new Random(); String script = "dbName = 'dfs://valuedb3'" + "if (exists(dbName))" + "{" + "dropDatabase(dbName);" + "}" + "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" + "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" + "pt = db.createPartitionedTable(datetest,'pdatetest','id');"; conn.run(script); MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); ErrorCodeInfo ret; //插入1行数据,插入数据的列数和表的列数不匹配,MTW 立刻返回错误信息 ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), random.nextInt() % 10000); if (!ret.errorInfo.equals("")) System.out.println("insert wrong format data: {3}\n" + ret.toString());
以上代码输出结果为:
insert wrong format data: {3} code=A2 info=Column counts don't match.
如果
MultithreadedTableWriter
在运行时连接断开,则所有工作线程被终止。继续通过MultithreadedTableWriter
向服务器写数据时,会因为工作线程终止而抛出异常,且数据不会被写入。此时,可通过调用MultithreadedTableWriter
的getUnwrittenData
获取未插入的数据,并重新插入:List<List<Entity>> unwriterdata = new ArrayList<>(); unwriterdata = multithreadedTableWriter_.getUnwrittenData(); System.out.println("{5} unwriterdata: " + unwriterdata.size()); //重新获取新的 MTW 对象 MultithreadedTableWriter newmultithreadedTableWriter = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest", false, false, null, 10000, 1, 5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA}); try { boolean writesuccess = true; //将没有写入的数据写到新的 MTW 中 ret = newmultithreadedTableWriter.insertUnwrittenData(unwriterdata); } finally { newmultithreadedTableWriter.waitForThreadCompletion(); writeStatus = newmultithreadedTableWriter.getStatus(); System.out.println("writeStatus: {6}\n" + writeStatus.toString()); }
以上代码输出结果为:
{5} unwriterdata: 10 writeStatus: {6} errorCode : errorInfo : isExiting : true sentRows : 10 unsentRows : 0 sendFailedRows: 0 threadStatus : threadId sentRows unsentRows sendFailedRows 23 3 0 0 24 2 0 0 25 1 0 0 26 3 0 0 27 1 0 0
以上为多线程写入的结果,并且需要先造成断线情况,因此 threadId 与 sentRows列 的值会根据实际情况而有所不同,用户需要根据实际情况进行判断。
重新写入
当 MTW 发生类型转换错误、写入线程写入失败等错误时,API 将会终止所有线程。此时可以通过 writer.getUnwrittenData()
方法获取失败数据,通过 insertUnwrittenData(unwrittendata)
重新写入失败数据。因为原有 MTW 对象的所有线程已经终止,无法再次被用来写入数据,因此需要重新构造一个新的 MTW 对象,将失败数据重新写入新的 MTW 对象中。以下给出重新写入示例:
List<List<Entity>> unwriterdata;
unwriterdata = multithreadedTableWriter_.getUnwrittenData();
System.out.println("{5} unwriterdata: " + unwriterdata.size());
// 重新获取新的 MTW 对象
MultithreadedTableWriter newmultithreadedTableWriter = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
false, false, null, 10000, 1,
5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
try {
boolean writesuccess = true;
// 将没有写入的数据写到新的MTW中
ret = newmultithreadedTableWriter.insertUnwrittenData(unwriterdata);
for (int i = 0; i < 10 - unwriterdata.size(); ++i)
{
ret = newmultithreadedTableWriter.insert(pErrorInfo, new Date(2022, 3, 23), "AAAAAAAB", random.nextInt() % 10000);
}
} catch(Exception e) {
e.getMessage();
} finally {
newmultithreadedTableWriter.waitForThreadCompletion();
writeStatus = newmultithreadedTableWriter.getStatus();
System.out.println("writeStatus: {6}\n" + writeStatus.toString());
}