Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add deployment usage in java quickstart #3220

Merged
merged 6 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ thirdparty-fast:
if [ "$$new_zetasql_version" != "$(ZETASQL_VERSION)" ] ; then \
echo "[deps]: thirdparty up-to-date. reinstall zetasql from $(ZETASQL_VERSION) to $$new_zetasql_version"; \
$(MAKE) thirdparty-configure; \
$(CMAKE_PRG) --build $(THIRD_PARTY_BUILD_DIR) --target zetasql; \
$(CMAKE_PRG) --build $(THIRD_PARTY_BUILD_DIR) -j $(NPROC) --target zetasql; \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does NPROC improve speed ? this is the parallelism of build all thirdparty deps, parallelism of each dep is handled inside each cmake/Fetch*.cmake

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, speed faster. I can't find -j or other options in Fetch*.cmake, where?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zetasql uses bazel, which has its own parallelism strategy builtin, no need to specify NPROC ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can’t recall about zetasql, I'll revert this change, and check it when I build the old stdc++ capable image.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert here as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already done 631f6ea, but seems we can't see it

else \
echo "[deps]: all up-to-date. zetasql already installed with version: $(ZETASQL_VERSION)"; \
fi; \
Expand All @@ -160,7 +160,7 @@ thirdparty-fast:

# third party compiled code install to 'OpenMLDB/.deps/usr', source code install to 'OpenMLDB/thirdsrc'
thirdparty: thirdparty-configure
$(CMAKE_PRG) --build $(THIRD_PARTY_BUILD_DIR)
$(CMAKE_PRG) --build $(THIRD_PARTY_BUILD_DIR) -j $(NPROC)

thirdparty-configure:
$(CMAKE_PRG) -S third-party -B $(THIRD_PARTY_BUILD_DIR) -DSRC_INSTALL_DIR=$(THIRD_PARTY_SRC_DIR) -DDEPS_INSTALL_DIR=$(THIRD_PARTY_DIR) $(THIRD_PARTY_CMAKE_FLAGS)
Expand Down
20 changes: 9 additions & 11 deletions demo/docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ services:
- /bin/bash
- -cx
- |
# TODO(tobe): Remove this after 0.7.3 is released
rm -f /work/openmldb/spark-3.2.1-bin-openmldbspark/jars/curator-* # curator NoSuchMethodError error
# # TODO(tobe): Remove this after 0.7.3 is released
# rm -f /work/openmldb/spark-3.2.1-bin-openmldbspark/jars/curator-* # curator NoSuchMethodError error
./init.sh
sleep 5
# quickstart test
cd /work/quick_start
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < cluster_quickstart.sql
python3 request_test.py

# java/python sdk
# java/python sdk, no jar in ci, so we should check the result manually
cd /work/java_quickstart
java -cp demo-1.0-SNAPSHOT.jar com.openmldb.demo.App
cd /work/python_quickstart
Expand All @@ -46,16 +46,14 @@ services:
python3 train_and_serve.py
python3 predict.py || (cat predict.log && return -1)

# oneflow sql test
# oneflow sql test # will be failed before https://github.com/4paradigm/OpenMLDB/issues/3151 fixed
cd /work/oneflow_demo/sql_scripts
# 10 min
cat sql_order | xargs cat | /work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client

python3 /work/job_checker.py

# open it after new diag tool released, or you can test in local by USE_ADD_WHL
# cd /work
# openmldb_tool status --diff -f /work/openmldb/conf/hosts
# openmldb_tool inspect
# openmldb_tool test
# openmldb_tool static-check --conf_file=/work/openmldb/conf/hosts -VCL --local
cd /work
openmldb_tool status --diff -f /work/openmldb/conf/hosts
openmldb_tool inspect
openmldb_tool test
openmldb_tool static-check --conf_file=/work/openmldb/conf/hosts -VCL --local
7 changes: 4 additions & 3 deletions demo/java_quickstart/demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
Expand All @@ -34,7 +34,8 @@
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.7.0</version>
<version>6.14.3</version>
<!-- <scope>test</scope> we use it in main-->
</dependency>
</dependencies>

Expand Down
92 changes: 89 additions & 3 deletions demo/java_quickstart/demo/src/main/java/com/openmldb/demo/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
Expand All @@ -39,14 +40,20 @@ public class App {
private String zkPath = "/openmldb";
private String db = "mydb16";
private String table = "trans";
private String deploymentName = "d1";

public static void main(String[] args) {
// 如果中途出现异常,将无法清理掉创建的数据库和表,需要手动清理,否则重试将会与预期不符
App demo = new App();
try {
// 初始化构造SqlExecutor
// 初始化构造 SqlExecutor
demo.init();
demo.createDatabase();
// set online and default db in SqlExecutor, no need to set in the below sql
demo.setOnlineAndDB();

demo.createTable();
demo.createDeployment();
// 通过insert语句插入
demo.insertWithoutPlaceholder();
// 通过placeholder的方式插入。placeholder方式不会重复编译sql, 在性能上会比直接insert好很多
Expand All @@ -55,6 +62,9 @@ public static void main(String[] args) {
demo.select();
// 在request模式下执行sql
demo.requestSelect();
// 执行 deployment
demo.executeDeployment();
demo.dropDeployment();
// 删除表
demo.dropTable();
// 删除数据库
Expand Down Expand Up @@ -93,6 +103,16 @@ private void dropDatabase() {
}
}

private void setOnlineAndDB() {
java.sql.Statement state = sqlExecutor.getStatement();
try {
state.execute("SET @@execute_mode='online';");
state.execute("use " + db + ";");
} catch (Exception e) {
e.printStackTrace();
}
}

private void createTable() {
String createTableSql = "create table " + table + "(c1 string,\n" +
" c3 int,\n" +
Expand All @@ -104,7 +124,6 @@ private void createTable() {
" index(key=c1, ts=c7));";
java.sql.Statement state = sqlExecutor.getStatement();
try {
state.execute("use " + db + ";");
state.execute(createTableSql);
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -120,6 +139,30 @@ private void dropTable() {
}
}

private void createDeployment() {
java.sql.Statement state = sqlExecutor.getStatement();
try {
String selectSql = String.format("SELECT c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM %s WINDOW w1 AS " +
"(PARTITION BY %s.c1 ORDER BY %s.c7 ROWS_RANGE BETWEEN 2d PRECEDING AND CURRENT ROW);", table,
table, table);
// 上线一个Deployment
String deploySql = String.format("DEPLOY %s %s", deploymentName, selectSql);
// set return null rs, don't check the returned value, it's false
state.execute(deploySql);
} catch (Exception e) {
e.printStackTrace();
}
}

private void dropDeployment() {
java.sql.Statement state = sqlExecutor.getStatement();
try {
state.execute("DROP DEPLOYMENT " + deploymentName + ";");
} catch (Exception e) {
e.printStackTrace();
}
}

private void insertWithoutPlaceholder() {
String insertSql = String.format("insert into %s values(\"aa\",23,33,1.4,2.4,1590738993000,\"2020-05-04\");",
table);
Expand Down Expand Up @@ -171,7 +214,6 @@ private void select() {
int num = 0;
java.sql.Statement state = sqlExecutor.getStatement();
try {
state.execute("SET @@execute_mode='online';");
boolean ret = state.execute(selectSql);
Assert.assertTrue(ret);
result = state.getResultSet();
Expand Down Expand Up @@ -234,6 +276,50 @@ private void requestSelect() {
}
}

private void executeDeployment() {
PreparedStatement pstmt = null;
ResultSet resultSet = null;
try {
pstmt = sqlExecutor.getCallablePreparedStmt(db, deploymentName);
// 如果是执行deployment, 可以通过名字获取preparedstatement
// pstmt = sqlExecutor.getCallablePreparedStmt(db, deploymentName);
ResultSetMetaData metaData = pstmt.getMetaData();
// 执行request模式需要在RequestPreparedStatement设置一行请求数据
setData(pstmt, metaData);
// 调用executeQuery会执行这个select sql, 然后将结果放在了resultSet中
resultSet = pstmt.executeQuery();

Assert.assertTrue(resultSet.next());
Assert.assertEquals(resultSet.getMetaData().getColumnCount(), 3);
Assert.assertEquals(resultSet.getString(1), "bb");
Assert.assertEquals(resultSet.getInt(2), 24);
Assert.assertEquals(resultSet.getLong(3), 34);
Assert.assertFalse(resultSet.next());

// reuse way
for (int i = 0; i < 5; i++) {
setData(pstmt, metaData);
pstmt.executeQuery();
// skip result check
}
} catch (SQLException e) {
e.printStackTrace();
Assert.fail();
} finally {
try {
if (resultSet != null) {
// result用完之后需要close
resultSet.close();
}
if (pstmt != null) {
pstmt.close();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}

private void batchRequestSelect() {
String selectSql = String.format("SELECT c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM %s WINDOW w1 AS " +
"(PARTITION BY %s.c1 ORDER BY %s.c7 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);", table, table, table);
Expand Down
20 changes: 0 additions & 20 deletions demo/job_checker.py

This file was deleted.

72 changes: 71 additions & 1 deletion docs/zh/quickstart/sdk/java_sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

注意:由于 openmldb-native 中包含了 OpenMLDB 编译的 C++ 静态库,默认是 Linux 静态库,macOS 上需将上述 openmldb-native 的 version 改成 `0.7.3-macos`,openmldb-jdbc 的版本保持不变。

openmldb-native 的 macOS 版本只支持 macOS 12,如需在 macOS 11 或 macOS 10.15上运行,需在相应 OS 上源码编译 openmldb-native 包,详细编译方法见[并发编译 Java SDK](https://openmldb.ai/docs/zh/main/deploy/compile.html#java-sdk)。
openmldb-native 的 macOS 版本只支持 macOS 12,如需在 macOS 11 或 macOS 10.15上运行,需在相应 OS 上源码编译 openmldb-native 包,详细编译方法见[并发编译 Java SDK](https://openmldb.ai/docs/zh/main/deploy/compile.html#java-sdk)。使用自编译的 openmldb-native 包,推荐使用`mvn install`安装到本地仓库,然后在 pom 中引用本地仓库的 openmldb-native 包,不建议用`scope=system`的方式引用。

Java SDK 连接 OpenMLDB 服务,可以使用 JDBC 的方式(推荐),也可以通过 SqlClusterExecutor 的方式直连。如果需要使用在线请求模式,只能使用 SqlClusterExecutor 。下面将依次演示两种连接方式。

Expand Down Expand Up @@ -106,6 +106,8 @@ PreparedStatement insertStatement = connection.prepareStatement("DELETE FROM t1

## SqlClusterExecutor 方式

SqlClusterExecutor 是最全面的Java SDK连接方式,不仅有JDBC可以使用的增删查功能,还可以使用请求模式等额外功能。

### 创建 SqlClusterExecutor

首先,进行 OpenMLDB 连接参数配置。
Expand Down Expand Up @@ -388,6 +390,74 @@ try {
}
```

### 执行 Deployment

执行 Deployment ,是通过 `SqlClusterExecutor::getCallablePreparedStmt(db, deploymentName)` 接口获取 CallablePreparedStatement 。区别于上文的 SQL 请求式查询,Deployment 在服务端已上线,速度会快于 SQL 请求式查询。

Deployment 使用过程分为两步:

- 上线Deployment
```java
// 上线一个Deployment(此处使用上文的selectSql),实际生产环境通常已经上线成功
java.sql.Statement state = sqlExecutor.getStatement();
try {
String selectSql = String.format("SELECT c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM %s WINDOW w1 AS " +
"(PARTITION BY %s.c1 ORDER BY %s.c7 ROWS_RANGE BETWEEN 2d PRECEDING AND CURRENT ROW);", table,
table, table);
// 上线一个Deployment
String deploySql = String.format("DEPLOY %s %s", deploymentName, selectSql);
// set return null rs, don't check the returned value, it's false
state.execute(deploySql);
} catch (Exception e) {
e.printStackTrace();
}
```
- 执行Deployment。重新创建 CallablePreparedStmt 有一定耗时,建议尽可以复用 CallablePreparedStmt,`executeQuery()`将会自动清除`setXX`的请求行缓存。
```java
// 执行Deployment
PreparedStatement pstmt = null;
ResultSet resultSet = null;
try {
pstmt = sqlExecutor.getCallablePreparedStmt(db, deploymentName);
// 如果是执行deployment, 可以通过名字获取preparedstatement
// pstmt = sqlExecutor.getCallablePreparedStmt(db, deploymentName);
ResultSetMetaData metaData = pstmt.getMetaData();
// 执行request模式需要在RequestPreparedStatement设置一行请求数据
setData(pstmt, metaData);
// 调用executeQuery会执行这个select sql, 然后将结果放在了resultSet中
resultSet = pstmt.executeQuery();

Assert.assertTrue(resultSet.next());
Assert.assertEquals(resultSet.getMetaData().getColumnCount(), 3);
Assert.assertEquals(resultSet.getString(1), "bb");
Assert.assertEquals(resultSet.getInt(2), 24);
Assert.assertEquals(resultSet.getLong(3), 34);
Assert.assertFalse(resultSet.next());

// reuse way
for (int i = 0; i < 5; i++) {
setData(pstmt, metaData);
pstmt.executeQuery();
// skip result check
}
} catch (SQLException e) {
e.printStackTrace();
Assert.fail();
} finally {
try {
if (resultSet != null) {
// result用完之后需要close
resultSet.close();
}
if (pstmt != null) {
pstmt.close();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
```

### 删除指定索引下某个 key 的所有数据

通过 Java SDK 可以有以下两种方式删除数据:
Expand Down