SchemalessWriter

DolphinDB SchemalessWriter 插件( 无模式写入插件) 可以将 InfluxDB Line protocol、海康 json 格式、海康 key/value 格式的数据写入到 DolphinDB。

安装插件

版本要求

DolphinDB Server: 2.00.10 及更高版本。目前仅支持 Linux 的 X86-64 位版本。

安装步骤

  1. 在 DolphinDB 客户端中使用 listRemotePlugins 命令查看插件仓库中的插件信息。
login("admin", "123456")
listRemotePlugins()
  1. 使用 installPlugin 命令完成插件安装。
installPlugin("SchemalessWriter")
  1. 使用 loadPlugin 命令加载插件。
loadPlugin("SchemalessWriter")

接口说明

userDefinedAppend

语法

userDefinedAppend(protocol, data, [userDefinedArg1], [userDefinedArg2])

参数

protocol STRING 类型标量,表示协议类型。可以取值为 “INFLUXDB”, ”HKJSON”, ”HKLINE”。

data STRING 类型标量,表示对应协议的数据。data 将被写入到自动创建或预先创建的库表中。

userDefinedArg1 protocol 为 INFLUXDB 时不需要,其它时候需要。

userDefinedArg2 protocol 为 INFLUXDB 时不需要,其它时候需要。 下面根据第 protocol 的取值分情况讨论:

  • InfluxDB Line protocol

指定 protocol 为 "INFLUXDB"; data 为一行或多行 InfluxDB Line protocol 数据。

示例

userDefinedAppend("INFLUXDB", "measurement1,location=cn,group=system field1=123,field2=456 1680130350899000000")

以上代码创建一个以 measurement1 为库名和表名,timestamp (NANOTIMESTAMP 类型) 和两个 tag(location 和 group),metricName,metricValue 为列名的库表 , 且写入两条数据:

timestamplocationgroupmetricNamemetricValue
1680130350899000000cnsystemfield1123
1680130350899000000cnsystemfield2456

当缺少 tag 时,会自动补入空值, 当多了 tag 时,会创建新表。

  • 海康 json 格式

protocol 为 "HKJSON"; data 为 海康 json 数据格式的数据,字符串类型; userDefinedArg1 为库名 (不带 "dfs://"); userDefinedArg2 为表名。

使用该格式时,插件不会自动建库建表, 用户需要手动建库建表。

示例

建库建表:

db = database("dfs://test_hkjson", VALUE, 1..3)
dummy = table(array(INT,0) as a, array(INT,0) as b, array(INT,0) as c , array(INT,0) as d)
db.createPartitionedTable(dummy,"test",[`a])

调用 userDefinedAppend 写入数据

lines = "{\"a\":1,\"b\":2,\"Values\":[{\"c\":3,\"d\":4},{\"c\":5,\"d\":6}]}"
userDefinedAppend("HKJSON",lines, "test_hkjson", "test")
  • 海康 key/value line 格式

protocol 为 "HKLINE"; data 为 海康 key/value line 数据格式的数据,字符串类型; userDefinedArg1 为库名 (不带 "dfs://"); userDefinedArg2 为表名。

使用该数据格式,插件不会自动建库建表, 用户需要手动建库建表。 示例

建库建表

if (existsDatabase("dfs://test_hkline")){ dropDatabase("dfs://test_hkline")}
db = database("dfs://test_hkline", HASH, [STRING,3])
dummy = table(array(STRING, 0) as "__name__",
array(STRING, 0) as path, 
array(INT, 0) as count,
array(DOUBLE, 0) as temp,
array(STRING, 0) as method , 
array(STRING, 0) as instance)
db.createPartitionedTable(dummy,"test_line",["__name__"])

调用 userDefinedAppend 写入数据

hklines = array(STRING,0)
hklines.append!("{__name__=\"requests_total\", path=\"/status\", method=\"GET\", instance=\"10.0.0.1:80\"}")
hklines.append!("{__name__=\"requests_total\",path=\"/status\",method=\"POST\",instance=\"10.0.0.1:80\"}")
hklines.append!("{__name__=\"requests_total\",path=\"/status\",count=1,temp=1.2,method=\"POST\",instance=\"10.0.0.1:80\"}")
i = 0
for (hkline in hklines){
    print("i:",i)
    SchemalessWriter::HKLineWriter(hkline, "test_line", "test_line")
    i += 1
}

configWriter

语法

SchemalessWriter::configWriter(config)

详情

配置建库的参数,分区的规则在写数据的接口中配置。

参数

config 一个字典,key 和 value 都是 STRING 类型标量。key 和 value 的取值如下:

key 的取值含义value 取值
"dbEngineType"表示数据库引擎的类型可以是 "OLAP" 或者 "TSDB",默认为 “OLAP”。
"dbNamePreStr"用来区分无模式写入自动创建的库还是系统中的其他场景下创建的库库名前缀

使用示例

cfg = dict(["dbEngineType"],["OLAP"])
SchemalessWriter::configWriter(cfg)