常用方法
本节将介绍 session 中常用的方法。
执行脚本
在 session 中执行脚本可以调用 session.run 方法,方法接口如下:
run(script, *args, clearMemory=None, pickleTableToList=None, priority=None, parallelism=None, fetchSize=None, disableDecimal=None)
基础功能参数
script:待执行脚本或函数名。
args:待执行函数的参数。
script:执行脚本
单独传入 script 且不传入 args 不定长位置参数时,表示执行脚本。
示例如下:
>>> s.run("x = 1;")
script + args:执行函数和传参
除了运行脚本之外,run 命令可以直接在 DolphinDB 服务器上执行 DolphinDB 内置或用户自定义的函数。对于这种用法,run 方法的第一个参数是 DolphinDB 中的函数名,之后的参数则是该函数的参数。
下面的示例展示 Python 程序通过 run 调用 DolphinDB 内置的 add 函数。add 函数有 x 和 y 两个参数。根据参数是否已在 DolphinDB 服务端被赋值,有以下三种调用方式:
(1) 所有参数均已在 DolphinDB 服务端被赋值
若变量 x 和 y 已经通过 Python 程序在 DolphinDB 服务端被赋值:
>>> s.run("x = [1,3,5];y = [2,4,6]")
那么在 Python 端要对这两个向量做加法运算,只需直接使用 run(script)
即可:
>>> s.run("add(x,y)") array([3, 7, 11], dtype=int32)
(2) 仅有一个参数 DolphinDB 服务端被赋值
若仅变量 x 已通过 Python 程序在服务器端被赋值:
>>> s.run("x = [1,3,5];")
而参数 y 要在调用 add 函数时一并赋值,需要使用 “部分应用” 方式把参数 x 固化在 add 函数内。具体请参考 部分应用。
>>> import numpy as np >>> y = np.array([1,2,3]) >>> result = s.run("add{x,}", y) >>> result array([2,5,8]) >>> result.dtype dtype('int64')
(3) 两个参数都待由 Python 客户端赋值
>>> import numpy as np >>> x = np.array([1.5,2.5,7]) >>> y = np.array([8.5,7.5,3]) >>> result = s.run("add", x, y) >>> result array([10., 10., 10.]) >>> result.dtype dtype('float64')
通过 run 调用 DolphinDB 的内置函数时,客户端上传参数的数据结构可以是标量 (scalar),列表 (list),字典 (dict),NumPy 的对象,pandas 的 DataFrame 和 Series 等等。
注意:
- NumPy array 的维度不能超过 2。
- pandas 的 DataFrame 和 Series 若有 index,在上传到 DolphinDB 以后会丢失。如果需要保留 index 列,则需要使用 pandas 的 DataFrame 函数
reset_index
。 - 如果 DolphinDB 函数的参数是时间或日期类型,Python 客户端上传时,参数应该先转换为 numpy.datetime64 类型。
高级功能参数
clearMemory
使用 run 方法时,有时为减少内存占用,希望 server 能在执行完毕后自动释放 run 语句中创建的变量。此时可通过指定 session 以及 DBConnectionPool 对象的 run 方法的参数 clearMemory=True
来实现。
>>> s.run("t=1", clearMemory = True) >>> s.run("t")
由于 t 在执行完毕后就被清除了,所以执行 s.run("t")
会抛出异常:
RuntimeError: <Exception> in run: Server response: 'Syntax Error: [line #1] Cannot recognize the token t' script: 't'
pickleTableToList
当 session 构造时指定的 protocol 为 PROTOCOL_DDB 或者 PROTOCOL_PICKLE 时,该参数有效。开启该参数后,如果返回值为 Table,则对应 Python 对象为一个 numpy.ndarray 的列表,列表中每一个元素表示原 Table 中的一列。有关数据格式的相关内容,请参考章节类型转换。
>>> import dolphindb.settings as keys >>> s = ddb.session(protocol=keys.PROTOCOL_DDB) >>> s.connect("localhost", 8848) True >>> s.run("table(1..3 as a)") a 0 1 1 2 2 3 >>> s.run("table(1..3 as a)", pickleTableToList=True) [array([1, 2, 3], dtype=int32)]
priority
优先级表示系统赋予作业的优先数,用于决定作业调度的先后顺序。实际生效的优先级由该参数和 DolphinDB server 端 setMaxJobPriority 的参数 maxPriority 决定,取值为 min(priority, maxPriority)。
在 DolphinDB 中,作业按照优先级进行调度。优先级的取值越高则表明优先级越高。对于优先级高的作业,系统会优先给与计算资源。基于作业的优先级,DolphinDB 设计了多级反馈队列来调度作业的执行。具体来说,系统共维护了10个队列,分别对应10个优先级。系统总是将线程资源分配给高优先级的作业;当一个高优先级队列为空时,系统才会处理低优先级队列中的作业;对于处于相同优先级的作业,系统会以 round-robin 的方式将线程资源分配给作业。
Python API 自 1.30.22.2 版本起,session 和 DBConnectionPool 中 run 方法提供 priority 参数,其用于指定任务的优先级,默认值为 4。使用示例如下:
>>> s.run("1+1", priority=7)
parallelism
并行度表示在一个数据节点上,最多同时可以用多少个线程来执行该作业产生的子任务,即作业的最大并行度。实际生效的并行度由该参数和 DolphinDB server 端 setMaxJobParallelism 的参数 maxParallelism 决定,取值为 min(parallelism, maxParallelism)。
Python API 自 1.30.22.2 版本起,session 和 DBConnectionPool 中 run 方法提供 parallelism 参数,默认值为 2。自 3.0.1.1 版本起,该参数默认值修改为 64。使用示例如下:
>>> s.run("1+1", parallelism=16)
可参考 DolphinDB 关于作业并行度的介绍。
fetchSize
对于大数据量的表,API 提供了分段读取方法。(仅适用于 DolphinDB 1.20.5 及以上版本,Python API 1.30.0.6 及以上版本)
在 Python 客户端执行以下代码创建一个大数据量的表:
>>> s = ddb.session() >>> s.connect("localhost", 8848, "admin", "123456") True >>> script = """ ... rows=100000; ... testblock=table(take(1,rows) as id,take(`A,rows) as symbol,take(2020.08.01..2020.10.01,rows) as date, rand(50,rows) as size,rand(50.5,rows) as price); ... """ >>> s.run(script)
在 run 方法中使用参数 fetchSize 指定分段大小 ,会返回一个 BlockReader 对象,可通过 read()
方法一段段的读取数据。需要注意的是 fetchSize 取值不能小于 8192(记录条数),示例如下:
>>> script1 = "select * from testblock" >>> block= s.run(script1, fetchSize = 8192) >>> total = 0 >>> while block.hasNext(): ... tem = block.read() ... total+=len(tem) ... >>> total 100000
使用上述分段读取的方法时,若数据未读取完毕,需要调用 skipAll
方法来放弃读取后续数据,才能继续执行后续代码。否则会导致套接字缓冲区滞留数据,引发后续数据的反序列化失败。示例代码如下:
>>> block= s.run(script1, fetchSize = 8192) >>> re = block.read() >>> block.skipAll() >>> s.run("1+1;") # 若没有调用 skipAll,执行此代码会抛出异常。 2
disableDecimal
该参数用于控制是否自动将返回的结果表中的 DECIMAL 类型列转换为 DOUBLE 类型列。如果指定该参数为 True,则执行脚本的结果表中的 DECIMAL 类型列将被转换为 DOUBLE 类型列并返回。
自 Python API 3.0.0.2 版本起,session/Session 和 DBConnectionPool 类中的 run
方法开始支持该参数。必须配合 2.00.12.3/3.00.1 及以上的 DolphinDB 服务器使用该参数。使用示例如下:
>>> df = s.run("table(decimal32([1.23, 2.3456], 3) as a)", disableDecimal=True)
>>> df.dtypes
a float64
dtype: object
相关方法
runFile
runFile
方法可读取文件所有内容作为脚本执行,可以如 run 方法一样传入不定长位置参数和不定长关键字参数。
注意: 该文件路径为客户端的本地路径。
使用示例如下:
>>> with open("./test.dos", "w+") as f: ... f.write(""" ... t = table(1..3 as a); ... t; ... """) ... 47 >>> s.runFile("./test.dos") a 0 1 1 2 2 3
上传变量
upload(nameObjectDict)
upload
方法用于上传 Python 对象到服务端,接收一个 dict 对象,其中字典的键表示待上传变量的变量名,字典的值则表示待上传的变量,可以是 int, str, pd.DataFrame, np.ndarray, dict 和 set 等。上传成功则返回上传对象在服务端的内存地址。关于如何上传各类型的 Python 对象,以及对应的服务端数据类型,请参考数据类型。
>>> s.upload({'a': 8, 'b': "abc", 'c': {'a':1, 'b':2}}) [59763200, 60161968, 54696752] >>> s.run("a") 8 >>> s.run("b") abc >>> s.run("c") {'a': 1, 'b': 2}
加载数据
本节将用到 example.csv 文件。
table
table(dbPath=None, data=None, tableAliasName=None, inMem=False, partitions=None)
- data :字符串或字典、DataFrame。如果 data 为字符串,表示服务端数据表表名;如果 data 为字典或 DataFrame,则表示将本地数据作为临时表上传到服务器。
- dbPath :字符串,表示待加载数据表所在的数据库地址。
- tableAliasName :用于指定待加载表的别名。
- inMem :是否将数据从服务器磁盘加载到服务器内存中。
- partitions :表示要加载的分区。
注意: table 函数在 data 为字符串时,实际是封装了 DolphinDB 的 loadTable 函数,从指定数据库中加载对应表,并获取其句柄。关于 inMem 和 partitions 参数的详细含义请参考loadTable。
对数据表句柄的操作以及面向对象的 SQL 查询,请参考章节面向对象操作。
loadTable
loadTable(tableName, dbPath=None, partitions=None, memoryMode=False)
loadTable
方法的使用与 table
方法相似,但该方法仅用于加载服务器端指定表,获取其句柄。
loadTableBySQL
loadTableBySQL(tableName, dbPath, sql)
该方法封装 DolphinDB loadTableBySQL
函数,将满足 SQL 查询中筛选条件的记录加载为内存中的分区表,返回内存表句柄给 API。函数详细说明请参考 loadTableBySQL。
- tableName/dbPath :根据 tableName 和 dbPath 加载 sql 中使用到的分区表。
- sql :SQL 查询的元代码。它可以用 WHERE 子句来过滤分区或记录行,也可以用 SELECT 语句选择包括计算列在内的列,但不能包含 TOP 子句、GROUP BY 子句、 ORDER BY 子句、CONTEXT BY 子句和 LIMIT 子句。
loadText
loadText(remoteFilePath, delimiter=",")
可使用 loadText
方法把文本文件导入到服务端的内存表中,文本文件必须和服务端在同一个服务器。该方法会在 Python 中返回一个 DolphinDB 内存表句柄。可使用 toDF
方法把 Python 中的内存表句柄对象 Table 转换为 pandas.DataFrame。
注意: 使用 loadText 方法时,载入的内存表数据量必须小于 DolphinDB 服务器可用内存。
>>> WORK_DIR = "C:/DolphinDB/Data" >>> trade = s.loadText(WORK_DIR+"/example.csv")
将返回的 DolphinDB 表对象转化为 pandas DataFrame。表的数据传输发生在此步骤。
>>> trade.toDF() TICKER date VOL PRC BID ASK 0 AMZN 1997.05.16 6029815 23.50000 23.50000 23.6250 1 AMZN 1997.05.17 1232226 20.75000 20.50000 21.0000 2 AMZN 1997.05.20 512070 20.50000 20.50000 20.6250 3 AMZN 1997.05.21 456357 19.62500 19.62500 19.7500 4 AMZN 1997.05.22 1577414 17.12500 17.12500 17.2500 5 AMZN 1997.05.23 983855 16.75000 16.62500 16.7500 ... 13134 NFLX 2016.12.29 3444729 125.33000 125.31000 125.3300 13135 NFLX 2016.12.30 4455012 123.80000 123.80000 123.8300
loadText 函数导入文件时的默认分隔符是','。用户也可指定其他符号作为分隔符。例如,导入'\t'分割的表格形式文本文件:
>>> t1 = s.loadText(WORK_DIR+"/t1.tsv", '\t')
注意 :loadText
/ ploadText
/ loadTextEx
都是将文件加载到服务端,并非加载本地文件。
ploadText
ploadText
函数可以并行加文本文件到内存分区表中。其加载速度比 loadText 函数快。
>>> trade = s.ploadText(WORK_DIR+"/example.csv") >>> trade.rows 13136
loadTextEx
loadTextEx(dbPath, tableName, partitionColumns=None, remoteFilePath=None, delimiter=",", sortColumns=None)
可使用函数 loadTextEx
把文本文件导入到分区数据库的分区表中。如果分区表不存在,函数会自动生成该分区表并把数据追加到表中。如果分区表已经存在,则直接把数据追加到分区表中。
函数 loadTextEx
的各个参数如下:
- dbPath :数据库路径。
- tableName :分区表的名称。
- partitionColumns :分区列。
- remoteFilePath :文本文件在 DolphinDB 服务器上的绝对路径。
- delimiter :文本文件的分隔符(默认分隔符是逗号)。
- sortColumns:字符串标量或向量,用于指定表的排序列,写入的数据将按 sortColumns 进行排序。注意,该参数仅适用于 TSDB 引擎。
下面的例子使用函数 loadTextEx
创建了分区表 trade,并把 example.csv 中的数据加载到表中。
>>> import dolphindb.settings as keys >>> if s.existsDatabase("dfs://valuedb"): ... s.dropDatabase("dfs://valuedb") ... >>> s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath="dfs://valuedb") >>> trade = s.loadTextEx(dbPath="mydb", tableName='trade',partitionColumns=["TICKER"], remoteFilePath=WORK_DIR + "/example.csv") >>> trade.toDF() TICKER date VOL PRC BID ASK 0 AMZN 1997-05-15 6029815 23.500 23.500 23.625 1 AMZN 1997-05-16 1232226 20.750 20.500 21.000 2 AMZN 1997-05-19 512070 20.500 20.500 20.625 3 AMZN 1997-05-20 456357 19.625 19.625 19.750 4 AMZN 1997-05-21 1577414 17.125 17.125 17.250 ... ... ... ... ... ... ... 13131 NVDA 2016-12-23 16193331 109.780 109.770 109.790 13132 NVDA 2016-12-27 29857132 117.320 117.310 117.320 13133 NVDA 2016-12-28 57384116 109.250 109.250 109.290 13134 NVDA 2016-12-29 54384676 111.430 111.260 111.420 13135 NVDA 2016-12-30 30323259 106.740 106.730 106.750 [13136 rows x 6 columns]
返回表中的行数:
>>> trade.rows
13136
返回表中的列数:
>>> trade.cols
6
展示表的结构:
>>> trade.schema name typeString typeInt comment 0 TICKER SYMBOL 17 1 date DATE 6 2 VOL INT 4 3 PRC DOUBLE 16 4 BID DOUBLE 16 5 ASK DOUBLE 16
数据库表管理
特别的,Python API 封装了部分服务端常用数据库表管理函数作为 session 的方法,用户可以调用这些方法对数据库表进行管理。
database
database(dbName=None, partitionType=None, partitions=None, dbPath=None, engine=None, atomic=None, chunkGranularity=None)
如果需要持久保存导入数据,或者需要导入的文件超过可用内存,可将数据导入 DFS 分区数据库保存。下面将使用一个例子来介绍如何创建分区数据库。
本节例子中会使用数据库 valuedb。首先检查该数据库是否存在,如果存在,将其删除:
>>> if s.existsDatabase("dfs://valuedb"): ... s.dropDatabase("dfs://valuedb") ...
使用 database 方法创建值分区(VALUE)的数据库,使用股票代码作为分区字段。参数 partitions 表示分区方案。下例中,先导入 DolphinDB 的关键字,再创建数据库。
>>> import dolphindb.settings as keys >>> s.database(dbName='mydb', partitionType=keys.VALUE, partitions=['AMZN','NFLX', 'NVDA'], dbPath='dfs://valuedb')
上述语句等同于在DolphinDB中执行脚本 db=database('dfs://valuedb', VALUE, ['AMZN','NFLX','NVDA'])
。
除了值分区(VALUE),DolphinDB 还支持哈希分区(HASH)、范围分区(RANGE)、列表分区(LIST)与组合分区(COMPO),具体请参见 database。
创建了分区数据库后,不可更改分区类型,一般也不可更改分区方案,但是创建值分区或范围分区(或者复合分区中的值分区或范围分区)后,DolphinDB 脚本中可以分别使用 addValuePartitions 与 addRangePartitions 函数添加分区。
database 方法的详细参数以及使用方法,请参考章节面向对象操作。
existsDatabase
>>> s.existsDatabase(dbUrl="dfs://testDB")
False
该函数用于判断 DolphinDB 服务端是否存在 dbUrl 对应的数据库,函数使用请参考 existsDatabase。
existsTable
>>> s.existsTable(dbUrl="dfs://valuedb", tableName="trade") True
检查指定表是否存在于指定数据库中,函数使用请参考 existsTable。
dropDatabase
>>> s.dropDatabase(dbPath="dfs://valuedb")
删除指定数据库的所有物理文件,函数使用请参考 dropDatabase。
dropPartition
>>> s.dropPartition(dbPath="dfs://valuedb", partitionPaths="AMZN", tableName="trade")
删除数据库中指定分区的数据。如果指定了 tableName,则删除指定表中符合指定条件的分区数据。否则,删除指定数据库所有表中符合指定条件的分区数据。函数使用请参考 dropPartition。
注意:如果创建数据库时指定分区粒度为 DATABASE,则可以不填 tableName,否则必须指定表名。
dropTable
>>> s.dropTable(dbPath="dfs://valuedb", tableName="trade")
删除指定数据库中的数据表,函数使用请参考 dropTable。
其他方法
除了上述方法,session 中还提供封装了一些其他常用的函数。
undef/undefAll
>>> s.undef("t1", "VAR") >>> s.undefAll()
undef 方法释放 session 中的指定对象;undefAll 方法释放 session 中的全部对象。undef 支持的对象类型包括:"VAR"(变量)、"SHARED"(共享变量) 与 "DEF"(函数定义)。默认类型为变量 "VAR"。"SHARED" 指内存中跨 session 的共享变量,例如流数据表。
假设 session 中有一个 DolphinDB 的表对象 t1,可以通过 undef("t1","VAR")
将该表释放掉。释放后,并不一定能够看到内存在服务端马上释放。这与 DolphinDB 的内存管理机制有关。DolphinDB 从操作系统申请的内存,释放后不会立即还给操作系统,因为这些释放的内存在 DolphinDB 中可以立即使用。申请内存首先从 DolphinDB 内部的池中申请内存,不足时才会向操作系统去申请。配置文件 (dolphindb.cfg) 中参数 maxMemSize 设置的内存上限会尽量保证。譬如设置为 8GB,那么 DolphinDB 会尽可能利用 8GB 内存。所以若用户需要反复 undef 内存中的一个变量以释放内存,为后续 server 的运行腾出更多内存空间,则需要将 maxMemSize 调整到一个合理的数值(不超过内存上限),否则当前内存没有释放,而后面需要的内存超过了系统的最大内存,DolphinDB 的进程就有可能被操作系统杀掉或者出现 out of memory
的错误。
clearAllCache
>>> s.clearAllCache() >>> s.clearAllCache(dfs=True)
clearAllCache
方法会调用服务端同名方法来清理服务端缓存,如果 dfs 参数为 True,将会在所有节点上清理缓存;否则只会在连接节点上清理。
setTimeout
与 session 建立连接时使用的参数 keepAliveTime 不同,setTimeout
是 session 的类方法,用于设置 TCP 连接 TCP_USER_TIMEOUT 选项。可以设置用户允许 TCP 连接在没有端到端连接的情况下的生存时间(单位 秒/s)。参考 Linux Socket options。
>>> ddb.session.setTimeout(3600)
注意: 本方法仅在 Linux 系统生效。默认时间为 30 秒。