流数据订阅
Rust API 提供的 stream_client 模块支持订阅流数据。
流订阅步骤
本节通过一个简单示例介绍如何订阅流数据表。相关接口的详细说明将在后续章节介绍。
配置订阅
使用 Request 配置订阅信息,包括:
-
表名:shared_stream_table
-
任务名:example_action
-
起始数据偏移:0
-
用户名与密码:admin,123456
示例代码:
let mut req = Request::new("shared_stream_table".into(), "example_action".into());
req.with_offset(0).with_auth(("admin", "123456"));
创建 SubscribeBuilder
let mut builder = SubscriberBuilder::new();
开始订阅
通过 SubscribeBuilder 提供的 subscribe
接口进行订阅。
let mut subscriber = builder.subscribe("127.0.0.1:8848", req).await.unwrap();
获取数据
通过 subscriber 获取数据。
Subscriber 实现了 futures::Stream ,因此可以使用 futures::StreamExt 所提供的方法进行数据处理。
while let Some(msg) = subscriber.next().await {
println!(
"topic: {}, offset: {}, content: {}",
msg.topic(),
msg.offset(),
msg.msg()
);
}
结束订阅
Subscriber 在生命周期结束时会自动取消订阅,无需手动操作。
订阅配置接口
new
语法
pub fn new(table_name: String, action_name: String) -> Self
参数
table_name是被订阅的数据表名。该表必须为共享的流数据表。
action_name 是一个字符串,表示订阅任务的名称。它可以包含字母,数字和下划线,并以字母开头。如果一个节点有多个订阅任务均订阅了同一张表,则每个订阅必须指定唯一的 actionName。
详情
创建一个新的订阅请求。
with_auth
语法
pub fn with_auth(&mut self, auth: (impl Into<String>, impl Into<String>)) -> &mut Self
参数
auth 用户名与密码。
详情
配置登录信息,即用户名与密码 。如未调用该接口进行配置,则后续将以 guest 用户身份进行订阅。
with_offset
语法
pub fn with_offset(&mut self, offset: i64) -> &mut Self
参数
offset 订阅任务开始后的第一条消息所在的位置。
详情
消息是流数据表中的行。如果未调用该接口,或设为-1,订阅将会从流数据表的当前行开始。
获取数据接口
Subscriber 类型实现 futures::Stream 时的 Item 类型为 Arc<Message> 。
Message 类型提供了如下获取数据的接口:
topic
语法
pub fn topic(&self) -> &String
详情
返回订阅主题(topic),即订阅的名称,它是一个字符串,由订阅表所在节点的信息、流数据表名称和订阅任务名称(如果指定了 actionName)组合而成,使用 "/" 分隔。
例如
localhost:8848:local8848/shared_stream_table/example_action
,其中节点地址为
localhost:8848,节点别名 local8848,表名 shared_stream_table ,任务名 example_action。
offset
语法
pub fn offset(&self) -> i64
详情
获取当前数据所在位置。
msg
语法
pub fn msg(&self) -> &VectorImpl
详情
获取数据的内容。返回的 VectorImpl 是表的一行。
get
语法
pub fn get(&self, index: usize) -> Option<ConstantImpl>
详情
获取 msg
返回的数据中的某一列的值。