批处理作业管理
针对某些特别耗时的任务,DolphinDB 支持批处理作业,在与常规交互作业独立的工作线程池中执行这些任务。批处理作业工作线程数的最大值是由配置参数 maxBatchJobWorker 设置的。如果批处理作业的数量超过了限制,新的批处理作业将会进入队列等待。批处理作业工作线程在闲置超过 60 秒后会自动销毁。
批处理作业的输出结果存于由配置参数 batchJobDir 指定的文件夹。如果没有指定,默认路径是 <HomeDir>/batchJobs。每个批处理作业产生两个文件夹:<job_id>.msg 和 < Job_id>.obj,分别存储中间消息和返回对象。另外,当系统接收、开始和完成批处理作业时,每个批处理作业会向 < BatchJobDir>/batchJob.log 添加三条信息。
相关函数
以上两个函数将批处理作业提交到本地节点,并且返回作业的 ID。其中,jobDesc 为可选参数,如果没有指定,函数名就是作业的描述。
-
getJobStatus(jobId):取得批处理作业的状态。
-
getRecentJobs([top]):取得本地节点上最近 N 个作业的状态。
-
getJobMessage(jobId):取得批处理作业返回的中间消息。
-
getJobReturn(jobId):取得批处理作业返回的对象。
-
cancelJob(jobId):取消已经提交但尚未完成的批处理作业。
例子
-
把作业提交到本地节点:
def job1(n){ s = 0 for (x in 1 : n) { s += sum(sin rand(1.0, 100000000)-0.5) print("iteration" + x + " " + s) } return s }; job1_ID=submitJob("job1_ID","", job1, 100); getJobStatus(job1_ID);
node userID jobId jobDesc receivedTime startTime endTime errorMsg local8848 root job1_ID job1 2018.06.16T10:44:34.066 2018.06.16T10:44:34.066 在作业状态中,EndTime 是空的。这意味着作业还在执行中。作业完成后,就能在状态中看到 EndTime。
getJobStatus(job1_ID);
node userID jobId jobDesc receivedTime startTime endTime errorMsg local8848 root job1_ID job1 2018.06.16T10:44:34.066 2018.06.16T10:44:34.066 2018.06.16T10:46:10.389 getJobMessage(job1_ID); 2018-06-16 10:44:34.066064 Start the job [job1_ID]: job1 2018-06-16 10:44:35.377095 iteration 1 1412.431451 2018-06-16 10:44:36.624907 iteration 2 2328.917258 2018-06-16 10:44:37.577107 iteration 3 5491.651822 2018-06-16 10:44:38.530187 iteration 4 6332.907608 2018-06-16 10:44:39.488295 iteration 5 8404.393113 ...... 2018-06-16 10:46:06.655519 iteration 95 -13124.624482 2018-06-16 10:46:07.562855 iteration 96 -14878.269863 2018-06-16 10:46:08.520555 iteration 97 -9842.451424 2018-06-16 10:46:09.456576 iteration 98 -8149.660675 2018-06-16 10:46:10.373218 iteration 99 -10639.329897 2018-06-16 10:46:10.389147 The job is done. getJobReturn(job1_ID); // output -4291.91147
-
把作业提交到远程节点:
使用 rpc 函数("DFS_NODE2" 与本地节点在同一集群):
def jobDemo(n){ s = 0 for (x in 1 : n) { s += sum(sin rand(1.0, 100000000)-0.5) print("iteration" + x + " " + s) } return s }; rpc("DFS_NODE2", submitJob, "job1_1", "", job1{10}); rpc("DFS_NODE2", getJobReturn, "jobDemo3") // output Output: -3426.577521
使用 remoteRun 函数:
conn = xdb("DFS_NODE2") conn.remoteRun(submitJob, "job1_2", "", job1, 10); conn.remoteRun(getJobReturn, "job1_2"); // output Output: 4238.832005