MultithreadedTableWriter

Java API 提供 MultithreadedTableWriter 类用于批量异步追加数据,并在客户端维护了一个数据缓冲队列。当服务器端忙于网络 I/O 时,客户端写线程仍然可以将数据持续写入缓冲队列(该队列由客户端维护)。写入队列后即可返回,从而避免了写线程的忙等。目前,MultithreadedTableWriter 支持批量写入数据到内存表、分区表和维度表。

注意对于异步写入:

  • API 客户端提交任务到缓冲队列,缓冲队列接到任务后,客户端即认为任务已完成。
  • 提供 getStatus 等接口查看写入状态。

构造方法

public MultithreadedTableWriter(String hostName, int port, String userId, String password,
    String dbName, String tableName, boolean useSSL,
    boolean enableHighAvailability, String[] highAvailabilitySites,
    int batchSize, float throttle,
    int threadCount, String partitionCol,
    int[] compressTypes, Mode mode, String[] pModeOption,
    boolean enableActualSendTime,
    boolean reconnect,
    int tryReconnectNums)

参数说明

hostName String 类型,表示所连接的服务器的地址。

port int 类型,表示服务器端口。

userId String 类型,登录时的用户名。

password String 类型,登录时的密码。

dbPath String 类型,表示分布式数据库地址。内存表时该参数为空。请注意,1.30.17 及以下版本 API,向内存表写入数据时,该参数需填写内存表表名。

tableName String 类型,表示分布式表或内存表的表名。 请注意:

  • 自 3.00.4.0 版本起,该参数支持指定 Orca 流表名称。
  • 1.30.17 及以下版本 API,向内存表写入数据时,该参数需为空。

useSSL boolean 类型,表示是否启用加密通讯。

enableHighAvailability boolean 类型,表示是否开启 API 高可用。

highAvailabilitySites 数组类型,表示所有可用节点的 ip:port 构成的 String 数组。

batchSize int 类型,表示批处理的消息的数量。

  • 如果该参数值为 1,表示客户端写入数据后就立即发送给服务器。
  • 如果该参数大于 1,表示数据量达到 batchSize 时,客户端才会将数据发送给服务器。

throttle 大于 0 的浮点数,单位为秒。若客户端有数据写入,但数据量不足 batchSize,则等待 throttle 的时间再发送数据。

threadCount int 类型,表示创建的工作线程数量,如果值为 1,表示单线程。对于维度表,其值必须为 1。

partitionCol String 类型,默认为空,仅在 threadCount 大于 1 时起效。

  • 对于分区表,必须指定为分区字段名。
  • 如果是流数据表,必须指定为表的字段名。
  • 对于维度表,该参数不起效。

compressTypes 数组类型,用于指定每一列采用的压缩传输方式,为空表示不压缩。每一列可选的压缩方式(大小写不敏感) 包括:

  • Vector.COMPRESS_LZ4:LZ4 压缩。
  • Vector.COMPRESS_DELTA:DELTAOFDELTA 压缩。

mode 写入模式,用于指定 MultithreadedTableWriter 对象写入数据的方式,包括两种:

  • Mode.M_Append:表示以 tableInsert 的方式向追加数据。对于 Orca 流表,仅支持此模式。
  • Mode.M_Upsert:表示以 upsert! 方式更新(或追加)数据。

pModeOption String 类型数组,表示不同模式下的扩展选项,目前,仅当 mode 指定为 Mode.M_Upsert 时有效,表示由 upsert! 可选参数组成的 String 类型数组。

callbackHandler 回调类,默认为空,表示不使用回调。开启回调后,将继承回调接口 Callback 并重载回调方法,将回调的接口对象传入 MultithreadedTableWriter

enableActualSendTime boolean 类型,表示是否开启记录消息发送的时间戳。使用时须保证要写入表的最后一列为 NANOTIMESTAMP 类型;在开启该功能后,用户无需再写入数据时填入最后一列时间戳的数据。注意:自 Java API 3.00.1 版本起,DolphinDB Server 2.00.16/3.00.3 版本起,若通过 Server 的接口 setStreamTableTimestamp 为指定流表设置了时间戳列 ,数据写入流表的系统时间会自动记录到最后一列。此时若 Java API 同时开启 enableActualSendTime,则发送时间会自动向倒数第二列填入。

reconnect boolean 类型,表示是否开启单节点断连后自动重连,可选参数,默认值为 false,表示关闭。

tryReconnectNums 表示重连尝试次数,int 类型。使用方式如下:

  • 若不开启高可用,须与 reconnect 参数搭配使用,对单节点进行有限次重连。若不填写该参数,默认进行无限重连。

  • 当开启 enableHighAvailability 高可用参数时,

    • 若指定该参数,将在断开连接后遍历高可用范围内的每个节点进行有限次重连。一次遍历中,每个节点只会被重连一次,最多进行 tryReconnectNums 次遍历尝试。

    • 若不填写该参数,默认是无限重连。

回调函数 callbackHandler 使用介绍

MultithreadedTableWriter 在开启回调后,用户会在回调方法中获取到一个 BasicTable 类型的回调表,该表由两列构成:

  • 第一列(String 类型),存放的是调用 MultithreadedTableWriter.insert 时增加的每一行的 id。
  • 第二列(boolean 类型),表示每一行写入成功与否,true 表示写入成功,false 表示写入失败。

使用流程:

  1. 继承 Callback 接口并重载 writeCompletion 方法用于获取回调数据:

    Callback callbackHandler = new Callback(){
        public void writeCompletion(Table callbackTable){
            List<String> failedIdList = new ArrayList<>();
            BasicStringVector idVec = (BasicStringVector) callbackTable.getColumn(0);
            BasicBooleanVector successVec = (BasicBooleanVector) callbackTable.getColumn(1);
            for (int i = 0; i < successVec.rows(); i++){
                if (!successVec.getBoolean(i)){
                    failedIdList.add(idVec.getString(i));
                }
            }
        }
    };
  2. 构造 MultithreadedTableWriter 对象并传入回调对象:

    MultithreadedTableWriter mtw = new MultithreadedTableWriter(host, port, userName, password, dbName, tbName, useSSL,
            enableHighAvailability, null, 10000, 1, 1, "price", callbackHandler);
  3. 调用 MultithreadedTableWriterinsert 方法并在第一列中为每一行写入 id:

    String theme = "theme1";
    for (int id = 0; id < 1000000; id++){
        //theme+id 为每一行对应的 id,将在回调时返回
        mtw.insert(theme + id, code, price);
    }

对象方法说明

方法名功能
insert插入单行数据
getUnwrittenData获取未写入服务器的数据
insertUnwrittenData将数据插入数据表
getStatus获取 MTW 对象当前的运行状态
waitForThreadCompletionMTW 会进入等待状态

以下为各方法的详细说明。

insert 方法

ErrorCodeInfo insert(Object... args)

参数说明

args 是变长参数,代表插入一行数据。

函数说明

插入单行数据。返回一个 ErrorCodeInfo 对象,包含 errorCode 和 errorInfo,分别表示错误代码和错误信息。

ErrorCodeInfo 介绍:

  • errorCode:"" 空字符串表示没有错误发生;如果有错误发生,值为错误码,例如:"A2"。
  • errorInfo:表示错误信息。当没有错误发生时,返回 null;发生错误时,返回包含错误信息的字符串。
  • hasError():判断插入待转换队列时是否发生错误。如果发生错误,返回 true;反之,返回 false。
  • succeed():判断是否成功插入数据缓存队列。如成功插入,返回 true;反之,返回 false。

ErrorCodeInfo 类提供了 hasErrorsucceed 方法用于获取数据是否成功放入数据队列的结果。在插入一行数据后,推荐调用 hasError 来判断是否成功将数据插入到待转换队列。

如果构造 MultithreadedTableWriter 时开启了回调(见上述参数说明),则每次调用 insert 时,需要在每行数据前面增加一列 String 类型的数据作为每行的标识符(id),此 id 列仅用于回调时返回给用户,不会写入表中。

示例

下例为演示采用多线程模式向表中插入一行记录。

MultithreadedTableWriter multithreadedTableWriter = new MultithreadedTableWriter("localhost", 8848, "admin", "123456", "dfs://valuedb3", "pdatetest",
                false, false, null, 10000, 1,
                5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
ErrorCodeInfo pErrorInfo = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", 10000000L);

getUnwrittenData 方法

List<List<Entity>> getUnwrittenData()

函数说明

返回一个嵌套列表,表示未写入服务器的数据。

注意:该方法获取到数据资源后, MultithreadedTableWriter 将释放这些数据资源。

示例

下例为演示获取未写入服务器的数据。

List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();

insertUnwrittenData 方法

ErrorCodeInfo insertUnwrittenData(List<List<Entity>> records)

参数说明

records 需要再次写入的数据。可以通过方法 getUnwrittenData 获取该对象。

函数说明

将数据插入数据表。返回值同 insert 方法。与 insert 方法的区别在于,insert 只能插入单行数据,而 insertUnwrittenData 可以同时插入多行数据。

示例

下例为演示插入之前未写入服务器的数据。

List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();
ErrorCodeInfo ret = multithreadedTableWriter_.insertUnwrittenData(unwrittenData);

getStatus 方法

Status getStatus()

函数说明

获取 MultithreadedTableWriter 对象当前的运行状态。

返回值说明

Status 是 MultithreadedTableWriter.Status 类,继承自 ErrorCodeInfo 实现。

Status 类具有以下属性和方法。

Status 类属性:

  • isExiting:写入线程是否正在退出。
  • errorCode:错误码。
  • errorInfo:错误信息。
  • sentRows:成功发送的总记录数。
  • unsentRows:待发送的总记录数。
  • sendFailedRows:发送失败的总记录数。
  • threadStatus:写入线程状态列表。
    • threadId:线程 Id。
    • sentRows:该线程成功发送的记录数。
    • unsentRows:该线程待发送的记录数。
    • sendFailedRows:该线程发送失败的记录数。

Status 类方法:

  • hasError():true 表示数据写入存在错误;false 表示数据写入无错误。
  • succeed():true 表示数据写入成功;false 表示数据写入失败。

示例

下例为演示获取 Writer 工作状态。

MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status();
writeStatus = multithreadedTableWriter_.getStatus();

以一个 MultithreadedTableWriterStatus 打印输出为例:

errorCode     : A1
 errorInfo     : Data conversion error: Cannot convert double to LONG
 isExiting     : True
 sentRows      : 2493
 unsentRows    : 0
 sendFailedRows: 7507
 threadStatus  : 
        threadId        sentRows        unsentRows      sendFailedRows
               0               0                 0                7507
        3567691520           415                 0                   0
        3489658624           831                 0                   0
        3481265920           416                 0                   0
        3472873216           416                 0                   0
        3464480512           415                 0                   0
<dolphindb.session.MultithreadedTableWriterStatus object at 0x7f0102c76d30>

以上输出内容显示,在本次 MTW 写入流程中发生了异常,异常代码是 A1,异常信息为 Data conversion error: Cannot convert double to LONG。

isExiting=True 表示当前线程正在退出。

sentRows=2493 表示当前有 2493 条数据已经写入到 DolphinDB 服务端。

unsentRows=0/sendFailedRows=7507 则表示当前仍在 API 的未成功写入 DolphinDB 服务端的数据一共有 0+7507=7507 条。

在后面的表格中,列出了各个后台线程的处理情况,threadId=0 表示所有线程的统计总数。通常而言,成功写入的结果(sentRows)会分别显示在各个线程中,例如上述输出中的 sentRows 列;写入失败的结果(unsentRows、sendFailedRows)会集中显示在 threadId=0 行,表示整个过程中的写入失败条数。

waitForThreadCompletion 方法

waitForThreadCompletion()

函数说明

调用此方法后,MTW 会进入等待状态,待后台工作线程全部完成后退出等待状态。

示例

下例为演示等待 Writer 写入完毕。

multithreadedTableWriter_.waitForThreadCompletion();

使用示例

示例 1

下例为演示用 MultithreadedTableWriter 向表中写入一系列数据并在发生错误后在控制台打印错误信息。

步骤包括:

  1. 创建 DFS 表。
  2. 创建 MTW 对象。
  3. 写入数据。
  4. 若有异常,进行异常处理。
@Test
public void testMultithreadedTableWriter() {
    DBConnection conn= new DBConnection();
    conn.connect(HOST, PORT, "admin", "123456");
    Random random = new Random();
    
    // 1、创建 DFS 表
    String script =
            "dbName = 'dfs://valuedb3'" +
                    "if (exists(dbName))" +
                    "{" +
                    "dropDatabase(dbName);" +
                    "}" +
                    "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
                    "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
                    "pt = db.createPartitionedTable(datetest,'pdatetest','id');";
    conn.run(script);
    
    // 2、创建 MTW 对象
    MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
            false, false, null, 10000, 1,
            5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
    ErrorCodeInfo ret;
    
    // 3、写入数据
    try {
        // 插入100行正确数据
        for (int i = 0; i < 100; ++i) {
            ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), "AAAAAAAB", random.nextInt() % 10000);
        }
    }
    catch (Exception e) {
        // MTW 抛出异常
        System.out.println("MTW exit with exception {0}" + e.getMessage());
    }

    // 等待 MTW 插入完成
    multithreadedTableWriter_.waitForThreadCompletion();
    MultithreadedTableWriter.Status writeStatus = new MultithreadedTableWriter.Status();
    writeStatus = multithreadedTableWriter_.getStatus();
    if (!writeStatus.errorInfo.equals("")) {
        // 如果写入时发生错误
        System.out.println("error in writing !");
    }
    System.out.println("writeStatus: {0}\n" + writeStatus.toString());
    System.out.println(((BasicLong)conn.run("exec count(*) from pt")).getLong());
}

以上代码输出结果为:

writeStatus: {0}
errorCode     : 
errorInfo     : 
isExiting     : true
sentRows      : 100
unsentRows    : 0
sendFailedRows: 0
threadStatus  :
        threadId        sentRows      unsentRows  sendFailedRows
              13              30               0               0
              14              18               0               0
              15              15               0               0
              16              20               0               0
              17              17               0               0

100

以上为多线程写入的结果,因此 threadId 与 sentRows 列的值会根据实际情况而有所不同,用户需要根据实际情况进行判断,正常情况下 sentRows 各数总和应为100。

示例 2

这里介绍 Mode.M_Upsert 模式的使用介绍:

@Test(timeout = 120000)
public void test_mtw_tableUpsert_DP_updateFirst() throws Exception {
    String script = "if(existsDatabase(\"dfs://upsert\")) {\n" +
            "dropDatabase(\"dfs://upsert\")\n" +
            "}\n" +
            "sym=\"A\" \"B\" \"C\" \"A\" \"D\" \"B\" \"A\"\n" +
            "date=take(2021.12.10,3) join take(2021.12.09, 3) join 2021.12.10\n" +
            "price=8.3 7.2 3.7 4.5 6.3 8.4 7.6\n" +
            "val=10 19 13 9 19 16 10\n" +
            "t=table(sym, date, price, val)\n" +
            "db=database(\"dfs://upsert\", VALUE,\"A\" \"B\" \"C\")\n" +
            "pt=db.createPartitionedTable(t, `pt, `sym)\n" +
            "pt.append!(t)";
    conn.run(script);
    MultithreadedTableWriter mtw = new MultithreadedTableWriter(HOST,PORT,"admin","123456","dfs://upsert","pt",
            false,false,null,1000,1,10,"sym",null,
            MultithreadedTableWriter.Mode.M_Upsert, 
            new String[]{"ignoreNull=false", "keyColNames=`sym"});
    mtw.insert(new BasicString("A"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(11.1),new BasicInt(12));
    mtw.insert(new BasicString("B"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(10.5),new BasicInt(9));
    mtw.insert(new BasicString("E"),new BasicDate(LocalDate.of(2021,12,9)),new BasicDouble(6.9),new BasicInt(11));
    mtw.waitForThreadCompletion();
    BasicTable ua = (BasicTable) conn.run("select * from pt;");
    System.out.println(ua.getString());
}

执行结果:

sym date       price val
--- ---------- ----- ---
A   2021.12.09 11.1  12 
A   2021.12.09 4.5   9  
A   2021.12.10 7.6   10 
B   2021.12.09 10.5  9  
B   2021.12.09 8.4   16 
C   2021.12.10 3.7   13 
D   2021.12.09 6.3   19 
E   2021.12.09 6.9   11 

异常返回

在使用 MultithreadedTableWriter 类调用 insert 方法插入数据时,可能会发生如下异常:

  • 在调用 MultithreadedTableWriterinsert 方法时,若插入数据的类型与表对应列的类型不匹配,则 MultithreadedTableWriter 会立刻返回错误信息并打印出堆栈:

    DBConnection conn= new DBConnection();
    conn.connect(HOST, PORT, "admin", "123456");
    Random random = new Random();
    String script =
            "dbName = 'dfs://valuedb3'" +
                    "if (exists(dbName))" +
                    "{" +
                    "dropDatabase(dbName);" +
                    "}" +
                    "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
                    "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
                    "pt = db.createPartitionedTable(datetest,'pdatetest','id');";
    conn.run(script);
    MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
            false, false, null, 10000, 1,
            5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
    ErrorCodeInfo ret;
    //插入1行类型错误数据,MTW 立刻返回错误信息
    ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), 222, random.nextInt() % 10000);
    if (!ret.errorInfo.equals(""))
        System.out.println("insert wrong format data: {2}\n" + ret.toString());

    以上代码输出结果为:

    java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL.
    	at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:795)
    	at com.xxdb.data.BasicEntityFactory.createScalar(BasicEntityFactory.java:505)
    	at com.xxdb.multithreadedtablewriter.MultithreadedTableWriter.insert(MultithreadedTableWriter.java:594)
    	at com.xxdb.BehaviorTest.testMul(BehaviorTest.java:89)
    	at com.xxdb.BehaviorTest.main(BehaviorTest.java:168)
    insert wrong format data: {2}
    code=A1 info=Invalid object error java.lang.RuntimeException: Failed to insert data. Cannot convert int to DT_SYMBOL.
  • 在调用 MultithreadedTableWriterinsert 方法时,若 insert 插入数据的列数和表的列数不匹配,MultithreadedTableWriter 会立刻返回错误信息:

    DBConnection conn= new DBConnection();
    conn.connect(HOST, PORT, "admin", "123456");
    Random random = new Random();
    String script =
            "dbName = 'dfs://valuedb3'" +
                    "if (exists(dbName))" +
                    "{" +
                    "dropDatabase(dbName);" +
                    "}" +
                    "datetest = table(1000:0,`date`symbol`id,[DATE, SYMBOL, LONG]);" +
                    "db = database(directory= dbName, partitionType= HASH, partitionScheme=[INT, 10]);" +
                    "pt = db.createPartitionedTable(datetest,'pdatetest','id');";
    conn.run(script);
    MultithreadedTableWriter multithreadedTableWriter_ = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
            false, false, null, 10000, 1,
            5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
    ErrorCodeInfo ret;
    //插入1行数据,插入数据的列数和表的列数不匹配,MTW 立刻返回错误信息
    ret = multithreadedTableWriter_.insert(new Date(2022, 3, 23), random.nextInt() % 10000);
    if (!ret.errorInfo.equals(""))
        System.out.println("insert wrong format data: {3}\n" + ret.toString());

    以上代码输出结果为:

    insert wrong format data: {3}
    code=A2 info=Column counts don't match.  
  • 如果 MultithreadedTableWriter 在运行时连接断开,则所有工作线程被终止。继续通过 MultithreadedTableWriter 向服务器写数据时,会因为工作线程终止而抛出异常,且数据不会被写入。此时,可通过调用 MultithreadedTableWritergetUnwrittenData 获取未插入的数据,并重新插入:

    List<List<Entity>> unwriterdata = new ArrayList<>();
    unwriterdata = multithreadedTableWriter_.getUnwrittenData();
    System.out.println("{5} unwriterdata: " + unwriterdata.size());
    //重新获取新的 MTW 对象
    MultithreadedTableWriter newmultithreadedTableWriter = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
            false, false, null, 10000, 1,
            5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
    try
    {
        boolean writesuccess = true;
        //将没有写入的数据写到新的 MTW 中
        ret = newmultithreadedTableWriter.insertUnwrittenData(unwriterdata);
    }
    finally
    {
        newmultithreadedTableWriter.waitForThreadCompletion();
        writeStatus = newmultithreadedTableWriter.getStatus();
        System.out.println("writeStatus: {6}\n" + writeStatus.toString());
    }

    以上代码输出结果为:

      {5} unwriterdata: 10
      writeStatus: {6}
      errorCode     : 
      errorInfo     : 
      isExiting     : true
      sentRows      : 10
      unsentRows    : 0
      sendFailedRows: 0
      threadStatus  :
              threadId        sentRows      unsentRows  sendFailedRows
                    23               3               0               0
                    24               2               0               0
                    25               1               0               0
                    26               3               0               0
                    27               1               0               0

    以上为多线程写入的结果,并且需要先造成断线情况,因此 threadId 与 sentRows列 的值会根据实际情况而有所不同,用户需要根据实际情况进行判断。

重新写入

当 MTW 发生类型转换错误、写入线程写入失败等错误时,API 将会终止所有线程。此时可以通过 writer.getUnwrittenData() 方法获取失败数据,通过 insertUnwrittenData(unwrittendata) 重新写入失败数据。因为原有 MTW 对象的所有线程已经终止,无法再次被用来写入数据,因此需要重新构造一个新的 MTW 对象,将失败数据重新写入新的 MTW 对象中。以下给出重新写入示例:

List<List<Entity>> unwriterdata;
unwriterdata = multithreadedTableWriter_.getUnwrittenData();
System.out.println("{5} unwriterdata: " + unwriterdata.size());

// 重新获取新的 MTW 对象
MultithreadedTableWriter newmultithreadedTableWriter = new MultithreadedTableWriter(HOST, PORT, "admin", "123456", "dfs://valuedb3", "pdatetest",
        false, false, null, 10000, 1,
        5, "id", new int[]{Vector.COMPRESS_LZ4, Vector.COMPRESS_LZ4, Vector.COMPRESS_DELTA});
try {
    boolean writesuccess = true;
    // 将没有写入的数据写到新的MTW中
    ret = newmultithreadedTableWriter.insertUnwrittenData(unwriterdata);

    for (int i = 0; i < 10 - unwriterdata.size(); ++i)
    {
        ret = newmultithreadedTableWriter.insert(pErrorInfo, new Date(2022, 3, 23), "AAAAAAAB", random.nextInt() % 10000);
    }

} catch(Exception e) {
  e.getMessage();
} finally {
    newmultithreadedTableWriter.waitForThreadCompletion();
    writeStatus = newmultithreadedTableWriter.getStatus();
    System.out.println("writeStatus: {6}\n" + writeStatus.toString());
}