Distributed Computing
This section describes parallel function calling, remote function calling, parallel remote calling, and pipeline function.
Parallel Function Call
DolphinDB divides a large task into multiple subtasks for simultaneous execution.
Parallel function calls usually utilize one of the two higher order functions: peach or ploop. peach
and ploop
are the parallel computing versions of each and loop, respectively. For the
difference between each and loop, please refer to loop.
There are 3 scenarios of parallel function calls.
(1) same function with different parameters
peach(log, (1..3, 4..6));
#0 | #1 |
---|---|
0 | 1.386294 |
0.693147 | 1.609438 |
1.098612 | 1.791759 |
ploop(log, (1..3, 4..6));
([0,0.693147,1.098612],[1.386294,1.609438,1.791759])
(2) different functions with same parameters
peach(call{, 3 4 5}, (log, sum));
log | sum |
---|---|
1.098612 | 12 |
1.386294 | 12 |
1.609438 | 12 |
ploop(call{, 3 4 5}, (log, sum));
([1.098612,1.386294,1.609438],12)
Note that in the examples above, we cannot write peach((log, sum), 3 4
5)
. This is because the first parameter of a template has to be a
function name, not function name array. To call multiple functions in
peach
or ploop
, we need to use the template
call.
(3) different functions with different parameters
x=[log, exp];
y=[1 2 3, 4 5 6];
peach(call, x, y);
#0 | #1 |
---|---|
0 | 54.59815 |
0.693147 | 148.413159 |
1.098612 | 403.428793 |
ploop(call, x, y);
([0,0.693147,1.098612],[54.59815,148.413159,403.428793])
How Parallel Computing Works in DolphinDB
DolphinDB supports parallel computing through multi-threading. Suppose there are n tasks and m local executors. (For local executors, please refer to the concept of distributed computing). The calling thread (worker) generates n subtasks and pushes n*m/(1 + m) subtasks to the local executor task queue. The remaining n/(1 + m) subtasks will be executed by the calling thread. After all n subtasks are executed, the calling thread combines the individual results to produce the final result.
To use parallel function calls, we need to make sure the number of local executors set in the configuration file is a positive integer.
If parallel function calls are initiated within subtasks, the system throws an exception and allocates new subtasks to local executors, as it may cause deadlock. However, all of the local executors have been assigned subtasks when parallel function calls are initiated for the first time (when n>1 + m). Since a local executor can only process one task at a time, it may fail to execute subtasks due to its self-contradictory work flow priorities.
Some built-in functions enable parallel function calls if the number of local executors set in the configuration file is a positive integer, such as peach, ploop, pnodeRun, ploadText and loadText.
Remote Function Call
In DolphinDB, we can send a local function that calls other local functions (all of which can be either built-in functions or user-defined functions) to run on a remote node on the fly without compilation or deployment. The system automatically serializes the function definition and the definitions of all dependent functions together with necessary local data to remote nodes.
We must open a connection before a remote call. To open a connection, we can run
conn=xdb(host, port)
where host is the host name (IP
address or website) of the remote node and port is the port number of the
remote node.
There are 3 ways to close a connection:
(1) call the close command.
(2) conn/=NULL.
(3) The connection will be closed automatically when the current session closes.
You can use remoteRun , remoteRunWithCompression and rpc for remote calls. They are different in the following aspects:
-
rpc
uses existing asynchronous connections among data nodes in the cluster;remoteRun
uses explicitly created connections by xdb function. -
The calling node and the remote node of function
rpc
must be located in the same cluster; there is no such limitation forremoteRun
.
There are 3 ways to make a remote call:
(1) Execute script on a remote node.
Syntax: remoteRun(conn, script) or conn(script) where script must be double quoted (a string).
conn = xdb("localhost",81);
remoteRun(conn, "x=rand(1.0,10000000); y=x pow 2; avg y");
// output
0.333254
(2) Execute a remote function on a remote node. The function is defined on the remote node, while the parameters are located on the local node.
Syntax: remoteRun(conn, "functionName", param1, param2, ...)
or
conn("function name", param1, param2, ...)
functionName must be quoted. The function could be either built-in or user-defined.
conn = xdb("localhost",81);
remoteRun(conn, "avg", rand(1.0,10000000) pow 2);
// output
0.333446
(3) Execute a local function on a remote node. The function is defined in the local node. It could be a built-in function or user-defined function, a named function or anonymous function. The parameters of the function are also located on the local node.
This is the most powerful feature of remote call in DolphinDB. We can send a local function that calls other local functions (all of them can be either built-in functions or user-defined functions) to run on a remote node on the fly without compilation or deployment. The system will automatically serialize the function definition and the definitions of all dependent functions together with necessary local data to remote nodes. Some other systems can only remotely call functions without any user-defined dependent functions.
Syntax of remoteRun
: remoteRun(conn, functionName, param1,
param2, ...)
or conn(functionName, param1, param2,
...)
functionName must not be quoted. param1, param2, ... are function arguments. The function could be either a built-in function or a user-defined function on the calling node.
Syntax of rpc
: rpc(nodeAlias, function, param1, param2,
...)
function must not be quoted. param1, param2, ... are
function arguments. The function could be either a built-in function or a
user-defined function on the calling node. Both the calling node and the remote
node must be located in the same cluster. Otherwise, we need to use
remoteRun
function.
-
Example 1: Remotely call a user-defined function with a local dataset.
Assume there is a table EarningsDates with 2 columns on the local node: stock ticker and date. For each of the 3 stocks in the table, we have the date when it announced earnings for the 3rd quarter of 2006. There is a table USPrices on the remote node with machine name "localhost" and port number 8081. It contains daily stock prices for all US stocks. We would like to get the stock prices from the remote node for all stocks in EarningsDates for the week after they announced earnings.
On the remote node, we import the data file to create the table USPrices, and then share it across all nodes as sharedUSPrices.
USPrices = loadText("c:/DolphinDB/Data/USPrices.csv"); share USPrices as sharedUSPrices;
When we create a connection to a remote node, the remote node creates a new session for this connection. This new session is completely isolated from other sessions on the remote node. This is convenient for development as developers don't have to worry about name conflicts. In this case, however, we would like to share data among multiple sessions on the same node. We can use the statement share to share objects. Currently only tables can be shared in DolphinDB.
We create a table EarningsDates on the local node, and send the table along with the script to a remote node. After the execution, the result is sent back to the local node.
EarningsDates=table(`XOM`AAPL`IBM as TICKER, 2006.10.26 2006.10.19 2006.10.17 as date) def loadDailyPrice(data){ dateDict = dict(data.TICKER, data.date) return select date, TICKER, PRC from objByName("sharedUSPrices") where dateDict[TICKER]<date<=dateDict[TICKER]+7 } conn = xdb("localhost",8081) prices = conn(loadDailyPrice, EarningsDates); prices;
date TICKER PRC 2006.10.27 XOM 71.46 2006.10.30 XOM 70.84 2006.10.31 XOM 71.42 2006.11.01 XOM 71.06 2006.11.02 XOM 71.19 2006.10.18 IBM 89.82 2006.10.19 IBM 89.86 2006.10.20 IBM 90.48 2006.10.23 IBM 91.56 2006.10.24 IBM 91.49 2006.10.20 AAPL 79.95 2006.10.23 AAPL 81.46 2006.10.24 AAPL 81.05 2006.10.25 AAPL 81.68 2006.10.26 AAPL 82.19 -
Example 2: Remotely call a built-in function that quotes a user-defined function.
def jobDemo(n){ s = 0 for (x in 1 : n) { s += sum(sin rand(1.0, 100000000)-0.5) print("iteration " + x + " " + s) } return s };
Remote call with function
remoteRun
:conn = xdb("DFS_NODE2") conn.remoteRun(submitJob, "jobDemo", "job demo", jobDemo, 10); // output Output: jobDemo4 conn.remoteRun(getJobReturn, "jobDemo") // output Output: 4238.832005
Remote call with function
rpc
:def testRemoteCall() { h=xdb("localhost", 8080) return h.remoteRun("1+2") } h = xdb("localhost", 8081) h.remoteRun(testRemoteCall)
Note:Please avoid cyclical calls with
remoteRun
as it may cause deadlocks. For example, if we run the above script on localhost:8080, The node 8080 sends the locally defined functiontestRemoteCall
to the node 8081, which will send the script "1+2" back to execute on the node 8080. When a node receives a job, it will assign a worker thread to execute the job. The remote call from 8080 to 8081 and "1+2" are both executed on the node 8080 and may be assigned the same worker. If these 2 jobs share the same worker, a deadlock occurs.
Parallel Remote Call
If a remote call is in blocking mode, the result will not be returned until the
remote node completes the function call. Parallel remote call with
remoteRun
needs to be used with ploop or peach. In the following
example, template each
executes the user-defined function
experimentPi
on node 8081, and then on node 8082;
while template peach
executes the function
experimentPi
on node 8081 and node 8082
simultaneously. We can see peach
saves a significant amount of time
compared with each
.
def simuPi(n){
x=rand(1.0, n)
y=rand(1.0, n)
return 4.0 * sum(x*x + y*y<=1.0) / n
}
def experimentPi(repeat, n): avg each(simuPi, take(n, repeat));
// create 2 connections
conns = each(xdb, "localhost", 8081 8082);
conns;
("Conn[localhost:8081:1166953221]","Conn[localhost:8082:1166953221]")
timer result = each(remoteRun{, experimentPi, 10, 10000000}, conns);
// output
Time elapsed: 6579.82 ms
timer result = peach(remoteRun{, experimentPi, 10, 10000000}, conns);
// output
Time elapsed: 4122.29 ms
// parallel computing saves running time
print avg(result)
// output
3.141691
// close two connections
each(close, conns);
To use remoteRun
in parallel remote call, we need to establish
connections to each remote node with function xdb
. To remotely call
the nodes within the same cluster as the calling node, we can use pnodeRun.
Syntax: pnodeRun(function, [nodes], [addNodeToResult])
function: the local function to call. It must not be quoted. It must have no parameters. It can be a function with no parameters by definition, or a partial application that wraps the orginal function and its parameters to a function with no parameters. It can be a built-in function or a user-defined function.
nodes: aliases of nodes. It is an optional parameter. If it is not specified, the system will call the function on all live data nodes in the cluster.
addNodeToResult: whether to add aliases of nodes to results. It is an optional parameter. The default value is true. If the returned result from each node already contains the node alias, we can set it to false.
pnodeRun
calls a local function on multiple remote nodes in parallel
and then merges the results. Both the calling node and the remote node must be
located in the same cluster.
In the following example, we wrap the function sum
and arguments
1..10 to a partial application sum{1..10}.
pnodeRun(sum{1..10}, `nodeA`nodeB);
Output:
Node Value
DFS_NODE2 55
DFS_NODE3 55
pnodeRun
is very convenient for cluster management. For example, in
a cluster with 4 nodes: "DFS_NODE1", "DFS_NODE2", "DFS_NODE3", and "DFS_NODE4", run
the following script on each node:
def jobDemo(n){
s = 0
for (x in 1 : n) {
s += sum(sin rand(1.0, 100000000)-0.5)
print("iteration " + x + " " + s)
}
return s
};
submitJob("jobDemo1","job demo", jobDemo, 10);
submitJob("jobDemo2","job demo", jobDemo, 10);
submitJob("jobDemo3","job demo", jobDemo, 10);
Check the status of the 2 most recently completed batch jobs on each node in the cluster:
pnodeRun(getRecentJobs{2});
Node | UserID | JobID | JobDesc | ReceivedTime | StartTime | EndTime | ErrorMsg |
---|---|---|---|---|---|---|---|
DFS_NODE4 | root | jobDemo2 | job demo | 2017.11.21T15:40:22.026 | 2017.11.21T15:40:22.027 | 2017.11.21T15:40:43.103 | |
DFS_NODE4 | root | jobDemo3 | job demo | 2017.11.21T15:40:22.027 | 2017.11.21T15:40:22.037 | 2017.11.21T15:40:43.115 | |
DFS_NODE1 | root | jobDemo2 | job demo | 2017.11.21T15:39:48.087 | 2017.11.21T15:39:48.088 | 2017.11.21T15:40:03.714 | |
DFS_NODE1 | root | jobDemo3 | job demo | 2017.11.21T15:39:48.088 | 2017.11.21T15:39:48.089 | 2017.11.21T15:40:03.767 | |
DFS_NODE2 | root | jobDemo2 | job demo | 2017.11.21T15:39:58.788 | 2017.11.21T15:39:58.788 | 2017.11.21T15:40:14.114 | |
DFS_NODE2 | root | jobDemo3 | job demo | 2017.11.21T15:39:58.788 | 2017.11.21T15:39:58.791 | 2017.11.21T15:40:14.178 | |
DFS_NODE3 | root | jobDemo2 | job demo | 2017.11.21T15:40:16.945 | 2017.11.21T15:40:16.945 | 2017.11.21T15:40:38.466 | |
DFS_NODE3 | root | jobDemo3 | job demo | 2017.11.21T15:40:16.945 | 2017.11.21T15:40:16.947 | 2017.11.21T15:40:38.789 |
pnodeRun(getRecentJobs{2}, `DFS_NODE3`DFS_NODE4);
Node | UserID | JobID | JobDesc | ReceivedTime | StartTime | EndTime | ErrorMsg |
---|---|---|---|---|---|---|---|
DFS_NODE3 | root | jobDemo2 | job demo | 2017.11.21T15:40:16.945 | 2017.11.21T15:40:16.945 | 2017.11.21T15:40:38.466 | |
DFS_NODE3 | root | jobDemo3 | job demo | 2017.11.21T15:40:16.945 | 2017.11.21T15:40:16.947 | 2017.11.21T15:40:38.789 | |
DFS_NODE4 | root | jobDemo2 | job demo | 2017.11.21T15:40:22.026 | 2017.11.21T15:40:22.027 | 2017.11.21T15:40:43.103 | |
DFS_NODE4 | root | jobDemo3 | job demo | 2017.11.21T15:40:22.027 | 2017.11.21T15:40:22.037 | 2017.11.21T15:40:43.115 |
pnodeRun
follows the following rules to merge the results from
multiple nodes:
(1) If the function returns a scalar:
Return a table with 2 columns: node alias and function results.
Continuing with the example above:
pnodeRun(getJobReturn{`jobDemo1})
// output
Output:
Node Value
DFS_NODE3 2,123.5508
DFS_NODE2 (42,883.5404)
DFS_NODE1 3,337.4107
DFS_NODE4 (2,267.3681)
(2) If the function returns a vector:
Return a matrix. Each column of the matrix is the function result from the node. The column labels of the matrix are the nodes.
(3) If the function returns a key-value dictionary:
Return a table with each row representing the function result from one node.
(4) If the function returns a table:
Return a table which combines individual tables from multiple nodes.
Please see the aforementioned example of pnodeRun(getRecentJobs{2}).
(5) If the function is a command (that returns nothing):
Return nothing
(6) For all other cases:
Return a dictionary. The key of the dictionary is node alias and the value is function result.
Data Source
Data source is a special type of data object that contains the following information about a data entity:
-
Meta descriptions of a data entity. By executing a data source, we can obtain a materialized data entity such as table, matrix, vector, etc. In DolphinDB's distributed computing framework, lightweight data source objects are passed to remote sites for computing, instead of large data entities, dramatically reducing the network traffic.
-
Execution locations. A data source could have 0, 1 or multiple locations. A data source with 0 location is a local data source. In case of multiple locations, these locations back up each other. The system randomly picks a location for distributed computing. When the data source is instructed to cache the materialized data object, the system picks the location where data were successfully retrieved last time.
-
Attributes to instruct the system to cache data or clear cache. For iterative computing algorithms (e.g. machine learning algorithms), data caching could significantly boost computing performance. Cached data will be cleared when the system runs out of memory. If this happens, the system can recover the data since the data source contains all meta descriptions and data transforming functions.
-
A data source object may also contain multiple data transforming functions to further process the retrieved data. These functions are executed sequentially, with the output of one function as the input (and the only input) of the next function. It is generally more efficient for data transforming functions to be included in the data source instead of the core computing operation. While there is no performance difference if the retrieved data is needed only once, it makes a huge difference for iterative computing on data sources with cached data objects. If the transforming are operated in the core computing unit, each iteration needs to execute the transformation; if the transforming are operated in the data source, they are executed only once.
Related functions/commands:
The function sqlDS creates a list of
data sources according to the input SQL metacode. If the data tables in the SQL
query contains n partitions, sqlDS
generates n data
sources. If the SQL query contain no partitioned tables, sqlDS
returns a tuple containing one data source.
Syntax: sqlDS(metaCode)
The argument is SQL meta code. For more details about meta code, please refer to Metaprogramming .
db = database("dfs://DBSeq",RANGE,`A`F`Z);
USPrices = loadTextEx(db, "USPrices",`TICKER ,"/home/DolphinDB/Data/USPrices.csv");
ds = sqlDS(<select log(SHROUT*(BID+ASK)/2) as SBA from USPrices where VOL>0>);
typestr ds;
ANY VECTOR
size ds;
2
ds[0];
DataSource< select [15] log(SHROUT * (BID + ASK) / 2) as SBA from USPrices where VOL > 0 [partition = /DBSeq/A_F/40r] >
ds[1];
DataSource< select [15] log(SHROUT * (BID + ASK) / 2) as SBA from USPrices where VOL > 0 [partition = /DBSeq/F_Z/40r] >The function transDS! adds data transforming functions to a data source or a list of data sources.
The function transDS! adds data transforming functions to a data source or a list of data sources.
Syntax: transDS!(ds, func)
The function cacheDS! instructs the system to cache the data source. It returns true or false to indicate whether the operation is successful.
Syntax: cacheDS!(ds)
The function clearDSCache! instructs the system to clear the cache after the data source is executed the next time.
Syntax: clearDSCache!(ds)
The function cacheDSNow immediately executes and caches the data source and returns the total number of cached rows.
Syntax: cacheDSNow(ds)
The function clearDSCacheNow immediately clears the data source and cache.
Syntax: clearDSCacheNow(ds)
Map-Reduce
The Map-Reduce
function is the core function of DolphinDB's generic
distributed computing framework.
Syntax: mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true])
-
ds: the list of data sources. This required parameter must be a tuple and each element of the tuple is a data source object. Even if there is only one data source, we still need a tuple to wrap the data source.
-
mapFunc: the
map
function. It accepts one and only one argument, which is the materialized data entity from the corresponding data source. If we would like themap
function to accept more parameters in addition to the materialized data source, we can use the PartialApplication to convert a multiple-parameter function to a unary function. The number ofmap
function calls is the number of data sources. Themap
function returns a regular object (scalar, pair, array, matrix, table, set, or dictionary) or a tuple (containing multiple regular objects). -
reduceFunc: the binary
reduce
function that combines twomap
function call results. Thereduce
function in most cases is trivial. An example is theaddition
function. Thereduce
function is optional. If it is not specified, the system simply returns all individual map call results to thefinal
function. -
finalFunc: the
final
function accepts one and only one parameter. The output of the lastreduce
function call is the input of thefinal
function. Thefinal
function is optional. If it is not specified, the system returns the individualmap
function call results. -
parallel: an optional boolean flag indicating whether to execute the
map
function in parallel locally. The default value is true, i.e. enabling parallel computing. When there is very limited available memory and each map call needs a large amount of memory, we can disable parallel computing to prevent the out-of-memory problem. We may also disable the parallel option to ensure thread safety. For example, if multiple threads write to the same file simultaneously, errors may occur.
The following is an example of distributed linear regression. X is the matrix of independent variables and y is the dependent variable. X and y are stored in multiple data sources. To estimate the least square parameters, we need to calculate X T X and X T y. We can calculate the tuple of (X T X, X T y) from each data source, then add up the results from all data sources to get X T X and X T y for the entire dataset.
def myOLSMap(table, yColName, xColNames, intercept){
if(intercept)
x = matrix(take(1.0, table.rows()), table[xColNames])
else
x = matrix(table[xColNames])
xt = x.transpose()
return xt.dot(x), xt.dot(table[yColName])
}
def myOLSFinal(result){
xtx = result[0]
xty = result[1]
return xtx.inv().dot(xty)[0]
}
def myOLSEx(ds, yColName, xColNames, intercept){
return mr(ds, myOLSMap{, yColName, xColNames, intercept}, +, myOLSFinal)
}
In the example above, we define the map
function and
final
function. In practice, we may define transformation
functions for data sources as well. These functions only need to be defined in the
local instance, with no need for users to compile them or deploy them to the remote
instances before using. DolphinDB's distributed computing framework handles these
complicated issues for end users on the fly. It is extremely easy to develop
distributed analytical functions and applications in DolphinDB.
As a frequently used analytics tool, the distributed least square linear regression is already implemented in DolphinDB core library. The built-in version( olsEx) provides more features.
Iterative Computing
Iterative computing is a commonly used computing methodology. Many machine learning methods and statistical models use iterative algorithms to estimate model parameters.
DolphinDB offers function imr for iterative computing based on the map-reduce methodology. Each iteration uses the result from the previous iteration and the input dataset. The input dataset for each iteration stays unchanged so that it can be cached. Iterative computing requires initial values for the model parameters and a termination criterion.
Syntax: imr(ds, initValue, mapFunc, [reduceFunc], [finalFunc], terminateFunc, [carryover=false])
-
ds: the list of data sources. It must be a tuple with each element as a data source object. Even if there is only one data source, we still need a tuple to wrap the data source. In iterative computing, data sources are automatically cached and the cache will be cleared after the last iteration.
-
initValue: the initial values of model parameter estimates. The format of the initial values must be the same as the output of the final function.
-
mapFunc: the
map
function. It has two arguments. The first argument is the data entity represented by the corresponding data source. The second is the output of thefinal
function in the previous iteration, which is an updated estimate of the model parameter. For the first iteration, it is the initial values given by the user. -
reduceFunc: the binary
reduce
function combines twomap
function call results. If there are M map calls, thereduce
function will be called M-1 times. Thereduce
function in most cases is trivial. An example is theaddition
function. Thereduce
function is optional. -
finalFunc: the
final
function in each iteration. It accepts two arguments. The first argument is the output of thefinal
function in the previous iteration. For the first iteration, it is the initial values given by the user. The second argument is the output of thereduce
function call. If thereduce
function is not specified, a tuple representing the collection of individual map call results would be the second argument. -
terminateFunc: either a function that determines whether the computation continues, or a specified number of iterations. The termination function accepts two parameters. The first is the output of the
reduce
function in the previous iteration and the second is the output of thereduce
function in the current iteration. If the function returns a true value, the iterations will end. -
carryover: a Boolean value indicating whether a
map
function call produces a carryover object to be passed to the next iteration of the map function call. The default value is false. If it is set to true, themap
function has 3 arguments and the last argument is the carryover object, and themap
function output is a tuple whose last element is the carryover object. In the first iteration, the carryover object is the NULL object.
Now let's use the example of distributed median calculation to illustrate the
function imr
. Assume the data is scattered on multiple nodes and we
would like to calculate the median of a variable across all the nodes. First, for
each data source, put the data into buckets and use the map
function to count the number of data points in each bucket. Then use the
reduce
function to merge the bucket counts from multiple data
sources. Locate the bucket that contains the median. In the next iteration, the
chosen bucket is divided into smaller buckets. The iterations will finish when the
size of the chosen bucket is no more than the specified number.
def medMap(data, range, colName){
return bucketCount(data[colName], double(range), 1024, true)
}
def medFinal(range, result){
x= result.cumsum()
index = x.asof(x[1025]/2.0)
ranges = range[1] - range[0]
if(index == -1)
return (range[0] - ranges*32):range[1]
else if(index == 1024)
return range[0]:(range[1] + ranges*32)
else{
interval = ranges / 1024.0
startValue = range[0] + (index - 1) * interval
return startValue : (startValue + interval)
}
}
def medEx(ds, colName, range, precision){
termFunc = def(prev, cur): cur[1] - cur[0] <= precision
return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg()
}
The pipeline function
The pipeline function optimizes tasks that meet the following conditions through multithreading:
(1) Can be decomposed into multiple sub-tasks.
(2) Each subtask contains multiple steps.
(3) The k-th step of the i-th subtask can only be executed after the (k-1)-th step of the i-th subtask and the k-th step of the (i-1)-th subtask are completed.
In the following example, we need to convert the partitioned table stockData into a CSV file. This table contains data from 2008 to 2018 and exceeds the available memory of the system, so we cannot load the entire table into memory and then converted it into a CSV file. The task can be divided into multiple subtasks, each of which consists of two steps: load one month of data into memory, and then store the data in the CSV file. To store the data of a month in the CSV file, it must be ensured that the data of the month has been loaded into the memory, and the that data of the previous month has been stored in the CSV file.
v = 2000.01M..2018.12M
def queryData(m){
return select * from loadTable("dfs://stockDB", "stockData") where TradingTime between datetime(date(m)) : datetime(date(m+1))
}
def saveData(tb){
tb.saveText("/hdd/hdd0/data/stockData.csv",',', true)
}
pipeline(each(partial{queryData}, v),saveData);