流订阅

Java API 支持流订阅,每当流表数据有更新时,Java 客户端可以通过流订阅方式获取更新的数据。Java API 有三种获取流数据的方式:ThreadedClient、ThreadPooledClient 以及 PollingClient。

三种流订阅工具的构造方式

构造说明:

  • 2.00.9 及之后的版本支持发布端通过订阅端的请求连接推送数据。因此,订阅端无需指定端口(默认值为0);如果指定,该参数无效,会被 API 忽略。
  • 1.30 版本、2.00.9 之前的版本在订阅端提交订阅请求后,发布端需要重新发起一个 TCP 连接用于传输数据。

ThreadedClient

每个订阅会新建一个数据处理线程,需要用户设置回调函数。每当有新的数据,对应的订阅线程就会用收到的数据来执行回调函数。

构造方法:

public ThreadedClient()
public ThreadedClient(String subscribeHost, int subscribePort)

参数说明:

subscribeHost String 类型,表示订阅的主机 IP,可选参数,默认为 localhost。若 Java 客户端机器有多个IP,这里可以指定使用某个 IP。

subscribePort int 类型,表示订阅的端口,可选参数,默认为 8849。

ThreadPooledClient

多个订阅共用一个线程池。

构造方法:

public ThreadPooledClient()
public ThreadPooledClient(String subscribeHost, int subscribePort, int threadCount)

参数说明:

subscribeHost String 类型,表示订阅的主机 IP,可选参数,默认为空。

subscribePort int 类型,表示订阅的端口,可选参数,默认为 8849。

threadCount int 类型,表示订阅线程数量,可选参数;如果不填,默认为当前计算机可用的线程资源数(Runtime.getRuntime().availableProcessors())。

PollingClient

订阅时返回一个队列,然后 poll 这个队列获取数据即可。

构造方法:

public PollingClient()
public PollingClient(String subscribeHost, int subscribePort)

参数说明:

subscribeHost String 类型,表示订阅的主机 IP,可选参数,默认为空。

subscribePort int 类型,表示订阅的端口。

三种流订阅工具的订阅方式

以上介绍了 Java API 提供的三种流订阅的构造方式,接下来介绍它们的订阅方式。它们都是通过 subscribe 方法来进行订阅,且具有相同的订阅参数。

subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, IVector filter, int batchSize, float throttle, StreamDeserializer deserializer, String user, String password, List<String> backupSites, int resubTimeout, boolean subOnce)

**参数介绍 **

host String 类型,表示发布端节点的 IP 地址。

port int 类型,表示发布端节点的端口号。

tableName String 类型,表示发布表的名称。

actionName String 类型,表示订阅任务的名称,选填参数,默认为 ”javaStreamingApi” 。

handler 表示用户自定义的回调函数,用于处理每次流入的数据。

offset 表示订阅任务开始后的第一条消息所在的位置,选填参数,默认为-1。一条消息对应流数据表中的一行。如果没有指定 offset,或它为负数,或超过了流数据表的记录行数,订阅将会从流数据表的当前行开始。offset 与流数据表创建时的第一行对应。如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。具体可参考文档 DolphinDB-subscribeTable

reconnect boolean 类型,表示订阅中断后,是否会自动重订阅,选填参数,默认为 false 。

filter 向量,表示过滤条件,参数函数可以参见 DolphinDB-subscribeTablefilter 描述;选填参数,默认为 null 。流数据表过滤列在 filter 中的数据才会发布到订阅端,不在 filter 中的数据不会发布。

batchSize int 类型,表示批处理的消息的数量。

  • 如果它是正数,直到消息的数量达到 batchSize 时,handler 才会处理进来的消息。
  • 如果它没有指定或者是非正数,消息到达之后,handler 就会马上处理消息。

注意 batchSizethrottle 参数要求同时填写,两者都为可选参数,但没有默认值。

throttle float 类型,表示 handler 处理到达的消息之前等待的时间。注意 batchSizethrottle 参数要求同时填写,两者都为可选参数,但没有默认值。

deserializer 表示订阅的异构流表对应的反序列化器,可选参数,默认为 null。

user String 类型,表示 Java API 所连接服务器的登录用户名,默认为 “” 即空字符串。

password String 类型,表示 Java API 所连接服务器的登录密码,默认为“”即空字符串。

backupSites String 类型列表,可选参数,表示备用的发布端节点列表,由节点 IP 和端口号组成,例如 [“192.168.0.1:8848“, “192.168.0.2:8849“]。

  • 指定 backupSites,表示启动主备节点切换。如果发生节点切换(例如连接断开),会在可用节点列表中不断轮询订阅,直至订阅成功。
  • 若配置该参数,用户须保证主节点(由 hostport 参数指定)和备用节点上存在相同结构、相同数据的同名流数据表。否则,可能出现订阅的数据不符合预期。
  • 若订阅的是高可用流表,重连时将从 backupSites 指定的节点列表中选择节点。
  • 取消订阅须使用主节点 IP 和端口号进行取消。

resubTimeout int 类型,可选参数,表示重订阅距离断线最小时间间隔,单位为毫秒,默认值为 100。

subOnce boolean 类型,可选参数,表示是否在节点切换时尝试之前成功连接过的节点。默认值为 false,表示不尝试。

示例代码

下面分别介绍以上三种方式订阅流数据示例。

  • 使用 MessageHandler 回调的方式获取新数据

首先,用户需要自定义数据处理器 handler。handler 需要实现 com.xxdb.streaming.client.MessageHandler 接口。以下为自定义 handler 示例:

public static MessageHandler MessageHandler_handler = new MessageHandler() {
    @Override
    public void doEvent(IMessage msg) {
        try {
            String script = String.format("insert into Receive values(%d,%s,%f)", Integer.parseInt(msg.getEntity(0).getString()), msg.getEntity(1).getString(), Double.valueOf(msg.getEntity(2).toString()));
            conn.run(script);
            //  System.out.println(msg.getEntity(0).getString());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
};

在启动订阅时,把 handler 实例作为参数传入订阅函数。

包括单线程回调或多线程回调两种方式:

  1. 单线程回调 ThreadedClient

    @Test
    public void testThreadedClient() throws SocketException {
        // 1.30 版本、2.00.9 之前的版本,开启订阅,指定端口8000
        ThreadedClient client = new ThreadedClient(8000);
        // 2.00.9 及之后的版本,开启订阅,无需指定端口
        ThreadedClient client = new ThreadedClient();
        client.subscribe("192.168.0.21", 8848, "Trades", handler, -1);
    }

    当流数据表有新增数据时,系统会通知 Java API 调用 MyHandler 方法,将新数据通过 msg 参数传入。

  2. 多线程回调 ThreadPooledClient

    @Test
    public void testThreadPooledClient() throws SocketException {
        // 1.30 版本、2.00.9 之前的版本,开启订阅,指定端口8000
        ThreadPooledClient client = new ThreadPooledClient(8000);
        // 2.00.9 及之后的版本,开启订阅,无需指定端口
        ThreadPooledClient client = new ThreadPooledClient();
        client.subscribe("192.168.0.21", 9002, "Trades", handler, -1);
    }
  • 通过客户机上的应用程序定期去流数据表查询是否有新增数据

    该种情况下,推荐使用 PollingClient,具体使用方式见如下代码。

    @Test
    public void testPollingClient() {
        // 1.30 版本、2.00.9 之前的版本,开启订阅,指定端口8000
        PollingClient client = new PollingClient(subscribePort);
        // 2.00.9 及之后的版本,开启订阅,无需指定端口
        PollingClient client = new PollingClient();
        tableName, TopicPoller poller1 = client.subscribe("192.168.0.21", 9002, "Trades", -1);
    
        while (true) {
            ArrayList<IMessage> msgs = poller1.poll(1000);
            if (msgs.size() > 0) {
                // 取数据中第一行第三个字段
                BasicInt value = (BasicInt) msgs.get(0).getEntity(2);
            }
        }
    }

​ poller1 探测到流数据表有新增数据后,会拉取到新数据。无新数据发布时,Java 程序会阻塞在 poller1.poll 方法等待。

重连 reconnect

若重连参数 reconnect 设置为 true、订阅意外中断后,系统是否以及如何自动重新订阅,取决于订阅中断由以下哪种原因导致:

  • 如果发布端与订阅端处于正常状态,但是网络中断,那么订阅端会在网络正常时自动从中断位置重新订阅。
  • 如果发布端崩溃,订阅端会在发布端重启后不断尝试重新订阅。
    • 如果发布端对流数据表启动了持久化,发布端重启后会首先读取硬盘上的数据,直到发布端读取到订阅中断位置的数据,订阅端才能成功重新订阅。
    • 如果发布端没有对流数据表启用持久化,那么订阅端将自动重新订阅失败。
  • 如果订阅端崩溃,订阅端重启后不会自动重新订阅,需要重新执行 subscribe 函数。

以下为设置 reconnect 为 true 的例子:

@Test
public void testPollingClient() {
    PollingClient client = new PollingClient(9002);
    TopicPoller poller1 = client.subscribe("192.168.0.21", 9002, "Trades", -1, true);
}

启用 filter

filter 为过滤器,可以对发布到订阅端的数据进行过滤,参数为一个向量。该参数需要发布端配合setStreamTableFilterColumn函数一起使用。使用setStreamTableFilterColumn指定流数据表的过滤列,如果列在 filter 中,流数据表才将该列会发布到订阅端,不在 filter 中的数据不会发布。

以下例子将一个包含元素1和2的整数类型向量作为subscribe的filter参数值:

@Test
public void testEnableFilter() throws IOException {
    BasicIntVector filter = new BasicIntVector(2);
    filter.setInt(0, 1);
    filter.setInt(1, 2);
    PollingClient client = new PollingClient(9002);
    TopicPoller poller1 = client.subscribe("192.168.0.21", 9002,"Trades1","subTread1",-1,filter);
    ArrayList<IMessage> msg1 = poller1.poll(500, 10000);
}

订阅异构流表

DolphinDB server 自 1.30.17 及 2.00.5 版本开始,支持通过 replay 函数将多个结构不同的流数据表回放(序列化)到一个流表里,这个流表被称为异构流表。Java API 自 1.30.19 版本开始,新增 StreamDeserializer 类,用于构造异构流表反序列化器,以实现对异构流表的订阅和反序列化操作。

Java API 通过 StreamDeserializer 类来构造异构流表反序列化器,可参考:DolphinDB-streamFilter ,语法如下:

  1. 通过指定表的 schema 进行构造,包含以下两种方式,指定表的 schema 信息或指定表的各列类型:

1)指定表的 schema 信息:

StreamDeserializer(Map<String, BasicDictionary> filters)

​ 其中,key 为 condition,value 为表的 schema。

​ 2)指定表的各列类型:

```
StreamDeserializer(HashMap<String, List<Entity.DATA_TYPE>> filters)
```

​ 其中,key 为 condition,value 为各列的类型。

  1. 通过指定表进行构造:
StreamDeserializer(Map<String, Pair<String, String>> tableNames, DBConnection conn)

其中,key 为表名,value 为 DBConnection。

订阅示例:

public class TestStreamDeserializer {
    // 假设异构流表回放时inputTables如下:
    // d = dict(['msg1', 'msg2'], [table1, table2]);
    // replay(inputTables = d, outputTables = `outTables, dateColumn = `timestampv, timeColumn = `timestampv)";

    // 异构流表解析器的创建方法如下:
    // 指定schema的方式
    BasicDictionary table1_schema = (BasicDictionary)conn.run("table1.schema()");
    BasicDictionary table2_schema = (BasicDictionary)conn.run("table2.schema()");
    Map<String,BasicDictionary > tables = new HashMap<>();
    tables.put("msg1", table1_schema);
    tables.put("msg2", table2_schema);
    StreamDeserializer streamFilter = new StreamDeserializer(tables);

    // 指定表的各列类型
    Entity.DATA_TYPE[] array1 = {DT_DATETIME,DT_TIMESTAMP,DT_SYMBOL,DT_DOUBLE,DT_DOUBLE};
    Entity.DATA_TYPE[] array2 = {DT_DATETIME,DT_TIMESTAMP,DT_SYMBOL,DT_DOUBLE};
    List<Entity.DATA_TYPE> filter1 = new ArrayList<>(Arrays.asList(array1));
    List<Entity.DATA_TYPE> filter2 = new ArrayList<>(Arrays.asList(array2));
    HashMap<String, List<Entity.DATA_TYPE>> filter = new HashMap<>();
    filter.put("msg1",filter1);
    filter.put("msg2",filter2);
    StreamDeserializer streamFilter = new StreamDeserializer(filter);

    // 指定表的方式
    Map<String, Pair<String, String>> tables = new HashMap<>();
    tables.put("msg1", new Pair<>("", "table1"));
    tables.put("msg2", new Pair<>("", "table2"));
    // conn是可选参数,如果不传入,在订阅的时候会自动使用订阅的conn进行构造
    StreamDeserializer streamFilter = new StreamDeserializer(tables, conn);
}
下面分别介绍如何通过 `ThreadedClient`、`ThreadPooledClient` 和 `PollingClient` 三种方式订阅异构流表:
  1. 通过 ThreadedClient 订阅异构流表:通过两种方式完成订阅时对异构流表的解析操作。

    • 通过指定 subscribe 函数的 deserialize 参数,实现在订阅时直接解析异构流表:

      ThreadedClient client = new ThreadedClient(8676);
      client.subscribe("192.168.0.21", 9002,"Trades1","subTread1", handler, -1, true, filter, streamFilter);
    • 异构流表(streamFilter)也可以写入用户自定义的 Handler 中,在回调时被解析:

public class CustomHandler implements MessageHandler {
    private StreamDeserializer deserializer_;
    private List<BasicMessage> msg1 = new ArrayList<>();
    private List<BasicMessage> msg2 = new ArrayList<>();

    public CustomHandler(StreamDeserializer deserializer) {
        deserializer_ = deserializer;
    }
    public void batchHandler(List<IMessage> msgs) {
    }

    public void doEvent(IMessage msg) {
        try {
                BasicMessage message = deserializer_.parse(msg);
                if (message.getSym().equals("msg1")) {
                    msg1.add(message);
                } else if (message.getSym().equals("msg2")) {
                    msg2.add(message);
                }
        } catch (Exception e) {
                e.printStackTrace();
        }
    }

    public List<BasicMessage> getMsg1() {
        return msg1;
    }
    public List<BasicMessage> getMsg2() {
        return msg2;
    }
}

// 创建流表
conn.run("st11 = streamTable(100:0, `timestampv`id`blob`price1,[TIMESTAMP,INT,BLOB,DOUBLE])\n" +
                "enableTableShareAndPersistence(table=st11, tableName=`outTables, asynWrite=true, compress=true, cacheSize=200000, retentionMinutes=180, preCache = 0)\t\n");

Map<String, Pair<String, String>> tables = new HashMap<>();
StreamDeserializer streamFilter = new StreamDeserializer(tables, conn);
CustomHandler handler = new CustomHandler(streamFilter);
ThreadedClient client = new ThreadedClient(8848);
client.subscribe("192.168.0.21", 9002, "outTables", "mutiSchema", handler, 0, true);
conn.run("t = table(timestamp(1 2) as a,1 2 as b,blob(`a`b) as c,1.1 2.1 as d)\n" +
                "outTables.append!(t)");
List<BasicMessage> msg1 = handler.getMsg1();
List<BasicMessage> msg2 = handler.getMsg2();
client.unsubscribe("192.168.0.21", 9002, "outTables", "mutiSchema");
  1. 通过 ThreadPooledClient 订阅异构流表的方法和 ThreadedClient 一致。

    • 指定 subscribe 函数的 deserialize 参数:

      CustomHandler handler = new CustomHandler(streamFilter);
      ThreadPooledClient client1 = new ThreadPooledClient(9050, 10);
      // client.subscribe(hostName, port, tableName, actionName, handler, 0, true);
      // client.subscribe("192.168.0.21", 9002, "Trades", handler);
      // client.subscribe("192.168.0.21", 9002, "outTables", "mutiSchema", handler, 0, true);
      client.subscribe("192.168.0.21", 9002,"Trades1","subTread1", handler, -1, true, filter, streamFilter);
    • 异构流表(streamFilter)也可以写入客户自定义的 Handler 中,在回调时被解析:

      CustomHandler handler = new CustomHandler(streamFilter);
      ThreadPooledClient client = new ThreadPooledClient(9050, 10);
      client.subscribe("192.168.0.21", 9002, "outTables", "mutiSchema", handler, 0, true);

    由于 PollingClient 没有回调函数,只能通过为 subscribe 函数的 deserialize 参数传入 streamFilter 的方式进行解析:

     ```
     PollingClient client = new PollingClient(9050);
     TopicPoller poller1 = client.subscribe("192.168.0.21", 9002, "Trades", "subTread1", 0, true, null, streamFilter,"","",true);
     ArrayList<IMessage> msg1;
     List<String> colNames =  Arrays.asList("tag","ts","data");
     List<Vector> colData = Arrays.asList(new BasicIntVector(0),new BasicTimestampVector(0),new BasicDoubleVector(0));
     BasicTable bt = new BasicTable(colNames,colData);
     conn.run("n=5000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" +
             "Trades.append!(t)");
     msg1 = poller1.poll(1000, 10000);
     ```

取消订阅

停止订阅数据使用 unsubscribe 方法。

public void unsubscribe(String host, int port, String tableName, String actionName)

示例代码:

client.unsubscribe("192.168.0.21", 9002, "outTables", "mutiSchema");