Skip to content

Commit

Permalink
[Improve][SeaTunnel-Web] Change JobStatus to enum type to avoid hard …
Browse files Browse the repository at this point in the history
…coding (#205)
  • Loading branch information
wuchunfu authored Sep 7, 2024
1 parent 528746d commit 94d8a39
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.app.dal.entity;

import org.apache.seatunnel.engine.core.job.JobStatus;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
Expand All @@ -42,7 +44,7 @@ public class JobInstance {
private Long jobDefineId;

@TableField("job_status")
private String jobStatus;
private JobStatus jobStatus;

@TableField("job_config")
private String jobConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.seatunnel.app.dal.entity;

import org.apache.seatunnel.engine.core.job.JobStatus;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
Expand Down Expand Up @@ -65,7 +67,7 @@ public class JobMetrics {
private long recordDelay;

@TableField("status")
private String status;
private JobStatus status;

@TableField("create_user_id")
private Integer createUserId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.app.domain.response.executor;

import org.apache.seatunnel.engine.core.job.JobStatus;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand All @@ -26,7 +28,7 @@
@NoArgsConstructor
public class JobExecutionStatus {

private String jobStatus;
private JobStatus jobStatus;

private String errorMessage;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.seatunnel.app.domain.response.metrics;

import org.apache.seatunnel.engine.core.job.JobStatus;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -45,5 +47,5 @@ public class JobPipelineDetailMetricsRes {

private long recordDelay;

private String status;
private JobStatus status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.seatunnel.app.domain.response.metrics;

import org.apache.seatunnel.engine.core.job.JobStatus;

import lombok.AllArgsConstructor;
import lombok.Data;

Expand All @@ -28,5 +30,5 @@ public class JobPipelineSummaryMetricsRes {

private long writeRowCount;

private String status;
private JobStatus status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.seatunnel.app.domain.response.metrics;

import org.apache.seatunnel.engine.core.job.JobStatus;

import lombok.AllArgsConstructor;
import lombok.Data;

Expand All @@ -30,5 +32,5 @@ public class JobSummaryMetricsRes {

private long writeRowCount;

private String status;
private JobStatus status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.app.domain.response.task;

import org.apache.seatunnel.engine.core.job.JobStatus;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
Expand All @@ -40,7 +42,7 @@ public class JobSimpleInfoRes {
private String jobName;

@ApiModelProperty(value = "job status", dataType = "String")
private String jobStatus;
private JobStatus jobStatus;

@ApiModelProperty(value = "job plan", dataType = "String")
private String jobPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.io.FileWriter;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -128,7 +127,7 @@ private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInst
} catch (Throwable e) {
log.error("Job execution submission failed.", e);
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobStatus(JobStatus.FAILED.name());
jobInstance.setJobStatus(JobStatus.FAILED);
jobInstance.setEndTime(new Date());
String jobInstanceErrorMessage = JobUtils.getJobInstanceErrorMessage(e.getMessage());
jobInstance.setErrorMessage(jobInstanceErrorMessage);
Expand Down Expand Up @@ -183,14 +182,14 @@ private SeaTunnelClient createSeaTunnelClient() {
@Override
public Result<Void> jobPause(Integer userId, Long jobInstanceId) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
if (Objects.equals(
getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId()), "RUNNING")) {
if (getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId())
== JobStatus.RUNNING) {
pauseJobInEngine(jobInstance.getJobEngineId());
}
return Result.success();
}

private String getJobStatusFromEngine(@NonNull JobInstance jobInstance, String jobEngineId) {
private JobStatus getJobStatusFromEngine(@NonNull JobInstance jobInstance, String jobEngineId) {

Engine engine = new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public void complete(
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, userId);
JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId);
jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId);
jobInstance.setJobStatus(jobResult.getStatus().name());
jobInstance.setJobStatus(jobResult.getStatus());
jobInstance.setJobEngineId(jobEngineId);
jobInstance.setUpdateUserId(userId);
jobInstance.setErrorMessage(JobUtils.getJobInstanceErrorMessage(jobResult.getError()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.app.utils.JobUtils;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.Constants;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
Expand Down Expand Up @@ -89,7 +90,7 @@ public JobSummaryMetricsRes getJobSummaryMetrics(
Engine engine = new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion());
IEngineMetricsExtractor engineMetricsExtractor =
(new EngineMetricsExtractorFactory(engine)).getEngineMetricsExtractor();
String jobStatus = engineMetricsExtractor.getJobStatus(jobEngineId);
JobStatus jobStatus = engineMetricsExtractor.getJobStatus(jobEngineId);

List<JobMetrics> jobPipelineDetailMetrics =
getJobPipelineDetailMetrics(jobInstance, userId);
Expand Down Expand Up @@ -160,8 +161,8 @@ private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsBatch(
log.info("jobEngineId={}", jobInstance.getJobEngineId());

if (jobInstance.getJobStatus() == null
|| jobInstance.getJobStatus().equals("FAILED")
|| jobInstance.getJobStatus().equals("RUNNING")) {
|| jobInstance.getJobStatus() == JobStatus.FAILED
|| jobInstance.getJobStatus() == JobStatus.RUNNING) {
// Obtain monitoring information from the collection of running jobs returned from
// the engine
if (!allRunningJobMetricsFromEngine.isEmpty()
Expand Down Expand Up @@ -192,15 +193,15 @@ private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsBatch(
if (jobMetricsFromDb != null) {
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);
}
if ("RUNNING".equals(jobInstance.getJobStatus())) {
if (jobInstance.getJobStatus() == JobStatus.RUNNING) {
// Set the job status of jobInstance and jobMetrics in the database to
// finished
jobInstance.setJobStatus("FINISHED");
jobInstance.setJobStatus(JobStatus.FINISHED);
jobInstanceDao.getJobInstanceMapper().updateById(jobInstance);
}
}
} else if (jobInstance.getJobStatus().equals("FINISHED")
|| jobInstance.getJobStatus().equals("CANCELED")) {
} else if (jobInstance.getJobStatus() == JobStatus.FINISHED
|| jobInstance.getJobStatus() == JobStatus.CANCELED) {
// If the status of the job is finished or cancelled, the monitoring information is
// directly obtained from MySQL
JobSummaryMetricsRes jobMetricsFromDb =
Expand All @@ -222,7 +223,7 @@ private void modifyAndUpdateJobInstanceAndJobMetrics(
Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine,
Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
Integer userId) {
jobInstance.setJobStatus("RUNNING");
jobInstance.setJobStatus(JobStatus.RUNNING);
HashMap<Integer, JobMetrics> jobMetricsFromEngine =
allRunningJobMetricsFromEngine.get(
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()));
Expand All @@ -246,7 +247,7 @@ private void modifyAndUpdateJobInstanceAndJobMetrics(
jobMetricsFromEngine
.get(jobMetrics.getPipelineId())
.getWriteRowCount()));
jobMetricsFromDb.forEach(jobMetrics -> jobMetrics.setStatus("RUNNING"));
jobMetricsFromDb.forEach(jobMetrics -> jobMetrics.setStatus(JobStatus.RUNNING));

updateJobInstanceAndMetrics(jobInstance, jobMetricsFromDb);
}
Expand All @@ -265,7 +266,7 @@ private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(

try {
if (jobInstance.getJobStatus() != null
&& jobInstance.getJobStatus().equals("CANCELED")) {
&& jobInstance.getJobStatus() == JobStatus.CANCELED) {
// If the status of the job is finished or cancelled
// the monitoring information is directly obtained from MySQL
JobSummaryMetricsRes jobMetricsFromDb =
Expand All @@ -278,8 +279,8 @@ private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb);

} else if (jobInstance.getJobStatus() != null
&& (jobInstance.getJobStatus().equals("FINISHED")
|| jobInstance.getJobStatus().equals("FAILED"))) {
&& (jobInstance.getJobStatus() == JobStatus.FINISHED
|| jobInstance.getJobStatus() == JobStatus.FAILED)) {
// Obtain monitoring information from the collection of running jobs returned
// from
// the engine
Expand Down Expand Up @@ -332,9 +333,9 @@ private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(
jobInstance);
jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes);
} else {
String jobStatusByJobEngineId = null;
JobStatus jobStatus = null;
try {
jobStatusByJobEngineId =
jobStatus =
getJobStatusByJobEngineId(
String.valueOf(
jobInstanceIdAndJobEngineIdMap.get(
Expand All @@ -345,8 +346,8 @@ private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(
jobInstance.getId());
}

if (jobStatusByJobEngineId != null) {
jobInstance.setJobStatus(jobStatusByJobEngineId);
if (jobStatus != null) {
jobInstance.setJobStatus(jobStatus);
jobInstanceDao.update(jobInstance);
JobSummaryMetricsRes jobSummaryMetricsResByDb =
getJobSummaryMetricsResByDb(
Expand All @@ -365,7 +366,7 @@ private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(
jobInstanceIdAndJobEngineIdMap.get(
jobInstance.getId())));
if (!jobMetricsFromDb.isEmpty()) {
String finalJobStatusByJobEngineId = jobStatusByJobEngineId;
JobStatus finalJobStatusByJobEngineId = jobStatus;
jobMetricsFromDb.forEach(
jobMetrics ->
jobMetrics.setStatus(finalJobStatusByJobEngineId));
Expand Down Expand Up @@ -402,7 +403,8 @@ private JobSummaryMetricsRes getRunningJobMetricsFromEngine(

log.info("jobInstance={}", jobInstance);

return new JobSummaryMetricsRes(jobInstance.getId(), 1L, readCount, writeCount, "RUNNING");
return new JobSummaryMetricsRes(
jobInstance.getId(), 1L, readCount, writeCount, JobStatus.RUNNING);
}

private JobSummaryMetricsRes getJobSummaryMetricsResByDb(
Expand Down Expand Up @@ -442,7 +444,7 @@ private void updateJobInstanceAndMetrics(JobInstance jobInstance, List<JobMetric
}
}

private String getJobStatusByJobEngineId(String jobEngineId) {
private JobStatus getJobStatusByJobEngineId(String jobEngineId) {
return SeaTunnelEngineProxy.getInstance().getJobStatus(jobEngineId);
}

Expand Down Expand Up @@ -581,7 +583,7 @@ private void syncMetricsToDb(
jobMetricsDao.getJobMetricsMapper().insertBatchMetrics(jobMetricsFromEngine);
}
} else {
String jobStatus = getJobStatusByJobEngineId(jobEngineId);
JobStatus jobStatus = getJobStatusByJobEngineId(jobEngineId);
for (JobMetrics jobMetrics : jobMetricsFromDb) {
Integer pipelineId = jobMetrics.getPipelineId();
JobMetrics currentPiplinejobMetricsFromEngine =
Expand Down
Loading

0 comments on commit 94d8a39

Please sign in to comment.