Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to bind KafkaClientMetrics for each decaton consumer #132

Merged
merged 7 commits into from
Dec 8, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void init(Config config, Recording recording, ResourceTracker resourceTra
subscription = SubscriptionBuilder
.newBuilder("decaton-benchmark")
.consumerConfig(props)
.properties(StaticPropertySupplier.of(properties))
.addProperties(StaticPropertySupplier.of(properties))
.processorsBuilder(
ProcessorsBuilder.consuming(config.topic(),
(TaskExtractor<Task>) bytes -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class CoreFunctionalityTest {
public RandomRule randomRule = new RandomRule();

@Test(timeout = 30000)
public void testProcessConcurrent() {
public void testProcessConcurrent() throws Exception {
Random rand = randomRule.random();
ProcessorTestSuite
.builder(rule)
Expand All @@ -65,7 +65,7 @@ public void testProcessConcurrent() {
}

@Test(timeout = 30000)
public void testProcessConcurrent_PartitionScopeProcessor() {
public void testProcessConcurrent_PartitionScopeProcessor() throws Exception {
Random rand = randomRule.random();
ProcessorTestSuite
.builder(rule)
Expand All @@ -80,7 +80,7 @@ public void testProcessConcurrent_PartitionScopeProcessor() {
}

@Test(timeout = 30000)
public void testProcessConcurrent_ThreadScopeProcessor() {
public void testProcessConcurrent_ThreadScopeProcessor() throws Exception {
Random rand = randomRule.random();
ProcessorTestSuite
.builder(rule)
Expand All @@ -95,7 +95,7 @@ public void testProcessConcurrent_ThreadScopeProcessor() {
}

@Test(timeout = 30000)
public void testAsyncTaskCompletion() {
public void testAsyncTaskCompletion() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(16);
Random rand = randomRule.random();
ProcessorTestSuite
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testAsyncTaskCompletion() {
* heap pressure by holding entire ProcessingContext instance.
*/
@Test(timeout = 30000)
public void testGetCompletionInstanceLater() {
public void testGetCompletionInstanceLater() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(16);
Random rand = randomRule.random();
ProcessorTestSuite
Expand All @@ -149,7 +149,7 @@ public void testGetCompletionInstanceLater() {
}

@Test(timeout = 60000)
public void testSingleThreadProcessing() {
public void testSingleThreadProcessing() throws Exception {
// Note that this processing semantics is not be considered as Decaton specification which users can rely on.
// Rather, this is just a expected behavior based on current implementation when we set concurrency to 1.
ProcessingGuarantee noDuplicates = new ProcessingGuarantee() {
Expand Down Expand Up @@ -189,7 +189,7 @@ public void doAssert() {
}

@Test(timeout = 30000)
public void testAsyncCompletionWithLeakAndTimeout() {
public void testAsyncCompletionWithLeakAndTimeout() throws Exception {
Random rand = randomRule.random();
ProcessorTestSuite
.builder(rule)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testPropertyDynamicSwitch() throws Exception {
.consuming(topicName,
new ProtocolBuffersDeserializer<>(HelloTask.parser()))
.thenProcess(processor))
.properties(StaticPropertySupplier.of(rateProp)));
.addProperties(StaticPropertySupplier.of(rateProp)));
DecatonClient<HelloTask> client = TestUtils.client(topicName, rule.bootstrapServers())) {

int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testRetryQueuingExtractingWithDefaultMeta() throws Exception {
}

@Test(timeout = 60000)
public void testRetryQueueingFromCompletionTimeoutCallback() {
public void testRetryQueueingFromCompletionTimeoutCallback() throws Exception {
ProcessorTestSuite
.builder(rule)
.numTasks(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SubscriptionStateTest {
public static KafkaClusterRule rule = new KafkaClusterRule();

@Test(timeout = 30000)
public void testStateTransition() {
public void testStateTransition() throws Exception {
Map<Integer, List<State>> subscriptionStates = new HashMap<>();
ProcessorTestSuite
.builder(rule)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
import com.linecorp.decaton.protocol.Sample.HelloTask;
Expand Down Expand Up @@ -111,7 +111,7 @@ public void testDetectStuckPartitions() throws Exception {
.consuming(topicName,
new ProtocolBuffersDeserializer<>(HelloTask.parser()))
.thenProcess(processor))
.properties(StaticPropertySupplier.of(
.addProperties(StaticPropertySupplier.of(
Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 1),
Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, 1))));
Producer<String, DecatonTaskRequest> producer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.Consumer;

import com.linecorp.decaton.processor.metrics.internal.AvailableTags;

import io.micrometer.core.instrument.Counter;
Expand All @@ -33,6 +35,7 @@
import io.micrometer.core.instrument.Meter.Id;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;

Expand Down Expand Up @@ -99,6 +102,20 @@ public void close() {
}

public class SubscriptionMetrics extends AbstractMetrics {
volatile KafkaClientMetrics kafkaClientMetrics;

public void bindClientMetrics(Consumer<String, byte[]> consumer) {
kafkaClientMetrics = new KafkaClientMetrics(consumer, availableTags.subscriptionScope());
kafkaClientMetrics.bindTo(registry);
}

private void closeClientMetrics() {
if (kafkaClientMetrics != null) {
kafkaClientMetrics.close();
kafkaClientMetrics = null;
}
}

private Timer processDuration(String section) {
return meter(() -> Timer.builder("subscription.process.durations")
.description(String.format(
Expand All @@ -118,6 +135,12 @@ private Timer processDuration(String section) {
public final Timer handlePausesTime = processDuration("pause");

public final Timer commitOffsetTime = processDuration("commit");

@Override
public void close() {
super.close();
closeClientMetrics();
}
}

public class TaskMetrics extends AbstractMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ public class ProcessorProperties extends AbstractDecatonProperties {
public static final PropertyDefinition<Boolean> CONFIG_LOGGING_MDC_ENABLED =
PropertyDefinition.define("decaton.logging.mdc.enabled", Boolean.class, true,
v -> v instanceof Boolean);
/**
* Controls whether to enable or disable binding Micrometer's KafkaClientMetrics to decaton consumers.
* This is disabled for backwards compatiblity, but recommended if you rely on Micrometer
* since JMX metrics are deprecated. The downside is a possible increase in metrics count.
*
* Reloadable: no
*/
public static final PropertyDefinition<Boolean> CONFIG_BIND_CLIENT_METRICS =
PropertyDefinition.define("decaton.client.metrics.micrometer.bound", Boolean.class, false,
v -> v instanceof Boolean);
/**
* Control time to "timeout" a deferred completion.
* Decaton allows {@link DecatonProcessor}s to defer completion of a task by calling
Expand Down Expand Up @@ -174,6 +184,7 @@ public class ProcessorProperties extends AbstractDecatonProperties {
CONFIG_GROUP_REBALANCE_TIMEOUT_MS,
CONFIG_SHUTDOWN_TIMEOUT_MS,
CONFIG_LOGGING_MDC_ENABLED,
CONFIG_BIND_CLIENT_METRICS,
CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS));

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.linecorp.decaton.processor.runtime;

import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_BIND_CLIENT_METRICS;

import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
Expand Down Expand Up @@ -140,6 +142,9 @@ public void receive(ConsumerRecord<String, byte[]> record) {
metrics = Metrics.withTags("subscription", scope.subscriptionId()).new SubscriptionMetrics();

Consumer<String, byte[]> consumer = consumerSupplier.get();
if (props.get(CONFIG_BIND_CLIENT_METRICS).value()) {
metrics.bindClientMetrics(consumer);
}
consumeManager = new ConsumeManager(consumer, contexts, new Handler(), metrics);
commitManager = new CommitManager(
consumer, props.get(ProcessorProperties.CONFIG_COMMIT_INTERVAL_MS), contexts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,22 @@ public static SubscriptionBuilder newBuilder(String subscriptionId) {
* @param suppliers {@link PropertySupplier} instances
* @return updated instance of {@link SubscriptionBuilder}.
*/
public SubscriptionBuilder properties(PropertySupplier... suppliers) {
ProcessorProperties.Builder<ProcessorProperties> builder = ProcessorProperties.builder();
public SubscriptionBuilder addProperties(PropertySupplier... suppliers) {
for (PropertySupplier supplier : suppliers) {
builder.setBySupplier(supplier);
propertiesBuilder.setBySupplier(supplier);
}
propertiesBuilder = builder;
return this;
}

/**
* Resets previous invocations of {@link #properties(PropertySupplier...)}
* and applies {@link #addProperties(PropertySupplier...)}.
*/
public SubscriptionBuilder properties(PropertySupplier... suppliers) {
propertiesBuilder = ProcessorProperties.builder();
return addProperties(suppliers);
}

/**
* Configure subscription to enable retry processing when {@link DecatonProcessor} requests.
*
Expand Down
38 changes: 22 additions & 16 deletions testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,25 @@

package com.linecorp.decaton.testing;

import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_BIND_CLIENT_METRICS;

import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.PropertySupplier;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.runtime.SubscriptionStateListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -62,6 +72,7 @@ private static Properties defaultProducerProps(String bootstrapServers) {
}

public static final String DEFAULT_GROUP_ID = "test-group";
public static final Duration DEFINITELY_TOO_SLOW = Duration.ofSeconds(20);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add timeouts of 20s in some places of the tests that were blocking forever, for easier debugging

curious why backtrace provided when test itself times out (@Test(timeout = xxx)) isn't sufficient to know at where it stuck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it was not exactly stuck. Producer and Consumer were running and doing nothing.


/**
* A helper to instantiate {@link DecatonClient} for producing protobuf tasks with preset configurations
Expand Down Expand Up @@ -116,7 +127,8 @@ public static Producer<String, DecatonTaskRequest> producer(String bootstrapServ
* @return {@link ProcessorSubscription} instance which is already running with unique subscription id assigned
*/
public static ProcessorSubscription subscription(String bootstrapServers,
Consumer<SubscriptionBuilder> builderConfigurer) {
Consumer<SubscriptionBuilder> builderConfigurer)
throws InterruptedException, TimeoutException {
return subscription("subscription-" + sequence(),
bootstrapServers,
builderConfigurer);
Expand All @@ -133,7 +145,8 @@ public static ProcessorSubscription subscription(String bootstrapServers,
*/
public static ProcessorSubscription subscription(String subscriptionId,
String bootstrapServers,
Consumer<SubscriptionBuilder> builderConfigurer) {
Consumer<SubscriptionBuilder> builderConfigurer)
throws InterruptedException, TimeoutException {
AtomicReference<SubscriptionStateListener> stateListenerRef = new AtomicReference<>();
CountDownLatch initializationLatch = new CountDownLatch(1);
SubscriptionStateListener outerStateListener = state -> {
Expand All @@ -160,14 +173,12 @@ public SubscriptionBuilder stateListener(SubscriptionStateListener stateListener

builderConfigurer.accept(builder);
builder.consumerConfig(props)
.addProperties(StaticPropertySupplier.of(Property.ofStatic(CONFIG_BIND_CLIENT_METRICS, true)))
.stateListener(outerStateListener);
ProcessorSubscription subscription = builder.buildAndStart();

try {
initializationLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
if (!initializationLatch.await(DEFINITELY_TOO_SLOW.toMillis(), TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Initialization did not complete within " + DEFINITELY_TOO_SLOW);
}
return subscription;
}
Expand All @@ -178,7 +189,7 @@ public SubscriptionBuilder stateListener(SubscriptionStateListener stateListener
* @param condition expected condition to be met
*/
public static void awaitCondition(String message,
Supplier<Boolean> condition) {
BooleanSupplier condition) {
awaitCondition(message, condition, Long.MAX_VALUE);
}

Expand All @@ -189,20 +200,15 @@ public static void awaitCondition(String message,
* @param timeoutMillis max duration to wait
*/
public static void awaitCondition(String message,
Supplier<Boolean> condition,
BooleanSupplier condition,
long timeoutMillis) {
long start = System.nanoTime();
while (!condition.get()) {
while (!condition.getAsBoolean()) {
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (elapsedMillis >= timeoutMillis) {
throw new AssertionError(message);
}
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intention of this change? Seems like waiting just 100ms here isn't an actual problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just avoid having to deal with InterruptedException. No strong preference anyways

}
}
}
Loading