Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into async-termination
Browse files Browse the repository at this point in the history
  • Loading branch information
m50d committed Feb 18, 2021
2 parents 29b547e + 4254ae9 commit f26b7ec
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 47 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: CI
on:
push:
branches:
- master
pull_request:
branches:
- master
jobs:
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
java: [8, 11]
steps:
- uses: actions/checkout@v2
- name: Setup java
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Execute test
uses: eskatos/gradle-command-action@v1
with:
arguments: build integrationTest
17 changes: 0 additions & 17 deletions .travis.yml

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Decaton
=======

[![Build Status](https://travis-ci.com/line/decaton.svg?branch=master)](https://travis-ci.com/line/decaton)
[![Build Status](https://github.com/line/decaton/workflows/CI/badge.svg?branch=master)](https://github.com/line/decaton/actions?query=workflow%3ACI+branch%3Amaster+event%3Apush)

Decaton is a streaming task processing framework built on top of [Apache Kafka](https://kafka.apache.org/).
It is designed to enable "concurrent processing of records consumed from one partition" which isn't possible in many Kafka consumer frameworks.
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
#

snapshot=true
version=1.0.1
version=1.0.2
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.linecorp.decaton.processor.runtime.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -117,7 +118,7 @@ public ConsumeManager(Consumer<String, byte[]> consumer,
* @param subscribeTopics list of topics to fetch records from.
*/
public void init(Collection<String> subscribeTopics) {
List<TopicPartition> pausedPartitions = new ArrayList<>();
Set<TopicPartition> pausedPartitions = new HashSet<>();
consumer.subscribe(subscribeTopics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
Expand All @@ -139,6 +140,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Consumer rebalance resets all pause states of assigned partitions even though they
// haven't moved over from/to different consumer instance.
// Need to re-call pause with originally paused partitions to bring state back consistent.
pausedPartitions.retainAll(partitions);
try {
consumer.pause(pausedPartitions);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
Expand All @@ -52,11 +53,10 @@
import org.apache.kafka.common.TopicPartition;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.Answer;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.DeferredCompletion;
Expand All @@ -74,9 +74,6 @@ public class ProcessorSubscriptionTest {
@Mock
Consumer<String, byte[]> consumer;

@Captor
ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> offsetsCaptor;

/**
* A mock consumer which exposes rebalance listener so that can be triggered manually
* ({@link MockConsumer} doesn't simulate rebalance listener invocation. refs: KAFKA-6968).
Expand Down Expand Up @@ -110,15 +107,21 @@ private static SubscriptionScope scope(String topic, long waitForProcessingOnClo

private static ProcessorSubscription subscription(Consumer<String, byte[]> consumer,
SubscriptionStateListener listener,
TopicPartition tp) {
TopicPartition tp,
DecatonProcessor<String> processor) {
SubscriptionScope scope = scope(tp.topic(), 0L);
ProcessorsBuilder<String> builder =
ProcessorsBuilder.consuming(scope.topic(),
(byte[] bytes) -> new DecatonTask<>(
TaskMetadata.builder().build(),
new String(bytes), bytes));
if (processor != null) {
builder.thenProcess(processor);
}
return new ProcessorSubscription(
scope,
() -> consumer,
ProcessorsBuilder.consuming(scope.topic(),
(byte[] bytes) -> new DecatonTask<>(
TaskMetadata.builder().build(), "dummy", bytes))
.build(null),
builder.build(null),
scope.props(),
listener);
}
Expand All @@ -135,7 +138,7 @@ public void testStateTransition() throws Exception {
pollLatch.countDown();
});

ProcessorSubscription subscription = subscription(consumer, states::add, tp);
ProcessorSubscription subscription = subscription(consumer, states::add, tp, null);

subscription.start();
pollLatch.await();
Expand All @@ -159,30 +162,50 @@ public void testOffsetRegression() throws Exception {
return null;
}).when(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));

ProcessorSubscription subscription = subscription(consumer, ignored -> {}, tp);
BlockingQueue<Long> feedOffsets = new ArrayBlockingQueue<>(4);
feedOffsets.add(100L);
feedOffsets.add(99L);
feedOffsets.add(100L);
feedOffsets.add(101L);
CountDownLatch processLatch = new CountDownLatch(1);
ProcessorSubscription subscription = subscription(consumer, ignored -> {}, tp, (context, task) -> {
if ("101".equals(task)) {
processLatch.countDown();
}
});

Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
Answer<?> storeCommitOffsets = invocation -> {
committedOffsets.putAll(invocation.getArgument(0));
return null;
};
doAnswer(storeCommitOffsets).when(consumer).commitSync(any(Map.class));
doAnswer(storeCommitOffsets).when(consumer).commitAsync(any(Map.class), any());

AtomicBoolean first = new AtomicBoolean();
CountDownLatch pollLatch = new CountDownLatch(1);
doAnswer(invocation -> {
if (first.compareAndSet(false, true)) {
listener.get().onPartitionsAssigned(singleton(tp));
return new ConsumerRecords<>(singletonMap(tp, Arrays.asList(
}
Long offset = feedOffsets.poll();
if (offset != null) {
return new ConsumerRecords<>(singletonMap(tp, Collections.singletonList(
// Feed one record, then a subsequent record of the regressing offset.
new ConsumerRecord<>(tp.topic(), tp.partition(), 100L, "abc", new byte[0]),
new ConsumerRecord<>(tp.topic(), tp.partition(), 99L, "abc", new byte[0]))));
new ConsumerRecord<>(tp.topic(), tp.partition(), offset, "abc",
String.valueOf(offset).getBytes()))));
} else {
pollLatch.countDown();
Thread.sleep(invocation.getArgument(0));
return ConsumerRecords.empty();
}
}).when(consumer).poll(anyLong());

subscription.start();
pollLatch.await();
processLatch.await();
subscription.close();

verify(consumer, times(1)).commitAsync(offsetsCaptor.capture(), any());
Map<TopicPartition, OffsetAndMetadata> offsets = offsetsCaptor.getValue();
OffsetAndMetadata offset = offsets.get(tp);
assertEquals(100L, offset.offset());
OffsetAndMetadata offset = committedOffsets.get(tp);
// 101 + 1 is committed when offset=101 is completed.
assertEquals(102L, offset.offset());
}

@Test(timeout = 10000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package com.linecorp.decaton.processor.runtime.internal;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toSet;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand All @@ -26,10 +30,14 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -44,7 +52,6 @@

import com.linecorp.decaton.processor.metrics.Metrics;
import com.linecorp.decaton.processor.metrics.Metrics.SubscriptionMetrics;
import com.linecorp.decaton.processor.runtime.internal.ConsumeManager;
import com.linecorp.decaton.processor.runtime.internal.ConsumeManager.ConsumerHandler;
import com.linecorp.decaton.processor.runtime.internal.ConsumeManager.PartitionStates;

Expand All @@ -68,19 +75,29 @@ public class ConsumeManagerTest {

SubscriptionMetrics metrics;

private ConsumerRebalanceListener rebalanceListener;

private ConsumeManager consumeManager;

@Before
public void setUp() {
metrics = Metrics.withTags("subscription", "subsc").new SubscriptionMetrics();
consumeManager = new ConsumeManager(consumer, states, handler, metrics);
consumeManager.init(Collections.singletonList(TOPIC));
doAnswer(invocation -> {
rebalanceListener = invocation.getArgument(1);
return null;
}).when(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
consumeManager.init(singletonList(TOPIC));
}

private static TopicPartition tp(int partition) {
return new TopicPartition(TOPIC, partition);
}

private static Set<TopicPartition> tpSet(int... partitions) {
return Arrays.stream(partitions).mapToObj(p -> new TopicPartition(TOPIC, p)).collect(toSet());
}

@Test
public void poll() {
List<ConsumerRecord<String, byte[]>> records = Arrays.asList(
Expand Down Expand Up @@ -121,4 +138,33 @@ public void poll() {
verify(consumer, times(1)).resume(Arrays.asList(tp(2)));
assertEquals(Arrays.asList(tp(1), tp(3)), partitionsPaused);
}

@Test
public void pauseStateHandlingAtRebalance() {
doReturn(tpSet(1, 2)).when(consumer).paused();
doAnswer(invocation -> {
// initial partitions: [1, 2]
// paused partition: [1, 2]
// revoked partitions: [2]
// newly assigned partition: [3]
rebalanceListener.onPartitionsRevoked(singletonList(tp(2)));
rebalanceListener.onPartitionsAssigned(Arrays.asList(tp(1), tp(3)));
return ConsumerRecords.empty();
}).when(consumer).poll(anyLong());

Set<TopicPartition> pausedPartitions = new HashSet<>();
doAnswer(invocation -> {
pausedPartitions.clear();
pausedPartitions.addAll(invocation.getArgument(0));
return null;
}).when(consumer).pause(any());

consumeManager.poll();
// Do not call pause for the revoked partition "2".
assertEquals(singleton(tp(1)), pausedPartitions);

doReturn(emptySet()).when(consumer).paused();
consumeManager.poll();
assertEquals(emptySet(), pausedPartitions);
}
}

0 comments on commit f26b7ec

Please sign in to comment.