pnodeRun

Syntax

pnodeRun(function,[nodes],[addNodeAlias=true])

Arguments

function is the local function to call. It must not be quoted. It can be a function with no parameters by definition, or a partial application that wraps the orginal function and its parameters to a function with no parameters. It can be a built-in function or a user-defined function.

nodes (optional) is a STRING scalar or vector indicating node alias(es). If not specified, the system will call the function on all active data nodes and compute nodes in the cluster.

addNodeAlias (optional) is a Boolean value specifying whether to add node aliases in results. The default value is true. Set to false if node results already contain aliases.

Details

Call a local function on specified nodes in a cluster in parallel and then merge the results.

  • If nodes is specified, the function is called on specified nodes.
  • If nodes is not specified,
    • When pnodeRun is called on a compute node within a compute group, function is executed on all compute nodes within the group.
    • Otherwise it is executed on all data nodes and and compute nodes which are not included in any compute groups.

Examples

Ex. 1 Call function getChunksMeta without specifying parameters

pnodeRun(getChunksMeta,,false);
site chunkId path dfsPath type flag size version state versionList
local8848 bd13090e-7177-01a7-4ac4-840e1b977dcf D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190605/GOOG /compo/20190605/GOOG 1 0 0 1 0 cid : 40,pt2=>40:6729; #
local8848 b4935730-6372-b2a1-4f24-6c323037e576 e:data/CHUNKS/compo/20190605/AAPL /compo/20190605/AAPL 1 0 0 1 0 cid : 40,pt2=>40:6613; #
local8848 f8ee72c9-dad3-f49e-430e-5ddb3c61ae18 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190604/MSFT /compo/20190604/MSFT 1 0 0 1 0 cid : 40,pt2=>40:6664; #
local8848 08e26b5a-dfac-799f-4979-0dd3902eae6e D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190604/GOOG /compo/20190604/GOOG 1 0 0 1 0 cid : 40,pt2=>40:6635; #
local8848 f9e53a3d-af3e-018d-4bfa-a2b4980f3561 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190604/AAPL /compo/20190604/AAPL 1 0 0 1 0 cid : 40,pt2=>40:6783; #
local8848 417e49e9-5c61-cf9e-4b21-4b35f8e57273 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190601/MSFT /compo/20190601/MSFT 1 0 0 1 0 cid : 40,pt2=>40:6602; #
local8848 3ee64942-1d72-bea7-4bc1-f720132d9288 D:130DolphinDB_Win64_Vserverlocal8848storage/CHUNKS/compo/20190602/AAPL /compo/20190602/AAPL 1 0 0 1 0 cid : 40,pt2=>40:6749; #

Ex. 2 In the following example, the function sum and arguments 1..10 are wrapped into a partial application sum{1..10}.

pnodeRun(sum{1..10}, `nodeA`nodeB);
Node Value
DFS_NODE2 55
DFS_NODE3 55

Ex. 3 pnodeRun is a very convenient tool for cluster management. For example, in a cluster of 4 nodes: "DFS_NODE1", "DFS_NODE2", "DFS_NODE3", and "DFS_NODE4", run the following script on each of the node:

def jobDemo(n){
    s = 0
    for (x in 1 : n) {
        s += sum(sin rand(1.0, 100000000)-0.5)
        print("iteration " + x + " " + s)
    }
    return s
};

submitJob("jobDemo1","job demo", jobDemo, 10);
submitJob("jobDemo2","job demo", jobDemo, 10);
submitJob("jobDemo3","job demo", jobDemo, 10);

To check the status of the most recent 2 completed batch jobs on each of the 4 nodes in the cluster:

pnodeRun(getRecentJobs{2});
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE1 root jobDemo2 job demo 2017.11.16T13:04:38.841 2017.11.16T13:04:38.841 2017.11.16T13:04:51.660
DFS_NODE1 root jobDemo3 job demo 2017.11.16T13:04:38.841 2017.11.16T13:04:38.843 2017.11.16T13:04:51.447
DFS_NODE2 root jobDemo2 job demo 2017.11.16T13:04:56.431 2017.11.16T13:04:56.432 2017.11.16T13:05:11.992
DFS_NODE2 root jobDemo3 job demo 2017.11.16T13:04:56.432 2017.11.16T13:04:56.434 2017.11.16T13:05:11.670
DFS_NODE3 root jobDemo2 job demo 2017.11.16T13:05:08.418 2017.11.16T13:05:08.419 2017.11.16T13:05:29.176
DFS_NODE3 root jobDemo3 job demo 2017.11.16T13:05:08.419 2017.11.16T13:05:08.421 2017.11.16T13:05:29.435
DFS_NODE4 root jobDemo2 job demo 2017.11.16T13:05:16.324 2017.11.16T13:05:16.325 2017.11.16T13:05:34.729
DFS_NODE4 root jobDemo3 job demo 2017.11.16T13:05:16.325 2017.11.16T13:05:16.328 2017.11.16T13:05:34.716
pnodeRun(getRecentJobs{2}, `DFS_NODE3`DFS_NODE4);
Node UserID JobID JobDesc ReceivedTime StartTime EndTime ErrorMsg
DFS_NODE3 root jobDemo2 job demo 2017.11.16T13:05:08.418 2017.11.16T13:05:08.419 2017.11.16T13:05:29.176
DFS_NODE3 root jobDemo3 job demo 2017.11.16T13:05:08.419 2017.11.16T13:05:08.421 2017.11.16T13:05:29.435
DFS_NODE4 root jobDemo2 job demo 2017.11.16T13:05:16.324 2017.11.16T13:05:16.325 2017.11.16T13:05:34.729
DFS_NODE4 root jobDemo3 job demo 2017.11.16T13:05:16.325 2017.11.16T13:05:16.328 2017.11.16T13:05:34.716
How does pnodeRun merge the results from multiple nodes:
  1. If function returns a scalar:

    Return a table with 2 columns: node alias and function results.

    Continuing with the example above:
    pnodeRun(getJobReturn{`jobDemo1});
    Node Value
    DFS_NODE3 2,123.5508
    DFS_NODE2 (42,883.5404)
    DFS_NODE1 3,337.4107
    DFS_NODE4 (2,267.3681)
  2. If function returns a vector:

    Return a matrix. Each column of the matrix would be the function returns from nodes. The column label of the matrix would be the nodes.

  3. If function returns a key-value dictionary:

    Return a table with each row representing the function return from one node.

  4. If function returns a table:

    Return a table which is the union of individual tables from multiple nodes.

  5. If function is a command (a command returns nothing):

    Return nothing

  6. For all other cases:

    Return a dictionary. The key is node alias and the value is the function return.