Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1744] Improve handling of null value edge cases when querying Helix #3603

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.cluster;

/**
* Exception to describe situations where Gobblin sees unexpected state from Helix. Historically, we've seen unexpected
* null values, which bubble up as NPE. This exception is explicitly used to differentiate bad Gobblin code from
* Helix failures (i.e. seeing a NPE implies Gobblin bug)
*/
public class GobblinHelixUnexpectedStateException extends Exception {
public GobblinHelixUnexpectedStateException(String message, Object... args) {
super(String.format(message, args));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,24 @@ public void execute() throws CommitStepException {

if (jobContext != null) {
String participant = jobContext.getAssignedParticipant(partitionNum);
if (participant != null) {
boolean isAssignedParticipant = participant.equalsIgnoreCase(helixInstanceName);
if (!isAssignedParticipant) {
log.info("The current helix instance is not the assigned participant. helixInstanceName={}, assignedParticipant={}",
helixInstanceName, participant);
}

return isAssignedParticipant;
if (participant == null) {
Copy link
Contributor Author

@homatthew homatthew Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please read the PR description to understand why these null checks are needed in the PR and how I tested these changes. These pieces of code are important and critical pieces with a fair amount of nuance related to helix.

log.error("The current assigned participant is null. This implies that \n"
+ "\t\t(a)Helix failed to write to zookeeper, which is often caused by lack of compression leading / exceeding zookeeper jute max buffer size (Default 1MB)\n"
+ "\t\t(b)Helix reassigned the task (unlikely if this current task has been running without issue. Helix does not have code for reassigning \"running\" tasks)\n"
+ "\t\tNote: This logic is true as of Helix version 1.0.2 and ZK version 3.6");

return false;
}

boolean isAssignedParticipant = participant.equalsIgnoreCase(helixInstanceName);
if (!isAssignedParticipant) {
log.info("The current helix instance is not the assigned participant. helixInstanceName={}, assignedParticipant={}",
helixInstanceName, participant);
}

return isAssignedParticipant;
}

return false;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.helix.HelixAdmin;
Expand All @@ -54,13 +57,7 @@
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.listeners.JobListener;

import static org.apache.helix.task.TaskState.STOPPED;
import static org.apache.helix.task.TaskState.*;


/**
Expand Down Expand Up @@ -393,13 +390,28 @@ private static void deleteStoppedHelixJob(HelixManager helixManager, String work
*
* @param jobNames a list of Gobblin job names.
* @return a map from jobNames to their Helix Workflow Ids.
* @throws GobblinHelixUnexpectedStateException when there is inconsistent helix state. This implies that we should retry the call
* to avoid acting on stale data
*/
public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames) {
Map<String, String> jobNameToWorkflowId = new HashMap<>();
public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames)
throws GobblinHelixUnexpectedStateException {
TaskDriver taskDriver = new TaskDriver(helixManager);
return getWorkflowIdsFromJobNames(taskDriver, jobNames);
}

public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriver, Collection<String> jobNames)
throws GobblinHelixUnexpectedStateException {
Map<String, String> jobNameToWorkflowId = new HashMap<>();
Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
for (String workflow : workflowConfigMap.keySet()) {
WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This previous version made 2 calls to to zookeeper when only 1 is necessary. Doing 2 calls makes it more likely to see inconsistent state.

for (Map.Entry<String, WorkflowConfig> entry : workflowConfigMap.entrySet()) {
String workflow = entry.getKey();
WorkflowConfig workflowConfig = entry.getValue();
if (workflowConfig == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, if this is not expected, throw exception instead of just log it out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made change to throw exception. I made the choice to create a new custom exception because there are other places where make API calls to ZK / Helix, and I want to use this exception to explicitly say there is a Helix issue.

The caller of this API (Job scheduler) is using a retryer that will automatically retry this API call again if there is an exception.

// As of Helix 1.0.2 implementation, this in theory shouldn't happen. But this null check is here in case implementation changes
// because the API doesn't technically prohibit null configs, maps allowing null values is implementation based, and we want to fail loudly with a clear root cause.
// the caller of this API should retry this API call
throw new GobblinHelixUnexpectedStateException("Received null workflow config from Helix. We should not see any null configs when reading all workflows. workflowId=%s", workflow);
}
//Filter out any stale Helix workflows which are not running.
if (workflowConfig.getTargetState() != TargetState.START) {
continue;
Expand Down Expand Up @@ -450,4 +462,4 @@ public static void dropInstanceIfExists(HelixAdmin admin, String clusterName, St
log.error("Could not drop instance: {} due to: {}", helixInstanceName, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,36 @@

import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.WorkflowConfig;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.gobblin.util.ConfigUtils;
import static org.testng.Assert.*;


/**
Expand Down Expand Up @@ -66,18 +80,77 @@ public void testConfigToProperties() {
Assert.assertNotNull(url, "Could not find resource " + url);

Config config = ConfigFactory.parseURL(url).resolve();
Assert.assertEquals(config.getString("k1"), "v1");
Assert.assertEquals(config.getString("k2"), "v1");
Assert.assertEquals(config.getInt("k3"), 1000);
assertEquals(config.getString("k1"), "v1");
assertEquals(config.getString("k2"), "v1");
assertEquals(config.getInt("k3"), 1000);
Assert.assertTrue(config.getBoolean("k4"));
Assert.assertEquals(config.getLong("k5"), 10000);
assertEquals(config.getLong("k5"), 10000);

Properties properties = ConfigUtils.configToProperties(config);
Assert.assertEquals(properties.getProperty("k1"), "v1");
Assert.assertEquals(properties.getProperty("k2"), "v1");
Assert.assertEquals(properties.getProperty("k3"), "1000");
Assert.assertEquals(properties.getProperty("k4"), "true");
Assert.assertEquals(properties.getProperty("k5"), "10000");
assertEquals(properties.getProperty("k1"), "v1");
assertEquals(properties.getProperty("k2"), "v1");
assertEquals(properties.getProperty("k3"), "1000");
assertEquals(properties.getProperty("k4"), "true");
assertEquals(properties.getProperty("k5"), "10000");
}

@Test
public void testGetWorkunitIdForJobNames() throws GobblinHelixUnexpectedStateException {
final String HELIX_JOB = "job";
final String GOBBLIN_JOB_NAME = "gobblin-job-name";

TaskDriver driver = Mockito.mock(TaskDriver.class);
WorkflowConfig workflowCfg = Mockito.mock(WorkflowConfig.class);
JobDag dag = Mockito.mock(JobDag.class);
JobConfig jobCfg = Mockito.mock(JobConfig.class);
TaskConfig taskCfg = Mockito.mock(TaskConfig.class);

/**
* Mocks for setting up the workflow, job dag, job names, etc.
*
* Example of task cfg
* "mapFields" : {
* "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c" : {
* "TASK_SUCCESS_OPTIONAL" : "true",
* "job.id" : "job_KafkaHdfsStreamingTracking_1668738617409",
* "job.name" : "KafkaHdfsStreamingTracking",
* "task.id" : "task_KafkaHdfsStreamingTracking_1668738617409_179",
* "gobblin.cluster.work.unit.file.path" : "<SOME PATH>",
* "TASK_ID" : "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c"
* },
*/
Mockito.when(driver.getWorkflows()).thenReturn(ImmutableMap.of(
"workflow-1", workflowCfg
));

Mockito.when(workflowCfg.getTargetState()).thenReturn(TargetState.START);
Mockito.when(workflowCfg.getJobDag()).thenReturn(dag);
Mockito.when(dag.getAllNodes()).thenReturn(new HashSet<>(Arrays.asList(HELIX_JOB)));
Mockito.when(driver.getJobConfig(HELIX_JOB)).thenReturn(jobCfg);
Mockito.when(jobCfg.getTaskConfigMap()).thenReturn(ImmutableMap.of("stub-guid", taskCfg));
Mockito.when(taskCfg.getConfigMap()).thenReturn(ImmutableMap.of(ConfigurationKeys.JOB_NAME_KEY, GOBBLIN_JOB_NAME));

assertEquals(
HelixUtils.getWorkflowIdsFromJobNames(driver, Arrays.asList(GOBBLIN_JOB_NAME)),
ImmutableMap.of(GOBBLIN_JOB_NAME, "workflow-1"));
}

@Test(expectedExceptions = GobblinHelixUnexpectedStateException.class)
public void testGetWorkunitIdForJobNamesWithInvalidHelixState() throws GobblinHelixUnexpectedStateException {
final String GOBBLIN_JOB_NAME = "gobblin-job-name";

TaskDriver driver = Mockito.mock(TaskDriver.class);

Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>();
workflowConfigMap.put("null-workflow-to-throw-exception", null);
Mockito.when(driver.getWorkflows()).thenReturn(workflowConfigMap);

try {
HelixUtils.getWorkflowIdsFromJobNames(driver, Arrays.asList(GOBBLIN_JOB_NAME));
} catch (GobblinHelixUnexpectedStateException e) {
e.printStackTrace();
throw e;
}
}

@AfterClass
Expand Down