StreamingSQLClient

Java API 提供了 StreamingSQLClient,用于操作流式 SQL。用户可通过它声明流式 SQL 表并注册流式 SQL 查询以获取结果表。StreamingSQLClient 会订阅对应的结果变更日志(log),并基于这些增量日志在本地维护实时更新的结果表。

StreamingSQLClient

用于创建操作流式 SQL 的客户端。
public StreamingSQLClient(String host, int port, String userName, String password)

参数说明:

  • host String 类型,指定启用流式 SQL 的节点 IP。
  • port int 类型,指定启用流式 SQL 的节点端口号。
  • userName String 类型,用于登录启用流式 SQL 的节点的用户名。
  • password String 类型,用于登录启用流式 SQL 的节点的密码。

declareStreamingSQLTable

将指定表声明为流式 SQL 输入表,表示希望在该表上注册和执行流式 SQL 查询。
public void declareStreamingSQLTable(String tableName)

参数说明:

  • tableName String 类型,指定流式 SQL 表的表名。目前仅支持共享内存表。

revokeStreamingSQLTable

注销一个通过 declareStreamingSQLTable 声明的流式 SQL 表。

  • 只能取消当前用户自己声明的表。
  • 注销前,必须先取消在该表上已注册的所有流式 SQL 查询。
  • 注销操作仅移除表的流式 SQL 功能,不会删除表本身或表中的数据。
public void revokeStreamingSQLTable(String tableName)

参数说明:

  • tableName String 类型,指定已声明的流式 SQL 表的名称。

listStreamingSQLTables

列举当前用户已通过 declareTableForStreamingSQL 声明的所有流式 SQL 表。如果调用者是管理员(admin),则会返回系统中所有用户声明的流式 SQL 表。
public BasicTable listStreamingSQLTables()

返回一个表,包含如下字段:表名(tableName)、是否共享(shared)、声明用户(users)。

registerStreamingSQL

注册流式 SQL 查询,注册成功会返回一个 query ID。系统会基于该查询生成一个与 query ID 同名的共享流表,用于写入结果变更日志。
public String registerStreamingSQL(String sqlQuery, String queryId, int logTableCacheSize)

参数说明:

sqlQuery String 类型,表示要注册的流式 SQL 查询语句。

queryId String 类型,可选参数,表示 query 对应的 ID 名称。格式要求与变量名相同,只能包含字母、数字及下划线,且必须以字母开头。

  • 若指定的 queryId 已存在,系统会自动在其后追加时间戳生成唯一 ID。
  • 若不指定,系统自动生成唯一 ID。

logTableCacheSize int 类型,可选参数,表示结果变更日志在内存中的最大缓存条数,默认缓存全部。只要存在订阅,缓存中未发布的数据就会被保留,不会被清理。

revokeStreamingSQL

注销一条已经注册的流式 SQL,只能注销当前用户注册的查询。
public void revokeStreamingSQL(String queryId)

参数:

queryId String 类型,表示已注册的流式 SQL 查询的 ID。

getStreamingSQLStatus

查看流式 SQL 查询的运行状态。
public BasicTable getStreamingSQLStatus(String queryId)

返回一个表,包含如下字段:

  • queryId:流式 SQL 查询的 ID。
  • user:注册该查询的用户名。
  • registerTime:注册该查询的时间。
  • status:当前查询的状态,取值包括:
    • SQL_REGISTERED:已注册,未运行。
    • SQL_RUNNING:正常运行中,结果实时更新。
    • SQL_STOPPED:已暂停。
    • INTERNAL_ERROR:运行异常。
  • sqlQuery:流式 SQL 查询的语句。
  • involvedTables:查询涉及的表。
  • lastErrorMessage:最近一次错误的信息(如有)。

参数说明:

queryId String 类型,可选参数,表示已注册的流式 SQL 查询 ID。若不指定,返回当前用户注册的所有流式 SQL 查询的状态。如果调用者是管理员(admin),则返回所有用户注册的流式 SQL 查询的状态。

subscribeStreamingSQL

订阅相应的 StreamingSQL 的结果变更 log,并返回当前计算结果的结果表,结果会根据订阅到的变更 log 实时更新。
public BasicTable subscribeStreamingSQL(String queryId, int batchSize, float throttle)

参数说明:

queryId String 类型,需要订阅的流式 SQL 查询 ID。

batchSize int 类型,可选参数。

  • 正数:累计未处理日志数达到 batchSize 时才处理。
  • 非正数或未指定:每批日志到达即处理。

throttle float 类型,可选参数,单位为秒。表示继上次处理变更 日志消息之后,若 batchSize 条件一直未达到,多久后再次处理消息。如果没有指定 batchSizethrottle 即使指定,也不起作用。

unsubscribeStreamingSQL

取消订阅指定的流式 SQL 查询结果。取消订阅后,订阅端的实时结果表将停止更新。
public void unsubscribeStreamingSQL(String queryId)

参数说明:

queryId String 类型,表示要取消订阅的流式 SQL 查询 ID。

使用示例

注:使用该功能前需要在 server 端启用流式 SQL。相关配置项为 streamingSQLExecutorsmaxStreamingSQLQueriesPerTable。详情请参见用户手册
DBConnection conn = new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
// 定义 keyedTables
String script = "share keyedTable(`id, 1:0, `id`time`value, [SYMBOL, TIMESTAMP, DOUBLE[]]) as t1;\n" +
        "share keyedTable(`id, 1:0, `id`time`value, [SYMBOL, TIMESTAMP, DOUBLE[]]) as t2;";
conn.run(script);

// 创建 StreamingSQLClient
StreamingSQLClient streamingSQLClient = new StreamingSQLClient(HOST, PORT, "admin","123456");

// 声明 t1、t2 为流式 SQL 表
streamingSQLClient.declareStreamingSQLTable("t1");
streamingSQLClient.declareStreamingSQLTable("t2");

// 查看已声明的流式 SQL 表
BasicTable streamingSQLTables = streamingSQLClient.listStreamingSQLTables();

// 注册流式 SQL
String sqlStr1 = "SELECT id, time,t1.value, rowSum(value) FROM t1 FULL JOIN t2 ON t1.id=t2.id\n" ;
String queryId = streamingSQLClient.registerStreamingSQL(sqlStr1);

// 查看指定的、已声明的流式 SQL 状态
BasicTable streamingSQLStatus = streamingSQLClient.getStreamingSQLStatus(queryId);

// 订阅流式 SQL
BasicTable bt = streamingSQLClient.subscribeStreamingSQL(queryId);

// 往已声明的流式 SQL 表写数据
conn.run("n=100;\n" +
        "data = table(take(\"A\"+string(1..30), n) as id, timestamp(2025.08.26T12:36:23.438+1..n) as timestamp, take([take(1..10,10),take(1..10,10),take(1..10,10)], n) as value)\n" +
        "t1.append!(data)\n");
conn.run("n=100;\n" +
        "data = table(take(\"A\"+string(1..30), n) as id, timestamp(2025.08.26T12:36:23.438+1..n) as timestamp, take([take(1..10,10),take(1..10,10),take(1..10,10)], n) as value)\n" +
        "t2.append!(data)\n");

Thread.sleep(5000);
// 根据业务需要,对本地的结果表 bt 进行相应的业务操作
System.out.println(bt.getString());

// 取消订阅
streamingSQLClient.unsubscribeStreamingSQL(queryId);