Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Zeta] Fix task can not end cause by lock metrics failed #7357

Merged
merged 6 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand All @@ -36,8 +39,10 @@

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.map.IMap;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,11 +78,17 @@ public void testSayHello() {

@Test
public void testExecuteJob() throws Exception {
runJobFileWithAssertEndStatus(
"batch_fakesource_to_file.conf", "fake_to_file", JobStatus.FINISHED);
}

private static void runJobFileWithAssertEndStatus(
String confFile, String name, JobStatus finished)
throws ExecutionException, InterruptedException {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("batch_fakesource_to_file.conf");
String filePath = TestUtils.getResource(confFile);
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");

jobConfig.setName(name);
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) {
Expand All @@ -94,11 +105,25 @@ public void testExecuteJob() throws Exception {
() ->
Assertions.assertTrue(
objectCompletableFuture.isDone()
&& JobStatus.FINISHED.equals(
&& finished.equals(
objectCompletableFuture.get())));
}
}

@Test
public void testExecuteJobWithLockMetrics() throws Exception {
// lock metrics map
IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
hazelcastInstance.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
try {
runJobFileWithAssertEndStatus(
"batch_fakesource_to_file.conf", "fake_to_file", JobStatus.FINISHED);
} finally {
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
}
}

@Test
public void cancelJobTest() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
Expand Down Expand Up @@ -229,29 +254,9 @@ void afterClass() {

@Test
public void testLastCheckpointErrorJob() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("batch_last_checkpoint_error.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("batch_last_checkpoint_error");

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) {
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);

await().atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
objectCompletableFuture.isDone()
&& JobStatus.FAILED.equals(
objectCompletableFuture.get())));
}
runJobFileWithAssertEndStatus(
"batch_last_checkpoint_error.conf",
"batch_last_checkpoint_error",
JobStatus.FAILED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -968,10 +968,14 @@ void taskDone(Task task) {
cancellationFutures.remove(taskGroupLocation);
try {
cancelAsyncFunction(taskGroupLocation);
} catch (Throwable e) {
throw new RuntimeException(e);
} catch (Throwable t) {
logger.severe("cancel async function failed", t);
}
try {
updateMetricsContextInImap();
} catch (Throwable t) {
logger.severe("update metrics context in imap failed", t);
}
updateMetricsContextInImap();
if (ex == null) {
logger.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,14 @@ private void subPlanDone(PipelineStatus pipelineStatus) {
RetryUtils.retryWithException(
() -> {
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
try {
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
} catch (Throwable e) {
log.error(
"Remove metrics context for pipeline {} failed, with exception: {}",
pipelineFullName,
ExceptionUtils.getMessage(e));
}
notifyCheckpointManagerPipelineEnd(pipelineStatus);
jobMaster.releasePipelineResource(this);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
Expand Down Expand Up @@ -678,8 +679,13 @@ public void removeMetricsContext(

boolean lockedIMap = false;
try {
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
lockedIMap = true;
lockedIMap =
metricsImap.tryLock(
Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, TimeUnit.SECONDS);
if (!lockedIMap) {
LOGGER.severe("lock imap failed in update metrics");
return;
}

HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
Expand All @@ -697,6 +703,8 @@ public void removeMetricsContext(
collect.forEach(centralMap::remove);
metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
}
} catch (Exception e) {
LOGGER.warning("failed to remove metrics context", e);
} finally {
if (lockedIMap) {
boolean unLockedIMap = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.seatunnel.engine.server.execution.TestTask;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -65,8 +64,6 @@ public void before() {
}

@Test
@Disabled(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reopen test case which we should open!

"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testCancel() {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();

Expand All @@ -92,8 +89,6 @@ public void testCancel() {
}

@Test
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testCancelBlockTask() throws InterruptedException {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();

Expand All @@ -118,8 +113,6 @@ public void testCancelBlockTask() throws InterruptedException {
}

@Test
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testFinish() {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();

Expand Down Expand Up @@ -150,8 +143,6 @@ public void testFinish() {

/** Test task execution time is the same as the timer timeout */
@Test
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testCriticalCallTime() throws InterruptedException {
AtomicBoolean stopMark = new AtomicBoolean(false);
CopyOnWriteArrayList<Long> stopTime = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -189,8 +180,6 @@ public void testCriticalCallTime() throws InterruptedException {
}

@Test
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testThrowException() throws InterruptedException {
TaskExecutionService taskExecutionService = server.getTaskExecutionService();

Expand Down Expand Up @@ -264,8 +253,6 @@ public void testThrowException() throws InterruptedException {
}

@RepeatedTest(2)
@Disabled(
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
public void testDelay() throws InterruptedException {

long lowLagSleep = 10;
Expand Down
Loading