Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for deferred completion #125

Merged
merged 2 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,37 @@ public void testAsyncTaskCompletion() {
.run();
}

/*
* This test aims to check if we can complete deferred completion even we get
* completion instance separately from first deferCompletion() call.
*
* NOTE: Though it is a valid way to complete deferred completion like this,
* it's recommended holding only `Completion` instance if possible to avoid unnecessary
* heap pressure by holding entire ProcessingContext instance.
*/
@Test(timeout = 30000)
public void testGetCompletionInstanceLater() {
ExecutorService executorService = Executors.newFixedThreadPool(16);
Random rand = randomRule.random();
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
ctx.deferCompletion();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 for adding test case to check this usage is valid, but IMO we should not advertise this as the primary usage of deferred completion, because unless users require ProcessingContext later in async processing, they should prefer using only Completion so that memory pressure is dramatically reduced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they should prefer using only Completion so that memory pressure is dramatically reduced

Good point. I'll revert this and add another test cases with caution.

executorService.execute(() -> {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
ctx.deferCompletion().complete();
}
});
}))
.build()
.run();
}

@Test(timeout = 60000)
public void testSingleThreadProcessing() {
// Note that this processing semantics is not be considered as Decaton specification which users can rely on.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.After;
Expand Down Expand Up @@ -139,6 +141,40 @@ public void testRetryQueuing() throws Exception {
.run();
}

@Test(timeout = 30000)
public void testRetryQueuingOnAsyncProcessor() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(16);
ProcessorTestSuite
.builder(rule)
.numTasks(1000)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
ctx.deferCompletion();
executorService.execute(() -> {
try {
if (ctx.metadata().retryCount() == 0) {
ctx.retry();
} else {
ctx.deferCompletion().complete();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});

}))
.retryConfig(RetryConfig.builder()
.retryTopic(retryTopic)
.backoff(Duration.ofMillis(10))
.build())
.excludeSemantics(
GuaranteeType.PROCESS_ORDERING,
GuaranteeType.SERIAL_PROCESSING)
.customSemantics(new ProcessRetriedTask())
.build()
.run();
}

/*
* This test tries to re-produce delivery-loss due to https://github.com/line/decaton/issues/101 by:
* 1. Retry all tasks once immediately
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,31 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

import com.google.protobuf.ByteString;

import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier;
import com.linecorp.decaton.client.KafkaProducerSupplier;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.PropertySupplier;
import com.linecorp.decaton.processor.runtime.RetryConfig;
import com.linecorp.decaton.processor.runtime.RetryConfig.RetryConfigBuilder;
import com.linecorp.decaton.processor.runtime.SubscriptionStateListener;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.processor.tracing.TracingProvider;
Expand Down Expand Up @@ -212,15 +219,19 @@ public void run() {

try {
producer = producerSupplier.apply(rule.bootstrapServers());
ConcurrentMap<TopicPartition, Long> retryOffsets = new ConcurrentHashMap<>();
for (int i = 0; i < subscriptions.length; i++) {
subscriptions[i] = newSubscription(i, topic, Optional.of(rollingRestartLatch));
subscriptions[i] = newSubscription(i, topic, Optional.of(rollingRestartLatch), retryOffsets);
}
CompletableFuture<Map<Integer, Long>> produceFuture =
CompletableFuture<Map<TopicPartition, Long>> produceFuture =
produceTasks(producer, topic, record -> semantics.forEach(g -> g.onProduce(record)));

rollingRestartLatch.await();
performRollingRestart(subscriptions, i -> newSubscription(i, topic, Optional.empty()));
awaitAllOffsetsCommitted(topic, produceFuture);
performRollingRestart(subscriptions, i -> newSubscription(i, topic, Optional.empty(), retryOffsets));
awaitAllOffsetsCommitted(produceFuture.join());
if (!retryOffsets.isEmpty()) {
awaitAllOffsetsCommitted(retryOffsets);
}

for (ProcessingGuarantee guarantee : semantics) {
guarantee.doAssert();
Expand All @@ -237,7 +248,11 @@ public void run() {
}
}

private ProcessorSubscription newSubscription(int id, String topic, Optional<CountDownLatch> processLatch) {
private ProcessorSubscription newSubscription(
int id,
String topic,
Optional<CountDownLatch> processLatch,
ConcurrentMap<TopicPartition, Long> retryOffsets) {
DecatonProcessor<TestTask> preprocessor = (context, task) -> {
long startTime = System.nanoTime();
try {
Expand All @@ -258,21 +273,31 @@ private ProcessorSubscription newSubscription(int id, String topic, Optional<Cou
ProcessorsBuilder<TestTask> processorsBuilder =
configureProcessorsBuilder.apply(sourceBuilder.thenProcess(preprocessor));

return TestUtils.subscription("subscription-" + id,
rule.bootstrapServers(),
builder -> {
builder.processorsBuilder(processorsBuilder);
if (retryConfig != null) {
builder.enableRetry(retryConfig);
}
if (propertySuppliers != null) {
builder.properties(propertySuppliers);
}
if(tracingProvider != null) {
builder.enableTracing(tracingProvider);
}
builder.stateListener(state -> statesListener.onChange(id, state));
});
return TestUtils.subscription(
"subscription-" + id,
rule.bootstrapServers(),
builder -> {
builder.processorsBuilder(processorsBuilder);
if (retryConfig != null) {
RetryConfigBuilder retryConfigBuilder = retryConfig.toBuilder();
KafkaProducerSupplier innerSupplier =
Optional.ofNullable(retryConfig.producerSupplier())
.orElseGet(DefaultKafkaProducerSupplier::new);
retryConfigBuilder.producerSupplier(props -> new InterceptingProducer(
innerSupplier.getProducer(props),
meta -> retryOffsets.compute(new TopicPartition(meta.topic(), meta.partition()),
(k, v) -> Math.max(v == null ? -1L : v, meta.offset())))
);
builder.enableRetry(retryConfigBuilder.build());
}
if (propertySuppliers != null) {
builder.properties(propertySuppliers);
}
if (tracingProvider != null) {
builder.enableTracing(tracingProvider);
}
builder.stateListener(state -> statesListener.onChange(id, state));
});
}

private static void performRollingRestart(ProcessorSubscription[] subscriptions,
Expand All @@ -286,18 +311,14 @@ private static void performRollingRestart(ProcessorSubscription[] subscriptions,
}
}

private void awaitAllOffsetsCommitted(String topic,
CompletableFuture<Map<Integer, Long>> produceFuture) {
Map<Integer, Long> producedOffsets = produceFuture.join();
private void awaitAllOffsetsCommitted(Map<TopicPartition, Long> producedOffsets) {
TestUtils.awaitCondition("all produced offsets should be committed", () -> {
Map<TopicPartition, OffsetAndMetadata> committed =
rule.admin().consumerGroupOffsets(TestUtils.DEFAULT_GROUP_ID);

for (Entry<Integer, Long> entry : producedOffsets.entrySet()) {
int partition = entry.getKey();
for (Entry<TopicPartition, Long> entry : producedOffsets.entrySet()) {
long produced = entry.getValue();

OffsetAndMetadata metadata = committed.get(new TopicPartition(topic, partition));
OffsetAndMetadata metadata = committed.get(entry.getKey());
if (metadata == null || metadata.offset() <= produced) {
return false;
}
Expand All @@ -323,7 +344,7 @@ private static void safeClose(AutoCloseable resource) {
* @param onProduce Callback which is called when a task is complete to be sent
* @return A CompletableFuture of Map, which holds partition as the key and max offset as the value
*/
private CompletableFuture<Map<Integer, Long>> produceTasks(
private CompletableFuture<Map<TopicPartition, Long>> produceTasks(
Producer<String, DecatonTaskRequest> producer,
String topic,
Consumer<ProducedRecord> onProduce) {
Expand Down Expand Up @@ -366,15 +387,34 @@ private CompletableFuture<Map<Integer, Long>> produceTasks(
}

return CompletableFuture.allOf(produceFutures).thenApply(notUsed -> {
Map<Integer, Long> result = new HashMap<>();
Map<TopicPartition, Long> result = new HashMap<>();
for (CompletableFuture<RecordMetadata> future : produceFutures) {
RecordMetadata metadata = future.join();
long offset = result.getOrDefault(metadata.partition(), -1L);
if (offset < metadata.offset()) {
result.put(metadata.partition(), metadata.offset());
}
result.compute(new TopicPartition(metadata.topic(), metadata.partition()),
(k, v) -> Math.max(v == null ? -1L : v, metadata.offset()));
}
return result;
});
}

private static class InterceptingProducer extends ProducerAdaptor<String, DecatonTaskRequest> {
private final Consumer<RecordMetadata> interceptor;

InterceptingProducer(Producer<String, DecatonTaskRequest> delegate,
Consumer<RecordMetadata> interceptor) {
super(delegate);
this.interceptor = interceptor;
}

@Override
public Future<RecordMetadata> send(ProducerRecord<String, DecatonTaskRequest> record,
Callback callback) {
return super.send(record, (meta, e) -> {
if (meta != null) {
interceptor.accept(meta);
}
callback.onCompletion(meta, e);
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.testing.processor;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;

abstract class ProducerAdaptor<K, V> implements Producer<K, V> {
protected final Producer<K, V> delegate;
protected ProducerAdaptor(Producer<K, V> delegate) {
this.delegate = delegate;
}

@Override
public void initTransactions() {
delegate.initTransactions();
}

@Override
public void beginTransaction() throws ProducerFencedException {
delegate.beginTransaction();
}

@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
throws ProducerFencedException {
delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
}

@Override
public void commitTransaction() throws ProducerFencedException {
delegate.commitTransaction();
}

@Override
public void abortTransaction() throws ProducerFencedException {
delegate.abortTransaction();
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return delegate.send(record);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
return delegate.send(record, callback);
}

@Override
public void flush() {
delegate.flush();
}

@Override
public List<PartitionInfo> partitionsFor(String topic) {
return delegate.partitionsFor(topic);
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
return delegate.metrics();
}

@Override
public void close() {
delegate.close();
}

@Override
public void close(Duration timeout) {
delegate.close(timeout);
}
}
Loading