流数据表
概述
流数据表是 DolphinDB 内部的一种特殊的内存表,用于存储和发布流数据,其被订阅后会把增量数据及时发布给订阅者,通常在发布端创建,用于接收生产者实时产生的流数据。
流数据表支持并发读写,但是与普通内存表不同,流数据表不支持修改或删除记录。向流数据表插入一条记录等价于数据源发布一条消息。
流数据表的查询接口与普通内存表一致,可使用 SQL 语句对流数据表进行查询和分析。
为了应对不同的流处理场景,DolphinDB 的流数据表可以分为以下几种:
表类型 | 对应函数 | 是否支持键值 | 是否可以设置内存中保存最大记录数 | |
---|---|---|---|---|
1 | 普通流数据表 | streamTable | 不支持 | 不支持 |
2 | 键值流数据表 | keyedStreamTable | 支持 | 不支持 |
3 | 持久化流数据表 | enableTableShareAndPersistence | 不支持 | 支持 |
4 | 高可用流数据表 | haStreamTable | 支持 | 支持 |
为什么流数据表需要共享
DolphinDB 的数据节点和计算节点可以充当发布者的角色,通过 streamTable 函数可以在数据节点或者计算节点上创建普通流数据表,作为发布者。
对于一个发布者,通常会被不同会话中的多个订阅端同时订阅,因此必须使用 share 函数把流数据表共享,才能被不同会话访问。共享后的流数据表线程安全,支持并发写入。
以下代码是创建一个共享的普通流数据表:
colNames = ["code", "tradetime", "price"] colTypes = [SYMBOL, TIMESTAMP, DOUBLE] share(table=streamTable(1:0, colNames, colTypes), sharedName="pubTable")
可以模拟写入几行数据,测试流数据表的写入功能:
tableInsert(pubTable, "000001SZ", 2023.12.15T09:30:00.000, 10.5) tableInsert(pubTable, "000001SZ", 2023.12.15T09:31:00.000, 10.6) tableInsert(pubTable, "000001SZ", 2023.12.15T09:32:00.000, 10.7) tableInsert(pubTable, "000001SZ", 2023.12.15T09:33:00.000, 10.8) tableInsert(pubTable, "000001SZ", 2023.12.15T09:34:00.000, 10.7) tableInsert(pubTable, "000001SZ", 2023.12.15T09:35:00.000, 10.6)
可以执行以下 SQL 语句查询流数据表:
res = select * from pubTable where price>=10.7
返回结果 res:
可以执行以下语句进行绘图:
price = exec price from pubTable
tradetime = exec tradetime from pubTable
plot(price, tradetime, title="minute price", chartType=LINE)
可以执行以下语句删除流数据表:
dropStreamTable(tableName="pubTable")
共享普通流数据表
共享普通流数据表的创建方式如下:
colNames = ["code", "tradetime", "price"] colTypes = [SYMBOL, TIMESTAMP, DOUBLE] share(table=streamTable(1:0, colNames, colTypes), sharedName="pubTable")
其特点为:
-
写入的每一条数据都会被发布,不会去重
-
写入的每一条数据都会保留在内存中,不能自动清除已发布消费的过期数据
-
测试场景和内存充裕的生产场景会使用共享普通流数据表
可以执行以下语句删除流数据表:
dropStreamTable(tableName="pubTable")
共享键值流数据表
共享键值流数据表的创建方式如下:
colNames = ["code", "tradetime", "price"] colTypes = [SYMBOL, TIMESTAMP, DOUBLE] share(table=keyedStreamTable(`code`tradetime, 1:0, colNames, colTypes), sharedName="pubTable")
其特点为:
-
写入的数据如果主键值重复,会被丢弃,只会发布主键值相同的第一条数据
-
写入的每一条数据都会保留在内存中,不能自动清除已发布消费的过期数据
-
测试场景和内存充裕的生产场景会使用共享键值流数据表
测试键值重复数据被丢弃的功能,上述键值流数据表的主键字段为 code 和 tradetime,写入主键相同的测试数据:
tableInsert(pubTable, "000001SZ", 2023.12.15T09:30:00.000, 10.5) tableInsert(pubTable, "000001SZ", 2023.12.15T09:30:00.000, 10.6)
可以执行以下 SQL 语句查询流数据表:
res = select * from pubTable where code="000001SZ"
返回结果 res:
说明键值去重功能生效,键值流数据表只保留相同主键值的第一条数据,其发布也是同理,只会发布主键值相同的第一条数据。
可以执行以下语句删除流数据表:
dropStreamTable(tableName="pubTable")
共享持久化流数据表
共享普通流数据表和共享键值流数据表都有一个特点,写入的每一条数据都会保留在内存中,不能自动清除已发布消费的过期数据。且节点重启时,全部在内存中的普通流数据表和键值流数据表中的数据是无法恢复的。
因此,DolphinDB 开发了持久化流数据表,可以把普通流数据表和键值流数据表设置为持久化流数据表,其特点为:
-
通过 enableTableShareAndPersistence 设置流数据表为共享的持久化流数据表,不需要再使用 share 函数声明其为共享变量
-
写入数据是否支持主键去重,取决于持久化的流数据表是普通流数据表还是键值流数据表
-
可以设置内存中最大保留行数,保留的是最新的记录
-
重启节点时,可以恢复已经写入流数据表的数据
-
内存有限的生产场景会使用共享持久化流数据表
共享持久化普通流数据表
共享持久化普通流数据表的创建方式如下:
colNames = ["code", "tradetime", "price"] colTypes = [SYMBOL, TIMESTAMP, DOUBLE] enableTableShareAndPersistence(table=streamTable(1:0, colNames, colTypes), tableName="pubTable", cacheSize=10000, preCache=1000)
-
cacheSize 表示流数据表在内存中最多保留多少行,上述例子中设置为 10,000,当写入数据大于 10,000 行时,系统会自动把时间更早的约 5,000 行数据从内存中清除,表中保留约一半最新的行数据
-
preCache 表示从磁盘加载到内存的记录条数。如果没有指定该参数,默认会把所有记录加载到内存中。
测试设置内存中最大保留行数为 10,000 的功能,先批量写入 10,000 行数据:
rowNums = 10000 simulateData = table( take(`000001SZ, rowNums) as code, take(0..(rowNums-1), rowNums)*1000*3+2023.12.15T09:30:00.000 as tradetime, take(10.6, rowNums) as price) tableInsert(pubTable, simulateData)
查询表中行数:
select count(*) from pubTable
返回:
count 10,000
再批量写入 3,000 行数据:
rowNums = 3000 simulateData = table( take(`000001SZ, rowNums) as code, take(0..(rowNums-1), rowNums)*1000*3+2023.12.15T17:49:57.000 as tradetime, take(10.6, rowNums) as price) tableInsert(pubTable, simulateData)
查询表中行数:
select count(*) from pubTable
返回:
count 8,000
可以查看 pubTable 中的数据,发现持久化流数据表中保留的 8,000 行数据是最新的记录,最早的 5,000 行记录已经自动从内存中清除。
可以执行以下语句删除持久化流数据表:
dropStreamTable(tableName="pubTable")
共享持久化键值流数据表
共享持久化键值流数据表的创建方式如下:
colNames = ["code", "tradetime", "price"] colTypes = [SYMBOL, TIMESTAMP, DOUBLE] enableTableShareAndPersistence(table=keyedStreamTable(`code`tradetime, 1:0, colNames, colTypes), tableName="pubTable", cacheSize=10000, preCache=1000)
可以执行以下语句删除持久化流数据表:
dropStreamTable(tableName="pubTable")
高可用流数据表
高可用流数据特点为:
-
通过 haStreamTable 创建的高可用流数据本身就是共享变量,不需要再使用 share 函数声明其为共享变量
-
高可用流数据表创建时就是持久化的,重启节点时,可以恢复已经写入高可用流数据表的数据
-
可以设置内存中最大保留行数,保留的是最新的记录
-
需要提供高可用流数据功能的场景会使用高可用流数据表
高可用流数据表使用方法相对复杂,具体使用方法可以参考流数据高可用功能章节。