Skip to content

Commit

Permalink
Work
Browse files Browse the repository at this point in the history
# Conflicts:
#	zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/SleepingJob.java
#	zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
#	zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
Reamer committed Dec 20, 2024

Verified

This commit was signed with the committer’s verified signature.
1 parent b6e40d4 commit b71b9ff
Showing 8 changed files with 162 additions and 126 deletions.
Original file line number Diff line number Diff line change
@@ -869,7 +869,7 @@ public InterpreterResult jobRun() throws Throwable {

@Override
protected boolean jobAbort() {
return false;
return true;
}

@Override
@@ -884,13 +884,22 @@ public void cancel(String sessionId,
String className,
RemoteInterpreterContext interpreterContext)
throws InterpreterRPCException, TException {
LOGGER.info("cancel {} {}", className, interpreterContext.getParagraphId());
LOGGER.info("cancel classname[{}] paragraphId[{}] sessionId[{}]", className,
interpreterContext.getParagraphId(), sessionId);
Interpreter intp = getInterpreter(sessionId, className);
String jobId = interpreterContext.getParagraphId();
Job<?> job = intp.getScheduler().getJob(jobId);

if (job != null) {
LOGGER.info("JobStatus {}", job.getStatus());
}
if (job != null && job.getStatus() == Status.PENDING) {
// Abort the job to inform the scheduler that this job does not need to be executed
job.abort();
LOGGER.info("isJobaborted {}", job.isAborted());
// Set Abort Status to inform the Zeppelin server about the abort.
job.setStatus(Status.ABORT);
LOGGER.info("Set job {} to abort", job);
} else {
Thread thread = new Thread( ()-> {
try {
Original file line number Diff line number Diff line change
@@ -75,6 +75,9 @@ public void submit(Job<?> job) {
}

@Override
/**
* Do not remove the job from the queue, because the job is reseted during runJob
*/
public Job<?> cancel(String jobId) {
Job<?> job = jobs.remove(jobId);
job.abort();
@@ -121,6 +124,7 @@ public void stop() {
* @param runningJob
*/
protected void runJob(Job<?> runningJob) {
LOGGER.info("Begin job {}", runningJob.getId());
if (runningJob.isAborted()) {
LOGGER.info("Job {} is aborted", runningJob.getId());
runningJob.setStatus(Job.Status.ABORT);
@@ -129,10 +133,7 @@ protected void runJob(Job<?> runningJob) {
}

LOGGER.info("Job {} started by scheduler {}", runningJob.getId(), name);
// Don't set RUNNING status when it is RemoteScheduler, update it via JobStatusPoller
if (!getClass().getSimpleName().equals("RemoteScheduler")) {
runningJob.setStatus(Job.Status.RUNNING);
}
runningJob.setStatus(Job.Status.RUNNING);
runningJob.run();
Object jobResult = runningJob.getReturn();
synchronized (runningJob) {
Original file line number Diff line number Diff line change
@@ -178,6 +178,10 @@ public void onJobStarted() {
}

public void onJobEnded() {
// Do not set a finish date, if job is aborted
if (isAborted()) {
return;
}
dateFinished = new Date();
}

@@ -194,9 +198,14 @@ public void run() {
}

private void completeWithSuccess(T result) {
setResult(result);
// Do not set a anything if the job is aborted or failed
if (isAborted() || status.isFailed()) {
return;
}
exception = null;
errorMessage = null;
setResult(result);

}

private void completeWithError(Throwable error) {
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
package org.apache.zeppelin.scheduler;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.apache.zeppelin.scheduler.Job.Status;
@@ -37,8 +38,8 @@ static void setUp() {
void testRun() throws InterruptedException {
Scheduler s = schedulerSvc.createOrGetFIFOScheduler("testRun");

Job<?> job1 = new SleepingJob("job1", null, 500);
Job<?> job2 = new SleepingJob("job2", null, 500);
SleepingJob job1 = new SleepingJob("job1", null, 500);
SleepingJob job2 = new SleepingJob("job2", null, 500);

s.submit(job1);
s.submit(job2);
@@ -50,16 +51,16 @@ void testRun() throws InterruptedException {
Thread.sleep(500);
assertEquals(Status.FINISHED, job1.getStatus());
assertEquals(Status.RUNNING, job2.getStatus());
assertTrue((500 < (Long) job1.getReturn()));
assertTrue((500 < job1.getReturn()));
schedulerSvc.removeScheduler(s.getName());
}

@Test
void testAbort() throws InterruptedException {
Scheduler s = schedulerSvc.createOrGetFIFOScheduler("testAbort");

Job<?> job1 = new SleepingJob("job1", null, 500);
Job<?> job2 = new SleepingJob("job2", null, 500);
SleepingJob job1 = new SleepingJob("job1", null, 500);
SleepingJob job2 = new SleepingJob("job2", null, 500);

s.submit(job1);
s.submit(job2);
@@ -74,8 +75,8 @@ void testAbort() throws InterruptedException {
assertEquals(Status.ABORT, job1.getStatus());
assertEquals(Status.ABORT, job2.getStatus());

assertTrue((500 > (Long) job1.getReturn()));
assertEquals(null, job2.getReturn());
assertNull(job1.getReturn());
assertNull(job2.getReturn());
schedulerSvc.removeScheduler(s.getName());
}
}
Original file line number Diff line number Diff line change
@@ -26,15 +26,14 @@
/**
*
*/
public class SleepingJob extends Job {
public class SleepingJob extends Job<Long> {

private int time;
boolean abort = false;
private long start;
private int count;

private static final Logger LOGGER = LoggerFactory.getLogger(SleepingJob.class);
private Object results;
private Long results;


public SleepingJob(String jobName, JobListener listener, int time) {
@@ -44,9 +43,9 @@ public SleepingJob(String jobName, JobListener listener, int time) {
}

@Override
public Object jobRun() {
public Long jobRun() {
start = System.currentTimeMillis();
while (abort == false) {
while (!isAborted()) {
count++;
try {
Thread.sleep(10);
@@ -62,17 +61,16 @@ public Object jobRun() {

@Override
public boolean jobAbort() {
abort = true;
return true;
}

@Override
public void setResult(Object results) {
public void setResult(Long results) {
this.results = results;
}

@Override
public Object getReturn() {
public Long getReturn() {
return results;
}

Original file line number Diff line number Diff line change
@@ -346,12 +346,12 @@ public Scheduler getScheduler() {
if (executionMode.equals("paragraph")) {
String name = RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId()
+ "-" + sessionId;
Scheduler s = new RemoteScheduler(name, this);
Scheduler s = new RemoteScheduler(name, SchedulerFactory.singleton().getExecutor(), this);
return SchedulerFactory.singleton().createOrGetScheduler(s);
} else if (executionMode.equals("note")) {
String noteId = getProperty(".noteId");
String name = RemoteInterpreter.class.getSimpleName() + "-" + noteId;
Scheduler s = new RemoteScheduler(name, this);
Scheduler s = new RemoteScheduler(name, SchedulerFactory.singleton().getExecutor(), this);
return SchedulerFactory.singleton().createOrGetScheduler(s);
} else {
throw new RuntimeException("Invalid execution mode: " + executionMode);
Loading

0 comments on commit b71b9ff

Please sign in to comment.