Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1822]Logging Abnormal Helix Task States #3685

Merged
merged 12 commits into from
Apr 26, 2023

Conversation

ZihanLi58
Copy link
Contributor

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Currently, in the autoScalingManager, we iterate through all Helix tasks without logging their statuses. This means that if any issues occur and we need to restart the pipeline, we lose the Helix status information, making it difficult to investigate the problem further.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    Unit test

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@codecov-commenter
Copy link

codecov-commenter commented Apr 21, 2023

Codecov Report

Merging #3685 (c4daa78) into master (3dd82ac) will increase coverage by 0.02%.
The diff coverage is 41.66%.

@@             Coverage Diff              @@
##             master    #3685      +/-   ##
============================================
+ Coverage     46.85%   46.88%   +0.02%     
- Complexity    10756    10758       +2     
============================================
  Files          2138     2138              
  Lines         84029    84040      +11     
  Branches       9338     9340       +2     
============================================
+ Hits          39374    39400      +26     
+ Misses        41059    41043      -16     
- Partials       3596     3597       +1     
Impacted Files Coverage Δ
...rg/apache/gobblin/yarn/YarnAutoScalingManager.java 56.61% <41.66%> (-1.79%) ⬇️

... and 4 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@homatthew homatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change. There are some edge cases that I am curious about

return jobContext.getAssignedParticipant(i);
}
if (!jobContext.getPartitionState(i).equals(
TaskPartitionState.ERROR) && !jobContext.getPartitionState(i).equals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskPartitionState:
https://github.com/apache/helix/blob/53e583d4ff16cfafdcb06d2164cdd7a8a6a81245/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java#L25-L44

Seems like your code considers 3 edge cases:

  1. Null partition state (This may happen when the job is not fully initialized). And then we return the assigned participant (which will probably return null)
  2. Non-error / non-dropped state. i.e. the 2 "internal error" states in the comments. This behavior remains unchanged
  3. Internal Helix error. Filter out the assigned participant by returning null.

The null edge case feels redundant because we are still calling get assigned participant.

Also, for case 2 recall that in Helix the assigned participant is responsible for modifying the task state and assigning itself the assigned participant. If Helix does not think we have enough containers (eg lost container), then it will still be counted as an assigned participant and it would be an error to still consider that container an assigned participant.

From the linked enum class, I think we should ignore assigned participants for STOPPED, COMPLETED, TIMED_OUT, TASK_ERROR, TASK_ABORTED, ERROR, DROPPED. I.e. anything that isn't INIT or RUNNING. Especially since the autoscaling manager is currently only used for streaming tasks and not distributed helix mode. Also we do not expect tasks outside of INIT and RUNNING (tasks go to DROPPED if there is any task failure like OOM or KafkaIngestionHealthCheck).

The one thing to consider is whether or not we want to log for all of these cases. I think in streaming state transitions are not that noisy. Tasks do not tend to exit much once running. (And if they are, then we should really know about it). Because right now, there is no transparency on helix state after submitting

To simplify this code I don't think we need these 3 if else statements. What do you think about this code block instead?

List<TaskPartitionState> validStates = Arrays.asList(TaskPartitionState.RUNNING, TaskPartitionState.INIT);
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(i -> {
              if(validStates.contains(jobContext.getPartitionState(i))) {
                return jobContext.getAssignedParticipant(i);
              }
              
              // adding log here now for debugging
              //todo: if this happens frequently, we should reset to status to retriable or at least report the error earlier
              log.error("Helix task {} is in {} state which is unexpected, please watch out to see if this get recovered", jobContext.getTaskIdForPartition(i), jobContext.getPartitionState(i));
                return null;
            }).filter(Objects::nonNull).collect(Collectors.toSet()));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here are some of my consideration

  1. null check is to prevent the NPE exception when I try to compare the task state with a specific value and try to return getPartitionState specifically to maintain backward compatibility. But I'll refactor the code a little bit to make it look clean
  2. To ensure that logs are helpful and not noisy, I will reduce the amount of information logged for retriable task states. Even if the instances are added to the in-use map, they will be removed automatically during the next run of the method as retry assigns them to new instances, causing old ones to be removed automatically.
  3. To address the issue of tasks failing multiple times, I will add a log for tasks that have a high number of attempts. This will be clearer than logging the unusual task state every time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes sense if Helix always assigns the task to a new participant on the next run. I still have some concerns about visibility when Helix doesn't reassign the task

}
if (!UNUSUAL_HELIX_TASK_STATES.contains(jobContext.getPartitionState(i))) {
return jobContext.getAssignedParticipant(i);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This else is redundant, IMO it makes sense to remove it especially since we are 5 indentation levels deep already.

Nit2: This method is a super method, and I'd encourage extracting some of the logic here. But as-is it's still readable to me (maybe not to a new comer)

Copy link
Contributor

@homatthew homatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, and you are planning to do the retriable states / live instances issues in another change

@ZihanLi58 ZihanLi58 merged commit 6338910 into apache:master Apr 26, 2023
phet added a commit to phet/gobblin that referenced this pull request Aug 15, 2023
* upstream/master:
  [GOBBLIN-1832] Emit warning instead of failing job for retention of Hive Table Views (apache#3695)
  [GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (apache#3694)
  [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs (apache#3686)
  [GOBBLIN-1829] Fixes bug where the wrong workunit event was being tracked for keepin… (apache#3691)
  [GOBBLIN-1828] Implement Timeout for Creating Writer Functionality (apache#3690)
  [GOBBLIN-1827] Add check that if nested field is optional and has a non-null default… (apache#3689)
  [GOBBLIN-1826] Change isAssignableFrom() to isSuperTypeOf() per Guava 20 javadocs to… (apache#3688)
  [GOBBLIN-1822]Logging Abnormal Helix Task States (apache#3685)
  [GOBBLIN-1819] Log helix workflow information and timeout information during submission wait / polling (apache#3681)
  [GOBBLIN-1821] Let flow execution ID propagate to the Job ID if it exists (apache#3684)
  [GOBBLIN-1810] Support general iceberg catalog (support configurable behavior for metadata retention policy) (apache#3680)
  Add null default value to observability events that are additionally added (apache#3682)
  [GOBBLIN-1816] Add job properties and GaaS instance ID to observability event (apache#3676)
  [GOBBLIN-1785] add MR_JARS_BASE_DIR and logic to delete old mr jar dirs (apache#3642)
  initiliaze yarn clients in yarn app launcher so that a child class can override the yarn client creation logic (apache#3679)
  [GOBBLIN-1811]Fix Iceberg Registration Serialization (apache#3673)
  [GOBBLIN-1817] change some deprecated code and fix minor codestyle (apache#3678)
  [GOBBLIN-1812] Mockito should only be test compile (apache#3674)
  [GOBBLIN-1813] Helix workflows submission timeouts are configurable (apache#3677)
  [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter (apache#3672)
  Refactor yarn app launchers to support extending these classes (apache#3671)
  [GOBBLIN-1808] Bump Guava version from 15.0 to 20.0 (apache#3669)
  [GOBBLIN-1806] Submit dataset summary event post commit and integrate them into GaaSObservabilityEvent (apache#3667)
  [GOBBLIN-1814] Add `MRJobLauncher` configurability for any failing mapper to be fatal to the MR job (apache#3675)
  Add new lookback version finder for use with iceberg retention (apache#3670)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants