enableTableCachePurge
Syntax
enableTableCachePurge(table,
[cacheSize],[cachePurgeTimeColumn],[cachePurgeInterval],[cacheRetentionTime])
Arguments
table is an empty stream table.
tableName is a string indicating the name of the shared table.
cacheSize (optional) is a positive integer used to determine the maximum number of records to retain in memory.
cachePurgeTimeColumn (optional) is a STRING scalar indicating the time column in the stream table.
cachePurgeInterval (optional) is a DURATION scalar indicating the interval to trigger cache purge.
cacheRetentionTime (optional) is a DURATION scalar indicating the retention time of cached data.
Details
Enable cache purge for a non-persisted stream table.
Cache purge can be configured using either of the following methods:
- Cache purge by size: Set cacheSize to specify a threshold for the
number of records retained. Older records exceeding the threshold will be
removed. The threshold is determined as follows:
- If the number of records appended in one batchdoes not exceed cacheSize, the threshold is 2.5 * cacheSize.
- If the number of records appended in one batch exceeds cacheSize, the threshold is 1.2 * (appended records + cacheSize).
- Cache purge by time: Set cachePurgeTimeColumn, cachePurgeInterval and cacheRetentionTime. The system will clean up data based on the cachePurgeTimeColumn. Each time when a new record arrives, the system obtains the time difference between the new record and the oldest record kept in memory. If the time difference exceeds cachePurgeInterval, the system will retain only the data with timestamps within cacheRetentionTime of the new data.
Note: If a record has not been enqueued for publishing, it will not be removed.
Examples
Example 1. Set cacheSize.
t = streamTable(1000:0, `time`sym`volume, [DATETIME, SYMBOL, INT])
enableTableCachePurge(table=t, cacheSize=1000)
time = datetime(2024.01.01T09:00:00) +1..1000*2
sym=take(`a`b`c, 1000)
volume = rand(10,1000)
insert into t values([time, sym, volume])
getStreamTableCacheOffset(t)
// output: 0
time = datetime(2024.01.01T09:35:00) +1..1000*2
sym=take(`a`b`c, 1000)
volume = rand(10,1000)
insert into t values([time, sym, volume])
getStreamTableCacheOffset(t)
// output: 500
t = streamTable(1000:0, `time`sym`volume, [DATETIME, SYMBOL, INT])
enableTableCachePurge(table=t, cachePurgeTimeColumn=`time, cachePurgeInterval=30m, cacheRetentionTime=20m)
time = datetime(2024.01.01T09:00:00) +1..1000*2
sym=take(`a`b`c, 1000)
volume = rand(10,1000)
insert into t values([time, sym, volume])
getStreamTableCacheOffset(t)
// output: 0
time = datetime(2024.01.01T09:35:00) +1..1000*2
sym=take(`a`b`c, 1000)
volume = rand(10,1000)
insert into t values([time, sym, volume])
getStreamTableCacheOffset(t)
// output: 999