流数据订阅

Rust API 提供的 stream_client 模块支持订阅流数据

流订阅步骤

本节通过一个简单示例介绍如何订阅流数据表。相关接口的详细说明将在后续章节介绍。

配置订阅

使用 Request 配置订阅信息,包括:

  • 表名:shared_stream_table

  • 任务名:example_action

  • 起始数据偏移:0

  • 用户名与密码:admin,123456

示例代码:

let mut req = Request::new("shared_stream_table".into(), "example_action".into());
req.with_offset(0).with_auth(("admin", "123456"));

创建 SubscribeBuilder

let mut builder = SubscriberBuilder::new();

开始订阅

通过 SubscribeBuilder 提供的 subscribe 接口进行订阅。

let mut subscriber = builder.subscribe("127.0.0.1:8848", req).await.unwrap();

获取数据

通过 subscriber 获取数据。

Subscriber 实现了 futures::Stream ,因此可以使用 futures::StreamExt 所提供的方法进行数据处理。

while let Some(msg) = subscriber.next().await {
    println!(
        "topic: {}, offset: {}, content: {}",
        msg.topic(),
        msg.offset(),
        msg.msg()
    );
}

结束订阅

Subscriber 在生命周期结束时会自动取消订阅,无需手动操作。

订阅配置接口

new

语法

pub fn new(table_name: String, action_name: String) -> Self

参数

table_name是被订阅的数据表名。该表必须为共享的流数据表。

action_name 是一个字符串,表示订阅任务的名称。它可以包含字母,数字和下划线,并以字母开头。如果一个节点有多个订阅任务均订阅了同一张表,则每个订阅必须指定唯一的 actionName

详情

创建一个新的订阅请求。

with_auth

语法

pub fn with_auth(&mut self, auth: (impl Into<String>, impl Into<String>)) -> &mut Self

参数

auth 用户名与密码。

详情

配置登录信息,即用户名与密码 。如未调用该接口进行配置,则后续将以 guest 用户身份进行订阅。

with_offset

语法

pub fn with_offset(&mut self, offset: i64) -> &mut Self

参数

offset 订阅任务开始后的第一条消息所在的位置。

详情

消息是流数据表中的行。如果未调用该接口,或设为-1,订阅将会从流数据表的当前行开始。

获取数据接口

Subscriber 类型实现 futures::Stream 时的 Item 类型为 Arc<Message> 。

Message 类型提供了如下获取数据的接口:

topic

语法

pub fn topic(&self) -> &String

详情

返回订阅主题(topic),即订阅的名称,它是一个字符串,由订阅表所在节点的信息、流数据表名称和订阅任务名称(如果指定了 actionName)组合而成,使用 "/" 分隔。

例如 localhost:8848:local8848/shared_stream_table/example_action,其中节点地址为 localhost:8848,节点别名 local8848,表名 shared_stream_table ,任务名 example_action。

offset

语法

pub fn offset(&self) -> i64

详情

获取当前数据所在位置。

msg

语法

pub fn msg(&self) -> &VectorImpl

详情

获取数据的内容。返回的 VectorImpl 是表的一行。

get

语法

pub fn get(&self, index: usize) -> Option<ConstantImpl>

详情

获取 msg 返回的数据中的某一列的值。