Skip to content

Commit

Permalink
[Fix](Job)Fix some issues in the Insert job. (apache#44543)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
- The job does not account for tasks in the Canceled state. 
- When a job is canceled, its status is marked as FAILED, and a
NullPointerException (NPE) occurs because resources have already been
released.
```
java.lang.NullPointerException: Cannot invoke "org.apache.doris.qe.ConnectContext.getState()" because "this.ctx" is null
	at org.apache.doris.job.extensions.insert.InsertTask.run(InsertTask.java:200) ~[classes/:?]
	at org.apache.doris.job.task.AbstractTask.runTask(AbstractTask.java:167) ~[classes/:?]
	at org.apache.doris.job.executor.DefaultTaskExecutorHandler.onEvent(DefaultTaskExecutorHandler.java:50) ~[classes/:?]
	at org.apache.doris.job.executor.DefaultTaskExecutorHandler.onEvent(DefaultTaskExecutorHandler.java:33) ~[classes/:?]
	at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143) ~[disruptor-3.4.4.jar:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]15e8716a7ab9">
```

- The RESUME job does not immediately schedule the job.
  • Loading branch information
CalvinKirs authored Nov 26, 2024
1 parent 137ed95 commit 447a3e9
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public void cancelAllTasks() throws JobException {
}
for (T task : runningTasks) {
task.cancel();
canceledTaskCount.incrementAndGet();
}
runningTasks = new CopyOnWriteArrayList<>();
logUpdateOperation();
Expand Down Expand Up @@ -185,6 +186,7 @@ public void cancelTaskById(long taskId) throws JobException {
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel();
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
canceledTaskCount.incrementAndGet();
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
}
Expand Down Expand Up @@ -418,13 +420,13 @@ public TRow getTvfInfo() {
/**
* Generates a common error message when the execution queue is full.
*
* @param taskId The ID of the task.
* @param queueConfigName The name of the queue configuration.
* @param taskId The ID of the task.
* @param queueConfigName The name of the queue configuration.
* @param executeThreadConfigName The name of the execution thread configuration.
* @return A formatted error message.
*/
protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String queueConfigName,
String executeThreadConfigName) {
String executeThreadConfigName) {
return String.format("Dispatch task failed, jobId: %d, jobName: %s, taskId: %d, the queue size is full, "
+ "you can increase the queue size by setting the property "
+ "%s in the fe.conf file or increase the value of "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ public void run() throws JobException {

@Override
public void onFail() throws JobException {
if (isCanceled.get()) {
return;
}
isFinished.set(true);
super.onFail();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ private void dropJob(T dropJob, String jobName) throws JobException {
public void alterJobStatus(Long jobId, JobStatus status) throws JobException {
checkJobExist(jobId);
jobMap.get(jobId).updateJobStatus(status);
if (status.equals(JobStatus.RUNNING)) {
jobScheduler.scheduleOneJob(jobMap.get(jobId));
}
jobMap.get(jobId).logUpdateOperation();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ public void runTask() throws JobException {
run();
onSuccess();
} catch (Exception e) {
if (TaskStatus.CANCELED.equals(status)) {
return;
}
this.errMsg = e.getMessage();
onFail();
log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e);
Expand Down
11 changes: 10 additions & 1 deletion regression-test/suites/job_p0/test_base_insert_job.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,11 @@ suite("test_base_insert_job") {
RESUME JOB where jobname = '${jobName}'
"""
println(tasks.size())
// test resume job success
Awaitility.await("resume-job-test").atMost(60, SECONDS).until({
def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """
println "resume tasks :" + afterResumeTasks
//resume tasks size should be greater than before pause
afterResumeTasks.size() > tasks.size()
})

Expand All @@ -247,7 +249,6 @@ suite("test_base_insert_job") {
CREATE JOB ${jobName} ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
println e.getMessage()
assert e.getMessage().contains("startTimeMs must be greater than current time")
}
// assert end time less than start time
Expand Down Expand Up @@ -281,6 +282,14 @@ suite("test_base_insert_job") {
} catch (Exception e) {
assert e.getMessage().contains("Invalid interval time unit: years")
}
// assert interval time unit is -1
try {
sql """
CREATE JOB test_error_starts ON SCHEDULE every -1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("expecting INTEGER_VALUE")
}

// test keyword as job name
sql """
Expand Down

0 comments on commit 447a3e9

Please sign in to comment.