Distributed Computing

This section describes parallel function calling, remote function calling, parallel remote calling, and pipeline function.

Parallel Function Call

DolphinDB can divide a large task into multiple subtasks for simultaneous execution.

Parallel function calls usually utilize one of the two higher order functions: peach or ploop. peach and ploop are the parallel computing version of each and loop, respectively. For the difference between each and loop, please refer to the section about loop.

There are 3 scenarios of parallel function calls.

(1) same function but different parameters

peach(log, (1..3, 4..6));
#0 #1
0 1.386294
0.693147 1.609438
1.098612 1.791759
ploop(log, (1..3, 4..6));
([0,0.693147,1.098612],[1.386294,1.609438,1.791759])

(2) different functions but same parameters

peach(call{, 3 4 5}, (log, sum));
log sum
1.098612 12
1.386294 12
1.609438 12
ploop(call{, 3 4 5}, (log, sum));
([1.098612,1.386294,1.609438],12)

Note that in the examples above, we cannot write peach((log, sum), 3 4 5). This is because the first parameter of a template has to be a function name, not function name array. To call multiple functions in peach or ploop, we need to use the template call.

(3) different functions and different parameters

x=[log, exp];
y=[1 2 3, 4 5 6];
peach(call, x, y);
#0 #1
0 54.59815
0.693147 148.413159
1.098612 403.428793
ploop(call, x, y);
([0,0.693147,1.098612],[54.59815,148.413159,403.428793])

How Parallel Computing Works in DolphinDB

DolphinDB supports parallel computing through multi-threading. Suppose there are n tasks and there are m local executors. (For local executors please refer to the section about distributed computing concepts). The calling thread (worker) generates n sub tasks and pushes nm*/(1 + m) sub tasks to the local executor task queue. The remaining n/(1 + m) sub tasks will be executed by the calling thread. After all n sub tasks are executed, the calling thread combines the individual results to produce the final result.

To use parallel function call, we need to make sure the number of local executors is set to be a positive integer in a configuration file.

The system throws an exception if parallel function calls are initiated within a sub task, as it may cause deadlock. If we initiate parallel function calls within a sub task, the system will allocate these new sub tasks to local executors, but all of the local executors have been assigned sub tasks when we initiate parallel function calls the first time (when n>1 + m). Since a local executor can only process one task at a time, we may have situations where local executors have self-contradictory work flow priorities and as a result the sub tasks cannot be executed.

Some built-in functions enable parallel function call if the number of local executors is set to be a positive integer in the system configuration file, such as peach, ploop, pnodeRun, ploadText and loadText.

Remote Function Call

In DolphinDB, we can send a local function that calls other local functions (all of which can be either built-in functions or user-defined functions) to run on a remote node on the fly without compilation or deployment. The system automatically serializes the function definition and the definitions of all dependent functions together with necessary local data to remote nodes.

We must open a connection before a remote call. To open a connection, we can run conn=xdb(host, port) where host is the host name (IP address or website) of the remote node and port is the port number of the remote node.

There are 3 ways to close a connection:

(1) call the close command.

(2) conn/=NULL.

(3) the connection will be closed automatically when the current session closes.

You can use remoteRun , remoteRunWithCompression and rpc for remote call. Their differences are:

  • rpc utilizes existing asynchronous connections among data nodes in the cluster; remoteRun uses explicitly created connections by xdb function.

  • The calling node and the remote node of the rpc function must be located in the same cluster; there is no such limitation for remoteRun.

There are 3 ways to remote call:

(1) Execute script on a remote node.

Syntax: remoteRun(conn, script) or conn(script) where script must be double quoted (a string).

conn =  xdb("localhost",81);
remoteRun(conn, "x=rand(1.0,10000000); y=x pow 2; avg y");
// output
0.333254

(2) Execute a remote function on a remote node. The function is defined on the remote node, while the parameters are located on the local node.

Syntax: remoteRun(conn, "functionName", param1, param2, ...) or conn("function name", param1, param2, ...)

functionName must be quoted. The function could be either a built-in or user-defined function.

conn =  xdb("localhost",81);
remoteRun(conn, "avg", rand(1.0,10000000) pow 2);
// output
0.333446

(3) execute a local function on a remote node. The function is defined in the local node. It could be a built-in function or user-defined function, a named function or anonymous function. The parameters of the function are also located on the local node.

This is the most powerful feature of remote call in DolphinDB. We can send a local function that calls other local functions (all of them can be either built-in functions or user-defined functions) to run on a remote node on the fly without compilation or deployment. The system will automatically serialize the function definition and the definitions of all dependent functions together with necessary local data to remote nodes. Some other systems can only remote call functions without any user-defined dependent functions.

Syntax of remoteRun: remoteRun(conn, functionName, param1, param2, ...) or conn(functionName, param1, param2, ...)

functionName must not be quoted. param1, param2, ... are function arguments. The function could be either a built-in function or a user-defined function on the calling node.

Syntax of rpc: rpc(nodeAlias, function, param1, param2, ...)

function must not be quoted. param1, param2, ... are function arguments. The function could be either a built-in function or a user-defined function on the calling node. Both the calling node and the remote node must be located in the same cluster. Otherwise, we need to use remoteRun function.

  • Example 1: remote call a user-defined function with a local dataset

Assume at the local node we have a table EarningsDates with 2 columns: stock ticker and date. For each of the 3 stocks in the table, we have the date when it announced earnings for the 3rd quarter of 2006. There is a table USPrices at a remote node with machine name "localhost" and port number 8081. It contains daily stock prices for all US stocks. We would like to get the stock prices from the remote node for all stocks in EarningsDates for the week after they announced earnings.

At the remote node, we import the data file to create the table USPrices, and then share it across all nodes as sharedUSPrices.

USPrices = loadText("c:/DolphinDB/Data/USPrices.csv");
share USPrices as sharedUSPrices;

When we create a connection to a remote node, the remote node creates a new session for this connection. This new session is completely isolated from other sessions on the remote node. This is convenient for development as developers don't have to worry about name conflicts. In this case, however, we do want to share data among multiple sessions on the same node. We can use the statement share to share the objects. Currently only tables can be shared in DolphinDB.

We create a table EarningsDates at the local node, and send the table with the script over to a remote node. After the execution, the result is sent back to the local node.

We create a table EarningsDates at the local node, and send the table with the script over to a remote node. After the execution, the result is sent back to the local node.

EarningsDates=table(`XOM`AAPL`IBM as TICKER, 2006.10.26 2006.10.19 2006.10.17 as date)

def loadDailyPrice(data){
    dateDict = dict(data.TICKER, data.date)
    return select date, TICKER, PRC from objByName("sharedUSPrices") where dateDict[TICKER]<date<=dateDict[TICKER]+7
}
conn = xdb("localhost",8081)
prices = conn(loadDailyPrice, EarningsDates);

prices;
date TICKER PRC
2006.10.27 XOM 71.46
2006.10.30 XOM 70.84
2006.10.31 XOM 71.42
2006.11.01 XOM 71.06
2006.11.02 XOM 71.19
2006.10.18 IBM 89.82
2006.10.19 IBM 89.86
2006.10.20 IBM 90.48
2006.10.23 IBM 91.56
2006.10.24 IBM 91.49
2006.10.20 AAPL 79.95
2006.10.23 AAPL 81.46
2006.10.24 AAPL 81.05
2006.10.25 AAPL 81.68
2006.10.26 AAPL 82.19
  • Example 2: remote call a built-in function that quotes a user-defined function

def jobDemo(n){
s = 0
for (x in 1 : n) {
s += sum(sin rand(1.0, 100000000)-0.5)
print("iteration " + x + " " + s)
}
return s
};

Remote call with function remoteRun:

conn = xdb("DFS_NODE2")
conn.remoteRun(submitJob, "jobDemo", "job demo", jobDemo, 10);
// output
Output: jobDemo4

conn.remoteRun(getJobReturn, "jobDemo")
// output
Output: 4238.832005

Remote call with function rpc:

  • Please avoid cyclical calls with remoteRun as it may cause deadlocks. For example, if we run the following script on localhost:8080:

def testRemoteCall() {
    h=xdb("localhost", 8080)
    return h.remoteRun("1+2")
}
h = xdb("localhost", 8081)
h.remoteRun(testRemoteCall)

The node 8080 sends the locally defined function testRemoteCall to the node of 8081, which will send the script "1+2" back to execute on the node of 8080. When a node receives a job, it will assign a worker thread to execute the job. The remote call from 8080 to 8081 and "1+2" are both executed on the node 8080 and may be assigned the same worker. If these 2 jobs share the same worker, a deadlock occurs.

Parallel Remote Call

A remote call is in blocking mode, i.e., it will not return results until the remote node completes the function call. Parallel remote call with remoteRun needs to be used with ploop or peach. In the following example, template each executes the user-defined function experimentPi on node 8081, and then on node 8082; while template peach executes the function experimentPi on node 8081 and node 8082 simultaneously. We can see peach saves a significant amount of time compared with each.

def simuPi(n){
x=rand(1.0, n)
y=rand(1.0, n)
return 4.0 * sum(x*x + y*y<=1.0) / n
}
def experimentPi(repeat, n): avg each(simuPi, take(n, repeat));

// create 2 connections
conns = each(xdb, "localhost", 8081 8082);
conns;
("Conn[localhost:8081:1166953221]","Conn[localhost:8082:1166953221]")

timer result = each(remoteRun{, experimentPi, 10, 10000000}, conns);
// output
Time elapsed: 6579.82 ms

timer result = peach(remoteRun{, experimentPi, 10, 10000000}, conns);
// output
Time elapsed: 4122.29 ms
// parallel computing saves running time

print avg(result)
// output
3.141691

// close two connections
each(close, conns);

To use remoteRun in parallel remote call, we need to establish connections to each remote node with function xdb. To remote call the nodes within the same cluster as the calling node, we can use pnodeRun.

Syntax: pnodeRun(function, [nodes], [addNodeToResult])

function: the local function to call. It must not be quoted. It must have no parameters. It can be a function with no parameters by definition, or a partial application that wraps the orginal function and its parameters to a function with no parameters. It can be a built-in function or a user-defined function.

nodes: aliases of nodes. It is an optional parameter. If it is not specified, the system will call the function on all live data nodes in the cluster.

addNodeToResult: whether to add aliases of nodes to results. It is an optional parameter. The default value is true. If the returned result from each node already contains the node alias, we can set it to false.

pnodeRun calls a local function on multiple remote nodes in parallel and then merges the results. Both the calling node and the remote nodes must be located in the same cluster.

In the following simple example, we wrap the function sum and arguments 1..10 to a partial application sum{1..10}.

pnodeRun(sum{1..10}, `nodeA`nodeB);
Output:
Node          Value
DFS_NODE2        55
DFS_NODE3        55

pnodeRun is a very convenient tool for cluster management. For example, in a cluster with 4 nodes: "DFS_NODE1", "DFS_NODE2", "DFS_NODE3", and "DFS_NODE4", run the following script on each of the node:

def jobDemo(n){
s = 0
for (x in 1 : n) {
    s += sum(sin rand(1.0, 100000000)-0.5)
    print("iteration " + x + " " + s)
}
return s
};

submitJob("jobDemo1","job demo", jobDemo, 10);
submitJob("jobDemo2","job demo", jobDemo, 10);
submitJob("jobDemo3","job demo", jobDemo, 10);

To check the status of the most recent 2 completed batch jobs on each of the 4 nodes in the cluster:

pnodeRun(getRecentJobs{2});
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE4 root jobDemo2 job demo 2017.11.21T15:40:22.026 2017.11.21T15:40:22.027 2017.11.21T15:40:43.103
DFS_NODE4 root jobDemo3 job demo 2017.11.21T15:40:22.027 2017.11.21T15:40:22.037 2017.11.21T15:40:43.115
DFS_NODE1 root jobDemo2 job demo 2017.11.21T15:39:48.087 2017.11.21T15:39:48.088 2017.11.21T15:40:03.714
DFS_NODE1 root jobDemo3 job demo 2017.11.21T15:39:48.088 2017.11.21T15:39:48.089 2017.11.21T15:40:03.767
DFS_NODE2 root jobDemo2 job demo 2017.11.21T15:39:58.788 2017.11.21T15:39:58.788 2017.11.21T15:40:14.114
DFS_NODE2 root jobDemo3 job demo 2017.11.21T15:39:58.788 2017.11.21T15:39:58.791 2017.11.21T15:40:14.178
DFS_NODE3 root jobDemo2 job demo 2017.11.21T15:40:16.945 2017.11.21T15:40:16.945 2017.11.21T15:40:38.466
DFS_NODE3 root jobDemo3 job demo 2017.11.21T15:40:16.945 2017.11.21T15:40:16.947 2017.11.21T15:40:38.789
pnodeRun(getRecentJobs{2}, `DFS_NODE3`DFS_NODE4);
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE3 root jobDemo2 job demo 2017.11.21T15:40:16.945 2017.11.21T15:40:16.945 2017.11.21T15:40:38.466
DFS_NODE3 root jobDemo3 job demo 2017.11.21T15:40:16.945 2017.11.21T15:40:16.947 2017.11.21T15:40:38.789
DFS_NODE4 root jobDemo2 job demo 2017.11.21T15:40:22.026 2017.11.21T15:40:22.027 2017.11.21T15:40:43.103
DFS_NODE4 root jobDemo3 job demo 2017.11.21T15:40:22.027 2017.11.21T15:40:22.037 2017.11.21T15:40:43.115

pnodeRun follows these rules to merge the results from multiple nodes:

(1) If the function returns a scalar:

Return a table with 2 columns: node alias and function results.

Continuing with the example above:

pnodeRun(getJobReturn{`jobDemo1})
// output
Output:
Node          Value
DFS_NODE3        2,123.5508
DFS_NODE2        (42,883.5404)
DFS_NODE1        3,337.4107
DFS_NODE4        (2,267.3681)

(2) If the function returns a vector:

Return a matrix. Each column of the matrix would be the function returns from nodes. The column label of the matrix would be the nodes.

(3) If the function returns a key-value dictionary:

Return a table with each row representing the function return from one node.

(4) If the function returns a table:

Return a table which is the union of individual tables from multiple nodes.

Please see the aforementioned example of pnodeRun(getRecentJobs{2}).

(5) If the function is a command (a command returns nothing):

Return nothing

(6) For all other cases:

Return a dictionary. The key would be node alias and the value would be the function return.

Data Source

Data source is a special type of data object that contains the following information about a data entity:

  1. Meta descriptions of a data entity. By executing a data source, we can obtain a materialized data entity such as table, matrix, vector, etc. In DolphinDB's distributed computing framework, lightweight data source objects instead of large data entities are passed to remote sites for computing, which dramatically reduces the network traffic.

  2. The locations of the execution venue. A data source could have 0, 1 or multiple locations. A data source with 0 location is a local data source. In case of multiple locations, these locations are backing up each other. The system randomly picks a location for distributed computing. When the data source is instructed to cache the materialized data object, the system picks the location where data were successfully retrieved last time.

  3. An attribute to instruct the system to cache the data or clear the cache. For iterative computing algorithms (e.g. machine learning algorithms), data caching could significantly boost computing performance. Cached data will be cleared when the system runs out of memory. If this happens, the system can recover the data since the data source contains all meta descriptions and data transforming functions.

  4. A data source object may also contain multiple data transforming functions to further process the retrieved data. These data transforming functions are executed sequentially, with the output of one function as the input (and the only input) of the next function. It is generally more efficient for the data transforming functions to be included in the data source instead of the core computing operation. While there is no performance difference if the retrieved data is needed only once, it makes a huge difference for iterative computing with data sources with cached data objects. If the transforming operations are in the core computing unit, each iteration needs to execute the transformation; if the transforming operations are in the data source, they are executed only once.

Map-Reduce

The Map-Reduce function is the core function of DolphinDB's generic distributed computing framework.

Syntax: mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true])

  • ds: the list of data sources. This required parameter must be a tuple and each element of the tuple is a data source object. Even if there is only one data source, we still need a tuple to wrap the data source.

  • mapFunc: the map function. It accepts one and only one argument, which is the materialized data entity from the corresponding data source. If we would like the map function to accept more parameters in addition to the materialized data source, we can use a PartialApplication to convert a multiple-parameter function to a unary function. The number of map function calls is the number of data sources. The map function returns a regular object (scalar, pair, array, matrix, table, set, or dictionary) or a tuple (containing multiple regular objects).

  • reduceFunc: the binary reduce function that combines two map function call results. The reduce function in most cases is trivial. An example is the addition function. The reduce function is optional. If the reduce function is not specified, the system simply returns all individual map call results to the final function.

  • finalFunc: the final function accepts one and only one parameter. The output of the last reduce function call is the input of the final function. The final function is optional. If it is not specified, the system returns the individual map function call results.

  • parallel: an optional boolean flag indicating whether to execute the map function in parallel locally. The default value is true, i.e. enabling parallel computing. When there is very limited available memory and each map call needs a large amount of memory, we can disable parallel computing to prevent the out-of-memory problem. We may also want to disable the parallel option to ensure thread safety. For example, if multiple threads write to the same file simultaneously, errors may occur.

The following is an example of distributed linear regression. X is the matrix of independent variables and y is the dependent variable. X and y are stored in multiple data sources. To estimate the least square parameters, we need to calculate X T X and X T y. We can calculate the tuple of (X T X, X T y) from each data source, then add up the results from all data sources to get X T X and X T y for the entire dataset.

def myOLSMap(table, yColName, xColNames, intercept){
    if(intercept)
        x = matrix(take(1.0, table.rows()), table[xColNames])
    else
        x = matrix(table[xColNames])
    xt = x.transpose()
    return xt.dot(x), xt.dot(table[yColName])
}

def myOLSFinal(result){
    xtx = result[0]
    xty = result[1]
    return xtx.inv().dot(xty)[0]
}

def myOLSEx(ds, yColName, xColNames, intercept){
    return mr(ds, myOLSMap{, yColName, xColNames, intercept}, +, myOLSFinal)
}

In the example above, we define the map function and final function. In practice, we may define transformation functions for data sources as well. These functions are defined in the local instance only and they don't exist at remote instances. Users don't need to compile them or deploy them to the remote instances before using them. DolphinDB's distributed computing framework handles these complicated issues for end users on the fly. It is extremely easy to develop distributed analytical functions and applications in DolphinDB.

As a frequently used analytics tool, the distributed least square linear regression is implemented in DolphinDB core library already. The built-in version( olsEx) provides more features.

Iterative Computing

Iterative computing is a commonly used computing methodology. Many machine learning methods and statistical models use iterative algorithms to estimate model parameters.

DolphinDB offers function imr for iterative computing based on the map-reduce methodology. Each iteration uses the result from the previous iteration and the input dataset. The input dataset for each iteration is unchanged so that it can be cached. Iterative computing requires initial values for the model parameters and a termination criterion.

Syntax: imr(ds, initValue, mapFunc, [reduceFunc], [finalFunc], terminateFunc, [carryover=false])

  • ds: the list of data sources. It must be a tuple with each element as a data source object. Even if there is only one data source, we still need a tuple to wrap the data source. In iterative computing, data sources are automatically cached and the cache will be cleared after the last iteration.

  • initValue: the initial values of model parameter estimates. The format of the initial values must be the same as the output of the final function.

  • mapFunc: the map function. It has two arguments. The first argument is the data entity represented by the corresponding data source. The second argument is the output of the final function in the previous iteration, which is an updated estimate of the model parameter. For the first iteration, it is the initial values given by the user.

  • reduceFunc: the binary reduce function combines two map function call results. If there are M map calls, the reduce function will be called M-1 times. The reduce function in most cases is trivial. An example is the addition function. The reduce function is optional.

  • finalFunc: the final function in each iteration. It accepts two arguments. The first argument is the output of the final function in the previous iteration. For the first iteration, it is the initial values given by the user. The second argument is the output of the reduce function call. If the reduce function is not specified, a tuple representing the collection of individual map call results would be the second argument.

  • terminateFunc: this is either a function that determines if the computation would continue, or a specified number of iterations. The termination function accepts two parameters. The first is the output of the reduce function in the previous iteration and the second is the output of the reduce function in the current iteration. If the function returns a true value, the iterations will end.

  • carryover: a Boolean value indicating whether a map function call produces a carryover object to be passed to the next iteration of the map function call. The default value is false. If it is set to true, the map function has 3 arguments and the last argument is the carryover object, and the map function output is a tuple whose last element is the carryover object. In the first iteration, the carryover object is the NULL object.

Now let's use the example of distributed median calculation to illustrate the function imr. Assume the data is scattered on multiple nodes and we would like to calculate the median of a variable across all the nodes. First, for each data source, put the data into buckets and use the map function to count the number of data points in each bucket. Then use the reduce function to merge the bucket counts from multiple data sources. Locate the bucket that contains the median. In the next iteration, the chosen bucket is divided into smaller buckets. The iterations will finish when the size of the chosen bucket is no more than the specified number.

def medMap(data, range, colName){
return bucketCount(data[colName], double(range), 1024, true)
}

def medFinal(range, result){
    x= result.cumsum()
    index = x.asof(x[1025]/2.0)
    ranges = range[1] - range[0]
    if(index == -1)
        return (range[0] - ranges*32):range[1]
    else if(index == 1024)
        return range[0]:(range[1] + ranges*32)
    else{
        interval = ranges / 1024.0
        startValue = range[0] + (index - 1) * interval
        return startValue : (startValue + interval)
    }
}

def medEx(ds, colName, range, precision){
    termFunc = def(prev, cur): cur[1] - cur[0] <= precision
    return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg()
}

The pipeline function

The pipeline function optimizes tasks that meet the following conditions through multithreading: (1) Can be decomposed into multiple sub-tasks.

(2) Each subtask contains multiple steps.

(3) The k-th step of the i-th subtask can only be executed after the (k-1)-th step of the i-th subtask and the k-th step of the (i-1)-th subtask are completed.

In the following example, we need to convert the partitioned table stockData into a csv file. This table contains data from 2008 to 2018 and exceeds the available memory of the system, so we cannot load the entire table into memory and then converted it into a csv file. The task can be divided into multiple sub-tasks, each of which consists of two steps: load one month of data into memory, and then store the data in the csv file. To store the data of a month in the csv file, it must be ensured that the data of the month has been loaded into the memory, and the that data of the previous month has been stored in the csv file.

v = 2000.01M..2018.12M
def queryData(m){
    return select * from loadTable("dfs://stockDB", "stockData") where TradingTime between datetime(date(m)) : datetime(date(m+1))
}
def saveData(tb){
    tb.saveText("/hdd/hdd0/data/stockData.csv",',', true)
}
pipeline(each(partial{queryData}, v),saveData);