SchemalessWriter
DolphinDB SchemalessWriter 插件( 无模式写入插件) 配合 server 的 loadSchemalessPlugin 和 userDefinedAppend 等函数可将 InfluxDB Line protocol, 海康 json 格式 ,海康 key/value 格式数据写入到 DolphinDB
函数介绍:
loadSchemalessPlugin
和 loadPlugin
参数一致, 为 PluginSchemaless.txt 的路径; 需要在调用 userDefinedAppend
之前调用
例如
loadSchemalessPlugin("/path/PluginSchemaless.txt")
userDefinedAppend
参数:
- 第 0 个参数为协议类型:可能的取值有:
INFLUXDB
或HKJSON
或HKLINE
; - 第 1 个参数为对应协议的数据,会写入到自动创建或预先创建的库表中;
- 第 2 个 和 第 3 个参数在 第 0 个参数为
INFLUXDB
时不需要,其它时候需要;
以下将根据第 0 个参数的取值分情况讨论。
InfluxDB Line protocol
- 参数 0 为 "INFLUXDB"
- 参数 1 为字符串,为一行或多行 InfluxDB Line protocol 数据 ,例如:
userDefinedAppend("INFLUXDB", "measurement1,location=cn,group=system field1=123,field2=456 1680130350899000000")
会以 measurement1 为库名,表名, timestamp (NANOTIMESTAMP 类型), 两个 tag (location 和 group) metricName,metricValue 为列名,自动建库建表 , 且写入两条数据:
timestamp | location | group | metricName | metricValue |
---|---|---|---|---|
1680130350899000000 | cn | system | field1 | 123 |
1680130350899000000 | cn | system | field2 | 456 |
当缺少 tag 时,会自动补入空值;当多了 tag 时,会创建新表。
海康 json 格式
- 参数 0 为 "HKJSON"
- 参数 1 为 海康 json 数据格式的数据,字符串类型
- 参数 2 为库名 (不带 "dfs://")
- 参数 3 为表名
此时插件不会自动建库建表, 用户需要手动建库建表, 示例:
建库建表:
db = database("dfs://test_hkjson", VALUE, 1..3)
dummy = table(array(INT,0) as a, array(INT,0) as b, array(INT,0) as c , array(INT,0) as d)
db.createPartitionedTable(dummy,"test",[`a])
调用 userDefinedAppend
写入数据:
lines = "{\"a\":1,\"b\":2,\"Values\":[{\"c\":3,\"d\":4},{\"c\":5,\"d\":6}]}"
userDefinedAppend("HKJSON",lines, "test_hkjson", "test")
海康 key/value line 格式
- 参数 0 为 "HKLINE"
- 参数 1 为 海康 key/value line 数据格式的数据,字符串类型
- 参数 2 为库名 (不带 "dfs://")
- 参数 3 为表名
此时插件不会自动建库建表, 用户需要手动建库建表,例如:
建库建表:
if (existsDatabase("dfs://test_hkline")){ dropDatabase("dfs://test_hkline")}
db = database("dfs://test_hkline", HASH, [STRING,3])
dummy = table(array(STRING, 0) as "__name__",
array(STRING, 0) as path,
array(INT, 0) as count,
array(DOUBLE, 0) as temp,
array(STRING, 0) as method ,
array(STRING, 0) as instance)
db.createPartitionedTable(dummy,"test_line",["__name__"])
调用 userDefinedAppend
写入数据:
hklines = array(STRING,0)
hklines.append!("{__name__=\"requests_total\", path=\"/status\", method=\"GET\", instance=\"10.0.0.1:80\"}")
hklines.append!("{__name__=\"requests_total\",path=\"/status\",method=\"POST\",instance=\"10.0.0.1:80\"}")
hklines.append!("{__name__=\"requests_total\",path=\"/status\",count=1,temp=1.2,method=\"POST\",instance=\"10.0.0.1:80\"}")
i = 0
for (hkline in hklines){
print("i:",i)
SchemalessWriter::HKLineWriter(hkline, "test_line", "test_line")
i += 1
}
configWriter
这个接口可以配置一些可选的参数,分区的规则在写数据的接口中配置。
- 函数功能: 配置建库的参数
- 可见:对用户可见,可以在server启动脚本中运行,加载插件后也可以在GUI中运行.
- 唯一参数:参数是一个字典,
- 可选参数key1, “dbEngineType“, value是引擎的类型,字符串,OLAP或者TSDB,默认为OLAP。
- 可选参数key2, “dbNamePreStr“, value 是库名字的前缀,用来区分无模式写入自动创建的库还是系统中的其他场景下创建的库,默认没有前缀。
返回值: 无
例如:
cfg = dict(["dbEngineType"],["OLAP"])
SchemalessWriter::configWriter(cfg)
版本更迭
- v2.00.10:新增插件