工作原理

在构造时,DDBDataLoader 接收用户提供的 SQL 查询语句,并将其拆分为多个子查询组。DDBDataLoader 使用后台线程从服务端获取数据,并将其处理为 PyTorch Tensor 格式的批量数据。

DDBDataLoader 提供了 groupColgroupScheme 参数,用于将单个查询 SQL 分成多组查询。其中,每一个组定义了一个时间序列,例如一支股票的交易数据。若不指定,则认为所有的数据定义了一个时间序列。例如,一种典型的情况是表里的数据包含了全部的股票,而我们在训练模型的时候只希望每支股票仅利用自己的历史数据来对未来进行预测。在这种情况下,我们需要将 groupCol 设置为股票标的列,将 groupScheme 设置为所有的股票标的,也即每一个组是一支股票的交易数据。换言之,假如原始的查询 SQL 为:

select * from loadTable(dbName, tableName)

假设 groupSchemestockID, groupScheme["apple", "google", "amazon"],则原始的查询会被拆分成以下三组查询:

select * from loadTable(dbName, tableName) where stockID = "apple"
select * from loadTable(dbName, tableName) where stockID = "google"
select * from loadTable(dbName, tableName) where stockID = "amazon"

模型训练的时候,每组训练数据则只会来源于以上三组中的某一组,而不会出现跨组的情况。

另一方面,为了避免加载大量数据至内存中,DDBDataLoader 采用分区粒度管理数据。也就是说,每组查询每次只加载一个分区到内存中。通常来说, DolphinDB 的一个分区的大小在 100MB 到 1GB 之间,因此这种设计可以很好地限制内存的使用量。另一方面,基于这种分区粒度的管理方式,使得 DDBDataLoader 实现的 shuffle 和传统的 dataloader 实现的 shuffle 会有一些区别。具体地说,传统的 shuffle 是将数据进行完全随机地打乱,但这样会引入大量的随机 IO,会使得效率偏低。而 DDBDataLoader 使用的 shuffle 方式,则是先随机地选取一个数据的分区并读取,随后在这个分区内部进行 shuffle。使用这种方式,可以最大化地减少随机的 IO,以提升整个训练过程的效率。

另一组要说明的参数是 repartitionColrepartitionScheme 。这组参数主要处理的是单个查询没办法直接拆成多个分区的情况。例如,一种常见的使用 DolphinDB 管理因子数据的方法是使用纵表来对因子数据进行管理(详见 中高频多因子库存储最佳实践)。使用这种方法,则在获取可用的训练数据之前,需要先对所有的数据做一个 pivot by 将数据从纵表转化为宽表。然而,用户的数据量可能非常大,例如几百 G 甚至更多,在这种情况下,服务器的内存完全可能不够完成对所有数据的 pivot by 。针对这类情况,DDBDataLoader 提供了 repartitionColrepartitionScheme 参数这组参数,这组参数的作用是可以对全表的数据做进一步的切分,将全表数据按照 repartitionColrepartitionScheme 拆成多个子表,然后对于每一个子表再做单独的 pivot by。换言之,假设原始的查询 SQL 为:

select val from loadTable(dbName, tableName) pivot by datetime, stockID, factorName

假设 repartitionColdate(datetime)repartitionScheme["2023.09.05", "2023.09.06", "2023.09.07"],则上述 的查询相当于会被拆成以下几个子查询:

select val from loadTable(dbName, tableName) pivot by datetime, stockID, factorName where date(datetime) = 2023.09.05
select val from loadTable(dbName, tableName) pivot by datetime, stockID, factorName where date(datetime) = 2023.09.06
select val from loadTable(dbName, tableName) pivot by datetime, stockID, factorName where date(datetime) = 2023.09.07

可以认为,设置了 repartitionColrepartitionScheme 参数之后,相当于对上述的 pivot by 查询语句的结果又做了一个“重分区”,使得每个分区占用的空间不至于特别大。

下面介绍 DDBDataLoader 的各个组件。在 DDBDataLoader 内部,每组子查询由数据管理器 DataManager 管理,每个 DataManager 又对应一个 DataSource。这里的 DataSource,可视为一个分区的元数据。DataSource 通过传入的 Session 会话从 DolphinDB 服务端获取一个分区的数据,并将该分区的数据放入一个预载队列中。DataManager 则根据选取数据的顺序从 DataSource 产生的预载队列中获取预载的分区粒度数据,并将其根据滑动窗口大小和步长处理为相应的 PyTorch 的 Tensor 格式。DDBDataLoader 维护了一个包含多个数据管理器 DataManager 的数据池,数据池的大小由参数 groupPoolSize 控制。后台工作线程从这些数据管理器中提取批量数据,并将其组装成用于训练的数据格式和形状,然后放入整个 AI Dataloader 的预准备队列中。最后,迭代时,DDBDataLoader 从预准备队列中获取已准备好的批量数据,将其传递给客户端,供神经网络训练使用。