数据写入

本节介绍如何使用 TableWriter 将数据高效批量写入 DolphinDB 表。当前版本仅支持写入流表(stream table),不支持分布式表,且不支持多线程并发写入。

使用流程如下:

  1. 通过 new 方法创建 TableWriter 实例;
  2. 通过 append_row 方法逐行写入数据;
  3. 可选调用 flush 方法立即上传缓冲区数据。

接口说明

new

语法

TableWriter::new(client: Client, table_name: &str, batch_size: u32) -> TableWriter

参数

client 数据库连接对象。

table_name 目标流表的表名。

batch_size 写入缓冲区的最大行数,超过后将自动上传。单位为“行”。

详情

创建一个新的 TableWriter 对象。构造函数会从 server 端获取表结构信息,因此必须确保表已创建。

append_row

语法

append_row(row: &mut Vec<PrimitiveType>) -> Result<Option<ConstantImpl>, Error>

参数

row 待插入的数据,长度必须与目标表字段数一致。用户需将原始的 Rust 类型(如 i64, f64, String, NaiveDateTime 等)包装为 PrimitiveType,可参考示例

详情

向缓冲区中追加一行数据。当缓存行数达到 batch_size 后,TableWriter 会自动将数据上传到 server 端。

flush

语法

flush()

详情

手动将缓冲区中的数据写入目标表。

size

语法

size()

详情

返回当前缓冲区中的行数。

示例

use chrono::Utc;
use dolphindb::{
    client::{ClientBuilder, TableWriter},
    types::PrimitiveType,
};
use tokio::{
    sync::mpsc,
    time::{sleep, Duration},
};

#[derive(Clone)]
struct TickerEvent {
    event_time: i64,
    symbol: String,
    event_id: i64,
    prices: Vec<f64>,
}

fn build_table_row(event: &TickerEvent) -> Vec<PrimitiveType> {
    vec![
        event.event_time.into(),
        event.symbol.clone().into(),
        event.event_id.into(),
        event.prices.clone().into(),
    ]
}

#[tokio::main]
async fn main() {
    // connect to DolphinDB
    let mut builder = ClientBuilder::new("127.0.0.1:8848");
    builder.with_auth(("admin", "123456"));
    let mut client = builder.connect().await.unwrap();

    // create a stream table
    let stream_table = "depthStreamTable";
    let script = format!(
        r#"
        colNames = ["event_time", "symbol", "event_id", "prices"]
        colTypes = [TIMESTAMP, SYMBOL, LONG, DOUBLE[]]

        if (!existsStreamTable("{stream_table}")) {{
            enableTableShareAndCachePurge(streamTable(1000000:0, colNames, colTypes), "{stream_table}", 1000000)
        }}
    "#
    );
    client.run_script(&script).await.unwrap().unwrap();

    // generate data in rust
    let event = TickerEvent {
        event_time: Utc::now().timestamp_millis(),
        symbol: "BTCUSDT".into(),
        event_id: 1000,
        prices: vec![5000.0; 100],
    };

    // TableWriter is NOT thread safe.
    // This example show how to insert data from multiple sources.
    let (tx, mut rx) = mpsc::unbounded_channel::<TickerEvent>();
    let symbol_number = 500;
    let tx1 = tx.clone();
    let tx2 = tx.clone();
    let event1 = event.clone();
    let event2 = event.clone();

    // This data source generates 500 * 20 rows every second.
    tokio::spawn(async move {
        loop {
            for _ in 0..symbol_number {
                let _ = tx1.send(event1.clone());
            }
            sleep(Duration::from_millis(1000 / 20)).await;
        }
    });
    // This data source generates 500 * 10 rows every second.
    tokio::spawn(async move {
        loop {
            for _ in 0..symbol_number {
                let _ = tx2.send(event2.clone());
            }
            sleep(Duration::from_millis(1000 / 10)).await;
        }
    });

    tokio::spawn(async move {
        let mut inserted = 0usize;
        let mut writer = TableWriter::new(client, stream_table, 512).await;
        while let Some(event) = rx.recv().await {
            let mut row = build_table_row(&event);
            let res = writer.append_row(&mut row).await;

            match res {
                Ok(_) => {
                    inserted += 1;
                    if inserted % 10000 == 0 {
                        println!("{} rows inserted and {} rows in buffer", inserted, rx.len());
                    }
                }
                Err(e) => {
                    eprintln!("Insertion failed: {:?}", e);
                    inserted += 1;
                }
            }
        }
    });

    println!("Insertion started.");
    futures::future::pending::<()>().await;
}