Skip to content

Commit

Permalink
[GOBBLIN-1744] Improve logging in null cases when querying from Helix
Browse files Browse the repository at this point in the history
  • Loading branch information
homatthew committed Nov 18, 2022
1 parent deec15e commit ea6d6e8
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,24 @@ public void execute() throws CommitStepException {

if (jobContext != null) {
String participant = jobContext.getAssignedParticipant(partitionNum);
if (participant != null) {
boolean isAssignedParticipant = participant.equalsIgnoreCase(helixInstanceName);
if (!isAssignedParticipant) {
log.info("The current helix instance is not the assigned participant. helixInstanceName={}, assignedParticipant={}",
helixInstanceName, participant);
}

return isAssignedParticipant;
if (participant == null) {
log.error("The current assigned participant is null. This implies that \n"
+ "\t\t(a)Helix failed to write to zookeeper, which is often caused by lack of compression leading / exceeding zookeeper jute max buffer size (Default 1MB)\n"
+ "\t\t(b)Helix reassigned the task (unlikely if this current task has been running without issue. Helix does not have code for reassigning \"running\" tasks)\n"
+ "\t\tNote: This logic is true as of Helix version 1.0.2 and ZK version 3.6");

return false;
}

boolean isAssignedParticipant = participant.equalsIgnoreCase(helixInstanceName);
if (!isAssignedParticipant) {
log.info("The current helix instance is not the assigned participant. helixInstanceName={}, assignedParticipant={}",
helixInstanceName, participant);
}

return isAssignedParticipant;
}

return false;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.helix.HelixAdmin;
Expand All @@ -54,13 +57,7 @@
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.listeners.JobListener;

import static org.apache.helix.task.TaskState.STOPPED;
import static org.apache.helix.task.TaskState.*;


/**
Expand Down Expand Up @@ -395,11 +392,22 @@ private static void deleteStoppedHelixJob(HelixManager helixManager, String work
* @return a map from jobNames to their Helix Workflow Ids.
*/
public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames) {
Map<String, String> jobNameToWorkflowId = new HashMap<>();
TaskDriver taskDriver = new TaskDriver(helixManager);
return getWorkflowIdsFromJobNames(taskDriver, jobNames);
}

public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriver, Collection<String> jobNames) {
Map<String, String> jobNameToWorkflowId = new HashMap<>();
Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
for (String workflow : workflowConfigMap.keySet()) {
WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
for (Map.Entry<String, WorkflowConfig> entry : workflowConfigMap.entrySet()) {
String workflow = entry.getKey();
WorkflowConfig workflowConfig = entry.getValue();
if (workflowConfig == null) {
// As of Helix 1.0.2 implementation, this in theory won't happen. But this null check is here in case implementation changes
// because the API doesn't technically prohibit null configs, maps allowing null values is implementation based, and we want to fail gracefully
log.error("Workflow config is null. Skipping the following workflow while searching for workflow ID. workflow={}, jobNames={}", workflow, jobNames);
continue;
}
//Filter out any stale Helix workflows which are not running.
if (workflowConfig.getTargetState() != TargetState.START) {
continue;
Expand Down Expand Up @@ -450,4 +458,4 @@ public static void dropInstanceIfExists(HelixAdmin admin, String clusterName, St
log.error("Could not drop instance: {} due to: {}", helixInstanceName, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,34 @@

import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.WorkflowConfig;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.gobblin.util.ConfigUtils;
import static org.testng.Assert.*;


/**
Expand Down Expand Up @@ -66,18 +78,59 @@ public void testConfigToProperties() {
Assert.assertNotNull(url, "Could not find resource " + url);

Config config = ConfigFactory.parseURL(url).resolve();
Assert.assertEquals(config.getString("k1"), "v1");
Assert.assertEquals(config.getString("k2"), "v1");
Assert.assertEquals(config.getInt("k3"), 1000);
assertEquals(config.getString("k1"), "v1");
assertEquals(config.getString("k2"), "v1");
assertEquals(config.getInt("k3"), 1000);
Assert.assertTrue(config.getBoolean("k4"));
Assert.assertEquals(config.getLong("k5"), 10000);
assertEquals(config.getLong("k5"), 10000);

Properties properties = ConfigUtils.configToProperties(config);
Assert.assertEquals(properties.getProperty("k1"), "v1");
Assert.assertEquals(properties.getProperty("k2"), "v1");
Assert.assertEquals(properties.getProperty("k3"), "1000");
Assert.assertEquals(properties.getProperty("k4"), "true");
Assert.assertEquals(properties.getProperty("k5"), "10000");
assertEquals(properties.getProperty("k1"), "v1");
assertEquals(properties.getProperty("k2"), "v1");
assertEquals(properties.getProperty("k3"), "1000");
assertEquals(properties.getProperty("k4"), "true");
assertEquals(properties.getProperty("k5"), "10000");
}

@Test
public void testGetWorkunitIdForJobNames(){
final String HELIX_JOB = "job";
final String GOBBLIN_JOB_NAME = "gobblin-job-name";

TaskDriver driver = Mockito.mock(TaskDriver.class);
WorkflowConfig workflowCfg = Mockito.mock(WorkflowConfig.class);
JobDag dag = Mockito.mock(JobDag.class);
JobConfig jobCfg = Mockito.mock(JobConfig.class);
TaskConfig taskCfg = Mockito.mock(TaskConfig.class);

/**
* Mocks for setting up the workflow, job dag, job names, etc.
*
* Example of task cfg
* "mapFields" : {
* "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c" : {
* "TASK_SUCCESS_OPTIONAL" : "true",
* "job.id" : "job_KafkaHdfsStreamingTracking_1668738617409",
* "job.name" : "KafkaHdfsStreamingTracking",
* "task.id" : "task_KafkaHdfsStreamingTracking_1668738617409_179",
* "gobblin.cluster.work.unit.file.path" : "<SOME PATH>",
* "TASK_ID" : "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c"
* },
*/
Mockito.when(driver.getWorkflows()).thenReturn(ImmutableMap.of(
"workflow-1", workflowCfg
));

Mockito.when(workflowCfg.getTargetState()).thenReturn(TargetState.START);
Mockito.when(workflowCfg.getJobDag()).thenReturn(dag);
Mockito.when(dag.getAllNodes()).thenReturn(new HashSet<>(Arrays.asList(HELIX_JOB)));
Mockito.when(driver.getJobConfig(HELIX_JOB)).thenReturn(jobCfg);
Mockito.when(jobCfg.getTaskConfigMap()).thenReturn(ImmutableMap.of("stub-guid", taskCfg));
Mockito.when(taskCfg.getConfigMap()).thenReturn(ImmutableMap.of(ConfigurationKeys.JOB_NAME_KEY, GOBBLIN_JOB_NAME));

assertEquals(
HelixUtils.getWorkflowIdsFromJobNames(driver, Arrays.asList(GOBBLIN_JOB_NAME)),
ImmutableMap.of(GOBBLIN_JOB_NAME, "workflow-1"));
}

@AfterClass
Expand Down

0 comments on commit ea6d6e8

Please sign in to comment.