Skip to content

Commit

Permalink
* throws a descriptive exception when seeing invalid state from helix…
Browse files Browse the repository at this point in the history
… / zk
  • Loading branch information
homatthew committed Jan 11, 2023
1 parent e1c4196 commit 8ce90a9
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.cluster;

/**
* Exception to describe situations where Gobblin sees unexpected state from Helix. Historically, we've seen unexpected
* null values, which bubble up as NPE. This exception is explicitly used to differentiate bad Gobblin code from
* Helix failures (i.e. seeing a NPE implies Gobblin bug)
*/
public class GobblinHelixUnexpectedStateException extends Exception {
public GobblinHelixUnexpectedStateException(String message, Object... args) {
super(String.format(message, args));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,23 +390,27 @@ private static void deleteStoppedHelixJob(HelixManager helixManager, String work
*
* @param jobNames a list of Gobblin job names.
* @return a map from jobNames to their Helix Workflow Ids.
* @throws GobblinHelixUnexpectedStateException when there is inconsistent helix state. This implies that we should retry the call
* to avoid acting on stale data
*/
public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames) {
public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames)
throws GobblinHelixUnexpectedStateException {
TaskDriver taskDriver = new TaskDriver(helixManager);
return getWorkflowIdsFromJobNames(taskDriver, jobNames);
}

public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriver, Collection<String> jobNames) {
public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriver, Collection<String> jobNames)
throws GobblinHelixUnexpectedStateException {
Map<String, String> jobNameToWorkflowId = new HashMap<>();
Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
for (Map.Entry<String, WorkflowConfig> entry : workflowConfigMap.entrySet()) {
String workflow = entry.getKey();
WorkflowConfig workflowConfig = entry.getValue();
if (workflowConfig == null) {
// As of Helix 1.0.2 implementation, this in theory won't happen. But this null check is here in case implementation changes
// because the API doesn't technically prohibit null configs, maps allowing null values is implementation based, and we want to fail gracefully
log.error("Workflow config is null. Skipping the following workflow while searching for workflow ID. workflow={}, jobNames={}", workflow, jobNames);
continue;
// As of Helix 1.0.2 implementation, this in theory shouldn't happen. But this null check is here in case implementation changes
// because the API doesn't technically prohibit null configs, maps allowing null values is implementation based, and we want to fail loudly with a clear root cause.
// the caller of this API should retry this API call
throw new GobblinHelixUnexpectedStateException("Received null workflow config from Helix. We should not see any null configs when reading all workflows. workflowId=%s", workflow);
}
//Filter out any stale Helix workflows which are not running.
if (workflowConfig.getTargetState() != TargetState.START) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;

import org.apache.gobblin.configuration.ConfigurationKeys;
Expand Down Expand Up @@ -93,7 +95,7 @@ public void testConfigToProperties() {
}

@Test
public void testGetWorkunitIdForJobNames(){
public void testGetWorkunitIdForJobNames() throws GobblinHelixUnexpectedStateException {
final String HELIX_JOB = "job";
final String GOBBLIN_JOB_NAME = "gobblin-job-name";

Expand Down Expand Up @@ -133,6 +135,24 @@ public void testGetWorkunitIdForJobNames(){
ImmutableMap.of(GOBBLIN_JOB_NAME, "workflow-1"));
}

@Test(expectedExceptions = GobblinHelixUnexpectedStateException.class)
public void testGetWorkunitIdForJobNamesWithInvalidHelixState() throws GobblinHelixUnexpectedStateException {
final String GOBBLIN_JOB_NAME = "gobblin-job-name";

TaskDriver driver = Mockito.mock(TaskDriver.class);

Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>();
workflowConfigMap.put("null-workflow-to-throw-exception", null);
Mockito.when(driver.getWorkflows()).thenReturn(workflowConfigMap);

try {
HelixUtils.getWorkflowIdsFromJobNames(driver, Arrays.asList(GOBBLIN_JOB_NAME));
} catch (GobblinHelixUnexpectedStateException e) {
e.printStackTrace();
throw e;
}
}

@AfterClass
public void tearDown() throws IOException {
if (this.fileSystem.exists(this.tokenFilePath.getParent())) {
Expand Down

0 comments on commit 8ce90a9

Please sign in to comment.