Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Seatunnel-web]Add support to configure placeholder with default value in the job config. #208

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
@AllArgsConstructor
// Job execution parameters
public class JobExecParam {
// job name -> key -> value
private Map<String, String> env;
// task name -> key -> value
private Map<String, Map<String, String>> tasks;
// job config placeholder name -> value
private Map<String, String> placeholderValues;
// task name -> new datasource id
private Map<String, String> datasource;
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Result<Long> jobExecute(Integer userId, Long jobDefineId, JobExecParam ex
return Result.success(executeResource.getJobInstanceId());
} catch (RuntimeException e) {
Result<Long> failure =
Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage());
Result.failure(SeatunnelErrorEnum.JOB_EXEC_SUBMISSION_ERROR, e.getMessage());
// Even though job execution submission failed, we still need to return the
// jobInstanceId to the user
// as the job instance has been created in the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public String generateJobConfig(
BusinessMode businessMode =
BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType());
Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr));
envConfig = JobUtils.updateEnvConfig(executeParam, envConfig);
JobUtils.updateDataSource(executeParam, tasks);

Map<String, List<Config>> sourceMap = new LinkedHashMap<>();
Expand Down Expand Up @@ -230,8 +229,6 @@ public String generateJobConfig(
ParsingMode.SHARDING.name()));
}

config =
JobUtils.updateTaskConfig(executeParam, config, task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
Expand All @@ -240,9 +237,6 @@ public String generateJobConfig(
businessMode,
config,
optionRule);
mergeConfig =
JobUtils.updateQueryTaskConfig(
executeParam, mergeConfig, task.getName());
sourceMap
.get(task.getConnectorType())
.add(filterEmptyValue(mergeConfig));
Expand Down Expand Up @@ -272,9 +266,6 @@ public String generateJobConfig(
}
List<TableSchemaReq> inputSchemas = findInputSchemas(tasks, lines, task);
Config transformConfig = buildTransformConfig(task, config, inputSchemas);
transformConfig =
JobUtils.updateTaskConfig(
executeParam, transformConfig, task.getName());
transformMap
.get(task.getConnectorType())
.add(filterEmptyValue(transformConfig));
Expand All @@ -289,8 +280,6 @@ public String generateJobConfig(
if (!sinkMap.containsKey(task.getConnectorType())) {
sinkMap.put(task.getConnectorType(), new ArrayList<>());
}
config =
JobUtils.updateTaskConfig(executeParam, config, task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
Expand Down Expand Up @@ -341,7 +330,8 @@ public String generateJobConfig(
.setJson(false)
.setComments(false)
.setOriginComments(false));
return SeaTunnelConfigUtil.generateConfig(env, sources, transforms, sinks);
String jobConfig = SeaTunnelConfigUtil.generateConfig(env, sources, transforms, sinks);
return JobUtils.replaceJobConfigPlaceholders(jobConfig, executeParam);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
*/
package org.apache.seatunnel.app.utils;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;

import org.apache.seatunnel.app.dal.entity.JobTask;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class JobUtils {

// The maximum length of the job execution error message, 4KB
private static final int ERROR_MESSAGE_MAX_LENGTH = 4096;
private static final Pattern placeholderPattern = Pattern.compile("\\$\\{(\\w+)(?::(.*?))?\\}");

public static String getJobInstanceErrorMessage(String message) {
if (message == null) {
Expand All @@ -40,46 +43,6 @@ public static String getJobInstanceErrorMessage(String message) {
: message;
}

public static Config updateEnvConfig(JobExecParam jobExecParam, Config envConfig) {
if (jobExecParam == null || jobExecParam.getEnv() == null) {
return envConfig;
}
return updateConfig(envConfig, jobExecParam.getEnv());
}

private static Config updateConfig(Config config, Map<String, String> properties) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
config =
config.withValue(
entry.getKey(), ConfigValueFactory.fromAnyRef(entry.getValue()));
}
return config;
}

public static Config updateTaskConfig(
JobExecParam jobExecParam, Config taskConfig, String taskName) {
if (jobExecParam == null
|| jobExecParam.getTasks() == null
|| jobExecParam.getTasks().get(taskName) == null) {
return taskConfig;
}
return updateConfig(taskConfig, jobExecParam.getTasks().get(taskName));
}

public static Config updateQueryTaskConfig(
JobExecParam jobExecParam, Config taskConfig, String taskName) {
if (jobExecParam == null
|| jobExecParam.getTasks() == null
|| jobExecParam.getTasks().get(taskName) == null) {
return taskConfig;
}
String query = jobExecParam.getTasks().get(taskName).get("query");
if (query != null) {
return taskConfig.withValue("query", ConfigValueFactory.fromAnyRef(query));
}
return taskConfig;
}

public static void updateDataSource(JobExecParam jobExecParam, List<JobTask> tasks) {
if (jobExecParam == null || jobExecParam.getDatasource() == null) {
return;
Expand All @@ -104,4 +67,29 @@ public static boolean isJobEndStatus(JobStatus jobStatus) {
|| JobStatus.CANCELED == jobStatus
|| JobStatus.FAILED == jobStatus;
}

// Replace placeholders in job config with actual values
public static String replaceJobConfigPlaceholders(
String jobConfigString, JobExecParam jobExecParam) {
Map<String, String> placeholderValues =
(jobExecParam != null && jobExecParam.getPlaceholderValues() != null)
? jobExecParam.getPlaceholderValues()
: Collections.emptyMap();

Matcher matcher = placeholderPattern.matcher(jobConfigString);
StringBuffer result = new StringBuffer();

while (matcher.find()) {
String placeholderName = matcher.group(1);
String replacement = placeholderValues.getOrDefault(placeholderName, matcher.group(2));
if (replacement == null) {
throw new SeatunnelException(
SeatunnelErrorEnum.JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER, placeholderName);
}
matcher.appendReplacement(result, replacement);
}

matcher.appendTail(result);
return result.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.utils;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.server.common.SeatunnelException;

import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class JobUtilsTests {

@Test
public void testReplaceJobConfigPlaceholders_AllJobConfigPlaceholdersReplaced() {
String jobConfigContent =
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam}";
Map<String, String> paramValues = new HashMap<>();
paramValues.put("jobModeParam", "STREAMING");
paramValues.put("jobNameParam", "newJob");
JobExecParam jobExecParam = getJobExecParam(paramValues);

String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);

assertEquals(expected, actual);
}

@Test
public void testReplaceJobConfigPlaceholders_JobConfig_PlaceholdersRepeat() {
String jobConfigContent =
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobModeParam}";
Map<String, String> paramValues = new HashMap<>();
paramValues.put("jobModeParam", "STREAMING");
JobExecParam jobExecParam = getJobExecParam(paramValues);

String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=STREAMING";
String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);

assertEquals(expected, actual);
}

@Test
public void testReplaceJobConfigPlaceholdersUsed() {
String jobConfigContent =
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam:DefaultJob}";
Map<String, String> paramValues = new HashMap<>();
paramValues.put("jobModeParam", "STREAMING");
JobExecParam jobExecParam = getJobExecParam(paramValues);

String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=DefaultJob";
String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);

assertEquals(expected, actual);
}

@Test
public void testReplaceJobConfigPlaceholders_NoDefaultValueThrowsException() {
String jobConfigContent =
"job.mode=${jobModeParam}\ncheckpoint.interval=30\njob.name=${jobNameParam}";
Map<String, String> paramValues = new HashMap<>();
paramValues.put("jobModeParam", "STREAMING");
JobExecParam jobExecParam = getJobExecParam(paramValues);

assertThrows(
SeatunnelException.class,
() -> {
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
});
}

@Test
public void testReplaceJobConfigPlaceholders_NoJobConfigPlaceholders() {
String jobConfigContent = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
Map<String, String> paramValues = new HashMap<>();
JobExecParam jobExecParam = getJobExecParam(paramValues);

String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);

assertEquals(expected, actual);
}

@Test
public void testParseConfigWithPlaceHolders() {
String transformConfig =
"{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"${logPrintDelayMs:100}\"}";
Config config = ConfigFactory.parseString(transformConfig);
assertNotNull(config);
}

private static @NotNull JobExecParam getJobExecParam(Map<String, String> paramValues) {
JobExecParam jobExecParam = new JobExecParam();
jobExecParam.setPlaceholderValues(paramValues);
return jobExecParam;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ public enum SeatunnelErrorEnum {
"load job state from engine error",
"load job statue from engine [%s] error, error msg is [%s]"),
UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s] version [%s]"),
JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
JOB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
LOAD_ENGINE_METRICS_ERROR(
40005, "load engine metrics error", "load engine metrics error. error msg is [%s]"),
JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER(
40006, "No value found for placeholder", "No value found for placeholder: [%s]"),

JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid error"),
/* datasource and virtual table */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@

import com.fasterxml.jackson.core.type.TypeReference;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class JobControllerWrapper extends SeatunnelWebTestingBase {

public Result<Long> createJob(JobCreateReq jobCreateRequest) {
Expand All @@ -48,15 +44,4 @@ public Result<JobRes> getJob(long jobVersionId) {
String response = sendRequest(urlWithParam("job/get/" + jobVersionId + "?"), null, "GET");
return JSONTestUtils.parseObject(response, new TypeReference<Result<JobRes>>() {});
}

public JobCreateReq populateJobCreateReqFromFile() {
String filePath = "src/test/resources/jobs/fake_source_console_job.json";
String jsonContent;
try {
jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
} catch (IOException e) {
throw new RuntimeException(e);
}
return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
}
}
Loading
Loading