From 36a3f58b45a1faf1fd6fdcafb738ab7b5f579139 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 16 Jul 2024 13:10:54 -0700 Subject: [PATCH 1/3] Process Heartbeat DagAction CDC messages with empty FlowExecutionId str --- .../DagActionStoreChangeMonitorTest.java | 14 +++++----- ...gementDagActionStoreChangeMonitorTest.java | 8 +++--- .../modules/orchestration/Orchestrator.java | 3 ++ .../FlowCompilationValidationHelper.java | 28 +++++++++++-------- .../DagActionStoreChangeMonitor.java | 8 ++++-- 5 files changed, 36 insertions(+), 25 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java index 40c930ea462..9fbebdb42ba 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -70,7 +70,7 @@ public class DagActionStoreChangeMonitorTest { private final String FLOW_GROUP = "flowGroup"; private final String FLOW_NAME = "flowName"; - private final long FLOW_EXECUTION_ID = 123L; + private final String FLOW_EXECUTION_ID = "123456789"; private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor; private int txidCounter = 0; @@ -135,7 +135,7 @@ public void tearDown() throws Exception { @Test public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoundException { Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = - wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", FLOW_EXECUTION_ID, null); + wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", null); mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); @@ -150,7 +150,7 @@ public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoun @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction") public void testProcessMessageWithHeartbeatAndFlowInfo() throws SpecNotFoundException { Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = - wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME); + wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", DagActionValue.RESUME); mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); @@ -251,12 +251,12 @@ public void testStartupSequenceHandlesFailures() throws Exception { * Util to create a general DagActionStoreChange type event */ private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType operationType, - String flowGroup, String flowName, long flowExecutionId, DagActionValue dagAction) { + String flowGroup, String flowName, String flowExecutionId, DagActionValue dagAction) { String key = getKeyForFlow(flowGroup, flowName, flowExecutionId); GenericStoreChangeEvent genericStoreChangeEvent = new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType); txidCounter++; - return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, String.valueOf(flowExecutionId), + return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, flowExecutionId, DagActionStore.NO_JOB_NAME_DEFAULT, dagAction); } @@ -264,7 +264,7 @@ private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType * Form a key for events using the flow identifiers * @return a key formed by adding an '_' delimiter between the flow identifiers */ - public static String getKeyForFlow(String flowGroup, String flowName, long flowExecutionId) { + public static String getKeyForFlow(String flowGroup, String flowName, String flowExecutionId) { return flowGroup + "_" + flowName + "_" + flowExecutionId; } @@ -272,7 +272,7 @@ public static String getKeyForFlow(String flowGroup, String flowName, long flowE * Util to create wrapper around DagActionStoreChangeEvent */ private Kafka09ConsumerClient.Kafka09ConsumerRecord wrapDagActionStoreChangeEvent(OperationType operationType, - String flowGroup, String flowName, long flowExecutionId, DagActionValue dagAction) { + String flowGroup, String flowName, String flowExecutionId, DagActionValue dagAction) { DagActionStoreChangeEvent eventToProcess = null; try { eventToProcess = diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java index 8d2b6bab9d3..3dbf0a96120 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java @@ -60,7 +60,7 @@ public class DagManagementDagActionStoreChangeMonitorTest { private final int OFFSET = 1; private final String FLOW_GROUP = "flowGroup"; private final String FLOW_NAME = "flowName"; - private final long FLOW_EXECUTION_ID = 123L; + private final String FLOW_EXECUTION_ID = "987654321"; private final String JOB_NAME = "jobName"; private MockDagManagementDagActionStoreChangeMonitor mockDagManagementDagActionStoreChangeMonitor; private int txidCounter = 0; @@ -109,7 +109,7 @@ public void setUp() throws Exception { public void testProcessMessageWithDelete() throws SchedulerException { Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME, DagActionValue.ENFORCE_JOB_START_DEADLINE); - DagActionStore.DagAction dagAction = new DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, JOB_NAME, + DagActionStore.DagAction dagAction = new DagActionStore.DagAction(FLOW_GROUP, FLOW_NAME, Long.parseLong(FLOW_EXECUTION_ID), JOB_NAME, DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE); mockDagManagementDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); /* TODO: skip deadline removal for now and let them fire @@ -124,7 +124,7 @@ public void testProcessMessageWithDelete() throws SchedulerException { * Util to create a general DagActionStoreChange type event */ private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType operationType, - String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionValue dagAction) { + String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionValue dagAction) { String key = DagActionStoreChangeMonitorTest.getKeyForFlow(flowGroup, flowName, flowExecutionId); GenericStoreChangeEvent genericStoreChangeEvent = new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType); @@ -137,7 +137,7 @@ private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType * Util to create wrapper around DagActionStoreChangeEvent */ private Kafka09ConsumerClient.Kafka09ConsumerRecord wrapDagActionStoreChangeEvent( - OperationType operationType, String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionValue dagAction) { + OperationType operationType, String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionValue dagAction) { DagActionStoreChangeEvent eventToProcess = null; try { eventToProcess = diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index f53b156b7b9..04e12c13ed1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -290,6 +290,9 @@ public void compileAndSubmitFlowToDagManager(FlowSpec flowSpec) throws IOExcepti _log.warn("Flow: {} submitted to dagManager failed to compile and produce a job execution plan dag", flowSpec); Instrumented.markMeter(this.flowOrchestrationFailedMeter); } + } catch (IOException | InterruptedException e) { + Instrumented.markMeter(this.flowOrchestrationFailedMeter); + throw e; } finally { this.dagManager.removeFlowSpecIfAdhoc(flowSpec); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 553fee27185..5695cc5eabe 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -112,20 +112,26 @@ public Optional> createExecutionPlanIfValid(FlowSpec flowS TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED); Map flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); - Optional> jobExecutionPlanDagOptional = - validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName, flowMetadata); - if (!jobExecutionPlanDagOptional.isPresent()) { - return Optional.absent(); - } + try { + Optional> jobExecutionPlanDagOptional = + validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName, flowMetadata); - if (jobExecutionPlanDagOptional.get().isEmpty()) { - populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata); - return Optional.absent(); - } + if (!jobExecutionPlanDagOptional.isPresent()) { + return Optional.absent(); + } + + if (jobExecutionPlanDagOptional.get().isEmpty()) { + populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata); + return Optional.absent(); + } - flowCompilationTimer.stop(flowMetadata); - return jobExecutionPlanDagOptional; + flowCompilationTimer.stop(flowMetadata); + return jobExecutionPlanDagOptional; + } catch (IOException e) { + log.error("Encountered exception when attempting to compile and perform checks for flow: {}", flowSpec); + throw e; + } } /** diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index 3f2c86771a9..d9d0913bcd6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -199,7 +199,7 @@ protected void processMessage(DecodeableKafkaRecord Date: Tue, 16 Jul 2024 14:48:49 -0700 Subject: [PATCH 2/3] Update unit test for HB and non empty flow info --- .../apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java index 9fbebdb42ba..a5de624ceda 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -150,7 +150,7 @@ public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoun @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction") public void testProcessMessageWithHeartbeatAndFlowInfo() throws SpecNotFoundException { Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = - wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", DagActionValue.RESUME); + wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, "", "", DagActionValue.RESUME); mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong()); verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong()); From 15da707862baf392d9d4f13827b849455a16b531 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Tue, 16 Jul 2024 15:55:32 -0700 Subject: [PATCH 3/3] Only log flow name and group in error log --- .../service/modules/utils/FlowCompilationValidationHelper.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 5695cc5eabe..b1bace47854 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -129,7 +129,8 @@ public Optional> createExecutionPlanIfValid(FlowSpec flowS flowCompilationTimer.stop(flowMetadata); return jobExecutionPlanDagOptional; } catch (IOException e) { - log.error("Encountered exception when attempting to compile and perform checks for flow: {}", flowSpec); + log.error("Encountered exception when attempting to compile and perform checks for flowGroup: {} flowName: {}", + flowGroup, flowName); throw e; } }