Table

在 Python API 中,可以使用 DolphinDB Python API 的原生方法来创建、使用数据库及数据表,本节将介绍如何创建数据表、使用 SQL 操作数据表。

Table 与 session.table

Python API 将 DolphinDB 服务端的数据表对象句柄,在 API 包装为 Table 类,封装实现部分功能。通常使用 session.table 或 session.loadTable 方法构造,也可以通过 loadText 等函数获得。

接口如下:

session.table(dbPath=None, data=None, tableAliasName=None, inMem=False, partitions=None)
  • dbPath:数据库路径,内存表或流数据表无需指定该参数。
  • data:数据表的数据,可以为 dict、pd.DataFrame 或 DolphinDB 服务端数据表名。
  • tableAliasName:表的别名。
  • inMem:是否加载表数据到 DolphinDB 服务端内存中。
  • partitions:将被加载到 DolphinDB 服务端内存中的分区。

上传数据为临时表

如果 data 参数传入 dict 或 pd.DataFrame,则表示上传该对象到 DolphinDB 服务端为临时表,此时无需指定 dbPath、inMem、partitions。

代码示例如下:

data1 = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [4, 5, 6],
})
t1 = s.table(data=data1)
print(t1, t1.tableName())
data2 = {
    'a': ['a', 'b', 'c'],
    'b': [1, 2, 3],
}
t2 = s.table(data=data2)
print(t2, t2.tableName())

输出结果如下:

<dolphindb.table.Table object at 0x7fbd5f02bd60> TMP_TBL_3cc57246
<dolphindb.table.Table object at 0x7fbd3205fc70> TMP_TBL_dbae4978

data1 和 data2 都作为临时表上传至服务端,对应表名分别为 TMP_TBL_3cc57246 和 TMP_TBL_dbae4978。

获取服务端数据表句柄

data 参数传入字符串,则表示获取服务端数据表句柄。

  1. 如果同时指定 dbPath 和 data,执行函数则表示从 dbPath 对应的数据库中加载表名为 data 的数据表。

    dbPath = "dfs://testTable"
    if s.existsDatabase(dbPath):
        s.dropDatabase(dbPath)
    db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="TSDB")
    s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
    schema_t = s.table(data="schema_t")
    db.createTable(schema_t, "pt", ["csymbol"])
    pt = s.table(dbPath=dbPath, data="pt")
    print(pt, pt.tableName())
    print(pt.toDF())

    输出结果如下:

    <dolphindb.table.Table object at 0x7f5036bcd040> pt_TMP_TBL_5229a3cc
    Empty DataFrame
    Columns: [ctime, csymbol, price, qty]
    Index: []
  2. 如果仅指定 data,执行函数则表示获取名为 data 的内存表句柄。

    s.run("test_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
    t = s.table(data="test_t")
    print(t, t.tableName())
    print(t.toDF())

    输出结果如下:

    <dolphindb.table.Table object at 0x7f11ffb3c070> test_t
    Empty DataFrame
    Columns: [ctime, csymbol, price, qty]
    Index: []

tableAliasName

如果指定该参数,则加载表时不会使用随机表名作为句柄名称。

  1. 上传本地变量至服务端时指定该参数。

    data1 = pd.DataFrame({
        'a': [1, 2, 3],
        'b': [4, 5, 6],
    })
    t1 = s.table(data=data1, tableAliasName="data1")
    print(t1, t1.tableName())
    data2 = {
        'a': ['a', 'b', 'c'],
        'b': [1, 2, 3],
    }
    t2 = s.table(data=data2, tableAliasName="data2")
    print(t2, t2.tableName())

    输出结果如下:

    <dolphindb.table.Table object at 0x7f167ecb69d0> data1
    <dolphindb.table.Table object at 0x7f1651d0bc40> data2
  2. 获取 DolphinDB 服务端数据库中数据表时指定该参数。

    dbPath = "dfs://testTable"
    if s.existsDatabase(dbPath):
        s.dropDatabase(dbPath)
    db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="TSDB")
    s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
    schema_t = s.table(data="schema_t")
    db.createTable(schema_t, "pt", ["csymbol"])
    pt = s.table(dbPath=dbPath, data="pt", tableAliasName="tmp_pt")
    print(pt, pt.tableName())
    print(pt.toDF())

    输出结果如下:

    <dolphindb.table.Table object at 0x7f3350edc040> tmp_pt
    Empty DataFrame
    Columns: [ctime, csymbol, price, qty]
    Index: []
  3. 获取 DolphinDB 服务端内存表句柄时指定该参数。

    s.run("test_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
    t = s.table(data="test_t", tableAliasName="test_t2")
    print(t, t.tableName())
    print(t.toDF())

    输出结果如下:

    <dolphindb.table.Table object at 0x7f9fb55b4070> test_t
    Empty DataFrame
    Columns: [ctime, csymbol, price, qty]
    Index: []

    从上述例子中可以看出,如果加载分区表或上传本地数据时,可以通过指定别名来避免使用临时表名;如果直接使用表名加载数据表句柄,则该参数无效。

inMem, partitions

该参数仅在加载磁盘数据库中的数据表时有效,参数详细使用方式请参考 loadTable

session.loadTable

该函数和 session.table 方法类似,返回值为 Table,但无法上传本地数据,仅能获取 DolphinDB 服务端数据表句柄。

session.loadTable(tableName, dbPath=None, partitions=None, memoryMode=False)
  • tableName:内存表名或数据库中数据表表名。
  • dbPath:数据库路径。
  • partitions:将被加载到 DolphinDB 服务端内存中的分区。
  • memoryMode:是否加载表数据到 DolphinDB 服务端内存中。

加载内存表

代码示例如下:

s.run("test_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
t = s.loadTable("test_t")
print(t, t.tableName())
print(t.toDF())

输出结果如下:

<dolphindb.table.Table object at 0x7fd1c90a4c10> test_t
Empty DataFrame
Columns: [ctime, csymbol, price, qty]
Index: []

加载数据库表

代码示例如下:

dbPath = "dfs://testTable"
if s.existsDatabase(dbPath):
    s.dropDatabase(dbPath)
db = s.database(partitionType=keys.VALUE, partitions=[1, 2, 3], dbPath=dbPath, engine="TSDB")
s.run("schema_t = table(100:0, `ctime`csymbol`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT])")
schema_t = s.table(data="schema_t")
db.createTable(schema_t, "pt", ["csymbol"])
pt = s.loadTable("pt", dbPath=dbPath)
print(pt, pt.tableName())
print(pt.toDF())

输出结果如下:

<dolphindb.table.Table object at 0x7fdaf7885eb0> pt_TMP_TBL_0dfdc80a
Empty DataFrame
Columns: [ctime, csymbol, price, qty]
Index: []

上传的数据表的生命周期

tableloadTable 方法返回一个 Python 本地变量,如果上传一个本地数据对象到服务端,且未指定别名,则使用随机表名作为该变量的句柄名。下例中,上传本地 data 对象到服务端,对应的 Python 本地变量为 t。服务端表名可以通过 Table.tableName 方法获取。

data = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [4, 5, 6],
})
t = s.table(data=data)
print(t.tableName())

数据结果如下:

TMP_TBL_e03723c9

其中 TMP_TBL_ 开头,表示该句柄为临时表的句柄,会随着 Python 端 Table 对象的析构而析构。

此时释放 DolphinDB 服务端对象有三种方法:

  1. undef 方法

    s.undef(t.tableName(), "VAR")
  2. 将服务端对象置空

    s.run(f"{t.tableName()}=NULL")
  3. 析构本地变量以取消本地对象对服务端对象的引用

    del t

注意: 如果在获取句柄时指定别名,或者获取服务端已存在的非临时表句柄,则 Python 端对象的析构并不会影响服务端数据。

如果一个表对象只是一次性使用,建议不使用上传机制,可以直接通过函数调用来完成,将表对象作为函数的一个参数。函数调用不会缓存数据。函数调用结束后,所有数据都释放,而且只有一次网络传输,降低网络延迟。

表操作

通过 session.table/session.loadTable 方法获得的对象通常为 Table,API 提供一系列方法用于处理 Table,使得能够在 API 端对表进行操作。例如,调用 select、where等方法执行一次查询;调用 update、delete 等方法更新数据……

需要注意的是,调用 update/delete 方法返回的对象并非 Table 类,而是对使用者透明的 TableUpdate/TableDelete 对象,因此,在调用这些方法后,需要执行 execute 方法才能将表数据的更新、修改同步至服务器。详情请参考对应函数的描述以及示例。

本节将用到 example.csv 文件。

获取表属性

提供 rows, cols, colNames 和 schema 方法可获取表属性。

调用 rows/cols 属性可以获取当前表的行数或列数。

>>> s.run("t = table(1..5 as a, 2..6 as b)    ")
>>> t = s.table(data="t")
>>> t.rows
5
>>> t.cols
2

调用 colNames 属性可以获取当前表的列名。

>>> t.colNames
['a', 'b']

调用 schema 属性返回一个 pd.DataFrame,表示当前表的结构(返回结果与服务端函数 schema 的结果中的 colDefs 属性一致)。

>>> t.schema
  name typeString  typeInt  extra comment
0    a        INT        4    NaN        
1    b        INT        4    NaN   

select

该方法类似 SQL 语句中的 select 子句,用于选取部分列。下例中使用 toDF 方法获取表对应的 pd.DataFrame 对象,详细使用请参考本文 4.6 toDF 方法。

  1. 使用一系列的列名作为输入内容。

    >>> trade=s.loadText(WORK_DIR+"/example.csv")
    >>> trade.select(["ticker", "date"]).toDF()
          ticker       date
    0       AMZN 1997-05-15
    1       AMZN 1997-05-16
    2       AMZN 1997-05-19
    3       AMZN 1997-05-20
    4       AMZN 1997-05-21
    ...
  2. 使用字符串作为输入内容。

    >>> trade.select("ticker, date, bid").toDF()
          ticker       date        bid
    0       AMZN 1997-05-15   23.50000
    1       AMZN 1997-05-16   20.50000
    2       AMZN 1997-05-19   20.50000
    3       AMZN 1997-05-20   19.62500
    4       AMZN 1997-05-21   17.12500
    ...

update

使用 update 可以更新表,其中 cols 表示待更新的列,vals 表示待更新的值。需要注意,必须和 execute 一起使用才能将 Python 端的修改同步至服务端。

从下述示例中可以看出,只有执行 execute 后才会将 update 更新的数据同步至服务器。

>>> trade = s.loadText(WORK_DIR+"/example.csv")
>>> t1 = trade.update(["VOL"],["999999"]).where("TICKER=`AMZN").where(["date=2015.12.16"])
>>> t2 = trade.where("ticker=`AMZN").where("date=2015.12.16")
>>> t2.toDF()
  TICKER       date      VOL        PRC        BID        ASK
0   AMZN 2015-12-16  3964470  675.77002  675.76001  675.83002
>>> t1 = trade.update(["VOL"],["999999"]).where("TICKER=`AMZN").where(["date=2015.12.16"]).execute()
>>> t2.toDF()
  TICKER       date     VOL        PRC        BID        ASK
0   AMZN 2015-12-16  999999  675.77002  675.76001  675.83002

delete

使用 delete 可以删除表中的记录,但必须和 execute 一起使用才能将 Python 端的修改同步至服务端。

>>> trade = s.loadText(WORK_DIR+"/example.csv")
>>> trade.rows
13136
>>> t = trade.delete().where('date<2013.01.01')
>>> trade.rows
13136
>>> t = trade.delete().where('date<2013.01.01').execute()
>>> trade.rows
3024

groupby

groupby 表示根据一定规则进行分组,该方法和 服务端 函数 groupby 一致,调用后,在SQL 中添加 groupby 子句,接收参数为字符串列表或者单个字符串,表示需要分组的列。如果后续追加 having 子句,having子句需包含聚合函数,表示为符合聚合函数条件的每组产生一条记录。

注: groupby 后面需要使用聚合函数,如 count, sum, aggagg2 等。

准备数据库的脚本如下:

>>> dbPath = "dfs://valuedb"
>>> if s.existsDatabase(dbPath):
...     s.dropDatabase(dbPath)
>>> s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX","NVDA"], dbPath=dbPath)
>>> trade = s.loadTextEx(dbPath=dbPath, partitionColumns=["TICKER"], tableName='trade', remoteFilePath=WORK_DIR+"/example.csv")

单独使用 groupby,分别计算每个股票的 vol 总和与 prc 总和:

>>> trade.select(['sum(vol)','sum(prc)']).groupby(['ticker']).toDF()
  ticker      sum_vol       sum_prc
0   AMZN  33706396492  772503.81377
1   NFLX  14928048887  421568.81674
2   NVDA  46879603806  127139.51092

同时使用 groupbyhaving

>>> trade.select('count(ask)').groupby(['vol']).having('count(ask)>1').toDF()
       vol  count_ask
0   579392          2
1  3683504          2
2  5732076          2
3  6299736          2
4  6438038          2
5  6946976          2
6  8160197          2
7  8924303          2

contextby

contextby 是 DolphinDB 独有的功能,是对标准 SQL 语句的拓展。使用 context by 子句可以简化对时间序列数据的操作。contextbygroupby 相似,区别在于 groupby 为每个组返回一个标量,但是 contextby 为每个组返回一个向量,且向量的长度与该组的行数相同。详细使用方式请参考 context by

与 having 子句连用时,如果只与聚合函数一起使用,则结果是符合聚合函数条件的分组,每组记录与输入数据中记录数一致;如果与非聚合函数一起使用,结果是符合指定条件的分组。

>>> trade.contextby('ticker').top(3).toDF()
  TICKER       date      VOL      PRC      BID      ASK
0   AMZN 1997-05-15  6029815  23.5000  23.5000  23.6250
1   AMZN 1997-05-16  1232226  20.7500  20.5000  21.0000
2   AMZN 1997-05-19   512070  20.5000  20.5000  20.6250
3   NFLX 2002-05-23  7507079  16.7500  16.7500  16.8500
4   NFLX 2002-05-24   797783  16.9400  16.9400  16.9500
5   NFLX 2002-05-28   474866  16.2000  16.2000  16.3700
6   NVDA 1999-01-22  5702636  19.6875  19.6250  19.6875
7   NVDA 1999-01-25  1074571  21.7500  21.7500  21.8750
8   NVDA 1999-01-26   719199  20.0625  20.0625  20.1250
>>> trade.select("TICKER, month(date) as month, cumsum(VOL)").contextby("TICKER,month(date)").toDF()
      TICKER      month  cumsum_VOL
0       AMZN 1997-05-01     6029815
1       AMZN 1997-05-01     7262041
2       AMZN 1997-05-01     7774111
3       AMZN 1997-05-01     8230468
4       AMZN 1997-05-01     9807882
...

contextbyhaving 一起使用:

>>> trade.contextby('ticker').having("sum(VOL)>40000000000").toDF()
     TICKER       date       VOL       PRC       BID       ASK
0      NVDA 1999-01-22   5702636   19.6875   19.6250   19.6875
1      NVDA 1999-01-25   1074571   21.7500   21.7500   21.8750
2      NVDA 1999-01-26    719199   20.0625   20.0625   20.1250
3      NVDA 1999-01-27    510637   20.0000   19.8750   20.0000
4      NVDA 1999-01-28    476094   19.9375   19.8750   20.0000
...

pivotby

pivotby 是 DolphinDB 的独有功能,是对标准 SQL 语句的拓展。它将表中一列或多列的内容按照两个维度重新排列,亦可配合数据转换函数使用。详细使用请参考 pivotby

pivotbyselect 子句一起使用时返回一个表。

>>> trade = s.table("dfs://valuedb", "trade")
>>> t1 = trade.select("VOL").pivotby("TICKER", "date")
>>> t1.toDF()
  TICKER  1997.05.15  1997.05.16  ...  2016.12.28  2016.12.29  2016.12.30
0   AMZN   6029815.0   1232226.0  ...     3301025     3158299     4139451
1   NFLX         NaN         NaN  ...     4388956     3444729     4455012
2   NVDA         NaN         NaN  ...    57384116    54384676    30323259

pivotbyexec 语句一起使用时返回一个 DolphinDB 的矩阵对象。

>>> trade.exec("VOL").pivotby("TICKER", "date").toDF()
[array([[ 6029815.,  1232226.,   512070., ...,  3301025.,  3158299.,
         4139451.],
       [      nan,       nan,       nan, ...,  4388956.,  3444729.,
         4455012.],
       [      nan,       nan,       nan, ..., 57384116., 54384676.,
        30323259.]]), array(['AMZN', 'NFLX', 'NVDA'], dtype=object), array(['1997-05-15T00:00:00.000000000', '1997-05-16T00:00:00.000000000',
       '1997-05-19T00:00:00.000000000', ...,
       '2016-12-28T00:00:00.000000000', '2016-12-29T00:00:00.000000000',
       '2016-12-30T00:00:00.000000000'], dtype='datetime64[ns]')]

sort, csort

可使用 csort 关键字排序。

>>> trade = s.loadTable("trade", "dfs://valuedb")
>>> trade.contextby('ticker').csort('date desc').toDF()
      TICKER       date      VOL        PRC        BID        ASK
0       AMZN 2016-12-30  4139451  749.87000  750.02002  750.40002
1       AMZN 2016-12-29  3158299  765.15002  764.66998  765.15997
2       AMZN 2016-12-28  3301025  772.13000  771.92999  772.15997
3       AMZN 2016-12-27  2638725  771.40002  771.40002  771.76001
4       AMZN 2016-12-23  1981616  760.59003  760.33002  760.59003
...

除了在排序函数 sortcsort 中指定 asc 和 desc 关键字来决定排序顺序外,也可以通过传参的方式实现。

sort(by, ascending=True)
csort(by, ascending=True)

参数 ascending 表示是否进行升序排序,默认值为 True。可以通过传入一个 list 来定义多列的不同排序方式。如以下脚本:

>>> trade.select("*").contextby('ticker').csort(["TICKER", "VOL"], True).limit(5).toDF()
   TICKER       date    VOL      PRC      BID     ASK
0    AMZN 1997-12-26  40721  54.2500  53.8750  54.625
1    AMZN 1997-08-12  47939  26.3750  26.3750  26.750
2    AMZN 1997-07-21  48325  26.1875  26.1250  26.250
3    AMZN 1997-08-13  49690  26.3750  26.0000  26.625
4    AMZN 1997-06-02  49764  18.1250  18.1250  18.375
5    NFLX 2002-09-05  20725  12.8500  12.8500  12.950
6    NFLX 2002-11-11  26824   8.4100   8.3000   8.400
7    NFLX 2002-09-04  27319  13.0000  12.8200  13.000
8    NFLX 2002-06-10  35421  16.1910  16.1900  16.300
9    NFLX 2002-09-06  54951  12.8000  12.7900  12.800
10   NVDA 1999-05-10  41250  17.5000  17.5000  17.750
11   NVDA 1999-05-07  52310  17.5000  17.3750  17.625
12   NVDA 1999-05-14  59807  18.0000  17.7500  18.000
13   NVDA 1999-04-01  63997  20.5000  20.1875  20.500
14   NVDA 1999-04-19  65940  19.0000  19.0000  19.125
>>> trade.select("*").contextby('ticker').csort(["TICKER", "VOL"], [True, False]).limit(5).toDF()
   TICKER       date        VOL       PRC     BID       ASK
0    AMZN 2007-04-25  104463043   56.8100   56.80   56.8100
1    AMZN 1999-09-29   80380734   80.7500   80.75   80.8125
2    AMZN 2006-07-26   76996899   26.2600   26.17   26.1800
3    AMZN 2007-04-26   62451660   62.7810   62.77   62.8300
4    AMZN 2005-02-03   60580703   35.7500   35.74   35.7300
5    NFLX 2015-07-16   63461015  115.8100  115.85  115.8600
6    NFLX 2015-08-24   59952448   96.8800   96.85   96.8800
7    NFLX 2016-04-19   55728765   94.3400   94.30   94.3100
8    NFLX 2016-07-19   55685209   85.8400   85.81   85.8300
9    NFLX 2016-01-20   53009419  107.7400  107.73  107.7800
10   NVDA 2011-01-06   87693472   19.3300   19.33   19.3400
11   NVDA 2011-02-17   87117555   25.6800   25.68   25.7000
12   NVDA 2011-01-12   86197484   23.3525   23.34   23.3600
13   NVDA 2011-08-12   80488616   12.8800   12.86   12.8700
14   NVDA 2003-05-09   77604776   21.3700   21.39   21.3700

top, limit

top 用于取表中的前 n 条记录。详细使用请参考 top

>>> trade = s.table("dfs://valuedb", "trade")
>>> trade.top(5).toDF()
  TICKER       date      VOL     PRC     BID     ASK
0   AMZN 1997-05-15  6029815  23.500  23.500  23.625
1   AMZN 1997-05-16  1232226  20.750  20.500  21.000
2   AMZN 1997-05-19   512070  20.500  20.500  20.625
3   AMZN 1997-05-20   456357  19.625  19.625  19.750
4   AMZN 1997-05-21  1577414  17.125  17.125  17.250

limit 子句和 top 子句功能类似。两者的区别在于:

  • top 子句中的整型常量不能为负数。在与 context by 子句一同使用时,limit 子句的标量值可以为负整数,返回每个组最后指定数目的记录。其他情况 limit 子句标量值为非负整数。
  • 可使用 limit 子句从某行开始选择一定数量的行。

limit 的详细使用请参考 limit

>>> trade.select("*").contextby('ticker').limit(-2).toDF()
  TICKER       date       VOL        PRC        BID        ASK
0   AMZN 2016-12-29   3158299  765.15002  764.66998  765.15997
1   AMZN 2016-12-30   4139451  749.87000  750.02002  750.40002
2   NFLX 2016-12-29   3444729  125.33000  125.31000  125.33000
3   NFLX 2016-12-30   4455012  123.80000  123.80000  123.83000
4   NVDA 2016-12-29  54384676  111.43000  111.26000  111.42000
5   NVDA 2016-12-30  30323259  106.74000  106.73000  106.75000
>>> trade.select("*").limit([2, 5]).toDF()
  TICKER       date      VOL     PRC     BID     ASK
0   AMZN 1997-05-19   512070  20.500  20.500  20.625
1   AMZN 1997-05-20   456357  19.625  19.625  19.750
2   AMZN 1997-05-21  1577414  17.125  17.125  17.250
3   AMZN 1997-05-22   983855  16.750  16.625  16.750
4   AMZN 1997-05-23  1330026  18.000  18.000  18.125

merge, merge_asof, merge_window 和 merge_cross

merge 用于内部连接、左连接、左半连接和外部连接,merge_asof 为 asof join,merge_window 为窗口连接。

merge

merge 的参数包括两个索引列和一个字符串参数。如果两个连接列的名称相同,则使用 on 参数指定连接列;如果两个连接列的名称不同,使用 left_onright_on 参数指定连接列。可选参数 how 表示表连接的类型,默认的连接类型为内部连接。

  1. 当连接列名称相同时,使用示例如下:

    >>> trade = s.table("dfs://valuedb", "trade")
    >>> t1 = s.table(data={
    ...     'TICKER': ['AMZN', 'AMZN', 'AMZN'], 
    ...     'date': np.array(['2015-12-31', '2015-12-30', '2015-12-29'], dtype='datetime64[D]'), 
    ...     'open': [695, 685, 674],
    ... })
    ...
    >>> t1 = t1.select("TICKER, date(date) as date, open")
    >>> trade.merge(t1,on=["TICKER","date"]).toDF()
      TICKER       date      VOL        PRC        BID        ASK  open
    0   AMZN 2015-12-29  5734996  693.96997  693.96997  694.20001   674
    1   AMZN 2015-12-30  3519303  689.07001  689.07001  689.09998   685
    2   AMZN 2015-12-31  3749860  675.89001  675.85999  675.94000   695
  2. 当连接列名称不相同时,需要指定 left_on 参数和 right_on 参数:

    >>> trade = s.table("dfs://valuedb", "trade")
    >>> t1 = s.table(data={
    ...     'TICKER': ['AMZN', 'AMZN', 'AMZN'], 
    ...     'date': np.array(['2015-12-31', '2015-12-30', '2015-12-29'], dtype='datetime64[D]'), 
    ...     'open': [695, 685, 674],
    ... })
    ...
    >>> t1 = t1.select("TICKER as TICKER1, date(date) as date1, open")
    >>> trade.merge(t1, left_on=["TICKER","date"], right_on=["TICKER1", "date1"]).toDF()
      TICKER       date      VOL        PRC        BID        ASK  open
    0   AMZN 2015-12-29  5734996  693.96997  693.96997  694.20001   674
    1   AMZN 2015-12-30  3519303  689.07001  689.07001  689.09998   685
    2   AMZN 2015-12-31  3749860  675.89001  675.85999  675.94000   695
  3. 当要选择左连接时,把 how 参数设置为 left

    >>> trade = s.table("dfs://valuedb", "trade")
    >>> t1 = s.table(data={
    ...     'TICKER': ['AMZN', 'AMZN', 'AMZN'], 
    ...     'date': np.array(['2015-12-31', '2015-12-30', '2015-12-29'], dtype='datetime64[D]'), 
    ...     'open': [695, 685, 674],
    ... })
    ...
    >>> t1 = t1.select("TICKER, date(date) as date, open")
    >>> trade.merge(t1,how="left", on=["TICKER","date"]).where('TICKER=`AMZN').where('2015.12.23<=date<=2015.12.31').toDF()
      TICKER       date      VOL        PRC        BID        ASK   open
    0   AMZN 2015-12-23  2722922  663.70001  663.48999  663.71002    NaN
    1   AMZN 2015-12-24  1092980  662.78998  662.56000  662.79999    NaN
    2   AMZN 2015-12-28  3783555  675.20001  675.00000  675.21002    NaN
    3   AMZN 2015-12-29  5734996  693.96997  693.96997  694.20001  674.0
    4   AMZN 2015-12-30  3519303  689.07001  689.07001  689.09998  685.0
    5   AMZN 2015-12-31  3749860  675.89001  675.85999  675.94000  695.0
  4. 当要选择外部连接时,把 how 参数设置为 outer

    >>> t1 = s.table(data={'TICKER': ['AMZN', 'AMZN', 'NFLX'], 'date': ['2015.12.29', '2015.12.30', '2015.12.31'], 'open': [674, 685, 942]})
    >>> t2 = s.table(data={'TICKER': ['AMZN', 'NFLX', 'NFLX'], 'date': ['2015.12.29', '2015.12.30', '2015.12.31'], 'close': [690, 936, 951]})
    >>> t1.merge(t2, how="outer", on=["TICKER","date"]).toDF()
      TICKER        date   open tmp_TICKER    tmp_date  close
    0   AMZN  2015.12.29  674.0       AMZN  2015.12.29  690.0
    1   AMZN  2015.12.30  685.0                           NaN
    2   NFLX  2015.12.31  942.0       NFLX  2015.12.31  951.0
    3                       NaN       NFLX  2015.12.30  936.0

注意: 分区表只能与分区表进行外部连接,内存表只能与内存表进行外部连接。

merge_asof

merge_asof 对应 DolphinDB 中的 asof join (aj)。asof join 与 left join 非常相似,主要有以下区别:

  1. asof join 的最后一个连接列通常是时间类型。对于左表中某行的时间 t,在右表最后一个连接列之外的其它连接列一致的记录中,如果右表没有与 t 对应的时间,asof join 会取右表中 t 之前的最近时间对应的记录;如果有多个相同的时间,会取最后一个时间对应的记录。

  2. 如果只有一个连接列,右表必须按照连接列排好序。如果有多个连接列,右表必须在其它连接列决定的每个组内根据最后一个连接列排好序。如果右表不满足这些条件,计算结果将会不符合预期。右表不需要按照其他连接列排序,左表不需要排序。

本节与下节的例子使用了 trades.csvquotes.csv,它们含有 NYSE 网站下载的 AAPL 和 FB 的 2016 年 10 月 24 日的交易与报价数据。

>>> dbPath = "dfs://tickDB"
>>> if s.existsDatabase(dbPath):
...     s.dropDatabase(dbPath)
... 
>>> s.database(partitionType=keys.VALUE, partitions=["AAPL","FB"], dbPath=dbPath)
>>> trades = s.loadTextEx(dbPath, tableName='trades', partitionColumns=["Symbol"], remoteFilePath=WORK_DIR+"/trades.csv")
>>> quotes = s.loadTextEx(dbPath, tableName='quotes', partitionColumns=["Symbol"], remoteFilePath=WORK_DIR+"/quotes.csv")
>>> trades.top(5).toDF()
                        Time  Exchange  Symbol  Trade_Volume  Trade_Price
0 1970-01-01 08:00:00.022239        75    AAPL           300        27.00
1 1970-01-01 08:00:00.022287        75    AAPL           500        27.25
2 1970-01-01 08:00:00.022317        75    AAPL           335        27.26
3 1970-01-01 08:00:00.022341        75    AAPL           100        27.27
4 1970-01-01 08:00:00.022368        75    AAPL            31        27.40
>>> quotes.where("second(Time)>=09:29:59").top(5).toDF()
                         Time  Exchange  Symbol  Bid_Price  Bid_Size  Offer_Price  Offer_Size
0  1970-01-01 09:30:00.005868        90    AAPL      26.89         1        27.10           6
1  1970-01-01 09:30:00.011058        90    AAPL      26.89        11        27.10           6
2  1970-01-01 09:30:00.031523        90    AAPL      26.89        13        27.10           6
3  1970-01-01 09:30:00.284623        80    AAPL      26.89         8        26.98           8
4  1970-01-01 09:30:00.454066        80    AAPL      26.89         8        26.98           1
>>> trades.merge_asof(quotes,on=["Symbol","Time"]).select(["Symbol","Time","Trade_Volume","Trade_Price","Bid_Price", "Bid_Size","Offer_Price", "Offer_Size"]).top(5).toDF()
  Symbol                        Time          Trade_Volume  Trade_Price  Bid_Price  Bid_Size  \
0   AAPL  1970-01-01 08:00:00.022239                   300        27.00       26.9         1
1   AAPL  1970-01-01 08:00:00.022287                   500        27.25       26.9         1
2   AAPL  1970-01-01 08:00:00.022317                   335        27.26       26.9         1
3   AAPL  1970-01-01 08:00:00.022341                   100        27.27       26.9         1
4   AAPL  1970-01-01 08:00:00.022368                    31        27.40       26.9         1

  Offer_Price   Offer_Size
0       27.49           10
1       27.49           10
2       27.49           10
3       27.49           10
4       27.49           10

merge_window

merge_window 对应 DolphinDB 中的 window join(wj),它是 asof join 的扩展。leftBound 参数和 rightBound 参数用于指定窗口的边界 w1 和 w2,对左表中最后一个连接列对应的时间为 t 的记录,在右表中选择 (t+w1) 到 (t+w2) 的时间并且其他连接列匹配的记录,然后对这些记录使用指定的聚合函数。

window join 和 prevailing window join 的唯一区别是,如果右表中没有与窗口左边界时间(即 t+w1)匹配的值,prevailing window join 会选择右表中 (t+w1) 之前的最近时间的记录作为 t+w1 时的记录。如果要使用 prevailing window join,需将 prevailing 参数设置为 True。

>>> trades.merge_window(quotes, -5000000000, 0, aggFunctions=["avg(Bid_Price)","avg(Offer_Price)"], on=["Symbol","Time"]).where("Time>=07:59:59").top(10).toDF()
 Time                          Exchange Symbol  Trade_Volume  Trade_Price  avg_Bid_Price  avg_Offer_Price
0 1970-01-01 08:00:00.022239        75   AAPL           300        27.00          26.90            27.49
1 1970-01-01 08:00:00.022287        75   AAPL           500        27.25          26.90            27.49
2 1970-01-01 08:00:00.022317        75   AAPL           335        27.26          26.90            27.49
3 1970-01-01 08:00:00.022341        75   AAPL           100        27.27          26.90            27.49
4 1970-01-01 08:00:00.022368        75   AAPL            31        27.40          26.90            27.49
5 1970-01-01 08:00:02.668076        68   AAPL          2434        27.42          26.75            27.36
6 1970-01-01 08:02:20.116025        68   AAPL            66        27.00            NaN              NaN
7 1970-01-01 08:06:31.149930        75   AAPL           100        27.25            NaN              NaN
8 1970-01-01 08:06:32.826399        75   AAPL           100        27.25            NaN              NaN
9 1970-01-01 08:06:33.168833        75   AAPL            74        27.25            NaN              NaN

[10 rows x 6 columns]

merge_cross

merge_cross 对应 DolphinDB 中的 cross join。交叉连接函数返回两张表的笛卡尔积的结果集。如果左表有 n 行,右表有 m 行,那么笛卡尔积结果集包含 n*m 行。

>>> s.run("""
...     t1 = table(2010 2011 2012 as year);
...     t2 = table(`IBM`C`AAPL as Ticker);
... """)
...
>>> t1 = s.table(data="t1")
>>> t2 = s.table(data="t2")
>>> t1.merge_cross(t2).toDF()
   year Ticker
0  2010    IBM
1  2010      C
2  2010   AAPL
3  2011    IBM
4  2011      C
5  2011   AAPL
6  2012    IBM
7  2012      C
8  2012   AAPL

rename

调用该函数可以给表重新设置名称。

>>> t.tableName()
t
>>> t.rename("xx")
>>> t.tableName()
xx

注意: 如果给临时表重新设置名称,会导致临时表无法被及时析构,导致内存泄漏。

drop

调用该函数可以删除表的指定元素。

>>> trade = s.loadText(WORK_DIR+"/example.csv")
>>> trade.colNames
['TICKER', 'date', 'VOL', 'PRC', 'BID', 'ASK']
>>> t1 = trade.drop(['ask', 'bid'])
>>> t1.colNames
['TICKER', 'date', 'VOL', 'PRC']

exec

由于使用 select 子句总是生成一张表,即使只选择一列数据亦是如此。若需要生成一个标量或者一个向量,建议使用 exec 子句。

exec 只选择一列时将生成一个 DolphinDB 的向量。在 Python 中使用 toDF() 加载该对象,可以打印出一个 np.ndarray 对象:

>>> trade = s.loadText(WORK_DIR+"/example.csv")
>>> trade.exec("ticker").toDF()
['AMZN' 'AMZN' 'AMZN' ... 'NFLX' 'NFLX' 'NFLX']

如果 exec 语句选择了多列,则结果和 select 语句一致,将生成一个 DolphinDB 的 table 类型。在 Python 中使用 toDF() 加载该对象,可以打印出一个 pd.DataFrame 对象:

>>> trade.exec(["ticker", "date", "bid"]).toDF()
      ticker       date        bid
0       AMZN 1997-05-15   23.50000
1       AMZN 1997-05-16   20.50000
2       AMZN 1997-05-19   20.50000
3       AMZN 1997-05-20   19.62500
4       AMZN 1997-05-21   17.12500
...

where

where 子句用于过滤数据。

  1. 多个过滤条件

    where 子句在使用时,如果指定了多个过滤条件,则各个条件之间默认使用 and 连接;如果需要以 or 来连接过滤条件,则需要将 or 连接的条件作为一个条件传入 where 方法。

    注: 1.30.21.2 及之前版本的 API,在添加多个 where 子句时,不会使用括号保证运算优先级,因此,使用时需在筛选条件外添加括号,以保证运算正确性。

    >>> trade = s.loadText(WORK_DIR+"/example.csv")
    >>> t1 = trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000')
    >>> t1.toDF()
              date        bid        ask        prc       vol
    0   1998-09-01   79.93750   80.25000   79.95313  11321844
    1   1998-11-17  148.68750  149.00000  148.50000  10279448
    2   1998-11-20  179.62500  179.75000  180.62500  11314228
    3   1998-11-23  217.75000  217.81250  218.00000  11559042
    4   1998-11-24  214.25000  214.62500  214.50000  13820992
    ...
    >>> t1.rows
    765
    >>> t1.showSQL()
    select date,bid,ask,prc,vol from TMP_TBL_2744917d where (TICKER=`AMZN) and (bid!=NULL) and (ask!=NULL) and (vol>10000000)
  2. 输入内容为字符串

    where 的输入内容可以是包含多个条件的字符串。

    >>> t1 = trade.select("ticker, date, vol").where("bid!=NULL, ask!=NULL, vol>50000000")
    >>> t1.toDF()
      ticker       date        vol
    0    AMZN 1999-09-29   80380734
    1    AMZN 2000-06-23   52221978
    2    AMZN 2001-11-26   51543686
    3    AMZN 2002-01-22   57235489
    4    AMZN 2005-02-03   60580703
    ...
    >>> t1.rows
    41

execute

  1. Table 类中,该方法表示以 exec 方式选取传入列的数据,传入参数 expr 为字符串或字符串列表,表示选取的列。

    >>> trade = s.loadText(WORK_DIR+"/example.csv")
    >>> trade.execute("TICKER")
    ['AMZN' 'AMZN' 'AMZN' ... 'NFLX' 'NFLX' 'NFLX']
    >>> trade.execute(["VOL", "PRC"])
               VOL        PRC
    0      6029815   23.50000
    1      1232226   20.75000
    2       512070   20.50000
    3       456357   19.62500
    4      1577414   17.12500
    ...        ...        ...
    13131  2010120  125.59000
    13132  5287520  128.35001
    13133  4388956  125.89000
    13134  3444729  125.33000
    13135  4455012  123.80000
    
    [13136 rows x 2 columns]
  2. 如果对 Table 类对象执行 update/delete 操作后,必须执行 execute 才能确保在服务端正确执行,此时无需传入参数,也不会以 exec 的形式执行,仅将修改语句提交至服务端执行,并返回一个 Table 对象作为结果。

executeAs

executeAs 可以把结果保存为 server 端的表对象,表名由参数 newTableName 指定,执行后将返回一个 Table 对象管理新创建的表。

注意: 通过执行该方法创建的表,其生存周期不受 Python 端控制,而是与 session 的生存周期保持一致。

>>> trade = s.loadText(WORK_DIR+"/example.csv")
>>> t1 = trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000').executeAs("AMZN")
>>> t1.tableName()
AMZN

showSQL

可以使用 showSQL 展示 SQL 语句。

>>> trade=s.loadText(WORK_DIR+"/example.csv")
>>> trade.select(["ticker", "date"]).showSQL()
select ticker,date from TMP_TBL_fb11c541

toDF, toList

toDF 和 toList 功能类似,都是执行缓存在 Python 端的 SQL 语句(可通过 showSQL 方法获取),并返回执行结果。两者的区别在于,toDF 与 session.run(sql) 一致,toList 则与 session.run(sql, pickleTableToList=True) 行为一致。session 在构造时,如果指定 protocol=PROTOCOL_PICKLE/PROTOCOL_DDB,则 toDF 将返回一个 pd.DataFrame,toList 返回一个由 np.ndarray 构成的 list,每个 np.ndarray 表示表中的一列。详细区别请参考章节 PROTOCOL_DDBPROTOCOL_PICKLE