From 1d9a8b3fcd9ebb7aec6e24f8b2a34ba797f97ac1 Mon Sep 17 00:00:00 2001
From: fcb-xiaobo <60566194+fcb-xiaobo@users.noreply.github.com>
Date: Fri, 10 Jan 2025 09:14:37 +0800
Subject: [PATCH] [Feature] [rest-api] Support Rest Api to upload file and
submit task (#8442)
---
docs/en/seatunnel-engine/rest-api-v2.md | 34 ++++++++++
docs/zh/seatunnel-engine/rest-api-v2.md | 33 ++++++++++
.../e2e/ClusterSeaTunnelEngineContainer.java | 30 +++++++++
.../upload-file/fake_to_console.conf | 48 ++++++++++++++
.../upload-file/fake_to_console.json | 27 ++++++++
.../seatunnel/engine/server/JettyService.java | 11 +++-
.../engine/server/rest/RestConstant.java | 3 +
.../server/rest/service/JobInfoService.java | 9 +++
.../servlet/SubmitJobByUploadFileServlet.java | 64 +++++++++++++++++++
9 files changed, 258 insertions(+), 1 deletion(-)
create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf
create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json
create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md b/docs/en/seatunnel-engine/rest-api-v2.md
index 27ab552f2dd..01dc9479117 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -483,6 +483,40 @@ sink {
------------------------------------------------------------------------------------------
+### Submit A Job By Upload Config File
+
+
+POST
/submit-job/upload
(Returns jobId and jobName if job submitted successfully.)
+
+#### Parameters
+
+> | name | type | data type | description |
+> |----------------------|----------|-----------|-----------------------------------|
+> | jobId | optional | string | job id |
+> | jobName | optional | string | job name |
+> | isStartWithSavePoint | optional | string | if job is started with save point |
+
+#### Request Body
+The name of the uploaded file key is config_file, and the file suffix json is parsed in json format. The conf or config file suffix is parsed in hocon format
+
+curl Example :
+```
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 'config_file=@"/temp/fake_to_console.conf"'
+
+```
+#### Responses
+
+```json
+{
+ "jobId": 733584788375666689,
+ "jobName": "SeaTunnel_Job"
+}
+```
+
+
+
+------------------------------------------------------------------------------------------
+
### Batch Submit Jobs
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md b/docs/zh/seatunnel-engine/rest-api-v2.md
index 7a3679b2160..bf118fe1c15 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -463,7 +463,40 @@ sink {
------------------------------------------------------------------------------------------
+### 提交作业来源上传配置文件
+
+POST
/submit-job
(如果作业提交成功,返回jobId和jobName。)
+
+#### 参数
+
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
+> |----------------------|----------|-----------------------------------|-----------------------------------|
+> | jobId | optional | string | job id |
+> | jobName | optional | string | job name |
+> | isStartWithSavePoint | optional | string | if job is started with save point |
+
+#### 请求体
+上传文件key的名称是config_file,文件后缀json的按照json格式来解析,conf或config文件后缀按照hocon格式解析
+
+curl Example
+
+```
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 'config_file=@"/temp/fake_to_console.conf"'
+
+```
+#### 响应
+
+```json
+{
+ "jobId": 733584788375666689,
+ "jobName": "SeaTunnel_Job"
+}
+```
+
+
+
+------------------------------------------------------------------------------------------
### 批量提交作业
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
index 671249f28c9..718d725d91d 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
@@ -37,7 +37,9 @@
import io.restassured.response.Response;
import scala.Tuple3;
+import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -253,6 +255,34 @@ public void testStartWithSavePointWithoutJobIdV2() {
});
}
+ @Test
+ public void testRestApiSubmitJobByUploadFileV2() {
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Tuple3 task = tasks.get(1);
+ URL resource =
+ this.getClass().getClassLoader().getResource("upload-file");
+ File fileDirect = new File(resource.getFile());
+ File[] files = fileDirect.listFiles();
+ for (File file : files) {
+ Response response =
+ given().multiPart("config_file", file)
+ .baseUri(
+ http
+ + container.getHost()
+ + colon
+ + task._1())
+ .basePath(
+ RestConstant
+ .REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE)
+ .when()
+ .post();
+ Assertions.assertEquals(200, response.getStatusCode());
+ }
+ });
+ }
+
@Test
public void testStopJob() {
AtomicInteger i = new AtomicInteger();
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf
new file mode 100644
index 00000000000..393c5c2e52f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.conf
@@ -0,0 +1,48 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ plugin_output = "fake"
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ plugin_input="fake"
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json
new file mode 100644
index 00000000000..73ab4447e7f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/upload-file/fake_to_console.json
@@ -0,0 +1,27 @@
+{
+ "env": {
+ "job.mode": "batch"
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "plugin_output": "fake",
+ "row.num": 100,
+ "schema": {
+ "fields": {
+ "name": "string",
+ "age": "int",
+ "card": "int"
+ }
+ }
+ }
+ ],
+ "transform": [
+ ],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "plugin_input": ["fake"]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
index 8af15a33250..4d9b75abf55 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
@@ -37,6 +37,7 @@
import org.apache.seatunnel.engine.server.rest.servlet.RunningThreadsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.StopJobServlet;
import org.apache.seatunnel.engine.server.rest.servlet.StopJobsServlet;
+import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobByUploadFileServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SystemMonitoringServlet;
@@ -47,6 +48,7 @@
import lombok.extern.slf4j.Slf4j;
import javax.servlet.DispatcherType;
+import javax.servlet.MultipartConfigElement;
import java.io.IOException;
import java.net.DatagramSocket;
@@ -70,6 +72,7 @@
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_STOP_JOBS;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOBS;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SYSTEM_MONITORING_INFORMATION;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_THREAD_DUMP;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_UPDATE_TAGS;
@@ -122,6 +125,9 @@ public void createJettyServer() {
ServletHolder threadDumpHolder = new ServletHolder(new ThreadDumpServlet(nodeEngine));
ServletHolder submitJobHolder = new ServletHolder(new SubmitJobServlet(nodeEngine));
+ ServletHolder submitJobByUploadFileHolder =
+ new ServletHolder(new SubmitJobByUploadFileServlet(nodeEngine));
+
ServletHolder submitJobsHolder = new ServletHolder(new SubmitJobsServlet(nodeEngine));
ServletHolder stopJobHolder = new ServletHolder(new StopJobServlet(nodeEngine));
ServletHolder stopJobsHolder = new ServletHolder(new StopJobsServlet(nodeEngine));
@@ -147,7 +153,10 @@ public void createJettyServer() {
context.addServlet(jobInfoHolder, convertUrlToPath(REST_URL_JOB_INFO));
context.addServlet(jobInfoHolder, convertUrlToPath(REST_URL_RUNNING_JOB));
context.addServlet(threadDumpHolder, convertUrlToPath(REST_URL_THREAD_DUMP));
-
+ MultipartConfigElement multipartConfigElement = new MultipartConfigElement("");
+ submitJobByUploadFileHolder.getRegistration().setMultipartConfig(multipartConfigElement);
+ context.addServlet(
+ submitJobByUploadFileHolder, convertUrlToPath(REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE));
context.addServlet(submitJobHolder, convertUrlToPath(REST_URL_SUBMIT_JOB));
context.addServlet(submitJobsHolder, convertUrlToPath(REST_URL_SUBMIT_JOBS));
context.addServlet(stopJobHolder, convertUrlToPath(REST_URL_STOP_JOB));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index e55f76cf54e..810e08453eb 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -75,6 +75,9 @@ public class RestConstant {
public static final String REST_URL_SYSTEM_MONITORING_INFORMATION =
"/system-monitoring-information";
public static final String REST_URL_SUBMIT_JOB = "/submit-job";
+
+ public static final String REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE = "/submit-job/upload";
+
public static final String REST_URL_SUBMIT_JOBS = "/submit-jobs";
public static final String REST_URL_STOP_JOB = "/stop-job";
public static final String REST_URL_STOP_JOBS = "/stop-jobs";
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
index 7d21c2023cd..22d3138aee4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
@@ -173,6 +173,15 @@ public JsonObject submitJob(Map requestParams, byte[] requestBod
return submitJobInternal(config, requestParams, seaTunnelServer, nodeEngine.getNode());
}
+ public JsonObject submitJob(Map requestParams, Config config) {
+ if (Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT))
+ && requestParams.get(RestConstant.JOB_ID) == null) {
+ throw new IllegalArgumentException("Please provide jobId when start with save point.");
+ }
+ SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false);
+ return submitJobInternal(config, requestParams, seaTunnelServer, nodeEngine.getNode());
+ }
+
public JsonArray submitJobs(byte[] requestBody) {
List, Config>> configTuples =
RestUtil.buildConfigList(requestHandle(requestBody), false);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
new file mode 100644
index 00000000000..2db376da068
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.servlet;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;
+
+import org.apache.seatunnel.engine.server.rest.service.JobInfoService;
+
+import org.apache.commons.io.IOUtils;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.Part;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class SubmitJobByUploadFileServlet extends BaseServlet {
+ private final JobInfoService jobInfoService;
+
+ public SubmitJobByUploadFileServlet(NodeEngineImpl nodeEngine) {
+ super(nodeEngine);
+ this.jobInfoService = new JobInfoService(nodeEngine);
+ }
+
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp)
+ throws IOException, ServletException {
+
+ Part filePart = req.getPart("config_file");
+ String submittedFileName = filePart.getSubmittedFileName();
+ String content = IOUtils.toString(filePart.getInputStream(), StandardCharsets.UTF_8);
+ Config config;
+ if (submittedFileName.endsWith(".json")) {
+ config =
+ ConfigFactory.parseString(
+ content, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON));
+ } else {
+ config = ConfigFactory.parseString(content);
+ }
+ writeJson(resp, jobInfoService.submitJob(getParameterMap(req), config));
+ }
+}