enableTableShareAndCachePurge

Syntax

enableTableShareAndCachePurge(table, tableName, [cacheSize],[cachePurgeTimeColumn],[cachePurgeInterval],[cacheRetentionTime])

Arguments

table is an empty stream table.

tableName is a string indicating the name of the shared table.

cacheSize (optional) is a positive integer used to determine the maximum number of records to retain in memory.

cachePurgeTimeColumn (optional) is a STRING scalar indicating the time column in the stream table.

cachePurgeInterval (optional) is a DURATION scalar indicating the interval to trigger cache purge.

cacheRetentionTime (optional) is a DURATION scalar indicating the retention time of cached data.

Details

Share a non-persisted stream table with cache purge.

Cache purge can be configured using either of the following methods:

  • Cache purge by size: Set cacheSize to specify a threshold for the number of records retained. Older records exceeding the threshold will be removed. The threshold is determined as follows:
    • If the number of records appended in one batchdoes not exceed cacheSize, the threshold is 2.5 * cacheSize.
    • If the number of records appended in one batch exceeds cacheSize, the threshold is 1.2 * (appended records + cacheSize).
  • Cache purge by time: Set cachePurgeTimeColumn, cachePurgeInterval and cacheRetentionTime. The system will clean up data based on the cachePurgeTimeColumn. Each time when a new record arrives, the system obtains the time difference between the new record and the oldest record kept in memory. If the time difference exceeds cachePurgeInterval, the system will retain only the data with timestamps within cacheRetentionTime of the new data.

Note: If a record has not been enqueued for publishing, it will not be removed.

Examples

Example 1. Set cacheSize.

t = streamTable(1000:0, `time`sym`volume, [DATETIME, SYMBOL, INT])
enableTableShareAndCachePurge(table=t, tableName=`st, cacheSize=1000)
time = datetime(2024.01.01T09:00:00) +1..1000*2
sym=take(`a`b`c, 1000)
volume = rand(10,1000)

insert into t values([time, sym, volume])
getStreamTableCacheOffset(t)
// output: 0

time = datetime(2024.01.01T09:35:00) +1..1000*2
sym=take(`a`b`c, 1000)
volume = rand(10,1000)
insert into t values([time, sym, volume])
getStreamTableCacheOffset(t)
// output: 500
Example 2. Set cachePurgeTimeColumn, cachePurgeInterval and cacheRetentionTime.
t = streamTable(1000:0, `time`sym`volume, [DATETIME, SYMBOL, INT])
enableTableShareAndCachePurge(table=t, tableName=`st, cachePurgeTimeColumn=`time,
 cachePurgeInterval=30m, cacheRetentionTime=20m)

time = datetime(2024.01.01T09:00:00) +1..1000*2
sym=take(`a`b`c, 1000)
volume = rand(10,1000)

insert into t values([time, sym, volume])
getStreamTableCacheOffset(t)
// output: 0

time = datetime(2024.01.01T09:35:00) +1..1000*2
sym=take(`a`b`c, 1000)
volume = rand(10,1000)
insert into t values([time, sym, volume])
getStreamTableCacheOffset(t)
// output: 999