mr
Syntax
mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true])
Details
mr is a general-purpose DolphinDB function for performing MapReduce computations. When data is distributed across multiple data sources, you can first compute each data source separately, then merge the partial results, and finally apply any additional processing you need.
What Is MapReduce
MapReduce has two core phases. For a more detailed introduction, see What is MapReduce:
- map: Executes the same computation logic on each data source separately.
- reduce: Merges multiple map results step by step into a single result.
mr function supports a third phase, final, which uses the
finalFunc parameter to perform final processing on the merged result
and generate the desired output.Applicable Scenarios
- Data is spread across multiple data sources, such as multiple partitions or multiple tables.
- Each dataset can be computed independently first and then aggregated.
- The computation logic is the same for each individual data source.
- You want to improve processing efficiency through parallel computing.
Parameters
ds: A list of data sources. This parameter must be a tuple, and each element in the tuple must be a data source object. Even if there is only one data source, you must still set it as a tuple.
mapFunc: The computation function applied to each data source. It takes only one parameter: the data object extracted from the current data source list and made available for computation. If you want to pass multiple parameters to the map function, use partial application to transform them into a single parameter. The map function is called once for each data source. It returns either a regular object (scalar, pair, vector, matrix, table, set, or dictionary) or a tuple containing multiple regular objects.
reduceFunc: Optional. A binary function used to merge two return values. These two return values may both come from the map function, or one of them may be the return value from the previous reduce function call. The system continues merging the result from the previous step with the return value of the next map function until it has merged the return values from all map functions.
finalFunc: Optional. A function that generates the final output. It takes only one parameter: the output of the last reduce function call. If you specify a final function but do not specify a reduce function, the system first combines all map function results into a tuple, and then calls the final function with that tuple as input.
parallel: Optional. Specifies whether to execute the map function in parallel. The default value is true. In general, enabling parallel execution can improve computation speed, but you may want to set this parameter to false in the following cases:
- A single map computation uses a very large amount of memory.
- Running multiple threads concurrently may cause thread-safety issues. For example, errors may occur if multiple threads write to the same file at the same time.
Returns
- If you specify finalFunc, mr returns the result of finalFunc.
- If you do not specify finalFunc but do specify reduceFunc, mr returns the final result of reduceFunc.
- If neither is specified, mr returns the aggregated results of all mapFunc calls as a tuple.
Examples
Example 1: Compute the global mean and variance across multiple partitions.
// Create a catalog and switch to it
createCatalog("demo")
go
use catalog demo
// Create a DFS database and a partitioned table in the current catalog
create database mr_demo partitioned by VALUE(2024.01.01 2024.01.02 2024.01.03), engine='OLAP'
go
create table mr_demo.pt (
date DATE,
id INT,
value DOUBLE
)
partitioned by date
// Insert test data
data = table(
2024.01.01 2024.01.01 2024.01.01 2024.01.02 2024.01.02 2024.01.03 2024.01.03 as date,
1 2 3 4 5 6 7 as id,
10.0 20.0 30.0 40.0 50.0 60.0 70.0 as value
)
demo.mr_demo.pt.append!(data)
// Use sqlDS to convert the query into a list of data sources
ds = sqlDS(<select * from mr_demo.pt>)
// map: Computes local statistics for each partition
def statsMap(table){
x = table.value // Get the values from the value column of the table
return [x.size(), sum(x), sum(pow(x, 2.0))] // Compute the row count, the sum of the values, and the sum of squares of the values
}
// reduce: Adds two local statistics item by item
def statsReduce(x, y){
return [x[0] + y[0], x[1] + y[1], x[2] + y[2]]
}
// final: Computes the global mean and variance from the aggregated statistics
def statsFinal(result){
count = result[0]
totalSum = result[1]
totalSquares = result[2]
avg = totalSum / count
variance = totalSquares / count - avg * avg
return table(avg as mean, variance as variance)
}
// Call mr
result = mr(ds, statsMap, statsReduce, statsFinal)
// View the result
result
The output is as follows:
| mean | variance |
|---|---|
| 40 | 400 |
Example 2: Compute least-squares linear regression across multiple data sources.
Linear regression predicts a dependent variable from one or more independent variables. Let the dependent variable be y and the independent-variable matrix be X. The goal of ordinary least squares is to find the regression coefficient beta, computed as follows:
beta = (X^T X)^(-1) X^T y
Therefore, the key to completing the regression is to first obtain X^T X and X^T y for the entire dataset. When data is distributed across multiple data sources, you can first compute the local Xi^T Xi and Xi^T yi on each data source, and then aggregate them into the global result. Sample code:
def myOLSMap(table, yColName, xColNames, intercept){
if(intercept)
x = matrix(take(1.0, table.rows()), table[xColNames])
else
x = matrix(table[xColNames])
xt = x.transpose();
return xt.dot(x), xt.dot(table[yColName])
}
def myOLSFinal(result){
xtx = result[0]
xty = result[1]
return xtx.inv().dot(xty)[0]
}
def myOLSEx(ds, yColName, xColNames, intercept){
return mr(ds, myOLSMap{, yColName, xColNames, intercept}, +, myOLSFinal)
}
-
map:
myOLSMapperforms the local computation for a single data source. Its inputs are:- table: The data table corresponding to the current data source.
- yColName: The name of the dependent-variable column.
- xColNames: Independent variable column names.
- intercept: Specifies whether to include an intercept term.
This function first constructs the local independent variable matrix Xi, then returns the local statistics for the current data source: Xi^T Xi and Xi^T yi.
-
reduce: Uses
+as the reduce function to add the results from individual data sources item by item.- For example, if two partitions return (X1^T X1, X1^T y1) and (X2^T X2, X2^T y2), the reduce step produces (X1^T X1 + X2^T X2, X1^T y1 + X2^T y2).
- It then merges this result with the next partition result and ultimately produces the global (X^T X, X^T y).
-
final:
myOLSFinalprocesses the aggregated result from the reduce phase and computes the regression coefficients using the least squares formula.- xtx: The aggregated X^T X.
- xty: The aggregated X^T y.
xtx.inv(): The inverse of the matrix xtx.xtx.inv().dot(xty): Matrix multiplication, corresponding to the formula(X^T X)^(-1) X^T y.
myOLSExwraps themrfunction and runs the computation through the map → reduce → final workflow.
