createPartitionedTable

语法

createPartitionedTable(dbHandle, table, tableName, [partitionColumns], [compressMethods], [sortColumns], [keepDuplicates=ALL], [sortKeyMappingFunction], [softDelete=false])

详情

根据 table 的结构创建一个空的分区表。

  • 对于分布式数据库和本地磁盘数据库,table 参数只能是一个表。

  • 对于内存数据库,table 参数可以是一个表或包含多个表的元组。如果 table 是一个元组,每个 table 表示一个分区。

如果参数 table 是一个表,则根据该表的结构创建一个分区表。通过 append!tableInsert给新创建的分区表插入数据。它不能用于创建顺序分区的分区表。

如果参数 table 是一系列表,则创建一个分区的内存表。参数 table 中表的数量与数据库中分区的数量相同。
注:
  • 创建分区表时只会使用参数 table 的结构,并不会把 table 中的数据插入到新的分区表中。

  • OLAP 引擎允许集群每个节点创建的不同的分布式分区表句柄(包含临时句柄)上限为 8192;TSDB 引擎没有上限。临时句柄的说明请参考注释。

  • 调用函数 createPartitionedTable 创建分布式分区表时,若用户没有创建一个句柄变量来接收函数的返回值,则每个数据库会创建一个临时句柄。若在同一数据库下多次创表,则该数据库的临时句柄会被覆盖。

参数

注: 参数 sortColumns, keepDuplicatessortKeyMappingFunction 仅在 databaseengine 参数指定为 TSDB 时才有效。

dbHandle database 函数返回的数据库句柄。它可以是本地磁盘数据库,也可以是分布式数据库。dbHandle 为空字符串或没有指定时,表示内存数据库的句柄。

table 一个表或包含多个表的元组。系统将会根据该表的结构创建新的分区表。

tableName 一个字符串,表示的分区表的名称。

partitionColumns 一个字符串或字符串向量,表示分区列。对于组合分区,partitionColumns 是一个字符串向量。对非顺序分区,此参数为必选参数。

compressMethods 一个字典,指定某些列使用 lz4,delta 或 zstd 压缩算法存储。key 为字段名,value 为压缩算法("lz4","delta" 或 “zstd“)。若未指定,默认采用 lz4 压缩算法。有关 delta 压缩算法,亦称为 delta-of-delta encoding,参考:Delta Compression Techniques.
  • 对于 DECIMAL, SHORT, INT, LONG 与时间或日期类型数据,建议采用 delta 算法压缩。

  • 将字符串存储为 SYMBOL 类型数据,实现对字符串类型的压缩。

sortColumns 字符串标量或向量,用于指定每一分区内的排序列,写入的数据在每一分区内将按 sortColumns 进行排序。系统默认 sortColumns (指定多列时) 最后一列为时间列,其余列字段作为排序的索引列,称作 sort key。每一分区内,同一个 sort key 组合值对应的数据将按时间列顺序连续存放在一起。查询时,若查询条件包含索引列,可以快速定位数据所在的数据块位置,提高查询性能。

  • sortColumns 只能是 INTEGER, TEMPORAL, LITERAL 类别(除 BLOB) 或 DECIMAL 类型。

    • sortColumns 指定为多列,则 sortColumns 的最后一列必须为时间列,其余列为索引列,且索引列不能为为 TIME, TIMESTAMP, NANOTIME, NANOTIMESTAMP 类型。

    • sortColumns 仅指定一列,则该列作为 sort key,其类型不能为TIME, TIMESTAMP, NANOTIME, NANOTIMESTAMP。若 sortColumns 指定为一列时间列 (非分区列),且同时指定了 sortKeyMappingFunction,则查询的过滤条件中 sortColumns 只能与相同时间类型的值进行比较。

  • 频繁查询的字段适合设置为 sortColumns(建议不超过 4 列),且建议优先把查询频率高的字段作为 sortColumns 中位置靠前的列。
  • 为保证性能最优,建议每个分区内索引列的组合数(sort key)不超过 2000 个。
  • sortColumns 是每个分区内部 level file 内数据的排序依据,与其是否为分区字段无关。
keepDuplicates 指定在每个分区内如何处理所有 sortColumns 之值皆相同的数据。提供以下选项:
  • ALL: 保留所有数据,为默认值。

  • LAST:仅保留最新数据

  • FIRST:仅保留第一条数据

在 TSDB 引擎单个分区 sort key 组合数过多,但每个 sort key 组合值对应的记录数较少的场景下,建议配置 sortKeyMappingFunction 参数以对 sort key 组合数进行降维。降维后单个 TSDB level File 内的数据块可以存储更多的数据,查询时既减少了读数据块的次数(降低了 I/O 开销),又提升了数据的压缩率。

sortKeyMappingFunction 由一元函数对象组成的向量,其长度与索引列一致,即 sortColumns 的长度 - 1,若只指定一个映射函数 mapfunc,必须写为 sortKeyMappingFunction=[mapfunc]。用于指定应用在索引列中各列的映射函数,以减少 sort key 的组合数,该过程称为 sort key 降维。

索引列中的各列被对应的映射函数降维后,原本的多个 sort key 组合值会被重新映射到一个新的 sort key 组合值上。而每个新 sort key 组合值对应的数据仍将根据 sortColumns 的最后一列(时间列)进行排序。降维在写入磁盘时进行,因此指定该参数一定程度上将影响写入性能。

注:
  • sortKeyMappingFunction 指定的函数对象与索引列中的各列一一对应,若其中某列无需降维,则函数对象置为空。

  • sortKeyMappingFunction 中的函数对象为 hashBucket,且需要对采用 Hash 分区的分区字段进行降维时,应确保 Hash 分区的数量和 hashBucket 中的 buckets 之间不存在整除关系(buckets=1 除外),否则会导致同一分区内的所有 Hash 值得到的 key 都相同。

softDelete 用于启用或禁用软删除功能。默认为 false,即禁用。该参数适于在行数多但删除量小的场景下使用。使用该参数需要同时满足以下条件:

  • 由TSDB 存储引擎创建的数据库内的表

  • keepDuplicates 已设置为 LAST

例子

例1. 在分布式数据库中创建一个分区表

例1.1 创建一张 OLAP 引擎下的分区表。

n=1000000;
t=table(2020.01.01T00:00:00 + 0..(n-1) as timestamp, rand(`IBM`MS`APPL`AMZN,n) as symbol, rand(10.0, n) as value)
db = database("dfs://rangedb_tradedata", RANGE, `A`F`M`S`ZZZZ)
Trades = db.createPartitionedTable(table=t, tableName="Trades", partitionColumns="symbol", compressMethods={timestamp:"delta"});

createPartitionedTable 只是建立一张空的表格 Trades,该表复制了表 t 的字段。接着用 append! 函数将数据追加到 Trades 表里。

Trades.append!(t);

查询分区表:

Trades=loadTable(db,`Trades);
select min(value) from Trades;

0

在分布式数据库中,初次创建表后,可以跳过 loadTable 把表载入内存的步骤,因为分布式文件系统会动态刷新表的内容。系统重启后,需要再次执行 loadTable 函数加载表。

例1.2 创建一张 TSDB 引擎下的分区表。

n = 10000
SecurityID = rand(`st0001`st0002`st0003`st0004`st0005, n)
sym = rand(`A`B, n)
TradeDate = 2022.01.01 + rand(100,n)
TotalVolumeTrade = rand(1000..3000, n)
TotalValueTrade = rand(100.0, n)
schemaTable_snap = table(SecurityID, TradeDate, TotalVolumeTrade, TotalValueTrade).sortBy!(`SecurityID`TradeDate)

dbPath = "dfs://TSDB_STOCK"
if(existsDatabase(dbPath)){dropDatabase(dbPath)}
db_snap = database(dbPath, VALUE, 2022.01.01..2022.01.05, engine='TSDB')
snap=createPartitionedTable(dbHandle=db_snap, table=schemaTable_snap, tableName="snap", partitionColumns=`TradeDate, sortColumns=`SecurityID`TradeDate, keepDuplicates=ALL, sortKeyMappingFunction=[hashBucket{,5}])
snap.append!(schemaTable_snap)
flushTSDBCache()
snap = loadTable(dbPath, `snap)
select * from snap

例2. 在内存数据库中创建一个分区表

例2.1 创建分区常规内存表

n = 200000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
t = table(n:0, colNames, colTypes)
db = database(, RANGE, `A`D`F)
pt = db.createPartitionedTable(table=t, tableName=`pt, partitionColumns=`sym)

insert into pt values(09:30:00.001,`AAPL,100,56.5)
insert into pt values(09:30:01.001,`DELL,100,15.5)
例2.2 创建分区键值内存表
n = 200000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
t = keyedTable(`time`sym, n:0, colNames, colTypes)
db = database(, RANGE, `A`D`F)
pt = db.createPartitionedTable(table=t, tableName=`pt, partitionColumns=`sym)

insert into pt values(09:30:00.001,`AAPL,100,56.5)
insert into pt values(09:30:01.001,`DELL,100,15.5)

例2.3 创建分区流数据表

注意,创建分区流数据表时 createPartitionedTable 的第二个参数必须是元组,并且其长度必须与分区数量相等,每个表对应一个分区。下例中,trades_stream1 和 trades_stream2 组成一个分区流数据表 trades。写入数据时,只能分别往 trades_stream1 和 trades_stream2 写入,不能直接写入到 trades。查询 trades 可以获取到两个表的数据。

n=200000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
trades_stream1 = streamTable(n:0, colNames, colTypes)
trades_stream2 = streamTable(n:0, colNames, colTypes)
db=database(, RANGE, `A`D`F)
trades = createPartitionedTable(db,table=[trades_stream1, trades_stream2], tableName="", partitionColumns=`sym)

insert into trades_stream1 values(09:30:00.001,`AAPL,100,56.5)
insert into trades_stream2 values(09:30:01.001,`DELL,100,15.5)

select * from trades;
time sym qty price
09:30:00.001 AAPL 100 56.5
09:30:01.001 DELL 100 15.5

例2.4 创建分区 MVCC 内存表

创建分区 MVCC 内存表的方式与创建分区流数据表的方式相同。

n=200000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
trades_mvcc1 = mvccTable(n:0, colNames, colTypes)
trades_mvcc2 = mvccTable(n:0, colNames, colTypes)
db=database(, RANGE, `A`D`F)
trades = createPartitionedTable(db,table=[trades_mvcc1, trades_mvcc2], tableName="", partitionColumns=`sym)

insert into trades_mvcc1 values(09:30:00.001,`AAPL,100,56.5)
insert into trades_mvcc2 values(09:30:01.001,`DELL,100,15.5)

select * from trades;
time sym qty price
09:30:00.001 AAPL 100 56.5
09:30:01.001 DELL 100 15.5