Skip to content

Commit

Permalink
* throws a descriptive exception when seeing invalid state from helix…
Browse files Browse the repository at this point in the history
… / zk
  • Loading branch information
homatthew committed Jan 10, 2023
1 parent e1c4196 commit cc5ec87
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.gobblin.cluster;

/**
* Exception to describe situations where Gobblin sees unexpected state from Helix. Historically, we've seen unexpected
* null values, which bubble up as NPE. This exception is explicitly used to differentiate bad Gobblin code from
* Helix failures (i.e. seeing a NPE implies Gobblin bug)
*/
public class GobblinHelixUnexpectedStateException extends Exception {
public GobblinHelixUnexpectedStateException(String message, Object... args) {
super(String.format(message, args));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,22 +391,23 @@ private static void deleteStoppedHelixJob(HelixManager helixManager, String work
* @param jobNames a list of Gobblin job names.
* @return a map from jobNames to their Helix Workflow Ids.
*/
public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames) {
public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames)
throws GobblinHelixUnexpectedStateException {
TaskDriver taskDriver = new TaskDriver(helixManager);
return getWorkflowIdsFromJobNames(taskDriver, jobNames);
}

public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriver, Collection<String> jobNames) {
public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriver, Collection<String> jobNames)
throws GobblinHelixUnexpectedStateException {
Map<String, String> jobNameToWorkflowId = new HashMap<>();
Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
for (Map.Entry<String, WorkflowConfig> entry : workflowConfigMap.entrySet()) {
String workflow = entry.getKey();
WorkflowConfig workflowConfig = entry.getValue();
if (workflowConfig == null) {
// As of Helix 1.0.2 implementation, this in theory won't happen. But this null check is here in case implementation changes
// because the API doesn't technically prohibit null configs, maps allowing null values is implementation based, and we want to fail gracefully
log.error("Workflow config is null. Skipping the following workflow while searching for workflow ID. workflow={}, jobNames={}", workflow, jobNames);
continue;
// As of Helix 1.0.2 implementation, this in theory shouldn't happen. But this null check is here in case implementation changes
// because the API doesn't technically prohibit null configs, maps allowing null values is implementation based, and we want to fail loudly with a clear root cause
throw new GobblinHelixUnexpectedStateException("Received null workflow config from Helix. We should not see any null configs when reading all workflows. workflowId=%s", workflow);
}
//Filter out any stale Helix workflows which are not running.
if (workflowConfig.getTargetState() != TargetState.START) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;

import org.apache.gobblin.configuration.ConfigurationKeys;
Expand Down Expand Up @@ -93,7 +95,7 @@ public void testConfigToProperties() {
}

@Test
public void testGetWorkunitIdForJobNames(){
public void testGetWorkunitIdForJobNames() throws GobblinHelixUnexpectedStateException {
final String HELIX_JOB = "job";
final String GOBBLIN_JOB_NAME = "gobblin-job-name";

Expand Down Expand Up @@ -133,6 +135,24 @@ public void testGetWorkunitIdForJobNames(){
ImmutableMap.of(GOBBLIN_JOB_NAME, "workflow-1"));
}

@Test(expectedExceptions = GobblinHelixUnexpectedStateException.class)
public void testGetWorkunitIdForJobNamesWithInvalidHelixState() throws GobblinHelixUnexpectedStateException {
final String GOBBLIN_JOB_NAME = "gobblin-job-name";

TaskDriver driver = Mockito.mock(TaskDriver.class);

Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>();
workflowConfigMap.put("null-workflow-to-throw-exception", null);
Mockito.when(driver.getWorkflows()).thenReturn(workflowConfigMap);

try {
HelixUtils.getWorkflowIdsFromJobNames(driver, Arrays.asList(GOBBLIN_JOB_NAME));
} catch (GobblinHelixUnexpectedStateException e) {
e.printStackTrace();
throw e;
}
}

@AfterClass
public void tearDown() throws IOException {
if (this.fileSystem.exists(this.tokenFilePath.getParent())) {
Expand Down

0 comments on commit cc5ec87

Please sign in to comment.