createPartitionedTable
Syntax
createPartitionedTable(dbHandle, table, tableName,
[partitionColumns], [compressMethods], [sortColumns], [keepDuplicates=ALL],
[sortKeyMappingFunction], [softDelete=false])
Arguments
database().engine
= TSDB).dbHandle (optional) is a DFS database handle returned by function database. The database can be either in the local file system, or in the distributed file system. If the dbHandle is empty or unspecified, the table is an in-memory partitioned table.
table (optional) is a table or a tuple of tables. The table schema will be used to construct the new partitioned table.
tableName (optional) is a string indicating the name of the partitioned table.
partitionColumns (optional) is a STRING scalar/vector indicating the partitioning column(s). For non-sequential partitioning, this parameter is required. For a composite partition, this parameter is a STRING vector.
compressMethods (optional) is a dictionary indicating which compression methods are used for specified columns. The keys are columns name and the values are compression methods ("lz4", "delta", "zstd" or "chimp"). If unspecified, use LZ4 compression method. Note:
- The delta compression method can be used for DECIMAL, SHORT, INT, LONG or temporal data types.
- Save strings as SYMBOL type to enable compression of strings.
- The chimp compression method can be used for DOUBLE type data with decimal parts not exceeding three digits in length.
sortColumns (optional) is a STRING scalar/vector that specifies the column(s) used to sort the ingested data within each partition of the TSDB database. The sort columns must be of Integral, Temporal, STRING, SYMBOL, or DECIMAL type. Note that sortColumns is not necessarily consistent with the partitioning column.
- If multiple columns are specified for sortColumns, the last column must be a time column. The preceding columns are used as the sort keys and they cannot be of TIME, TIMESTAMP, NANOTIME, or NANOTIMESTAMP type.
- If only one column is specified for sortColumns, the column is used as the sort key, and it can be a time column or not. If the sort column is a time column and sortKeyMappingFunction is specified, the sort column specified in a SQL where condition can only be compared with temporal values of the same data type.
- It is recommended to specify frequently-queried columns for sortColumns and sort them in the descending order of query frequency, which ensures that frequently-used data is readily available during query processing.
- The number of sort key entries (which are the combinations of the values of the sort keys) within each partition may not exceed 1000 for optimal performance. This limitation prevents excessive memory usage and ensures efficient query processing.
keepDuplicates (optional) specifies how to deal with records with duplicate sortColumns values. It can have the following values:
- ALL: keep all records;
- LAST: only keep the last record;
- FIRST: only keep the first record.
It is recommended to specify the sortKeyMappingFunction parameter if there are many sort keys in a partition of a TSDB database and a small number of records with the same sort key. After dimensionality reduction, the blocks in a TSDB level file can store more data, which not only reduces the frequency of reading data blocks and disk I/O during query, but also improves the data compression ratio.
sortKeyMappingFunction (optional) is a vector of unary functions. It has the same length as the number of sort keys. The specified mapping functions are applied to each sort key (i.e., the sort columns except for the temporal column) for dimensionality reduction. After the dimensionality reduction for the sort keys, records with a new sort key entry will be sorted based on the last column of sortColumns (the temporal column).
Note:
- Dimensionality reduction is performed when writing to disk, so specifying this parameter may affect write performance.
- The functions specified in
sortKeyMappingFunction
correspond to each and every sort key. If a sort key does not require dimensionality reduction, leave the corresponding element empty in the vector. - If a mapping function is
hashBucket
AND the sort key to which it applies is a HASH partitioning column, make sure the number of hash partitions and buckets are not divisible by each other (except when buckets=1). Otherwise the column values from the same HASH partition would be mapped to the same hash bucket after dimensionality reduction.
softDelete (optional) determines whether to enable soft delete for TSDB databases. The default value is false. To use it, keepDuplicates must be set to 'LAST'. It is recommended to enable soft delete for databases where the row count is large and delete operations are infrequent.
Details
Create an empty partitioned table with the same schema as the specified table.
- To create a DFS table or a table on disk, parameter table must be a table. This function is used with append! or tableInsert to generate a partitioned table. It cannot be used to create a partitioned table with sequential domain.
- To create an in-memory partitioned table, parameter table can be a table or a tuple of tables. The number of tables given by the parameter table must be the same as the number of partitions in the database.
Note:
- Only the schema of table is used. None of the rows in table is imported to the newly created partitioned table.
- For a DFS database in an OLAP engine, the maximum number of handles (including temporary handles*) to partitioned tables is 8,192 per node. For the TSDB storage engine, there is no limit.
*temporary handles: If no handle is specified when you create a partitioned table in
a DFS database with createPartitionedTable
, each database creates a
temporary handle to hold the return value. If you create multiple tables under the
same database, the temporary handle for the database is overwritten each time.
Examples
Example 1. Create a DFS table
Example 1.1. Create a DFS table in OLAP database
n=10000
t=table(2020.01.01T00:00:00 + 0..(n-1) as timestamp, rand(`IBM`MS`APPL`AMZN,n) as symbol, rand(10.0, n) as value)
db = database("dfs://rangedb_tradedata", RANGE, `A`F`M`S`ZZZZ)
Trades = db.createPartitionedTable(table=t, tableName="Trades", partitionColumns="symbol", compressMethods={timestamp:"delta"});
At this point, the table Trades is empty. The schema of Trades is the same as the schema of table t. Then we append table t to table Trades.
Trades.append!(t);
Now the contents of table Trades have been updated on disk. In the DFS system, the system doesn't dynamically refresh the contents of tables. We need to load the table into memory before we can work with it interactively.
Trades=loadTable(db,`Trades);
select min(value) from Trades;
// output
0
After appending data to a DFS table, we don't need to use function loadTable to load the table before querying
the table, as the system automatically refreshes the table after appending
operations. After system restarts, however, we need to use
loadTable
to load a DFS table before querying the table.
Example 1.2. Create a DFS table in TSDB database
n = 10000
SecurityID = rand(`st0001`st0002`st0003`st0004`st0005, n)
sym = rand(`A`B, n)
TradeDate = 2022.01.01 + rand(100,n)
TotalVolumeTrade = rand(1000..3000, n)
TotalValueTrade = rand(100.0, n)
schemaTable_snap = table(SecurityID, TradeDate, TotalVolumeTrade, TotalValueTrade).sortBy!(`SecurityID`TradeDate)
dbPath = "dfs://TSDB_STOCK"
if(existsDatabase(dbPath)){dropDatabase(dbPath)}
db_snap = database(dbPath, VALUE, 2022.01.01..2022.01.05, engine='TSDB')
snap=createPartitionedTable(dbHandle=db_snap, table=schemaTable_snap, tableName="snap", partitionColumns=`TradeDate, sortColumns=`SecurityID`TradeDate, keepDuplicates=ALL, sortKeyMappingFunction=[hashBucket{,5}])
snap.append!(schemaTable_snap)
flushTSDBCache()
snap = loadTable(dbPath, `snap)
select * from snap
Example 2. Create in-memory partitioned tables
Example 2.1. Create a partitioned in-memory table
n = 20000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
t = table(n:0, colNames, colTypes)
db = database(, RANGE, `A`D`F)
pt = db.createPartitionedTable(t, `pt, `sym)
insert into pt values(09:30:00.001,`AAPL,100,56.5)
insert into pt values(09:30:01.001,`DELL,100,15.5)
Example 2.2. Create a partitioned keyed table
n = 20000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
t = keyedTable(`time`sym, n:0, colNames, colTypes)
db = database(, RANGE, `A`D`F)
pt = db.createPartitionedTable(t, `pt, `sym)
insert into pt values(09:30:00.001,`AAPL,100,56.5)
insert into pt values(09:30:01.001,`DELL,100,15.5)
Example 2.3. Create a partitioned stream table
When creating a partitioned stream table, the second parameter of
createPartitionedTable
must be a tuple of tables, and its
length must be equal to the number of partitions. Each table in the tuple represents
a partition. In the following example, trades_stream1 and trades_stream2 form a
partitioned stream table trades. We cannot directly write data to trades. Instead,
we need to write to trades_stream1 and trades_stream2.
n=200000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
trades_stream1 = streamTable(n:0, colNames, colTypes)
trades_stream2 = streamTable(n:0, colNames, colTypes)
db=database(, RANGE, `A`D`F)
trades = createPartitionedTable(db,[trades_stream1, trades_stream2], "", `sym)
insert into trades_stream1 values(09:30:00.001,`AAPL,100,56.5)
insert into trades_stream2 values(09:30:01.001,`DELL,100,15.5)
select * from trades;
time | sym | qty | price |
---|---|---|---|
09:30:00.001 | AAPL | 100 | 56.5 |
09:30:01.001 | DELL | 100 | 15.5 |
Example 2.4. Create a partitioned MVCC table
Similar to a partitioned stream table, to create a partitioned MVCC table, the second
parameter of createPartitionedTable
must be a tuple of tables, and
its length must be equal to the number of partitions. Each table in the tuple
represents a partition. In the following example, trades_mvcc1 and trades_mvcc2 form
a partitioned MVCC table trades. We cannot directly write data to trades. Instead,
we need to write to trades_mvcc1 and trades_mvcc2.
n=200000
colNames = `time`sym`qty`price
colTypes = [TIME,SYMBOL,INT,DOUBLE]
trades_mvcc1 = mvccTable(n:0, colNames, colTypes)
trades_mvcc2 = mvccTable(n:0, colNames, colTypes)
db=database(, RANGE, `A`D`F)
trades = createPartitionedTable(db,[trades_mvcc1, trades_mvcc2], "", `sym)
insert into trades_mvcc1 values(09:30:00.001,`AAPL,100,56.5)
insert into trades_mvcc2 values(09:30:01.001,`DELL,100,15.5)
select * from trades;
time | sym | qty | price |
---|---|---|---|
09:30:00.001 | AAPL | 100 | 56.5 |
09:30:01.001 | DELL | 100 | 15.5 |