diff --git a/processor/src/it/java/com/linecorp/decaton/processor/CoreFunctionalityTest.java b/processor/src/it/java/com/linecorp/decaton/processor/CoreFunctionalityTest.java index cc53f095..fddb6e72 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/CoreFunctionalityTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/CoreFunctionalityTest.java @@ -117,6 +117,37 @@ public void testAsyncTaskCompletion() { .run(); } + /* + * This test aims to check if we can complete deferred completion even we get + * completion instance separately from first deferCompletion() call. + * + * NOTE: Though it is a valid way to complete deferred completion like this, + * it's recommended holding only `Completion` instance if possible to avoid unnecessary + * heap pressure by holding entire ProcessingContext instance. + */ + @Test(timeout = 30000) + public void testGetCompletionInstanceLater() { + ExecutorService executorService = Executors.newFixedThreadPool(16); + Random rand = randomRule.random(); + ProcessorTestSuite + .builder(rule) + .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { + ctx.deferCompletion(); + executorService.execute(() -> { + try { + Thread.sleep(rand.nextInt(10)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + ctx.deferCompletion().complete(); + } + }); + })) + .build() + .run(); + } + @Test(timeout = 60000) public void testSingleThreadProcessing() { // Note that this processing semantics is not be considered as Decaton specification which users can rely on. diff --git a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java index a7c78b45..cf4419a8 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java @@ -23,6 +23,8 @@ import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.After; @@ -139,6 +141,40 @@ public void testRetryQueuing() throws Exception { .run(); } + @Test(timeout = 30000) + public void testRetryQueuingOnAsyncProcessor() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(16); + ProcessorTestSuite + .builder(rule) + .numTasks(1000) + .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { + ctx.deferCompletion(); + executorService.execute(() -> { + try { + if (ctx.metadata().retryCount() == 0) { + ctx.retry(); + } else { + ctx.deferCompletion().complete(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }); + + })) + .retryConfig(RetryConfig.builder() + .retryTopic(retryTopic) + .backoff(Duration.ofMillis(10)) + .build()) + .excludeSemantics( + GuaranteeType.PROCESS_ORDERING, + GuaranteeType.SERIAL_PROCESSING) + .customSemantics(new ProcessRetriedTask()) + .build() + .run(); + } + /* * This test tries to re-produce delivery-loss due to https://github.com/line/decaton/issues/101 by: * 1. Retry all tasks once immediately diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java index ed6453b3..2a52439e 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java @@ -25,11 +25,15 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Function; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -37,12 +41,15 @@ import com.google.protobuf.ByteString; +import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier; +import com.linecorp.decaton.client.KafkaProducerSupplier; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.ProcessorSubscription; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; import com.linecorp.decaton.processor.runtime.PropertySupplier; import com.linecorp.decaton.processor.runtime.RetryConfig; +import com.linecorp.decaton.processor.runtime.RetryConfig.RetryConfigBuilder; import com.linecorp.decaton.processor.runtime.SubscriptionStateListener; import com.linecorp.decaton.processor.runtime.TaskExtractor; import com.linecorp.decaton.processor.tracing.TracingProvider; @@ -212,15 +219,19 @@ public void run() { try { producer = producerSupplier.apply(rule.bootstrapServers()); + ConcurrentMap retryOffsets = new ConcurrentHashMap<>(); for (int i = 0; i < subscriptions.length; i++) { - subscriptions[i] = newSubscription(i, topic, Optional.of(rollingRestartLatch)); + subscriptions[i] = newSubscription(i, topic, Optional.of(rollingRestartLatch), retryOffsets); } - CompletableFuture> produceFuture = + CompletableFuture> produceFuture = produceTasks(producer, topic, record -> semantics.forEach(g -> g.onProduce(record))); rollingRestartLatch.await(); - performRollingRestart(subscriptions, i -> newSubscription(i, topic, Optional.empty())); - awaitAllOffsetsCommitted(topic, produceFuture); + performRollingRestart(subscriptions, i -> newSubscription(i, topic, Optional.empty(), retryOffsets)); + awaitAllOffsetsCommitted(produceFuture.join()); + if (!retryOffsets.isEmpty()) { + awaitAllOffsetsCommitted(retryOffsets); + } for (ProcessingGuarantee guarantee : semantics) { guarantee.doAssert(); @@ -237,7 +248,11 @@ public void run() { } } - private ProcessorSubscription newSubscription(int id, String topic, Optional processLatch) { + private ProcessorSubscription newSubscription( + int id, + String topic, + Optional processLatch, + ConcurrentMap retryOffsets) { DecatonProcessor preprocessor = (context, task) -> { long startTime = System.nanoTime(); try { @@ -258,21 +273,31 @@ private ProcessorSubscription newSubscription(int id, String topic, Optional processorsBuilder = configureProcessorsBuilder.apply(sourceBuilder.thenProcess(preprocessor)); - return TestUtils.subscription("subscription-" + id, - rule.bootstrapServers(), - builder -> { - builder.processorsBuilder(processorsBuilder); - if (retryConfig != null) { - builder.enableRetry(retryConfig); - } - if (propertySuppliers != null) { - builder.properties(propertySuppliers); - } - if(tracingProvider != null) { - builder.enableTracing(tracingProvider); - } - builder.stateListener(state -> statesListener.onChange(id, state)); - }); + return TestUtils.subscription( + "subscription-" + id, + rule.bootstrapServers(), + builder -> { + builder.processorsBuilder(processorsBuilder); + if (retryConfig != null) { + RetryConfigBuilder retryConfigBuilder = retryConfig.toBuilder(); + KafkaProducerSupplier innerSupplier = + Optional.ofNullable(retryConfig.producerSupplier()) + .orElseGet(DefaultKafkaProducerSupplier::new); + retryConfigBuilder.producerSupplier(props -> new InterceptingProducer( + innerSupplier.getProducer(props), + meta -> retryOffsets.compute(new TopicPartition(meta.topic(), meta.partition()), + (k, v) -> Math.max(v == null ? -1L : v, meta.offset()))) + ); + builder.enableRetry(retryConfigBuilder.build()); + } + if (propertySuppliers != null) { + builder.properties(propertySuppliers); + } + if (tracingProvider != null) { + builder.enableTracing(tracingProvider); + } + builder.stateListener(state -> statesListener.onChange(id, state)); + }); } private static void performRollingRestart(ProcessorSubscription[] subscriptions, @@ -286,18 +311,14 @@ private static void performRollingRestart(ProcessorSubscription[] subscriptions, } } - private void awaitAllOffsetsCommitted(String topic, - CompletableFuture> produceFuture) { - Map producedOffsets = produceFuture.join(); + private void awaitAllOffsetsCommitted(Map producedOffsets) { TestUtils.awaitCondition("all produced offsets should be committed", () -> { Map committed = rule.admin().consumerGroupOffsets(TestUtils.DEFAULT_GROUP_ID); - for (Entry entry : producedOffsets.entrySet()) { - int partition = entry.getKey(); + for (Entry entry : producedOffsets.entrySet()) { long produced = entry.getValue(); - - OffsetAndMetadata metadata = committed.get(new TopicPartition(topic, partition)); + OffsetAndMetadata metadata = committed.get(entry.getKey()); if (metadata == null || metadata.offset() <= produced) { return false; } @@ -323,7 +344,7 @@ private static void safeClose(AutoCloseable resource) { * @param onProduce Callback which is called when a task is complete to be sent * @return A CompletableFuture of Map, which holds partition as the key and max offset as the value */ - private CompletableFuture> produceTasks( + private CompletableFuture> produceTasks( Producer producer, String topic, Consumer onProduce) { @@ -366,15 +387,34 @@ private CompletableFuture> produceTasks( } return CompletableFuture.allOf(produceFutures).thenApply(notUsed -> { - Map result = new HashMap<>(); + Map result = new HashMap<>(); for (CompletableFuture future : produceFutures) { RecordMetadata metadata = future.join(); - long offset = result.getOrDefault(metadata.partition(), -1L); - if (offset < metadata.offset()) { - result.put(metadata.partition(), metadata.offset()); - } + result.compute(new TopicPartition(metadata.topic(), metadata.partition()), + (k, v) -> Math.max(v == null ? -1L : v, metadata.offset())); } return result; }); } + + private static class InterceptingProducer extends ProducerAdaptor { + private final Consumer interceptor; + + InterceptingProducer(Producer delegate, + Consumer interceptor) { + super(delegate); + this.interceptor = interceptor; + } + + @Override + public Future send(ProducerRecord record, + Callback callback) { + return super.send(record, (meta, e) -> { + if (meta != null) { + interceptor.accept(meta); + } + callback.onCompletion(meta, e); + }); + } + } } diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java new file mode 100644 index 00000000..3fe045a2 --- /dev/null +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.testing.processor; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; + +abstract class ProducerAdaptor implements Producer { + protected final Producer delegate; + protected ProducerAdaptor(Producer delegate) { + this.delegate = delegate; + } + + @Override + public void initTransactions() { + delegate.initTransactions(); + } + + @Override + public void beginTransaction() throws ProducerFencedException { + delegate.beginTransaction(); + } + + @Override + public void sendOffsetsToTransaction(Map offsets, String consumerGroupId) + throws ProducerFencedException { + delegate.sendOffsetsToTransaction(offsets, consumerGroupId); + } + + @Override + public void commitTransaction() throws ProducerFencedException { + delegate.commitTransaction(); + } + + @Override + public void abortTransaction() throws ProducerFencedException { + delegate.abortTransaction(); + } + + @Override + public Future send(ProducerRecord record) { + return delegate.send(record); + } + + @Override + public Future send(ProducerRecord record, Callback callback) { + return delegate.send(record, callback); + } + + @Override + public void flush() { + delegate.flush(); + } + + @Override + public List partitionsFor(String topic) { + return delegate.partitionsFor(topic); + } + + @Override + public Map metrics() { + return delegate.metrics(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public void close(Duration timeout) { + delegate.close(timeout); + } +} diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java index ce7572bd..96736c51 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java @@ -17,56 +17,21 @@ package com.linecorp.decaton.testing.processor; import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.Future; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.header.internals.RecordHeader; import com.linecorp.decaton.processor.tracing.TestTracingProvider; import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; -public class TestTracingProducer implements Producer { - private final Producer inner; - - public TestTracingProducer(Producer inner) {this.inner = inner;} - - @Override - public void initTransactions() { - inner.initTransactions(); - } - - @Override - public void beginTransaction() throws ProducerFencedException { - inner.beginTransaction(); - } - - @Override - public void sendOffsetsToTransaction(Map offsets, String consumerGroupId) - throws ProducerFencedException { - inner.sendOffsetsToTransaction(offsets, consumerGroupId); - } - - @Override - public void commitTransaction() throws ProducerFencedException { - inner.commitTransaction(); - } - - @Override - public void abortTransaction() throws ProducerFencedException { - inner.abortTransaction(); +public class TestTracingProducer extends ProducerAdaptor { + public TestTracingProducer(Producer delegate) { + super(delegate); } private static void propagateCurrentTrace(ProducerRecord record) { @@ -82,37 +47,12 @@ private static void propagateCurrentTrace(ProducerRecord send(ProducerRecord record) { propagateCurrentTrace(record); - return inner.send(record); + return super.send(record); } @Override public Future send(ProducerRecord record, Callback callback) { propagateCurrentTrace(record); - return inner.send(record, callback); - } - - @Override - public void flush() { - inner.flush(); - } - - @Override - public List partitionsFor(String topic) { - return inner.partitionsFor(topic); - } - - @Override - public Map metrics() { - return inner.metrics(); - } - - @Override - public void close() { - inner.close(); - } - - @Override - public void close(Duration timeout) { - inner.close(timeout); + return super.send(record, callback); } }