Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1652]Add more log in the KafkaJobStatusMonitor in case it fails to process one GobblinTrackingEvent #3513

Merged
merged 5 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -357,6 +358,35 @@ public void testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
jobStatusMonitor.shutDown();
}

@Test (dependsOnMethods = "testProcessMessageForCancelledAndKilledEvent")
public void testProcessProgressingMessageWhenNoPreviousStatus() throws IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic5");

//Submit GobblinTrackingEvents to Kafka
ImmutableList.of(
createGTE(TimingEvent.JOB_COMPLETION_PERCENTAGE, new HashMap<>())
).forEach(event -> {
context.submitEvent(event);
kafkaReporter.report();
});

try {
Thread.sleep(1000);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);

State state = getNextJobStatusState(jobStatusMonitor, recordIterator, this.jobGroup, this.jobName);
// Verify we are able to process it without NPE
Assert.assertNull(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
}

@Test (dependsOnMethods = "testProcessingRetriedForApparentlyTransientErrors")
public void testProcessMessageForCancelledAndKilledEvent() throws IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic4");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,56 +221,63 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
@VisibleForTesting
static void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus, StateStore stateStore)
throws IOException {
if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, JobStatusRetriever.NA_KEY);
}
if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
}
String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
String storeName = jobStatusStoreName(flowGroup, flowName);
String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);

List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
if (states.size() > 0) {
org.apache.gobblin.configuration.State previousJobStatus = states.get(states.size() - 1);
String previousStatus = previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
int previousGeneration = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 1);
// This is to make the change backward compatible as we may not have this info in cluster events
// If we does not have those info, we treat the event as coming from the same attempts as previous one
int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration);
int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts);
// We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals).
// The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating.
// And job status reflect the execution status in one attempt
if (previousStatus != null && currentStatus != null &&
(previousGeneration > currentGeneration
|| (previousGeneration == currentGeneration && previousAttempts > currentAttempts)
|| (previousGeneration == currentGeneration && previousAttempts == currentAttempts
&& ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))){
log.warn(String.format("Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts, flowGroup, flowName, flowExecutionId, jobGroup, jobName));
jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
} else {
jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
try {
if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, JobStatusRetriever.NA_KEY);
}
if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD)) {
jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
}
String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
String storeName = jobStatusStoreName(flowGroup, flowName);
String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);

List<org.apache.gobblin.configuration.State> states = stateStore.getAll(storeName, tableName);
if (states.size() > 0) {
org.apache.gobblin.configuration.State previousJobStatus = states.get(states.size() - 1);
String previousStatus = previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
String currentStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
int previousGeneration = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, 1);
// This is to make the change backward compatible as we may not have this info in cluster events
// If we does not have those info, we treat the event as coming from the same attempts as previous one
int currentGeneration = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD, previousGeneration);
int previousAttempts = previousJobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
int currentAttempts = jobStatus.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, previousAttempts);
// We use three things to accurately count and thereby bound retries, even amidst out-of-order events (by skipping late arrivals).
// The generation is monotonically increasing, while the attempts may re-initialize back to 0. this two-part form prevents the composite value from ever repeating.
// And job status reflect the execution status in one attempt
if (previousStatus != null && currentStatus != null && (previousGeneration > currentGeneration || (
previousGeneration == currentGeneration && previousAttempts > currentAttempts) || (previousGeneration == currentGeneration && previousAttempts == currentAttempts
&& ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
< ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))))) {
log.warn(String.format(
"Received status [generation.attempts] = %s [%s.%s] when already %s [%s.%s] for flow (%s, %s, %s), job (%s, %s)",
currentStatus, currentGeneration, currentAttempts, previousStatus, previousGeneration, previousAttempts,
flowGroup, flowName, flowExecutionId, jobGroup, jobName));
jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
} else {
jobStatus = mergeState(jobStatus, states.get(states.size() - 1));
}
}
}

modifyStateIfRetryRequired(jobStatus);
stateStore.put(storeName, tableName, jobStatus);
modifyStateIfRetryRequired(jobStatus);
stateStore.put(storeName, tableName, jobStatus);
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what kind of exception were we seeing here? curious whether the entire block above needs wrapping or perhaps just the tail end subset of it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are seeing NPE without any information, so want to catch the entire method

log.warn("Meet exception when adding jobStatus to state store at "
+ e.getStackTrace()[0].getClassName() + "line number: " + e.getStackTrace()[0].getLineNumber(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't the exception stack trace be logged, since you pass it as the final arg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, based on what we noticed, if it's NPE, no much info will logged out, so I want to add the line info there in case we meet the same issue next time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to log that info. I was just thinking the full stack trace (warn outputs from e) should have the info in

e.getStackTrace()[0].getClassName() + "line number: " + e.getStackTrace()[0].getLineNumber()

throw new IOException(e);
}
}

private static void modifyStateIfRetryRequired(org.apache.gobblin.configuration.State state) {
int maxAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, 1);
int currentAttempts = state.getPropAsInt(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, 1);
// SHOULD_RETRY_FIELD maybe reset by JOB_COMPLETION_PERCENTAGE event
if ((state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
if (state.contains(JobStatusRetriever.EVENT_NAME_FIELD) &&(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.FAILED.name())
|| state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.PENDING_RETRY.name())
|| (state.getProp(JobStatusRetriever.EVENT_NAME_FIELD).equals(ExecutionStatus.CANCELLED.name()) && state.contains(TimingEvent.FlowEventConstants.DOES_CANCELED_FLOW_MERIT_RETRY))
) && currentAttempts < maxAttempts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testInvalidColumnName() {
try {
KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage().contains("Data too long"));
Assert.assertTrue(e.getCause().getCause().getMessage().contains("Data too long"));
return;
}
Assert.fail();
Expand Down