Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] [rest-api] Support Rest Api to upload file and submit task #8442

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/en/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,40 @@ sink {

------------------------------------------------------------------------------------------

### Submit A Job By Upload Config File

<details>
<summary><code>POST</code> <code><b>/submit-job/upload</b></code> <code>(Returns jobId and jobName if job submitted successfully.)</code></summary>

#### Parameters

> | name | type | data type | description |
> |----------------------|----------|-----------|-----------------------------------|
> | jobId | optional | string | job id |
> | jobName | optional | string | job name |
> | isStartWithSavePoint | optional | string | if job is started with save point |

#### Request Body
The name of the uploaded file key is config_file, and the file suffix json is parsed in json format. The conf or config file suffix is parsed in hocon format

curl Example :
```
curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 'config_file=@"/temp/fake_to_console.conf"'

```
#### Responses

```json
{
"jobId": 733584788375666689,
"jobName": "SeaTunnel_Job"
}
```

</details>

------------------------------------------------------------------------------------------

### Batch Submit Jobs

<details>
Expand Down
33 changes: 33 additions & 0 deletions docs/zh/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,40 @@ sink {
</details>

------------------------------------------------------------------------------------------
### 提交作业来源上传配置文件

<details>
<summary><code>POST</code> <code><b>/submit-job</b></code> <code>(如果作业提交成功,返回jobId和jobName。)</code></summary>

#### 参数

> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
> |----------------------|----------|-----------------------------------|-----------------------------------|
> | jobId | optional | string | job id |
> | jobName | optional | string | job name |
> | isStartWithSavePoint | optional | string | if job is started with save point |

#### 请求体
上传文件key的名称是config_file,文件后缀json的按照json格式来解析,conf或config文件后缀按照hocon格式解析

curl Example

```
curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 'config_file=@"/temp/fake_to_console.conf"'

```
#### 响应

```json
{
"jobId": 733584788375666689,
"jobName": "SeaTunnel_Job"
}
```

</details>

------------------------------------------------------------------------------------------

### 批量提交作业

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import io.restassured.response.Response;
import scala.Tuple3;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand Down Expand Up @@ -253,6 +255,34 @@ public void testStartWithSavePointWithoutJobIdV2() {
});
}

@Test
public void testRestApiSubmitJobByUploadFileV2() {
Arrays.asList(server, secondServer)
.forEach(
container -> {
Tuple3<Integer, String, Long> task = tasks.get(1);
URL resource =
this.getClass().getClassLoader().getResource("upload-file");
File fileDirect = new File(resource.getFile());
File[] files = fileDirect.listFiles();
for (File file : files) {
Response response =
given().multiPart("config_file", file)
.baseUri(
http
+ container.getHost()
+ colon
+ task._1())
.basePath(
RestConstant
.REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE)
.when()
.post();
Assertions.assertEquals(200, response.getStatusCode());
}
});
}

@Test
public void testStopJob() {
AtomicInteger i = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
plugin_output = "fake"
parallelism = 1
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

transform {
}

sink {
console {
plugin_input="fake"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"plugin_output": "fake",
"row.num": 100,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"plugin_input": ["fake"]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.engine.server.rest.servlet.RunningThreadsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.StopJobServlet;
import org.apache.seatunnel.engine.server.rest.servlet.StopJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobByUploadFileServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SystemMonitoringServlet;
Expand All @@ -47,6 +48,7 @@
import lombok.extern.slf4j.Slf4j;

import javax.servlet.DispatcherType;
import javax.servlet.MultipartConfigElement;

import java.io.IOException;
import java.net.DatagramSocket;
Expand All @@ -70,6 +72,7 @@
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_STOP_JOBS;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOBS;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SYSTEM_MONITORING_INFORMATION;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_THREAD_DUMP;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_UPDATE_TAGS;
Expand Down Expand Up @@ -122,6 +125,9 @@ public void createJettyServer() {
ServletHolder threadDumpHolder = new ServletHolder(new ThreadDumpServlet(nodeEngine));

ServletHolder submitJobHolder = new ServletHolder(new SubmitJobServlet(nodeEngine));
ServletHolder submitJobByUploadFileHolder =
new ServletHolder(new SubmitJobByUploadFileServlet(nodeEngine));

ServletHolder submitJobsHolder = new ServletHolder(new SubmitJobsServlet(nodeEngine));
ServletHolder stopJobHolder = new ServletHolder(new StopJobServlet(nodeEngine));
ServletHolder stopJobsHolder = new ServletHolder(new StopJobsServlet(nodeEngine));
Expand All @@ -147,7 +153,10 @@ public void createJettyServer() {
context.addServlet(jobInfoHolder, convertUrlToPath(REST_URL_JOB_INFO));
context.addServlet(jobInfoHolder, convertUrlToPath(REST_URL_RUNNING_JOB));
context.addServlet(threadDumpHolder, convertUrlToPath(REST_URL_THREAD_DUMP));

MultipartConfigElement multipartConfigElement = new MultipartConfigElement("");
submitJobByUploadFileHolder.getRegistration().setMultipartConfig(multipartConfigElement);
context.addServlet(
submitJobByUploadFileHolder, convertUrlToPath(REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE));
context.addServlet(submitJobHolder, convertUrlToPath(REST_URL_SUBMIT_JOB));
context.addServlet(submitJobsHolder, convertUrlToPath(REST_URL_SUBMIT_JOBS));
context.addServlet(stopJobHolder, convertUrlToPath(REST_URL_STOP_JOB));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class RestConstant {
public static final String REST_URL_SYSTEM_MONITORING_INFORMATION =
"/system-monitoring-information";
public static final String REST_URL_SUBMIT_JOB = "/submit-job";

public static final String REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE = "/submit-job/upload";

public static final String REST_URL_SUBMIT_JOBS = "/submit-jobs";
public static final String REST_URL_STOP_JOB = "/stop-job";
public static final String REST_URL_STOP_JOBS = "/stop-jobs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public JsonObject submitJob(Map<String, String> requestParams, byte[] requestBod
return submitJobInternal(config, requestParams, seaTunnelServer, nodeEngine.getNode());
}

public JsonObject submitJob(Map<String, String> requestParams, Config config) {
if (Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT))
&& requestParams.get(RestConstant.JOB_ID) == null) {
throw new IllegalArgumentException("Please provide jobId when start with save point.");
}
SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false);
return submitJobInternal(config, requestParams, seaTunnelServer, nodeEngine.getNode());
}

public JsonArray submitJobs(byte[] requestBody) {
List<Tuple2<Map<String, String>, Config>> configTuples =
RestUtil.buildConfigList(requestHandle(requestBody), false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.rest.servlet;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;

import org.apache.seatunnel.engine.server.rest.service.JobInfoService;

import org.apache.commons.io.IOUtils;

import com.hazelcast.spi.impl.NodeEngineImpl;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.Part;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class SubmitJobByUploadFileServlet extends BaseServlet {
private final JobInfoService jobInfoService;

public SubmitJobByUploadFileServlet(NodeEngineImpl nodeEngine) {
super(nodeEngine);
this.jobInfoService = new JobInfoService(nodeEngine);
}

@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws IOException, ServletException {

Part filePart = req.getPart("config_file");
String submittedFileName = filePart.getSubmittedFileName();
String content = IOUtils.toString(filePart.getInputStream(), StandardCharsets.UTF_8);
Config config;
if (submittedFileName.endsWith(".json")) {
config =
ConfigFactory.parseString(
content, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON));
} else {
config = ConfigFactory.parseString(content);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we only support json and conf file, can we add file name suffix check here, if not conf and json config, throw exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we only support json and conf file, can we add file name suffix check here, if not conf and json config, throw exception.

Okay, thank you for your suggestion. My idea is that the configuration file format supports JSON or HOCON, so the configuration file suffix only needs to distinguish JSON from others, such as. conf, config, etc. This type of content can be parsed as long as it is HOCON, rather than limiting the. conf suffix

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about add a parameters named format? Just like submit job api? Please refer https://github.com/apache/seatunnel/blob/dev/docs/en/seatunnel-engine/rest-api-v2.md#parameters-6

}
writeJson(resp, jobInfoService.submitJob(getParameterMap(req), config));
}
}
Loading