快速开始

本节将展示通过 dolphindb 连接、使用及操作单节点 DolphinDB 服务器的完整操作。

通过阅读,您将了解到如何使用 dolphindb 连接 DolphinDB 并与之交互、以及如何操作数据库表。

建立连接

session(会话控制)可以实现客户端与服务器之间的信息交互。DolphinDB Python API 通过 session 在 DolphinDB 服务器上执行脚本和函数,同时实现双向的数据传递。

注意: dolphindb 自 1.30.22.1 版本调整 session 类名为 Session,同时增加别名 session 以确保兼容性。

session 连接及使用示例

在如下脚本中,先通过 import 语句导入 dolphindb ,在 Python 中创建一个 session,然后使用指定的域名(或 IP 地址)和端口号把该会话连接到 DolphinDB 服务器。最后展示一个简单的计算示例,执行 1+1 的脚本,得到返回值 2

>>> import dolphindb as ddb
>>> s = ddb.session()
>>> s.connect("localhost", 8848)
True
>>> s.run("1+1;")
2
>>> s.close()

注意

  • 建立连接前,须先启动 DolphinDB 服务器。
  • 若当前 session 不再使用,建议立即调用 close() 关闭会话。否则可能出现因连接数过多,导致其它会话无法连接服务器的情况。

数据交互

dolphindb 目前支持多种数据交互的方法,本节仅介绍通过 run, upload 等方式上传和下载数据。更多交互方式请参考章节 session 的常用方法DBConnectionPool 的常用方法

下表为 DolphinDB 的各种数据形式与 Python 对象的对应关系。

DolphinDB DataFormPython
Scalarint/str/float/...
Vectornumpy.ndarray
Pairlist
Matrix[numpy.ndarray, numpy.ndarray, numpy.ndarray]
Setset
Dictionarydict
Tablepandas.DataFrame

注意

  • 由于 DolphinDB 中的 Matrix 对象可以设置行名和列名,故下载的 Matrix 形式的数据将转换成 list 类型的 Python 对象。该 list 对象中将包含一个表示数据的二维数组和两个分别表示行名、列名的一维数组。
  • 若 API 使用不同的下载方式,同样的 DolphinDB 数据将会对应不同的 Python 对象。详情请参考章节类型转换

在下载数据前,首先导入 dolphindb,然后创建一个 session 并连接到 DolphinDB 服务端。

>>> import dolphindb as ddb
>>> s = ddb.session()
>>> s.connect("localhost", 8848)
True

下载数据

使用 s.run 的方式运行脚本,成功执行后将下载具体数据。

以下依次创建并下载数据形式分别为标量(Scalar)、向量(Vector)、数据对(Pair)、矩阵(Matrix)、集合(Set)、字典(Dictionary)和表(Table)的数据。

标量(Scalar)

>>> s.run("1;")
1

向量(Vector)

>>> s.run("1..10;")
array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10], dtype=int32)

数据对(Pair)

>>> s.run("1:5;")
[1, 5]

矩阵(Matrix)

>>> s.run("1..6$2:3;")
[array([[1, 3, 5],
       [2, 4, 6]], dtype=int32), None, None]

集合(Set)

>>> s.run("set([1,5,9]);")
{1, 5, 9}

字典(Dictionary)

>>> s.run("dict(1 2 3, 4.5 7.8 4.3);")
{2: 7.8, 1: 4.5, 3: 4.3}

表(Table)

>>> s.run("table(`XOM`GS`AAPL as id, 102.1 33.4 73.6 as x);")
     id      x
0   XOM  102.1
1    GS   33.4
2  AAPL   73.6

上传数据

导入库 numpy 和 pandas,示例如下:

>>> import numpy as np
>>> import pandas as pd

通过 s.upload 的方式上传数据,输入参数 变量名称:数据,成功执行后将返回该变量在服务器端的地址。

以下将依次展示上传 Python 对象对应 DolphinDB 数据形式为标量(Scalar)、向量(Vector)、矩阵(Matrix)、集合(Set)、字典(Dictionary)和表(Table)的数据。

注意: Python API 暂不支持上传数据对(Pair)形式的数据。

以下脚本中将展示成功上传后,通过 typestr 函数查询数据的 DolphinDB 类型名称,以及使用 s.run 的方式重新下载数据。

标量(Scalar)

>>> s.upload({'scalar_sample': 1})
62776640
>>> s.run("typestr(scalar_sample);")
'LONG'
>>> s.run("scalar_sample;")
1

向量(Vector)

>>> s.upload({'vector_sample': np.array([1, 3])})
65583680
>>> s.run("typestr(vector_sample);")
'FAST LONG VECTOR'
>>> s.run("vector_sample;")
array([1, 3])

矩阵(Matrix)

>>> s.upload({'matrix_sample': np.array([[1, 2, 3], [4, 5, 6]])})
65484832
>>> s.run("typestr(matrix_sample);")
'FAST LONG MATRIX'
>>> s.run("matrix_sample;")
[array([[1, 2, 3],
       [4, 5, 6]]), None, None]

集合(Set)

>>> s.upload({'set_sample': {1, 4, 7}})
65578432
>>> s.run("typestr(set_sample);")
'LONG SET'
>>> s.run("set_sample;")
{1, 4, 7}

字典(Dictionary)

>>> s.upload({'dict_sample': {'a': 1}})
58318576
>>> s.run("typestr(dict_sample);")
'STRING->LONG DICTIONARY'
>>> s.run("dict_sample;")
{'a': 1}

表(Table)

>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': ['a', 'b', 'c']})
>>> s.upload({'table_sample': df})
63409760
>>> s.run("typestr(table_sample);")
'IN-MEMORY TABLE'
>>> s.run("table_sample;")
   a  b
0  1  a
1  2  b
2  3  c

数据库操作

Python API 支持通过如下两种方式操作 DolphinDB:

  • 方式一:通过 session.run() 执行 DolphinDB 脚本。
  • 方式二:使用 Python API 封装的方法操作服务器。

以下展示两种方式的使用示例。

方式一:执行 DolphinDB 脚本

在以下脚本中,先导入 dolphindb,创建一个 session 并连接到 DolphinDB 服务端。由于后续脚本中涉及到执行数据库相关的操作,须在连接服务端的并且登录具有相应权限的账户。

在完成上述操作后,通过 s.run() 执行创建变量、数据库 db 和数据表 pt,并向表 pt 中写入数据等操作,最后执行 SQL 语句返回表 pt 的行数。

import dolphindb as ddb
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
s.run("""
    n=1000000
    ID=rand(10, n)
    x=rand(1.0, n)
    t=table(ID, x)
    db=database(directory="dfs://hashdb", partitionType=HASH, partitionScheme=[INT, 2])
    pt = db.createPartitionedTable(t, `pt, `ID)
    pt.append!(t);
""")
re = s.run("select count(x) from pt;")
print(re)

# output
   count_x
0  1000000

方式二:使用 API 接口

在以下脚本中,首先创建一个 session 并连接到 DolphinDB 服务端,同时登录具有相应权限的账户。然后创建 schema_t 用于定义分布式表的表结构。使用 API 接口执行 session.table, session.databaseDatabase.createPartitionedTable 以创建一个内存表、数据库 db 和 分区表 pt,再通过 pt.append 方法向表中追加数据。最后使用 Table.toDF 获取表 pt 的数据。

import pandas as pd
import numpy as np
import dolphindb as ddb
import dolphindb.settings as keys

s = ddb.session("192.168.1.113", 8848, "admin", "123456")
n = 1000000
df = pd.DataFrame({
    'ID':   np.random.randint(0, 10, n),
    'x':    np.random.rand(n),
})
s.run("schema_t = table(100000:0, `ID`x,[INT, DOUBLE])")
schema_t = s.table(data="schema_t")
if s.existsDatabase("dfs://hashdb"):
    s.dropDatabase("dfs://hashdb")
db = s.database(dbPath="dfs://hashdb", partitionType=keys.HASH, partitions=[keys.DT_INT, 2])
pt: ddb.Table = db.createPartitionedTable(table=schema_t, tableName="pt", partitionColumns=["ID"])
data = s.table(data=df)
pt.append(data)
print(pt.toDF())

# output
        ID         x
0        4  0.320935
1        8  0.426056
2        8  0.505221
3        4  0.692984
4        4  0.709175
...     ..       ...
999995   5  0.479531
999996   3  0.805629
999997   5  0.873164
999998   7  0.158090
999999   5  0.530824

[1000000 rows x 2 columns]