enableTableShareAndPersistence

Syntax

enableTableShareAndPersistence(table, tableName, [asynWrite=true], [compress=true], [cacheSize], [retentionMinutes=1440], [flushMode=0], [preCache], [cachePurgeTimeColumn], [cachePurgeInterval], [cacheRetentionTime])

Arguments

table is an empty stream table.

tableName is a string indicating the name of the shared table.

asynWrite (optional) is a Boolean value indicating whether persistence is enabled in asynchronous mode. The default value is true, meaning asynchronous persistence is enabled. In this case, once data is written into memory, the write is deemed complete. The data stored in memory is then persisted to disk by another thread.

compress (optional) is a Boolean value indicating whether to save a table to disk in compression mode. The default value is true.

cacheSize (optional) is a LONG integer specifying the maximum number of rows of the table to keep in memory. If not provided, all rows are kept.

  • The minimum valid value for this parameter is 1,000.

  • If cacheSize is greater than 1,000:

    • When each batch of appended data does not exceed cacheSize, the data volume in memory will not exceed 1.5 times of cacheSize.

    • When a batch exceeds cacheSize, the data volume in memory will not exceed 2 times the maximum number of appended rows.

retentionMinutes (optional) is an integer indicating for how long (in minutes) a log file larger than 1GB will be kept after last update. The default value is 1440, which means the log file is kept for 1440 minutes, i.e., 1 day.

flushMode (optional) is an integer indicating whether to enable synchronous disk flush. It can be 0 or 1. The persistence process first writes data from memory to the page cache, then flushes the cached data to disk. If flushMode is 0 (default), asynchronous disk flushing is enabled. In this case, once data is written from memory to the page cache, the flush is deemed complete and the next batch of data can be written to the table. If flushMode is set to 1, the current batch of data must be flushed to disk before the next batch can be written.

preCache (optional) is an integer indicating the number of records to be loaded into memory from the persisted stream table on disk when DolphinDB restarts. If it is not specified, all records are loaded into memory when DolphinDB restarts.

cachePurgeTimeColumn (optional) is a STRING scalar indicating the time column in the stream table.

cachePurgeInterval (optional) is a DURATION scalar indicating the interval to trigger cache purge.

cacheRetentionTime (optional) is a DURATION scalar indicating the retention time of cached data.

Details

Share a stream table, and enable it to be persisted to disk.

For this command to work, we need to specify the configuration parameter persistenceDir in the configuration file (dolohindb.cfg in standalone mode and cluster.cfg in cluster mode). For details of this configuration parameter, see Standalone Mode. The persistence location of the table is <PERSISTENCE_DIR>/<TABLE_NAME>. The directory contains 2 types of files: data files (named like data0.log, data1.log...) and an index file index.log. The data that has been persisted to disk will be loaded into memory after the system is restarted.

The parameter asynWrite informs the system whether table persistence is in asynchronous mode. With asynchronous mode, new data are pushed to a queue and persistence workers (threads) will write the data to disk later. With synchronous mode, the table append operation keeps running until new data are persisted to the disk. The default value is true (asynchronous mode). In general, asynchronous mode achieves higher throughput.

With asynchronous mode, table persistence is conducted by a single persistence worker (thread), and the persistence worker may handle multiple tables. If there is only one table to be persisted, an increase in the number of persistence workers doesn't improve performance.

Stream tables keep all data in memory by default. To prevent excessive memory usage, you can clear cached data using either of the following methods:

  • Set cacheSize that specifies the maximum number of rows to be kept in memory. When the row count reaches the cacheSize limit, the system will automatically remove the oldest 50% of rows to free up space.

  • Set cachePurgeTimeColumn, cachePurgeInterval and cacheRetentionTime. The system will clear data based on the time column specified by cachePurgeTimeColumn. Each time when a new record arrives, the system obtains the time difference between the new record and the oldest record kept in memory. If the time difference exceeds cachePurgeInterval, the system will retain only the data with timestamps within cacheRetentionTime of the new data, and clear the rest.

Note:
  • It is recommended to invoke command fflush to write data in the page cache to disk before you terminate a DolphinDB process (with kill -15) and restart it.

  • If asynWrite is set to true, streaming data is written at the fastest speed and data loss may occur due to server crash.

  • If asynWrite is set to false and flushMode to 0, data loss may occur due to operating system crash.

  • If asynWrite is set to false and flushMode to 1, the streaming data is written at the slowest speed, and server or operating system crash will not cause data loss.

  • It is not allowed to share a stream table multiple times by modifying the shared table name.

Examples

Example 1:

colName=["time","x"]
colType=["timestamp","int"]
t = streamTable(100:0, colName, colType);
enableTableShareAndPersistence(table=t, tableName=`st, cacheSize=1200000)
go;
for(s in 0:200){
    n=10000
    time=2019.01.01T00:00:00.000+s*n+1..n
    x=rand(10.0, n)
    insert into st values(time, x)
}
getPersistenceMeta(st);

// output
sizeInMemory->800000
asynWrite->true
totalSize->2000000
compress->true
memoryOffset->1200000
retentionMinutes->1440
sizeOnDisk->2000000
persistenceDir->/home/llin/hzy/server1/pst/st
hashValue->0
diskOffset->0

Example 2: Illustrate how to use cachePurgeTimeColumn, cachePurgeInterval, and cacheRetentionTime.

colName=["time","x"]
colType=["timestamp","int"]
t1 = streamTable(100:0, colName, colType);

enableTableShareAndPersistence(table=t1,tableName=`st1, cachePurgeTimeColumn=`time, cachePurgeInterval=duration("7H"),cacheRetentionTime=duration("2H"))

go;

time=2019.01.01T00:00:00.000
for(s in 0:6000){
  time = temporalAdd(time,1,"m");
  x=rand(10.0, 1)
  insert into st1 values(time, x)
}

getPersistenceMeta(st1);
/* output:
lastLogSeqNum->-1
sizeInMemory->300
totalSize->12000
asynWrite->true
compress->true
raftGroup->-1
memoryOffset->11700
retentionMinutes->1440
sizeOnDisk->11879
persistenceDir->/home/ffliu/jjxu/DolphinDB_Linux64_V3.0/server/persistence/st1
hashValue->0
diskOffset->0
*/

Related commands: disableTablePersistence, clearTablePersistence, enableTablePersistence