常用方法

本节将介绍 session 中常用的方法。

执行脚本

在 session 中执行脚本可以调用 session.run 方法,方法接口如下:

run(script, *args,
    clearMemory=None, pickleTableToList=None,
    priority=None, parallelism=None,
    fetchSize=None, disableDecimal=None)

基础功能参数

  • script:待执行脚本或函数名。

  • args:待执行函数的参数。

script:执行脚本

单独传入 script 且不传入 args 不定长位置参数时,表示执行脚本。

示例如下:

>>> s.run("x = 1;")

script + args:执行函数和传参

除了运行脚本之外,run 命令可以直接在 DolphinDB 服务器上执行 DolphinDB 内置或用户自定义的函数。对于这种用法,run 方法的第一个参数是 DolphinDB 中的函数名,之后的参数则是该函数的参数。

下面的示例展示 Python 程序通过 run 调用 DolphinDB 内置的 add 函数。add 函数有 x 和 y 两个参数。根据参数是否已在 DolphinDB 服务端被赋值,有以下三种调用方式:

(1) 所有参数均已在 DolphinDB 服务端被赋值

若变量 x 和 y 已经通过 Python 程序在 DolphinDB 服务端被赋值:

>>> s.run("x = [1,3,5];y = [2,4,6]")

那么在 Python 端要对这两个向量做加法运算,只需直接使用 run(script) 即可:

>>> s.run("add(x,y)")
array([3, 7, 11], dtype=int32)

(2) 仅有一个参数 DolphinDB 服务端被赋值

若仅变量 x 已通过 Python 程序在服务器端被赋值:

>>> s.run("x = [1,3,5];")

而参数 y 要在调用 add 函数时一并赋值,需要使用 “部分应用” 方式把参数 x 固化在 add 函数内。具体请参考 部分应用

>>> import numpy as np
>>> y = np.array([1,2,3])
>>> result = s.run("add{x,}", y)
>>> result
array([2,5,8])
>>> result.dtype
dtype('int64')

(3) 两个参数都待由 Python 客户端赋值

>>> import numpy as np
>>> x = np.array([1.5,2.5,7])
>>> y = np.array([8.5,7.5,3])
>>> result = s.run("add", x, y)
>>> result
array([10., 10., 10.])
>>> result.dtype
dtype('float64')

通过 run 调用 DolphinDB 的内置函数时,客户端上传参数的数据结构可以是标量 (scalar),列表 (list),字典 (dict),NumPy 的对象,pandas 的 DataFrame 和 Series 等等。

注意

  • NumPy array 的维度不能超过 2。
  • pandas 的 DataFrame 和 Series 若有 index,在上传到 DolphinDB 以后会丢失。如果需要保留 index 列,则需要使用 pandas 的 DataFrame 函数 reset_index
  • 如果 DolphinDB 函数的参数是时间或日期类型,Python 客户端上传时,参数应该先转换为 numpy.datetime64 类型。

高级功能参数

clearMemory

使用 run 方法时,有时为减少内存占用,希望 server 能在执行完毕后自动释放 session 中创建的变量。此时可通过指定 session 以及 DBConnectionPool 对象的 run 方法的参数 clearMemory=True 来实现。

>>> s.run("t=1", clearMemory = True)
>>> s.run("t")

由于 t 在执行完毕后就被清除了,所以执行 s.run("t") 会抛出异常:

RuntimeError: <Exception> in run: Server response: 'Syntax Error: [line #1] Cannot recognize the token t' script: 't'

pickleTableToList

当 session 构造时指定的 protocol 为 PROTOCOL_DDB 或者 PROTOCOL_PICKLE 时,该参数有效。开启该参数后,如果返回值为 Table,则对应 Python 对象为一个 numpy.ndarray 的列表,列表中每一个元素表示原 Table 中的一列。有关数据格式的相关内容,请参考章节类型转换

>>> import dolphindb.settings as keys
>>> s = ddb.session(protocol=keys.PROTOCOL_DDB)
>>> s.connect("localhost", 8848)
True
>>> s.run("table(1..3 as a)")
   a
0  1
1  2
2  3
>>> s.run("table(1..3 as a)", pickleTableToList=True)
[array([1, 2, 3], dtype=int32)]

priority

优先级表示系统赋予作业的优先数,用于决定作业调度的先后顺序。实际生效的优先级由该参数和 DolphinDB server 端 setMaxJobPriority 的参数 maxPriority 决定,取值为 min(priority, maxPriority)。

在 DolphinDB 中,作业按照优先级进行调度。优先级的取值越高则表明优先级越高。对于优先级高的作业,系统会优先给与计算资源。基于作业的优先级,DolphinDB 设计了多级反馈队列来调度作业的执行。具体来说,系统共维护了10个队列,分别对应10个优先级。系统总是将线程资源分配给高优先级的作业;当一个高优先级队列为空时,系统才会处理低优先级队列中的作业;对于处于相同优先级的作业,系统会以 round-robin 的方式将线程资源分配给作业。

Python API 自 1.30.22.2 版本起,session 和 DBConnectionPool 中 run 方法提供 priority 参数,其用于指定任务的优先级,默认值为 4。使用示例如下:

>>> s.run("1+1", priority=7)

parallelism

并行度表示在一个数据节点上,最多同时可以用多少个线程来执行该作业产生的子任务,即作业的最大并行度。实际生效的并行度由该参数和 DolphinDB server 端 setMaxJobParallelism 的参数 maxParallelism 决定,取值为 min(parallelism, maxParallelism)。

Python API 自 1.30.22.2 版本起,session 和 DBConnectionPool 中 run 方法提供 parallelism 参数,默认值为 2。自 3.0.1.1 版本起,该参数默认值修改为 64。使用示例如下:

>>> s.run("1+1", parallelism=16)

可参考 DolphinDB 关于作业并行度的介绍

fetchSize

对于大数据量的表,API 提供了分段读取方法。(仅适用于 DolphinDB 1.20.5 及以上版本,Python API 1.30.0.6 及以上版本)

在 Python 客户端执行以下代码创建一个大数据量的表:

>>> s = ddb.session()
>>> s.connect("localhost", 8848, "admin", "123456")
True
>>> script = """
... rows=100000;
... testblock=table(take(1,rows) as id,take(`A,rows) as symbol,take(2020.08.01..2020.10.01,rows) as date, rand(50,rows) as size,rand(50.5,rows) as price);
... """
>>> s.run(script)

在 run 方法中使用参数 fetchSize 指定分段大小 ,会返回一个 BlockReader 对象,可通过 read() 方法一段段的读取数据。需要注意的是 fetchSize 取值不能小于 8192(记录条数),示例如下:

>>> script1 = "select * from testblock"
>>> block= s.run(script1, fetchSize = 8192)
>>> total = 0
>>> while block.hasNext():
...     tem = block.read()
...     total+=len(tem)
... 
>>> total
100000

使用上述分段读取的方法时,若数据未读取完毕,需要调用 skipAll 方法来放弃读取后续数据,才能继续执行后续代码。否则会导致套接字缓冲区滞留数据,引发后续数据的反序列化失败。示例代码如下:

>>> block= s.run(script1, fetchSize = 8192)
>>> re = block.read()
>>> block.skipAll()
>>> s.run("1+1;") # 若没有调用 skipAll,执行此代码会抛出异常。
2

disableDecimal

该参数用于控制是否自动将返回的结果表中的 DECIMAL 类型列转换为 DOUBLE 类型列。如果指定该参数为 True,则执行脚本的结果表中的 DECIMAL 类型列将被转换为 DOUBLE 类型列并返回。

自 Python API 3.0.0.2 版本起,session/Session 和 DBConnectionPool 类中的 run 方法开始支持该参数。必须配合 2.00.12.3/3.00.1 及以上的 DolphinDB 服务器使用该参数。使用示例如下:

>>> df = s.run("table(decimal32([1.23, 2.3456], 3) as a)", disableDecimal=True)
>>> df.dtypes
a    float64
dtype: object

相关方法

runFile

runFile 方法可读取文件所有内容作为脚本执行,可以如 run 方法一样传入不定长位置参数和不定长关键字参数。

注意: 该文件路径为客户端的本地路径。

使用示例如下:

>>> with open("./test.dos", "w+") as f:
...     f.write("""
...         t = table(1..3 as a);
...         t;
...     """)
... 
47
>>> s.runFile("./test.dos")
    a
0   1
1   2
2   3

上传变量

upload(nameObjectDict)

upload 方法用于上传 Python 对象到服务端,接收一个 dict 对象,其中字典的键表示待上传变量的变量名,字典的值则表示待上传的变量,可以是 int, str, pd.DataFrame, np.ndarray, dict 和 set 等。上传成功则返回上传对象在服务端的内存地址。关于如何上传各类型的 Python 对象,以及对应的服务端数据类型,请参考数据类型

>>> s.upload({'a': 8, 'b': "abc", 'c': {'a':1, 'b':2}})
[59763200, 60161968, 54696752]
>>> s.run("a")
8
>>> s.run("b")
abc
>>> s.run("c")
{'a': 1, 'b': 2}

加载数据

本节将用到 example.csv 文件。

table

table(dbPath=None, data=None, tableAliasName=None, inMem=False, partitions=None)
  • data :字符串或字典、DataFrame。如果 data 为字符串,表示服务端数据表表名;如果 data 为字典或 DataFrame,则表示将本地数据作为临时表上传到服务器。
  • dbPath :字符串,表示待加载数据表所在的数据库地址。
  • tableAliasName :用于指定待加载表的别名。
  • inMem :是否将数据从服务器磁盘加载到服务器内存中。
  • partitions :表示要加载的分区。

注意: table 函数在 data 为字符串时,实际是封装了 DolphinDB 的 loadTable 函数,从指定数据库中加载对应表,并获取其句柄。关于 inMempartitions 参数的详细含义请参考loadTable

对数据表句柄的操作以及面向对象的 SQL 查询,请参考章节面向对象操作

loadTable

loadTable(tableName, dbPath=None, partitions=None, memoryMode=False)

loadTable 方法的使用与 table 方法相似,但该方法仅用于加载服务器端指定表,获取其句柄。

loadTableBySQL

loadTableBySQL(tableName, dbPath, sql)

该方法封装 DolphinDB loadTableBySQL 函数,将满足 SQL 查询中筛选条件的记录加载为内存中的分区表,返回内存表句柄给 API。函数详细说明请参考 loadTableBySQL

  • tableName/dbPath :根据 tableNamedbPath 加载 sql 中使用到的分区表。
  • sql :SQL 查询的元代码。它可以用 WHERE 子句来过滤分区或记录行,也可以用 SELECT 语句选择包括计算列在内的列,但不能包含 TOP 子句、GROUP BY 子句、 ORDER BY 子句、CONTEXT BY 子句和 LIMIT 子句。

loadText

loadText(remoteFilePath, delimiter=",")

可使用 loadText 方法把文本文件导入到服务端的内存表中,文本文件必须和服务端在同一个服务器。该方法会在 Python 中返回一个 DolphinDB 内存表句柄。可使用 toDF 方法把 Python 中的内存表句柄对象 Table 转换为 pandas.DataFrame。

注意: 使用 loadText 方法时,载入的内存表数据量必须小于 DolphinDB 服务器可用内存。

>>> WORK_DIR = "C:/DolphinDB/Data"
>>> trade = s.loadText(WORK_DIR+"/example.csv")

将返回的 DolphinDB 表对象转化为 pandas DataFrame。表的数据传输发生在此步骤。

>>> trade.toDF()
      TICKER        date       VOL        PRC        BID       ASK
0       AMZN  1997.05.16   6029815   23.50000   23.50000   23.6250
1       AMZN  1997.05.17   1232226   20.75000   20.50000   21.0000
2       AMZN  1997.05.20    512070   20.50000   20.50000   20.6250
3       AMZN  1997.05.21    456357   19.62500   19.62500   19.7500
4       AMZN  1997.05.22   1577414   17.12500   17.12500   17.2500
5       AMZN  1997.05.23    983855   16.75000   16.62500   16.7500
...
13134   NFLX  2016.12.29   3444729  125.33000  125.31000  125.3300
13135   NFLX  2016.12.30   4455012  123.80000  123.80000  123.8300

loadText 函数导入文件时的默认分隔符是','。用户也可指定其他符号作为分隔符。例如,导入'\t'分割的表格形式文本文件:

>>> t1 = s.loadText(WORK_DIR+"/t1.tsv", '\t')

注意loadText / ploadText / loadTextEx 都是将文件加载到服务端,并非加载本地文件。

ploadText

ploadText 函数可以并行加文本文件到内存分区表中。其加载速度比 loadText 函数快。

>>> trade = s.ploadText(WORK_DIR+"/example.csv")
>>> trade.rows
13136

loadTextEx

loadTextEx(dbPath, tableName,  partitionColumns=None, remoteFilePath=None, delimiter=",", sortColumns=None)

可使用函数 loadTextEx 把文本文件导入到分区数据库的分区表中。如果分区表不存在,函数会自动生成该分区表并把数据追加到表中。如果分区表已经存在,则直接把数据追加到分区表中。

函数 loadTextEx 的各个参数如下:

  • dbPath :数据库路径。
  • tableName :分区表的名称。
  • partitionColumns :分区列。
  • remoteFilePath :文本文件在 DolphinDB 服务器上的绝对路径。
  • delimiter :文本文件的分隔符(默认分隔符是逗号)。
  • sortColumns:字符串标量或向量,用于指定表的排序列,写入的数据将按 sortColumns 进行排序。注意,该参数仅适用于 TSDB 引擎。

下面的例子使用函数 loadTextEx 创建了分区表 trade,并把 example.csv 中的数据加载到表中。

>>> import dolphindb.settings as keys
>>> if s.existsDatabase("dfs://valuedb"):
...     s.dropDatabase("dfs://valuedb")
...
>>> s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath="dfs://valuedb")
>>> trade = s.loadTextEx(dbPath="mydb", tableName='trade',partitionColumns=["TICKER"], remoteFilePath=WORK_DIR + "/example.csv")
>>> trade.toDF()
      TICKER       date       VOL      PRC      BID      ASK
0       AMZN 1997-05-15   6029815   23.500   23.500   23.625
1       AMZN 1997-05-16   1232226   20.750   20.500   21.000
2       AMZN 1997-05-19    512070   20.500   20.500   20.625
3       AMZN 1997-05-20    456357   19.625   19.625   19.750
4       AMZN 1997-05-21   1577414   17.125   17.125   17.250
...      ...        ...       ...      ...      ...      ...
13131   NVDA 2016-12-23  16193331  109.780  109.770  109.790
13132   NVDA 2016-12-27  29857132  117.320  117.310  117.320
13133   NVDA 2016-12-28  57384116  109.250  109.250  109.290
13134   NVDA 2016-12-29  54384676  111.430  111.260  111.420
13135   NVDA 2016-12-30  30323259  106.740  106.730  106.750

[13136 rows x 6 columns]

返回表中的行数:

>>> trade.rows
13136

返回表中的列数:

>>> trade.cols
6

展示表的结构:

>>> trade.schema
     name typeString  typeInt comment
0  TICKER     SYMBOL       17        
1    date       DATE        6        
2     VOL        INT        4        
3     PRC     DOUBLE       16        
4     BID     DOUBLE       16        
5     ASK     DOUBLE       16

数据库表管理

特别的,Python API 封装了部分服务端常用数据库表管理函数作为 session 的方法,用户可以调用这些方法对数据库表进行管理。

database

database(dbName=None, partitionType=None, partitions=None, dbPath=None, engine=None, atomic=None, chunkGranularity=None)

如果需要持久保存导入数据,或者需要导入的文件超过可用内存,可将数据导入 DFS 分区数据库保存。下面将使用一个例子来介绍如何创建分区数据库。

本节例子中会使用数据库 valuedb。首先检查该数据库是否存在,如果存在,将其删除:

>>> if s.existsDatabase("dfs://valuedb"):
...     s.dropDatabase("dfs://valuedb")
...

使用 database 方法创建值分区(VALUE)的数据库,使用股票代码作为分区字段。参数 partitions 表示分区方案。下例中,先导入 DolphinDB 的关键字,再创建数据库。

>>> import dolphindb.settings as keys
>>> s.database(dbName='mydb', partitionType=keys.VALUE, partitions=['AMZN','NFLX', 'NVDA'], dbPath='dfs://valuedb')

上述语句等同于在DolphinDB中执行脚本 db=database('dfs://valuedb', VALUE, ['AMZN','NFLX','NVDA'])

除了值分区(VALUE),DolphinDB 还支持哈希分区(HASH)、范围分区(RANGE)、列表分区(LIST)与组合分区(COMPO),具体请参见 database

创建了分区数据库后,不可更改分区类型,一般也不可更改分区方案,但是创建值分区或范围分区(或者复合分区中的值分区或范围分区)后,DolphinDB 脚本中可以分别使用 addValuePartitions 与 addRangePartitions 函数添加分区。

database 方法的详细参数以及使用方法,请参考章节面向对象操作

existsDatabase

>>> s.existsDatabase(dbUrl="dfs://testDB")
False

该函数用于判断 DolphinDB 服务端是否存在 dbUrl 对应的数据库,函数使用请参考 existsDatabase

existsTable

>>> s.existsTable(dbUrl="dfs://valuedb", tableName="trade")
True

检查指定表是否存在于指定数据库中,函数使用请参考 existsTable

dropDatabase

>>> s.dropDatabase(dbPath="dfs://valuedb")

删除指定数据库的所有物理文件,函数使用请参考 dropDatabase

dropPartition

>>> s.dropPartition(dbPath="dfs://valuedb", partitionPaths="AMZN", tableName="trade")

删除数据库中指定分区的数据。如果指定了 tableName,则删除指定表中符合指定条件的分区数据。否则,删除指定数据库所有表中符合指定条件的分区数据。函数使用请参考 dropPartition

注意:如果创建数据库时指定分区粒度为 DATABASE,则可以不填 tableName,否则必须指定表名。

dropTable

>>> s.dropTable(dbPath="dfs://valuedb", tableName="trade")

删除指定数据库中的数据表,函数使用请参考 dropTable

其他方法

除了上述方法,session 中还提供封装了一些其他常用的函数。

undef/undefAll

>>> s.undef("t1", "VAR")
>>> s.undefAll()

undef 方法释放 session 中的指定对象;undefAll 方法释放 session 中的全部对象。undef 支持的对象类型包括:"VAR"(变量)、"SHARED"(共享变量) 与 "DEF"(函数定义)。默认类型为变量 "VAR"。"SHARED" 指内存中跨 session 的共享变量,例如流数据表。

假设 session 中有一个 DolphinDB 的表对象 t1,可以通过 undef("t1","VAR") 将该表释放掉。释放后,并不一定能够看到内存在服务端马上释放。这与 DolphinDB 的内存管理机制有关。DolphinDB 从操作系统申请的内存,释放后不会立即还给操作系统,因为这些释放的内存在 DolphinDB 中可以立即使用。申请内存首先从 DolphinDB 内部的池中申请内存,不足时才会向操作系统去申请。配置文件 (dolphindb.cfg) 中参数 maxMemSize 设置的内存上限会尽量保证。譬如设置为 8GB,那么 DolphinDB 会尽可能利用 8GB 内存。所以若用户需要反复 undef 内存中的一个变量以释放内存,为后续 server 的运行腾出更多内存空间,则需要将 maxMemSize 调整到一个合理的数值(不超过内存上限),否则当前内存没有释放,而后面需要的内存超过了系统的最大内存,DolphinDB 的进程就有可能被操作系统杀掉或者出现 out of memory 的错误。

clearAllCache

>>> s.clearAllCache()
>>> s.clearAllCache(dfs=True)

clearAllCache 方法会调用服务端同名方法来清理服务端缓存,如果 dfs 参数为 True,将会在所有节点上清理缓存;否则只会在连接节点上清理。

setTimeout

与 session 建立连接时使用的参数 keepAliveTime 不同,setTimeout 是 session 的类方法,用于设置 TCP 连接 TCP_USER_TIMEOUT 选项。可以设置用户允许 TCP 连接在没有端到端连接的情况下的生存时间(单位 秒/s)。参考 Linux Socket options

>>> ddb.session.setTimeout(3600)

注意: 本方法仅在 Linux 系统生效。默认时间为 30 秒。

流订阅相关

dolphindb 提供流数据订阅接口,可以从服务器订阅流数据表,并获取其数据,详细介绍请参考章节流订阅基础流订阅(进阶)

相关方法:enableStreaming / subscribe / unsubscribe / getSubscriptionTopics