运行 run/tryRun
下文将介绍执行脚本或函数的 run
方法及其参数和使用示例。
以下为使用 run
的完整语法。
public Entity run(String script, ProgressListener listener, int priority, int parallelism, int fetchSize, boolean clearSessionMemory, String tableName, boolean enableSeqNo)
参数介绍
script String 类型,表示要执行的脚本。
listener 表示脚本执行期间可使用的监听器,可选参数,默认值为 null。在脚本执行时 server 端会返回用户打印的信息,一般是用户使用 print 语句输出的结果,此时可以通过 listener 接收。
priority int 类型,表示此次执行脚本的优先级,可选参数,默认值为 4 。priority 取值在 0-9 之间,其值越大表示优先级越高。
parallelism int 类型,表示并行度,即在一个数据节点上,最多同时可以用多少个线程来执行该作业产生的子任务。
- 若使用 3.00.1.2 之前的版本,该参数的默认值是 2。
- 若使用 3.00.1.2 及之后的版本,该参数的默认值是 64。注意:若用户在 server 中通过 setMaxJobParallelism 也设置了任务并行度,将取 parallelism (API 端的并行度)和 MaxJobParallelism (server 端的并行度)的最小值作为任务的并行度。
fetchSize int 类型,表示分块大小,可选参数, 默认值为 0。当要获取的表很大时,往往需要采用分块获取的方式,此时设置 fetchSize 可以减少客户端程序的内存占用。需要注意的是,若您需要配置 fetchSize,fetchSize 取值不能小于 8192(记录条数)。
clearSessionMemory boolean 类型,用来减少执行时的内存占用,可以使 server 在执行完毕后自动释放 run 语句中因创建变量所占用的内存资源,可选参数,默认为 false 。
tableName String 类型,在执行时可以通过设置此参数来快速获取对应表的结构,可选参数,默认值为 ““ (空字符串)。
enableSeqNo boolean 类型,用来选择是否开启 seqNo 功能。seqNo 是一个长整型,代表一个客户端的任务序号。
- 2.00.11.0 版本之前,
run
方法自动开启 seqNo 功能。若当前写入任务失败,则将重复提交该任务。 - 自 2.00.11.0 版本起,
run
方法支持参数 enableSeqNo,用来手动选择是否开启 seqNo 功能。若不填写,则默认开启该功能。
使用示例
示例1:执行 script 脚本
只传入 script,即要执行的脚本。
@Test
public void test_run_script() throws IOException {
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.0.68", 8848);
Entity entiry = dbConnection.run("x = 1; x;");
System.out.println(entiry.getString());
}
执行结果:
1
示例2:执行函数
public Entity run(String function, List<Entity> arguments)
以下为通过 run
方法执行 tableInsert
函数的代码示例:
public void test_run_function_tableInsert() throws Exception {
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.0.68", 8848);
int n=5000000;
List<String> colNames = new ArrayList<>();
colNames.add("date");
colNames.add("val");
int[] time = new int[n*2];
double[] val = new double[n*2];
int baseTime = Utils.countDays(2000,1,1);
for (int i = 0; i < n*2; i++) {
time[i] = baseTime + (i % 15);
val[i] = rand.nextDouble();
}
List<Vector> colVectors = new ArrayList<>();
BasicDateVector dateVector = new BasicDateVector(time);
colVectors.add(dateVector);
dateVector.setCompressedMethod(1);
BasicDoubleVector valVector = new BasicDoubleVector(val);
valVector.setCompressedMethod(2);
colVectors.add(valVector);
BasicTable table = new BasicTable(colNames, colVectors);
List<Entity> args = Arrays.asList(table);
conn.run("t = table(100000:0,`date`val,[DATE,DOUBLE])" +
"share t as st");
// 执行 tableInsert
conn.run("tableInsert{st}", args);
}
示例3:设置 tableName 参数来快速获取对应表的结构
这里指定表名 ts,返回值能够获取对应的表结构。
@Test
public void testRun() throws IOException {
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.0.68", 8848);
dbConnection.run("ts = table(1..100 as a,take(`a`b`c,100) as b)");
BasicTableSchema tableSchema = (BasicTableSchema) dbConnection.run("select * from ts", "ts");
System.out.println(tableSchema.getString());
}
执行结果:
cols :2
rows :100
name typeString typeInt colIndex
a DT_INT 4 0
b DT_STRING 18 1
示例4:设置 clearSessionMemory 以减少执行时的内存占用
使用 run 方法时,有时为减少内存占用,希望 server 能在执行完毕后自动释放 run 语句中创建的变量。此时可通过设置 clearMemory = true
来实现。
以下测试 case 执行一个脚本,并设置 priority 优先级为4,同时开启 clearSessionMemory 功能。
@Test
public void testRun3() throws IOException {
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.0.68", 8848);
dbConnection.run("x=1;", 4, true);
Entity entity = dbConnection.run("x;");
System.out.println(entity.getString());
}
执行结果:
java.io.IOException: 192.168.0.68:8848 Server response: 'Syntax Error: [line #1] Cannot recognize the token x' script: 'x;'
由于 x 在第一次 run 执行完毕后被清除,所以再次执行 run("x") 会抛上述异常。
示例5:设置 fetchSize
对于大数据量的表,Java API 提供了分段读取方法。(仅适用于 DolphinDB 1.20.5 及以上版本)
下例在 Java 客户端创建一个数据量大小为 22486 的表,并设置 fetchSize 为10000 来进行分块获取数据。在 run 方法中使用参数 fetchSize 指定分块大小 ,会返回一个 EntityBlockReader
对象,可通过 read()
方法一段段的读取数据。需要注意的是,fetchSize 取值不能小于 8192(记录条数)。
@Test
public void TestRun() throws IOException {
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.0.68", 8848);
EntityBlockReader blockReader = (EntityBlockReader) dbConnection.run("table(1..22486 as id)", (ProgressListener) null, 4, 4, 10000);
while (blockReader.hasNext()) {
System.out.println(data.rows());
BasicTable t = (BasicTable) blockReader.read();
// 对读出来的数据进行处理
// ...
}
}
执行结果:
10000
20000
2486
注意,使用上述分段读取的方法时,若数据未读取完毕,需要调用 skipAll() 方法来放弃读取后续数据,才能继续执行后续代码。否则会导致套接字缓冲区滞留数据,引发后续数据的反序列化失败。示例代码如下:
@Test
public void testFetchDataInt() throws IOException {
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.0.68", 8848);
EntityBlockReader blockReader = (EntityBlockReader) dbConnection.run("table(1..22486 as id)", (ProgressListener) null, 4, 4, 10000);
// Only read 10000.
BasicTable data = (BasicTable) blockReader.read();
// Skip remain data.
blockReader.skipAll();
// After skip remain data, follow-up code could run. If not, it would throw exception.
dbConnection.run("1==1;");
}
示例6:设置 listener 监听器
可以设置脚本执行期间使用的监听器 listener,在脚本执行时 server 端会返回用户打印的信息,示例如下:
@Test
public void testRun5() throws IOException{
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.1.167", 18921);
ProgressListener listener = new ProgressListener() {
@Override
public void progress(String message) {
System.out.println("message: \n" + message);
}
};
Entity entity = dbConnection.run("t = table(1..5 as id, rand(100, 5) as price); print t; select * from t where id=4;", listener);
System.out.println("select one record: \n" + entity.getString());
}
执行结果:
Connect to 192.168.1.167:18921.
message:
id price
-- -----
1 9
2 11
3 81
4 78
5 1
select one record:
id price
-- -----
4 78
关于 tryRun 方法
在 Java API 中,当多个线程采用同一个连接操作同一张表的时候,可能会因为线程执行顺序的不确定性,导致出现不符合预期的结果。举例来说,假设线程 A 要先更新某张表,然后对这张表进行查询操作,并且在线程 A 操作结束后由线程 B 删除这张表。在没有锁的情况下,A 和 B 的执行顺序是未知的,可能会出现如下情况:线程 A 在执行更新和查询操作的间隙,线程 B 就删除了该表,导致线程 A 查询操作的失败,这是不符合期望的。因此,Java API 的 run
方法内部采用了 ReentrantLock(采用非公平锁)来避免这类情况。
同时,Java API 提供了 tryRun
方法来执行一个"尝试运行"的操作:用户可以通过该方法来尝试执行一个 DolphinDB 脚本。两者的区别是: run
方法会阻塞并等待锁的释放后再执行脚本,而 tryRun
方法在锁已经被其他线程占用的情况下会直接返回 null ,而不是等待锁的释放(实现原理可以参考 ReentrantLock 的 tryLock 方法)。
tryRun
方法定义了如下参数:
public Entity tryRun(String script, int priority, int parallelism,int fetchSize, boolean clearSessionMemory)
其中,script、priority、parallelism、fetchSize、clearSessionMemory 参数含义与作用与 run
方法一致,这里不做重复介绍,以下给出简单使用示例:
@Test
public void testTryRun() throws IOException{
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.0.68", 8848);
Entity entity = dbConnection.tryRun("x=1; x;", 1, 1, true);
System.out.println(entity.getString());
}
此外,tryRun
方法还支持传入函数名 function 和参数 arguments 来执行:
public Entity tryRun(String function, List<Entity> arguments, int priority, int parallelism, int fetchSize)
示例代码如下,调用 sum
方法计算向量 x 中所有元素之和:
@Test
public void testTryRun2() throws IOException{
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.0.68", 8848);
List<Entity> arguments = new ArrayList<>();
dbConnection.run("x=1..10000000");
arguments.add(dbConnection.run("x;"));
Entity entity = dbConnection.tryRun("sum", arguments, 1, 1);
System.out.println(entity.getString());
}
执行结果:
50000005000000
关于 testRun
和 run
方法不同点对比,示例代码如下:
@Test
public void testTryRun3() throws IOException, InterruptedException {
DBConnection dbConnection = new DBConnection();
dbConnection.connect("192.168.0.68", 8848, "admin", "123456");
// thread1:执行一个表的查询操作
Thread thread1 = new Thread(()->{
try {
dbConnection.run("pt1=loadTable(\"dfs://test\", \"tb1\"); sleep(2000);");
} catch (IOException e) {
throw new RuntimeException(e);
}
});
// thread2:执行一个select 1操作,但是使用tryRun方法
Thread thread2 = new Thread(()->{
try {
Entity entity = dbConnection.tryRun("x=1;x;");
System.out.println(entity);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
thread1.start();
thread1.sleep(1);
thread2.start();
}
此时执行结果为:
null
可以看到:线程 1 和线程 2 同时执行时,线程 2 没有获得脚本执行的返回结果,而是得到一个 null 值。说明此时连接的 ReentrantLock 已经被线程 1 占有,线程 2 在执行 tryRun
方法的时候没有进行等待,而是直接返回 null。
如果线程 2 单独执行,此时没有其他的线程与线程 2 争抢锁,则会有以下执行结果(可能会有不同,注意不是null 即可):
com.xxdb.data.BasicAnyVector@155967e6
可以看到:此时没有锁的争抢过程,线程 2 顺利执行。
如果线程 2 执行 run
方法,此时线程 2 会阻塞等待线程 1 执行结束后释放锁,则会有以下执行结果(可能会有不同,注意不是 null 即可):
com.xxdb.data.BasicAnyVector@22b38b67
可以看到:即使有锁争抢的过程,但是线程 2 仍然有返回结果。