mapr

在分布式环境中,使用 mapr 语句可以定义 用户自定义聚合函数(UDAF)<udaf> 的 map reduce 实现。此外,若要在 cgroup by 中使用用户自定义聚合函数,亦需要在 mapr 语句中进行声明。

语法

mapr udaf(args..){mapF1, mapF2, ..., mapFn -> reduceFunc[; cumF1, cumF2, ..., cumFn -> runningReduceFunc]}

参数

udaf(args...) 是用户自定义函数的函数名和参数。

mapF1~mapFn 是 map 函数,该部分计算将被 map 到各个数据节点进行。

reduceFunc 是 reduce 函数,map 的计算结果将被传入 reduceFunc 用于进一步计算。

[] 部分为可选参数,用于自定义聚合函数的 cgroup by 实现。cgroup(cumulative group)即累积分组,常用于按时间维度分组后累积计算每个组的统计值。与 group by 不同的是,将数据分组后,group by 每组的数据单独进行计算,而 cgroup by 每组参与计算的数据包含当前组和其之前所有组的数据,详见 cgroup by。

cumF1~cumFnmapF1~mapFn 一一对应,用于对 map 的结果进行进一步计算。cumF1~cumFn 通常指定为内置的累积函数,如 cumsum;或者指定为内置的 copy 函数,将 map 的结果直接传给 runningReduceFunc,然后在 runningReduceFunc 中定义累积聚合的逻辑。

注:

mapr 语句及相关的函数需要添加到 init 配置项指定的系统初始化脚本(默认 dolphindb.dos)中,重启 server 后才会生效。

例子

定义一个计算调和平均数的自定义聚合函数,调和平均数对应公式如下:

上述公式对 的倒数求平均后再取倒数。基于 mapr,将求倒数平均的部分 map 到各个分区并行计算,最后在 reduce 阶段,用加权平均汇总结果,然后再求一次倒数即可。


// 先声明普通的聚合函数
defg harmonicMean(x){
    return x.reciprocal().avg().reciprocal()
}

// 分布式实现
// 先定义一个 map 函数 reciprocalAvg,用于求倒数的平均
def reciprocalAvg(x) : reciprocal(x).avg()

// 定义一个 reduce 函数 harmonicMeanReduce 用于进行 reduce 部分汇总和倒数计算
defg harmonicMeanReduce(reciprocalAvgs, counts) : wavg(reciprocalAvgs, counts).reciprocal()

// mapr 语句,声明 harmonicMean 的 map-reduce 实现
mapr harmonicMean(x){reciprocalAvg(x), count(x) -> harmonicMeanReduce}

将上述脚本追加到 init 指定的初始化脚本中。重启 server,然后在客户端运行以下脚本:


// 定义一个普通聚合函数,用于和 mapr 实现进行对比
defg harmonicMean_norm(x){
    return x.reciprocal().avg().reciprocal()
}

// 创建分区内存表
n = 100000
t = table((2020.09.01 + rand(9, n)).sort!() as date, take(`IBM`MSFT, n) as sym, rand(10.0, n) as value)
db = database("", RANGE, [2020.09.01, 2020.09.03, 2020.09.10])
stock = db.createPartitionedTable(t, "stock", "date").append!(t)

// 对比查询分区内存表的耗时
timer(1000) select harmonicMean_norm(value) as gval from stock group by sym // Time elapsed: 4932.661 ms
timer(1000) select harmonicMean(value) as gval from stock group by sym // Time elapsed: 2490.347 ms

可以看出 map-reduce 的并行计算提升了查询的性能。

在上例的基础上,对 mapr 语句进行改写,定义调和平均数的 cgroup by 实现。

// 定义一个累积聚合函数 harmonicMeanRunning
defg harmonicMeanRunning(reciprocalAvgs, counts) : cumwavg(reciprocalAvgs, counts).reciprocal()

// 在上述 mapr 语句的基础上,声明 harmonicMean 的 cgroup by 实现
mapr harmonicMean(x){reciprocalAvg(x), count(x) -> harmonicMeanReduce; copy, copy -> harmonicMeanRunning}

将上述脚本继续更新到 init 指定的初始化脚本中,重启 server,然后在客户端运行以下脚本(复用上例创建的分区内存表):

// 在 cgroup by 语句中调用自定义聚合函数 harmonicMean
select harmonicMean(value) as gval from stock cgroup by date order by date

更多例子和说明请参考教程:自定义聚合函数