diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java index eb86ebfb14b..fc5b87802c3 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -357,6 +358,35 @@ public void testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti jobStatusMonitor.shutDown(); } + @Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent") + public void testProcessProgressingMessageWhenNoPreviousStatus() throws IOException, ReflectiveOperationException { + KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic5"); + + //Submit GobblinTrackingEvents to Kafka + ImmutableList.of( + createGTE(TimingEvent.JOB_COMPLETION_PERCENTAGE, new HashMap<>()) + ).forEach(event -> { + context.submitEvent(event); + kafkaReporter.report(); + }); + + try { + Thread.sleep(1000); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty()); + jobStatusMonitor.buildMetricsContextAndMetrics(); + Iterator recordIterator = Iterators.transform( + this.kafkaTestHelper.getIteratorForTopic(TOPIC), + this::convertMessageAndMetadataToDecodableKafkaRecord); + + State state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName); + // Verify we are able to process it without NPE + Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD)); + } + @Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors") public void testProcessMessageForCancelledAndKilledEvent() throws IOException, ReflectiveOperationException { KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic4"); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java index 363d5c74450..272c949775c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java @@ -221,56 +221,63 @@ protected void processMessage(DecodeableKafkaRecord message) { @VisibleForTesting static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore) throws IOException { - if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) { - jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, JobStatusRetriever.NA_KEY); - } - if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) { - jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY); - } - String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD); - String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD); - String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD); - String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD); - String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD); - String storeName = jobStatusStoreName(flowGroup, flowName); - String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName); - - List states = stateStore.getAll(storeName, tableName); - if (states.size() > 0) { - org.apache.gobblin.configuration.State previousJobStatus = states.get(states.size() - 1); - String previousStatus = previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD); - String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD); - int previousGeneration = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 1); - // This is to make the change backward compatible as we may not have this info in cluster events - // If we does not have those info, we treat the event as coming from the same attempts as previous one - int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration); - int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1); - int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts); - // We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals). - // The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating. - // And job status reflect the execution status in one attempt - if (previousStatus != null && currentStatus != null && - (previousGeneration > currentGeneration - || (previousGeneration == currentGeneration && previousAttempts > currentAttempts) - || (previousGeneration == currentGeneration && previousAttempts == currentAttempts - && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){ - log.warn(String.format("Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)", - currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId, jobGroup, jobName)); - jobStatus = mergeState(states.get(states.size() - 1), jobStatus); - } else { - jobStatus = mergeState(jobStatus, states.get(states.size() - 1)); + try { + if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) { + jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, JobStatusRetriever.NA_KEY); + } + if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) { + jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY); + } + String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD); + String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD); + String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD); + String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD); + String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD); + String storeName = jobStatusStoreName(flowGroup, flowName); + String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName); + + List states = stateStore.getAll(storeName, tableName); + if (states.size() > 0) { + org.apache.gobblin.configuration.State previousJobStatus = states.get(states.size() - 1); + String previousStatus = previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD); + String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD); + int previousGeneration = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 1); + // This is to make the change backward compatible as we may not have this info in cluster events + // If we does not have those info, we treat the event as coming from the same attempts as previous one + int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration); + int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1); + int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts); + // We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals). + // The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating. + // And job status reflect the execution status in one attempt + if (previousStatus != null && currentStatus != null && (previousGeneration > currentGeneration || ( + previousGeneration == currentGeneration && previousAttempts > currentAttempts) || (previousGeneration == currentGeneration && previousAttempts == currentAttempts + && ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) + < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) { + log.warn(String.format( + "Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)", + currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts, + flowGroup, flowName, flowExecutionId, jobGroup, jobName)); + jobStatus = mergeState(states.get(states.size() - 1), jobStatus); + } else { + jobStatus = mergeState(jobStatus, states.get(states.size() - 1)); + } } - } - modifyStateIfRetryRequired(jobStatus); - stateStore.put(storeName, tableName, jobStatus); + modifyStateIfRetryRequired(jobStatus); + stateStore.put(storeName, tableName, jobStatus); + } catch (Exception e) { + log.warn("Meet exception when adding jobStatus to state store at " + + e.getStackTrace()[0].getClassName() + "line number: " + e.getStackTrace()[0].getLineNumber(), e); + throw new IOException(e); + } } private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) { int maxAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1); int currentAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1); // SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event - if ((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()) + if (state.contains(JobStatusRetriever.EVENT_NAME_FIELD) &&(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name()) || state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name()) || (state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.CANCELLED.name()) && state.contains(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY)) ) && currentAttempts < maxAttempts) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java index 765b526df3d..d226c83222c 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java @@ -160,7 +160,7 @@ public void testInvalidColumnName() { try { KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore()); } catch (IOException e) { - Assert.assertTrue(e.getCause().getMessage().contains("Data too long")); + Assert.assertTrue(e.getCause().getCause().getMessage().contains("Data too long")); return; } Assert.fail();