diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index c49929586d5..b5d02c03443 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -49,7 +49,7 @@ public class ServerConfigOptions { public static final Option JOB_METRICS_BACKUP_INTERVAL = Options.key("job-metrics-backup-interval") .intType() - .defaultValue(60) + .defaultValue(10) .withDescription("The interval (in seconds) of job metrics backups"); public static final Option TASK_EXECUTION_THREAD_SHARE_MODE = diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index 3e18a458e39..971266e091a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -269,11 +269,19 @@ public boolean isSavePointEnd() { } protected InvocationFuture sendOperationToMemberNode(TaskOperation operation) { + log.debug( + "Sead Operation : " + + operation.getClass().getSimpleName() + + " to " + + jobMaster.queryTaskGroupAddress( + operation.getTaskLocation().getTaskGroupLocation()) + + " for task group:" + + operation.getTaskLocation().getTaskGroupLocation()); return NodeEngineUtil.sendOperationToMemberNode( nodeEngine, operation, jobMaster.queryTaskGroupAddress( - operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId())); + operation.getTaskLocation().getTaskGroupLocation())); } /** diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 0b840e22cb1..23d4010d31e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -78,7 +78,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -400,23 +399,23 @@ public void cleanJob() { removeJobIMap(); } - public Address queryTaskGroupAddress(long taskGroupId) { - for (PipelineLocation pipelineLocation : ownedSlotProfilesIMap.keySet()) { - Optional currentVertex = - ownedSlotProfilesIMap.get(pipelineLocation).keySet().stream() - .filter( - taskGroupLocation -> - taskGroupLocation.getTaskGroupId() == taskGroupId) - .findFirst(); - if (currentVertex.isPresent()) { - return ownedSlotProfilesIMap - .get(pipelineLocation) - .get(currentVertex.get()) - .getWorker(); + public Address queryTaskGroupAddress(TaskGroupLocation taskGroupLocation) { + + PipelineLocation pipelineLocation = + new PipelineLocation( + taskGroupLocation.getJobId(), taskGroupLocation.getPipelineId()); + + Map taskGroupLocationSlotProfileMap = + ownedSlotProfilesIMap.get(pipelineLocation); + + if (null != taskGroupLocationSlotProfileMap) { + SlotProfile slotProfile = taskGroupLocationSlotProfileMap.get(taskGroupLocation); + if (null != slotProfile) { + return slotProfile.getWorker(); } } throw new IllegalArgumentException( - "can't find task group address from task group id: " + taskGroupId); + "can't find task group address from taskGroupLocation: " + taskGroupLocation); } public ClassLoader getClassLoader() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java index 2bc3b2a5be2..fc6aff87f9a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java @@ -97,7 +97,10 @@ private CompletableFuture schedulerPipeline(SubPlan pipeline) { pipeline, jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation())); - log.debug("slotProfiles: {}", slotProfiles); + log.debug( + "slotProfiles: {}, PipelineLocation: {}", + slotProfiles, + pipeline.getPipelineLocation()); // To ensure release pipeline resource after new master node active, we need store // slotProfiles first and then deploy tasks. diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java index 50a887176d9..ecb94e85c0f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java @@ -52,10 +52,7 @@ public void run() throws Exception { () -> server.getCoordinatorService() .getJobMaster(taskLocation.getJobId()) - .queryTaskGroupAddress( - taskLocation - .getTaskGroupLocation() - .getTaskGroupId()), + .queryTaskGroupAddress(taskLocation.getTaskGroupLocation()), new RetryUtils.RetryMaterial( Constant.OPERATION_RETRY_TIME, true,