作业管理

作业(job)是DolphinDB 中最基本的执行单位,可以简单理解为一段DolphinDB脚本代码在系统中的一次执行。作业根据阻塞与否可分为同步作业和异步作业。

查询同步作业

同步作业创建后,可以使用getConsoleJobs查看作业信息。示例如下:

>getConsoleJobs();

node	userID	rootJobId       jobType	desc	priority    parallelism receiveTime                 sessionId
------  -----   --------------  ------  ------- -------     ----------- ----------------------      -------------
NODE_1	admin	b4ff7490-9a...	unknown	unknown	0	    1           2020.03.07T20:42:04.903     323,818,682
NODE_1	guest	12c0be95-e1...	unknown	unknown	4	    2           2020.03.07T20:42:06.319     1,076,727,822

上述结果中node是作业所在节点名,userID是作业创建者,用户可以根据userID找到自己创建的作业,rootJobId是作业编号,priority是优先级,parallelism是并行度,receiveTime是系统接收到作业的时间,sessionId是会话ID。这里需要注意的是:

  • 同步作业会独占前台界面,需要另起一个会话(例如开启另一个GUI)来连接当前节点并运行getConsoleJobs。
  • 可以用rpc和pnodeRun查看某个节点或全部节点的同步作业。查看全部数据节点/计算节点上同步作业信息的脚本如下所示:
  pnodeRun(getConsoleJobs)

另一种方法是在web集群管理器上查看作业。从1.30.16版本起,DolphinDB在web集群管理界面上增加了作业管理功能,支持查看和取消作业。如下图所示,点击左侧边栏中的作业管理后,右边界面会显示运行中的同步作业、已提交的批处理和定时作业。点击同步作业的 +,可显示同步作业的详细信息,点击停止,可取消作业。


取消同步作业

使用cancelConsoleJob函数取消同步作业。在DolphinDB系统中,每个作业都有一个唯一的编号,即rootJobId,当作业有很多子任务时,每个子任务的rootJobId跟父作业的rootJobId一致。在DolphinDB系统中取消作业的操作是根据rootJobId进行的。作业编号需用上节所述的getConsoleJobs函数获取。例如下面脚本定义了一个取消节点上所有同步作业的函数,并用pnodeRun提交到所有数据节点、计算节点执行:

def cancelAllConsoleJob(){
    cancelConsoleJob(getConsoleJobs()[`rootJobid])
}
pnodeRun(cancelAllConsoleJob)

这里需要注意的是:

  • 首先,DolphinDB的作业是基于线程的,线程不能被直接取消(kill),所以DolphinDB通过设置Cancel标志的方法来实现。系统在收到取消作业的命令后,设置一个Cancel标志,然后在执行作业的某些阶段,例如开始某个子任务、开始每一轮循环前,去检测是否设置了Cancel标志。若设置了Cancel标志,则取消作业。因此取消作业不是马上就可生效,可能会有一点延迟。

  • 其次,实践中可能会遇到几个问题:

    • 问题1:一个节点的连接数已经用完,无法发送取消作业的命令。若DolphinDB节点是前端交互模式启动,可以在DolphinDB console中执行。若部署模式是集群模式,任意两个节点之间如果已经建立了一些连接,这些连接一般是不会释放的,所以可以尝试从其他节点删除指定的作业。DolphinDB是一个分布式系统,无论从哪个节点删除一个作业,该节点都会将任务广播到其它节点。
    • 问题2:节点的作业队列中有大量的作业在排队,cancelConsoleJob或getConsoleJobs无法立即被执行。解决此问题的方法,一是用GUI中的取消作业按钮,这个按钮发送任务时会以urgent(紧急)方式发送,也即最高优先级处理;二是从其他节点删除指定的作业,取消子任务的任务发送时也以最高优先级处理。

查询批处理作业

节点上所有批处理作业的信息,可以用getRecentJobs查看。示例如下:

>getRecentJobs(4);
node	userID	jobId   jobDesc	priority    parallelism	receivedTime	        startTime               endTime                 errorMsg
----    ------  -----   ------- --------    ----------- ------------            ---------               -------                 --------
NODE_0	admin	write   write	4	    2	        2020.03.08T16:09:16.795			
NODE_0	admin	write   check	4	    2	        2020.01.08T16:09:16.795	2020.01.08T16:09:16.797		
NODE_0	guest	query   query	0	    1	        2020.01.10T21:44:16.122	2020.01.10T21:44:16.123	2020.01.10T21:44:16.123	Not granted to read table dfs://FuturesContract/
NODE_0	admin	test    foo     8	    64	        2020.02.25T01:30:23.458	2020.02.25T01:30:23.460	2020.02.25T01:30:23.460	

在作业状态信息列表中,若startTime为空,如上述第一个作业,表示作业还在排队等待执行。若EndTime为空,如上述第二个作业,这意味着作业还在执行中。若errorMsg非空,如上述第三个作业,表示作业有错误。

getRecentJobs函数可查看多个作业状态信息,若只需查一个特定的作业状态信息,可用getJobStatus函数。若需查询集群内其他节点的批处理作业信息,可调用用rpc或pnodeRun。查看全部数据节点/计算节点上批处理作业信息的脚本如下所示:

  pnodeRun(getRecentJobs)

DolphinDB系统把批处理作业的输出结果保存到磁盘文件。文件路径由配置参数 batchJobDir 指定,默认路径是 /batchJobs 。每个批处理作业产生两个文件夹:<job_id>.msg 和 <Job_id>.obj ,分别存储中间消息和返回对象。另外,当系统接收、开始和完成批处理作业时,每个批处理作业会向 /batchJob.log 添加三条信息。DolphinDB提供了以下2个函数查看相关信息:

  • getJobMessage: 取得批处理作业返回的中间消息。
  • getJobReturn: 取得批处理作业返回的对象。

取消批处理作业

对已经提交但尚未完成的批处理作业,可使用cancelJob命令取消。与同步作业一样,系统通过设置Cancel标志的方法来取消作业,因此取消作业不是立即就能生效。若需取消其他节点上的批处理作业,可调用rpc或pnodeRun函数,例如下面的脚本可取消所有数据节点/计算节点上未完成的批处理作业:

def cancelAllBatchJob(){
	jobids=exec jobid from getRecentJobs() where endTime=NULL
	cancelJob(jobids)	
}
pnodeRun(cancelAllBatchJob)

分解子任务

在DolphinDB中,对于大型数据表,一般都需要进行分区处理。如果一个job里含有分区表的查询计算任务(如SQL查询),系统会将其分解成多个子任务并发送到不同的节点上并行执行,待子任务执行完毕之后,再合并结果,继续此job的执行。类似的,DolphinDB 中的分布式计算也会被分解成子任务,以子任务为单位进行调度。因此,job也可以理解成一系列的子任务。

DolphinDB首先是一个数据库,内置的计算引擎主要解决高并发、交互式的计算任务。因此DolphinDB的资源配置方式与Apache Spark 等低并发、批处理的计算引擎有所不同。Apache Spark 以独占方式按应用事先分配计算资源包括CPU核,内存等。DolphinDB则将作业分解成子任务,以子任务为单位进行调度。每一个子任务并不独占CPU核和内存。而是将计算资源放入一个共享的资源池,根据每个计算任务的优先级和并行度来调度子任务。

根据数据源的不同,DolphinDB用不同方法分解作业的子任务:

  • 对大部分的作业如SQL查询,DolphinDB以数据库分区为单位分解,每一个分区分配一个子任务,计算下推到存储引擎执行,计算和存储耦合。这种计算任务的分解是由数据库的分区模式决定的,因此创建数据库时如何合理的分区非常重要。
  • 对另一种涉及到数据清洗的作业,譬如用repartitionDS产生数据源,然后使用mr(map reduce)或imr(iterative map reduce,通常用于机器学习)函数来计算的作业。这类作业计算和数据分离,repartitionDS产生的每一个数据源就是一个子任务。

作业调度

作业优先级

在DolphinDB中,作业是按照优先级进行调度的,优先级的取值范围为0-9,取值越高则优先级越高。对于优先级高的作业,系统会优先给与计算资源。基于作业的优先级,DolphinDB设计了多级反馈队列来调度作业的执行。具体来说,系统维护了10个队列,分别对应10个优先级。系统总是分配线程资源给高优先级的作业,对于处于相同优先级的作业,系统会以round-robin的方式分配线程资源给作业;当一个优先级队列为空的时候,才会处理低优先级的队列中的作业。

DolphinDB作业的优先级可以通过以下方式来设置:

  • 对于同步作业,其优先级取值为min(4,通过setMaxJobPriority函数设定的该用户最高优先级)。
  • 对于通过submitJob提交的批处理作业,系统会给与默认优先级4。用户也可以使用submitJobEx函数来指定优先级。
  • 定时任务的优先级设为4,无法改变。

作业并行度

并行度表示在一个数据节点上,最多同时可以用多少个线程来执行该作业产生的子任务。并行度默认取值为2,只有使用submitJobEx函数提交的批处理作业才可通过其参数parallelism配置并行度。

并行度可以认为是一种时间片单位。举例来说,若一个作业的并行度为2,且该作业产生了100个并行子任务,那么系统只会分配2个线程用于子任务的计算,因此需要50轮调度才能完成整个作业的执行。为了充分利用资源,系统会进行优化。若当前的本地执行线程有16个,只有一个作业在运行,即使并行度是2,系统也会分给其16个线程。

调度策略

当有多个作业时,系统按照作业的优先级来调度子任务,优先级高并行度高的作业会分到更多的计算资源。为了防止处于低优先级的作业长时间等待,DolphinDB会适当降低作业的优先级。具体的做法是,当一个作业的时间片被执行完毕后,如果存在比其低优先级的作业,那么将会自动降低一级优先级。当优先级到达最低点后,又回到初始的优先级。因此低优先级的任务迟早会被调度到。

下面以A,B两个作业举例说明。

A作业优先级6,并行度4,子任务个数16。B作业优先级4,并行度2,子任务个数2。资源池中的线程个数为4。第一轮调度,优先级最高的是6,计算资源全部分给A。执行完毕后,A的优先级降到5,A的子任务还剩12个。第二轮调度,优先级最高是5,计算资源全部分给A。执行完毕后,A的优先级降到4,A的子任务还剩8个。第三轮调度,优先级最高是4,A和B各有2个子任务被执行。执行完毕后,B已经完成,A优先级仍然为4,A的子任务还剩6个。虽然A的优先级和并行度更高,但是因为子任务太多,最终B先完成任务。

计算容错

DolphinDB的分布式计算的容错性得益于分区副本冗余存储。当一个子任务被发送到一个分区副本节点上之后,若节点出现故障或者分区副本发生了数据校验错误(副本损坏),job scheduler(即某个数据节点的一个worker线程)将会发现这个故障,并且选择该分区的另一个副本节点,重新执行子任务。

可以通过配置参数dfsReplicationFactor来调整这种冗余度。

数据和计算资源的均衡

要提高计算效率,最关键的是数据和计算资源的均衡。具体包括:

  • 如果包含多个数据节点,每个节点的计算资源尽量均衡;
  • 数据库表的各个分区大小保持均衡;
  • 如果是多数据节点,每个节点上的分区数量尽量保持均衡。

当计算引擎出现性能问题,或者某些作业耗时太久,可以从下面这些途径排查问题:

  • 首先,查看所有节点的资源使用情况,譬如每个节点平均负载,CPU使用率,网络读写性能,磁盘读写性能,内存使用情况,作业和子任务队列深度等等。这些指标可以从Web界面读取。如果与Prometheus集成,也可以从Prometheus观察。或者通过内置函数:

    • getPerf,用于查看本地节点上的多个性能监控度量值。
    • getClusterPerf,用于在控制节点上运行后查看每个节点的多个配置和性能监控度量值。
    • getJobStat,用于监控正在执行或者队列中的作业和任务的数量。
    • getDiskIOStat,用于查看每个IO队列的深度和总体数量。

    等来观察这些指标。

    高阶函数pnodeRun可以帮助在所有数据节点上运行这些函数。

  • 其次,可以分析作业涉及的数据的分布,包括分区的数量,分区在各个节点上的分布。函数getAllChunks,getClusterChunksStatus,getChunksMeta和getTabletsMeta可以获取分区的分布信息。