数据写入
本节介绍如何使用 TableWriter
将数据高效批量写入 DolphinDB 表。当前版本仅支持写入流表(stream
table),不支持分布式表,且不支持多线程并发写入。
使用流程如下:
- 通过
new
方法创建TableWriter
实例; - 通过
append_row
方法逐行写入数据; - 可选调用
flush
方法立即上传缓冲区数据。
接口说明
new
语法
TableWriter::new(client: Client, table_name: &str, batch_size: u32) -> TableWriter
参数
client 数据库连接对象。
table_name 目标流表的表名。
batch_size 写入缓冲区的最大行数,超过后将自动上传。单位为“行”。
详情
创建一个新的 TableWriter
对象。构造函数会从 server 端获取表结构信息,因此必须确保表已创建。
append_row
语法
append_row(row: &mut Vec<PrimitiveType>) -> Result<Option<ConstantImpl>, Error>
参数
row 待插入的数据,长度必须与目标表字段数一致。用户需将原始的 Rust 类型(如 i64
,
f64
, String
,
NaiveDateTime
等)包装为 PrimitiveType
,可参考示例。
详情
向缓冲区中追加一行数据。当缓存行数达到 batch_size
后,TableWriter
会自动将数据上传到 server 端。
flush
语法
flush()
详情
手动将缓冲区中的数据写入目标表。
size
语法
size()
详情
返回当前缓冲区中的行数。
示例
use chrono::Utc;
use dolphindb::{
client::{ClientBuilder, TableWriter},
types::PrimitiveType,
};
use tokio::{
sync::mpsc,
time::{sleep, Duration},
};
#[derive(Clone)]
struct TickerEvent {
event_time: i64,
symbol: String,
event_id: i64,
prices: Vec<f64>,
}
fn build_table_row(event: &TickerEvent) -> Vec<PrimitiveType> {
vec![
event.event_time.into(),
event.symbol.clone().into(),
event.event_id.into(),
event.prices.clone().into(),
]
}
#[tokio::main]
async fn main() {
// connect to DolphinDB
let mut builder = ClientBuilder::new("127.0.0.1:8848");
builder.with_auth(("admin", "123456"));
let mut client = builder.connect().await.unwrap();
// create a stream table
let stream_table = "depthStreamTable";
let script = format!(
r#"
colNames = ["event_time", "symbol", "event_id", "prices"]
colTypes = [TIMESTAMP, SYMBOL, LONG, DOUBLE[]]
if (!existsStreamTable("{stream_table}")) {{
enableTableShareAndCachePurge(streamTable(1000000:0, colNames, colTypes), "{stream_table}", 1000000)
}}
"#
);
client.run_script(&script).await.unwrap().unwrap();
// generate data in rust
let event = TickerEvent {
event_time: Utc::now().timestamp_millis(),
symbol: "BTCUSDT".into(),
event_id: 1000,
prices: vec![5000.0; 100],
};
// TableWriter is NOT thread safe.
// This example show how to insert data from multiple sources.
let (tx, mut rx) = mpsc::unbounded_channel::<TickerEvent>();
let symbol_number = 500;
let tx1 = tx.clone();
let tx2 = tx.clone();
let event1 = event.clone();
let event2 = event.clone();
// This data source generates 500 * 20 rows every second.
tokio::spawn(async move {
loop {
for _ in 0..symbol_number {
let _ = tx1.send(event1.clone());
}
sleep(Duration::from_millis(1000 / 20)).await;
}
});
// This data source generates 500 * 10 rows every second.
tokio::spawn(async move {
loop {
for _ in 0..symbol_number {
let _ = tx2.send(event2.clone());
}
sleep(Duration::from_millis(1000 / 10)).await;
}
});
tokio::spawn(async move {
let mut inserted = 0usize;
let mut writer = TableWriter::new(client, stream_table, 512).await;
while let Some(event) = rx.recv().await {
let mut row = build_table_row(&event);
let res = writer.append_row(&mut row).await;
match res {
Ok(_) => {
inserted += 1;
if inserted % 10000 == 0 {
println!("{} rows inserted and {} rows in buffer", inserted, rx.len());
}
}
Err(e) => {
eprintln!("Insertion failed: {:?}", e);
inserted += 1;
}
}
}
});
println!("Insertion started.");
futures::future::pending::<()>().await;
}