Skip to content

Commit

Permalink
[GOBBLIN-1716] refactor HighLevelConsumer to make consumer initializa… (
Browse files Browse the repository at this point in the history
#3570)

* [GOBBLIN-1716] refactor HighLevelConsumer to make consumer initialization configurable

* allow SpecStoreChangeMonitor to initialize partition and offsets
* create unique group_id for each host's consumer client monitor

* remove extra call to helper function

* rename methods and add comment

* move topic assignment and offset logic here

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Sep 30, 2022
1 parent fbc11ce commit 71da34b
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,7 @@ public HighLevelConsumer(String topic, Config config, int numThreads) {
this.numThreads = numThreads;
this.config = config.withFallback(FALLBACK);
this.gobblinKafkaConsumerClient = createConsumerClient(this.config);
// On Partition rebalance, commit exisiting offsets and reset.
this.gobblinKafkaConsumerClient.subscribe(this.topic, new GobblinConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
copyAndCommit();
partitionOffsetsToCommit.clear();
}

@Override
public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
// No op
}
});
assignTopicPartitions();
this.consumerExecutor = Executors.newSingleThreadScheduledExecutor(ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HighLevelConsumerThread")));
this.queueExecutor = Executors.newFixedThreadPool(this.numThreads, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("QueueProcessor-%d")));
this.queues = new LinkedBlockingQueue[numThreads];
Expand Down Expand Up @@ -164,6 +152,27 @@ protected GobblinKafkaConsumerClient createConsumerClient(Config config) {
}
}

/*
The default implementation of this method subscribes to the given topic and uses the default Kafka logic to split
partitions of the topic among all consumers in the group and start consuming from the last committed offset for the
partition. Override this method to assign partitions and initialize offsets using different logic.
*/
protected void assignTopicPartitions() {
// On Partition rebalance, commit existing offsets and reset.
this.gobblinKafkaConsumerClient.subscribe(this.topic, new GobblinConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<KafkaPartition> partitions) {
copyAndCommit();
partitionOffsetsToCommit.clear();
}

@Override
public void onPartitionsAssigned(Collection<KafkaPartition> partitions) {
// No op
}
});
}

/**
* Called once on {@link #startUp()} to start metrics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.successful.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.failed.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.deleted.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errorss";
public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.specStoreMonitor.unexpected.errors";

// Metadata keys
public static final String TOPIC = "topic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,25 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import org.apache.commons.text.StringEscapeUtils;

import com.codahale.metrics.Meter;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -40,6 +49,10 @@
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;


/**
Expand Down Expand Up @@ -74,7 +87,34 @@ public String load(String key) throws Exception {
protected GobblinServiceJobScheduler scheduler;

public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
super(topic, config, numThreads);
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(SPEC_STORE_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())),
numThreads);
}

@Override
protected void assignTopicPartitions() {
// The consumer client will assign itself to all partitions for this topic and consume from its latest offset.
List<KafkaTopic> kafkaTopicList = this.getGobblinKafkaConsumerClient().getFilteredTopics(Collections.EMPTY_LIST,
Lists.newArrayList(Pattern.compile(this.topic)));

List<KafkaPartition> kafkaPartitions = new ArrayList();
for (KafkaTopic topic : kafkaTopicList) {
kafkaPartitions.addAll(topic.getPartitions());
}

Map<KafkaPartition, LongWatermark> partitionLongWatermarkMap = new HashMap<>();
for (KafkaPartition partition : kafkaPartitions) {
try {
partitionLongWatermarkMap.put(partition, new LongWatermark(this.getGobblinKafkaConsumerClient().getLatestOffset(partition)));
} catch (KafkaOffsetRetrievalFailureException e) {
log.warn("Failed to retrieve latest Kafka offset, consuming from beginning for partition {} due to {}",
partition, e);
partitionLongWatermarkMap.put(partition, new LongWatermark(0L));
}
}
this.getGobblinKafkaConsumerClient().assignAndSeek(kafkaPartitions, partitionLongWatermarkMap);
}

@Override
Expand Down

0 comments on commit 71da34b

Please sign in to comment.