dynamicGroupCumsum

Syntax

dynamicGroupCumsum(cumValue, prevCumValue, membership, prevMembership, groupCount)

Arguments

cumValue is a numeric vector that records the cumulative value of the event at the current timestamp.

prevCumValue is a numeric vector, of which elements can be NULL values (the first record of each group), indicating the cumulative value of the event at the previous timestamp of cumValue.

membership is a vector of INT type, of which elements must be integers in the interval [0, groupCount), indicating tags for records at the current timestamp.

prevMembership is a vector of INT type, of which elements can be NULL value (the first record of each group), indicating tags for records at the previous timestamp of membership.

groupCount is an integer in the interval [2, 8], indicating the number of tags.

Details

The attribute and category of an event are fixed in most cases. In some scenarios, the category of an event, however, will change dynamically. For example, when processing real-time tick data, users may judge whether an order (attribute) is a large or a small one (category) based on the cumulative volume to analyze capital flow. As real-time data continues to flow in, trading volume keeps increasing, and thus a small order may change to a large one.

Function dynamicGroupCumsum is used in such scenarios to obtain the cumulative sum of an indicator for events of different categories.

Details are as follows:

  • If membership = prevMembership, count remains unchanged.

  • If membership ≠ prevMembership, the count of corresponding group of membership increases by cumValue, and the count of corresponding group of prevMembership decreases by preCumValue.

  • If prevMembership is a NULL value (the first record of each group), the count of corresponding group of membership increases by cumValue.

It returns a tuple of length groupCount. Each element is a vector of the same length as membership, which sequentially records the cumulative sum of an indicator (cumValue) for each tag.

Note: The index of the tuple matches the tags, which means that the count of tag 0 is output at index 0 of the tuple.

Examples

Data preparation:

// Define a function to generate tags
def tag_func(v){

  return iif(v <= 5, 0, iif(v <= 10 and v > 5, 1, 2))
# output
}
// original table
time = take(2022.01.01T09:00:00.000 + 1..3, 6)
sym=`st0`st0`st0`st1`st1`st1
orderNo = `10001`10002`10001`10002`10003`10002
volume = 2 4 6 3 2 9
t = table(sym, time, orderNo, volume)

// calculate cumulative sums and tag the results
t1 = select *, cumsum(volume) as sumVolume from t context by sym, orderNo
t2 = lj(t, t1,`sym`time`orderNo)
t3 = select sym, time, orderNo, volume, sumVolume, tag_func(sumVolume) as groupId from t2

For historical data, you can use SQL statement to calculate the cumulative volume for each group:

t4 = select sym, time, orderNo, prev(groupId) as prevGroupId, groupId, prev(sumVolume) as prevSumVolume, sumVolume from t3 context by sym,orderNo
t5 = lj(t3, t4,`sym`time`orderNo)
re = select sym, time, orderNo, dynamicGroupCumsum(sumVolume, prevSumVolume, groupId, prevGroupId, 3) as `groupId0`groupId1`groupId2 from t5 context by sym
re
sym time orderNo groupId0 groupId1 groupId2
st0 2022.01.01T09:00:00.001 10001 2 0 0
st0 2022.01.01T09:00:00.002 10002 6 0 0
st0 2022.01.01T09:00:00.003 10001 4 8 0
st1 2022.01.01T09:00:00.001 10002 3 0 0
st1 2022.01.01T09:00:00.002 10003 5 0 0
st1 2022.01.01T09:00:00.003 10002 2 0 12

For real-time data, you can use reactive state engine to calculate the cumulative volume for each group:

result = table(1000:0, `sym`time`orderNo`groupId0`groupId1`groupId2, [SYMBOL, TIME, SYMBOL,INT,INT,INT])
factor0 = [ <time>, <prev(groupId) as prevGroupId>, <groupId>, <prev(sumVolume) as prevSumVolume>, <sumVolume>]
factor1 = [<time>, <orderNo>, <dynamicGroupCumsum(sumVolume, prevSumVolume, groupId, prevGroupId, 3)>]
dm1 = table(1000:0, `sym`time`orderNo`volume`sumVolume`groupId, [SYMBOL, TIME, SYMBOL,INT, INT,INT])
dm2 = table(1000:0, `sym`orderNo`time`prevGroupId`groupId`prevSumVolume`sumVolume, [SYMBOL, SYMBOL, TIME, INT, INT, INT, INT])
res1 = createReactiveStateEngine(name="reactive_csum", metrics =factor1, dummyTable=dm2, outputTable=result, keyColumn=`sym, keepOrder=true)
res0 = createReactiveStateEngine(name="reactive_prev", metrics =factor0, dummyTable=dm1, outputTable=res1, keyColumn=`sym`orderNo, keepOrder=true)
res0.append!(t3)

select * from result
sym time orderNo groupId0 groupId1 groupId2
st0 2022.01.01T09:00:00.001 10001 2 0 0
st0 2022.01.01T09:00:00.002 10002 6 0 0
st0 2022.01.01T09:00:00.003 10001 4 8 0
st1 2022.01.01T09:00:00.001 10002 3 0 0
st1 2022.01.01T09:00:00.002 10003 5 0 0
st1 2022.01.01T09:00:00.003 10002 2 0 12
dropStreamEngine("reactive_csum")
dropStreamEngine("reactive_prev")

Related function: dynamicGroupCumcount