Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
* upstream/master:
  Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (apache#3414)
  [GOBBLIN-1563]Collect more information to analyze the RC for some job cannot emit kafka events to update job status (apache#3416)
  [GOBBLIN-1521] Create local mode of streaming kafka job to help user quickly onboard (apache#3372)
  [GOBBLIN-1559] Support wildcard for input paths (apache#3410)
  [GOBBLIN-1561]Improve error message when flow compilation fails (apache#3412)
  [GOBBLIN-1556]Add shutdown logic in FsJobConfigurationManager (apache#3407)
  [GOBBLIN-1542] Integrate with Helix API to add/remove task from a running helix job (apache#3393)
  • Loading branch information
phet committed Oct 26, 2021
2 parents 384dc09 + 255bdc1 commit 045af8e
Show file tree
Hide file tree
Showing 40 changed files with 1,403 additions and 536 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.source;

import com.google.common.eventbus.EventBus;
import org.apache.gobblin.annotation.Alpha;


/**
* An interface for infinite source, where source should be able to detect the work unit change
* and post the change through eventBus
*
* @author Zihan Li
*
* @param <S> output schema type
* @param <D> output record type
*/
@Alpha
public interface InfiniteSource<S, D> extends Source<S, D>{

/**
* Return the eventBus where it will post {@link org.apache.gobblin.stream.WorkUnitChangeEvent} when workUnit change
*/
EventBus getEventBus();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.stream;

import java.util.List;
import lombok.Getter;
import org.apache.gobblin.source.workunit.WorkUnit;

/**
* The event for {@link org.apache.gobblin.source.InfiniteSource} to indicate there is a change in work units
* Job launcher should then be able to handle this event
*/
public class WorkUnitChangeEvent {
@Getter
private final List<String> oldTaskIds;
@Getter
private final List<WorkUnit> newWorkUnits;
public WorkUnitChangeEvent(List<String> oldTaskIds, List<WorkUnit> newWorkUnits) {
this.oldTaskIds = oldTaskIds;
this.newWorkUnits = newWorkUnits;
}
}
4 changes: 3 additions & 1 deletion gobblin-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ dependencies {
compile externalDependency.hadoopCommon
compile externalDependency.avroMapredH2
compile externalDependency.findBugsAnnotations
compile externalDependency.helix
compile (externalDependency.helix) {
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
}

runtimeOnly project(":gobblin-modules:gobblin-service-kafka")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public void run() {
}, 0, this.refreshIntervalInSeconds, TimeUnit.SECONDS);
}

@Override
protected void shutDown() throws Exception {
ExecutorsUtils.shutdownExecutorService(this.fetchJobSpecExecutor, Optional.of(log));
super.shutDown();
}

void fetchJobSpecs() throws ExecutionException, InterruptedException {
List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
(List<Pair<SpecExecutor.Verb, JobSpec>>) this.specConsumer.changedSpecs().get();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
Expand Down Expand Up @@ -145,6 +146,33 @@ static void waitJobInitialization(
log.info("Work flow {} initialized", workFlowName);
}

protected static boolean deleteTaskFromHelixJob(String workFlowName,
String jobName, String taskID, TaskDriver helixTaskDriver) {
try {
log.info(String.format("try to delete task %s from workflow %s, job %s", taskID, workFlowName, jobName));
helixTaskDriver.deleteTask(workFlowName, jobName, taskID);
} catch (Exception e) {
e.printStackTrace();
return !helixTaskDriver.getJobConfig(TaskUtil.getNamespacedJobName(workFlowName, jobName)).getMapConfigs().containsKey(taskID);
}
return true;
}

protected static boolean addTaskToHelixJob(String workFlowName,
String jobName, TaskConfig taskConfig, TaskDriver helixTaskDriver) {
String taskId = taskConfig.getId();
try {
log.info(String.format("try to add task %s to workflow %s, job %s", taskId, workFlowName, jobName));
helixTaskDriver.addTask(workFlowName, jobName, taskConfig);
} catch (Exception e) {
e.printStackTrace();
JobContext jobContext =
helixTaskDriver.getJobContext(TaskUtil.getNamespacedJobName(workFlowName, jobName));
return jobContext.getTaskIdPartitionMap().containsKey(taskId);
}
return true;
}

public static void submitJobToWorkFlow(JobConfig.Builder jobConfigBuilder,
String workFlowName,
String jobName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ChainedPathZkSerializer;
import org.apache.helix.manager.zk.PathBasedZkSerializer;
import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.zookeeper.datamodel.serializer.ChainedPathZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -89,8 +89,8 @@ void testJobShouldGetCancelled() throws Exception {
IntegrationJobCancelSuite.TASK_STATE_FILE)
.withValue(SleepingTask.SLEEP_TIME_IN_SECONDS, ConfigValueFactory.fromAnyRef(100));
this.suite = new IntegrationJobCancelSuite(jobConfigOverrides);
HelixManager helixManager = getHelixManager();
suite.startCluster();
HelixManager helixManager = getHelixManager();
helixManager.connect();

ExecutorService executor = Executors.newSingleThreadExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,13 @@ public void testTaskAssignmentAfterHelixConnectionRetry()
Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE);
this.suite = new TaskAssignmentAfterConnectionRetry(jobConfigOverrides);

suite.startCluster();

String zkConnectString = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
String clusterName = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
//A test manager instance for observing the state of the cluster
HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "TestManager", InstanceType.SPECTATOR, zkConnectString);

suite.startCluster();

helixManager.connect();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ public void setUp()
suite = new IntegrationBasicSuite(jobConfigOverrides);

helixConfig = suite.getManagerConfig();
String clusterName = helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
String zkConnectString = helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "TestManager",
InstanceType.SPECTATOR, zkConnectString);
}

@Test (groups = {"disabledOnCI"})
//Test disabled on Travis because cluster integration tests are generally flaky on Travis.
public void testExecute() throws Exception {
suite.startCluster();
String clusterName = helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
String zkConnectString = helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "TestManager",
InstanceType.SPECTATOR, zkConnectString);

//Connect to the previously started Helix cluster
helixManager.connect();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Table of Contents

[TOC]

# Introduction

Gobblin supports streaming mode that allows continuous ingestion of data from Kafka to HDFS. The streaming mode has been deployed in production at LinkedIn as a Gobblin cluster that uses Yarn for container allocation and Helix for task coordination.

Here, we describe how to set up a Kafka -> HDFS pipeline in local mode for users to easily start and test out a streaming ingestion pipeline.


# Setup local kafka cluster

Follow [kafka quick start](https://kafka.apache.org/quickstart) to set up your kafka cluster, and create test topic "testEvents"

# Run EmbeddedGobblin to start the job

We use the configuration: /gobblin-modules/gobblin-kafka-09/src/test/resources/kafkaHDFSStreaming.conf to execute the job.

To run the job, in your intellij, you can run the test in /gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaStreamingLocalTest
by removing the line '(enabled=false)'. In order to run the test in IDE, you may need to manually delete log4j-over-slf4j jar in IDE

Under your kafka dir, you can run following command to produce data into your kafka topic

`bin/kafka-console-producer.sh --topic testEvents --bootstrap-server localhost:9092`

The job will continually consume from testEvents and write out data as txt file onto your local fileSystem (/tmp/gobblin/kafka/publish). It will write put data every 60 seconds, and will never end until
you manually kill it.

If you want the job ingest data as avro/orc, you will need to have schema registry as schema source and change the job configuration to control the behavior, a sample configuration can be found [here](https://github.com/apache/gobblin/blob/master/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf)

Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
Expand Down Expand Up @@ -78,12 +75,12 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
protected static final Joiner JOINER = Joiner.on('.').skipNulls();
protected static final String METRIC_KEY_PREFIX = "gobblin.metrics";
protected static final String EVENTS_QUALIFIER = "events";
private static final Logger LOGGER = LoggerFactory.getLogger(EventReporter.class);
public static final int DEFAULT_QUEUE_CAPACITY = 100;
public static final String QUEUE_CAPACITY_KEY = ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".queue.capacity";
public static final int DEFAULT_QUEUE_OFFER_TIMEOUT_SECS = 10;
public static final String QUEUE_OFFER_TIMOUT_SECS_KEY = ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + ".queue.offer.timeout.secs";
private static final String NULL_STRING = "null";
public static final int REPORT_TIMEOUT_SECS = 60;

private final MetricContext metricContext;
private final BlockingQueue<GobblinTrackingEvent> reportingQueue;
Expand All @@ -108,7 +105,7 @@ public EventReporter(Builder builder) {
this.closer = Closer.create();
this.immediateReportExecutor = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(1,
ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))),
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("EventReporter-" + builder.name + "-%d"))),
5, TimeUnit.MINUTES);

this.metricContext = builder.context;
Expand Down Expand Up @@ -144,9 +141,11 @@ public void notificationCallback(Notification notification) {
*/
public void addEventToReportingQueue(GobblinTrackingEvent event) {
if (this.reportingQueue.size() > this.queueCapacity * 2 / 3) {
log.info("Trigger immediate run to report the event since queue is almost full");
immediatelyScheduleReport();
}
try {
log.debug(String.format("Offering one event to the metrics queue with event name: %s", event.getName()));
if (!this.reportingQueue.offer(sanitizeEvent(event), this.queueOfferTimeoutSecs, TimeUnit.SECONDS)) {
log.error("Enqueuing of event {} at reporter with class {} timed out. Sending of events is probably stuck.",
event, this.getClass().getCanonicalName());
Expand Down Expand Up @@ -235,11 +234,14 @@ public T withConfig(Config config) {
@Override
public void close() {
try {
log.info(String.format("Closing event reporter %s", this.getClass().getCanonicalName()));
this.metricContext.removeNotificationTarget(this.notificationTargetKey);
this.immediateReportExecutor.awaitTermination(REPORT_TIMEOUT_SECS, TimeUnit.SECONDS);
log.info(String.format("Flush out %s events before closing the reporter", this.reportingQueue.size()));
report();
this.closer.close();
} catch (Exception e) {
LOGGER.warn("Exception when closing EventReporter", e);
log.warn("Exception when closing EventReporter", e);
} finally {
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fork.record.queue.capacity=1

# Streaming-source specific configurations
source.class=org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource
gobblin.source.kafka.extractorType=org.apache.gobblin.prototype.kafka.KafkaAvroBinaryStreamingExtractor
gobblin.source.kafka.extractorType=org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor
kafka.workunit.size.estimator.type=CUSTOM
kafka.workunit.size.estimator.customizedType=org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator
kafka.workunit.packer.type=CUSTOM
Expand All @@ -27,9 +27,10 @@ streaming.watermark.commitIntervalMillis=2000

# Converter configs
# Default Generic Record based pipeline
recordStreamProcessor.classes="org.apache.gobblin.prototype.kafka.GenericRecordBasedKafkaSchemaChangeInjector,org.apache.gobblin.prototype.kafka.LiKafkaConsumerRecordToGenericRecordConverter"
recordStreamProcessor.classes="org.apache.gobblin.converter.GenericRecordBasedKafkaSchemaChangeInjector"

# Record-metadata decoration into main record
# This is not supported in OSS yet since we found decorate will require re-build generic record which is expansive
gobblin.kafka.converter.recordMetadata.enable=true

# Writer configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.io.Text;
import org.apache.helix.AccessOption;
Expand All @@ -47,6 +46,7 @@

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.io.StreamUtils;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -144,18 +146,36 @@ public KafkaTopic apply(Entry<String, List<PartitionInfo>> filteredTopicEntry) {
public long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
this.consumer.assign(Collections.singletonList(topicPartition));
long previousPosition = this.consumer.position(topicPartition);
this.consumer.seekToBeginning(topicPartition);
long earliestOffset = this.consumer.position(topicPartition);
this.consumer.seek(topicPartition, previousPosition);

return this.consumer.position(topicPartition);
return earliestOffset;
}

@Override
public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException {
TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId());
this.consumer.assign(Collections.singletonList(topicPartition));
long previousPosition = this.consumer.position(topicPartition);
this.consumer.seekToEnd(topicPartition);
long latestOffset = this.consumer.position(topicPartition);
this.consumer.seek(topicPartition, previousPosition);

return this.consumer.position(topicPartition);
return latestOffset;
}

@Override
public void assignAndSeek(List<KafkaPartition> topicPartitions,
Map<KafkaPartition, LongWatermark> topicWatermarksMap) {
HashSet<KafkaPartition> topicPartitionSet = new HashSet(topicPartitions);
topicWatermarksMap.entrySet().stream().filter(entry -> topicPartitionSet.contains(entry.getKey()))
.forEach(entry -> {
TopicPartition topicPartition = new TopicPartition(entry.getKey().getTopicName(), entry.getKey().getId());
this.consumer.assign(Collections.singletonList(topicPartition));
this.consumer.seek(topicPartition, entry.getValue().getValue());
});
}

@Override
Expand Down
Loading

0 comments on commit 045af8e

Please sign in to comment.