StreamingSQLClient
Java API 提供了 StreamingSQLClient,用于操作流式 SQL。用户可通过它声明流式 SQL 表并注册流式 SQL 查询以获取结果表。StreamingSQLClient 会订阅对应的结果变更日志(log),并基于这些增量日志在本地维护实时更新的结果表。
StreamingSQLClient
public StreamingSQLClient(String host, int port, String userName, String password)
参数说明:
- host String 类型,指定启用流式 SQL 的节点 IP。
- port int 类型,指定启用流式 SQL 的节点端口号。
- userName String 类型,用于登录启用流式 SQL 的节点的用户名。
- password String 类型,用于登录启用流式 SQL 的节点的密码。
declareStreamingSQLTable
public void declareStreamingSQLTable(String tableName)
参数说明:
- tableName String 类型,指定流式 SQL 表的表名。目前仅支持共享内存表。
revokeStreamingSQLTable
注销一个通过 declareStreamingSQLTable
声明的流式 SQL 表。
- 只能取消当前用户自己声明的表。
- 注销前,必须先取消在该表上已注册的所有流式 SQL 查询。
- 注销操作仅移除表的流式 SQL 功能,不会删除表本身或表中的数据。
public void revokeStreamingSQLTable(String tableName)
参数说明:
- tableName String 类型,指定已声明的流式 SQL 表的名称。
listStreamingSQLTables
declareTableForStreamingSQL
声明的所有流式 SQL
表。如果调用者是管理员(admin),则会返回系统中所有用户声明的流式 SQL
表。public BasicTable listStreamingSQLTables()
返回一个表,包含如下字段:表名(tableName)、是否共享(shared)、声明用户(users)。
registerStreamingSQL
public String registerStreamingSQL(String sqlQuery, String queryId, int logTableCacheSize)
参数说明:
sqlQuery String 类型,表示要注册的流式 SQL 查询语句。
queryId String 类型,可选参数,表示 query 对应的 ID 名称。格式要求与变量名相同,只能包含字母、数字及下划线,且必须以字母开头。
- 若指定的 queryId 已存在,系统会自动在其后追加时间戳生成唯一 ID。
- 若不指定,系统自动生成唯一 ID。
logTableCacheSize int 类型,可选参数,表示结果变更日志在内存中的最大缓存条数,默认缓存全部。只要存在订阅,缓存中未发布的数据就会被保留,不会被清理。
revokeStreamingSQL
public void revokeStreamingSQL(String queryId)
参数:
queryId String 类型,表示已注册的流式 SQL 查询的 ID。
getStreamingSQLStatus
public BasicTable getStreamingSQLStatus(String queryId)
返回一个表,包含如下字段:
- queryId:流式 SQL 查询的 ID。
- user:注册该查询的用户名。
- registerTime:注册该查询的时间。
- status:当前查询的状态,取值包括:
- SQL_REGISTERED:已注册,未运行。
- SQL_RUNNING:正常运行中,结果实时更新。
- SQL_STOPPED:已暂停。
- INTERNAL_ERROR:运行异常。
- sqlQuery:流式 SQL 查询的语句。
- involvedTables:查询涉及的表。
- lastErrorMessage:最近一次错误的信息(如有)。
参数说明:
queryId String 类型,可选参数,表示已注册的流式 SQL 查询 ID。若不指定,返回当前用户注册的所有流式 SQL 查询的状态。如果调用者是管理员(admin),则返回所有用户注册的流式 SQL 查询的状态。
subscribeStreamingSQL
public BasicTable subscribeStreamingSQL(String queryId, int batchSize, float throttle)
参数说明:
queryId String 类型,需要订阅的流式 SQL 查询 ID。
batchSize int 类型,可选参数。
- 正数:累计未处理日志数达到 batchSize 时才处理。
- 非正数或未指定:每批日志到达即处理。
throttle float 类型,可选参数,单位为秒。表示继上次处理变更 日志消息之后,若 batchSize 条件一直未达到,多久后再次处理消息。如果没有指定 batchSize,throttle 即使指定,也不起作用。
unsubscribeStreamingSQL
public void unsubscribeStreamingSQL(String queryId)
参数说明:
queryId String 类型,表示要取消订阅的流式 SQL 查询 ID。
使用示例
DBConnection conn = new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
// 定义 keyedTables
String script = "share keyedTable(`id, 1:0, `id`time`value, [SYMBOL, TIMESTAMP, DOUBLE[]]) as t1;\n" +
"share keyedTable(`id, 1:0, `id`time`value, [SYMBOL, TIMESTAMP, DOUBLE[]]) as t2;";
conn.run(script);
// 创建 StreamingSQLClient
StreamingSQLClient streamingSQLClient = new StreamingSQLClient(HOST, PORT, "admin","123456");
// 声明 t1、t2 为流式 SQL 表
streamingSQLClient.declareStreamingSQLTable("t1");
streamingSQLClient.declareStreamingSQLTable("t2");
// 查看已声明的流式 SQL 表
BasicTable streamingSQLTables = streamingSQLClient.listStreamingSQLTables();
// 注册流式 SQL
String sqlStr1 = "SELECT id, time,t1.value, rowSum(value) FROM t1 FULL JOIN t2 ON t1.id=t2.id\n" ;
String queryId = streamingSQLClient.registerStreamingSQL(sqlStr1);
// 查看指定的、已声明的流式 SQL 状态
BasicTable streamingSQLStatus = streamingSQLClient.getStreamingSQLStatus(queryId);
// 订阅流式 SQL
BasicTable bt = streamingSQLClient.subscribeStreamingSQL(queryId);
// 往已声明的流式 SQL 表写数据
conn.run("n=100;\n" +
"data = table(take(\"A\"+string(1..30), n) as id, timestamp(2025.08.26T12:36:23.438+1..n) as timestamp, take([take(1..10,10),take(1..10,10),take(1..10,10)], n) as value)\n" +
"t1.append!(data)\n");
conn.run("n=100;\n" +
"data = table(take(\"A\"+string(1..30), n) as id, timestamp(2025.08.26T12:36:23.438+1..n) as timestamp, take([take(1..10,10),take(1..10,10),take(1..10,10)], n) as value)\n" +
"t2.append!(data)\n");
Thread.sleep(5000);
// 根据业务需要,对本地的结果表 bt 进行相应的业务操作
System.out.println(bt.getString());
// 取消订阅
streamingSQLClient.unsubscribeStreamingSQL(queryId);