Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to bind KafkaClientMetrics for each decaton consumer #132

Merged
merged 7 commits into from
Dec 8, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class CoreFunctionalityTest {
public RandomRule randomRule = new RandomRule();

@Test(timeout = 30000)
public void testProcessConcurrent() {
public void testProcessConcurrent() throws Exception {
Random rand = randomRule.random();
ProcessorTestSuite
.builder(rule)
Expand All @@ -65,7 +65,7 @@ public void testProcessConcurrent() {
}

@Test(timeout = 30000)
public void testProcessConcurrent_PartitionScopeProcessor() {
public void testProcessConcurrent_PartitionScopeProcessor() throws Exception {
Random rand = randomRule.random();
ProcessorTestSuite
.builder(rule)
Expand All @@ -80,7 +80,7 @@ public void testProcessConcurrent_PartitionScopeProcessor() {
}

@Test(timeout = 30000)
public void testProcessConcurrent_ThreadScopeProcessor() {
public void testProcessConcurrent_ThreadScopeProcessor() throws Exception {
Random rand = randomRule.random();
ProcessorTestSuite
.builder(rule)
Expand All @@ -95,7 +95,7 @@ public void testProcessConcurrent_ThreadScopeProcessor() {
}

@Test(timeout = 30000)
public void testAsyncTaskCompletion() {
public void testAsyncTaskCompletion() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(16);
Random rand = randomRule.random();
ProcessorTestSuite
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testAsyncTaskCompletion() {
* heap pressure by holding entire ProcessingContext instance.
*/
@Test(timeout = 30000)
public void testGetCompletionInstanceLater() {
public void testGetCompletionInstanceLater() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(16);
Random rand = randomRule.random();
ProcessorTestSuite
Expand All @@ -149,7 +149,7 @@ public void testGetCompletionInstanceLater() {
}

@Test(timeout = 60000)
public void testSingleThreadProcessing() {
public void testSingleThreadProcessing() throws Exception {
// Note that this processing semantics is not be considered as Decaton specification which users can rely on.
// Rather, this is just a expected behavior based on current implementation when we set concurrency to 1.
ProcessingGuarantee noDuplicates = new ProcessingGuarantee() {
Expand Down Expand Up @@ -189,7 +189,7 @@ public void doAssert() {
}

@Test(timeout = 30000)
public void testAsyncCompletionWithLeakAndTimeout() {
public void testAsyncCompletionWithLeakAndTimeout() throws Exception {
Random rand = randomRule.random();
ProcessorTestSuite
.builder(rule)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testRetryQueuingExtractingWithDefaultMeta() throws Exception {
}

@Test(timeout = 60000)
public void testRetryQueueingFromCompletionTimeoutCallback() {
public void testRetryQueueingFromCompletionTimeoutCallback() throws Exception {
ProcessorTestSuite
.builder(rule)
.numTasks(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SubscriptionStateTest {
public static KafkaClusterRule rule = new KafkaClusterRule();

@Test(timeout = 30000)
public void testStateTransition() {
public void testStateTransition() throws Exception {
Map<Integer, List<State>> subscriptionStates = new HashMap<>();
ProcessorTestSuite
.builder(rule)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
import com.linecorp.decaton.protocol.Sample.HelloTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.Consumer;

import com.linecorp.decaton.processor.metrics.internal.AvailableTags;

import io.micrometer.core.instrument.Counter;
Expand All @@ -33,6 +35,7 @@
import io.micrometer.core.instrument.Meter.Id;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;

Expand Down Expand Up @@ -99,6 +102,23 @@ public void close() {
}

public class SubscriptionMetrics extends AbstractMetrics {
volatile KafkaClientMetrics kafkaClientMetrics;

public void bindClientMetrics(Consumer<String, byte[]> consumer) {
if (kafkaClientMetrics != null) {
closeClientMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case would this be effective?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, none. Indeed SubscriptionMetrics are only instantiated in ProcessorSubscription ctor. Removing

}
kafkaClientMetrics = new KafkaClientMetrics(consumer, availableTags.subscriptionScope());
kafkaClientMetrics.bindTo(registry);
}

private void closeClientMetrics() {
if (kafkaClientMetrics != null) {
kafkaClientMetrics.close();
kafkaClientMetrics = null;
}
}

private Timer processDuration(String section) {
return meter(() -> Timer.builder("subscription.process.durations")
.description(String.format(
Expand All @@ -118,6 +138,12 @@ private Timer processDuration(String section) {
public final Timer handlePausesTime = processDuration("pause");

public final Timer commitOffsetTime = processDuration("commit");

@Override
public void close() {
super.close();
closeClientMetrics();
}
}

public class TaskMetrics extends AbstractMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ public class ProcessorProperties extends AbstractDecatonProperties {
public static final PropertyDefinition<Boolean> CONFIG_LOGGING_MDC_ENABLED =
PropertyDefinition.define("decaton.logging.mdc.enabled", Boolean.class, true,
v -> v instanceof Boolean);
/**
* Controls whether to enable or disable binding Micrometer's KafkaClientMetrics to decaton consumers.
* This is disabled for backwards compatiblity, but recommended if you rely on Micrometer
* since JMX metrics are deprecated. The downside is a possible increase in metrics count.
*
* Reloadable: no
*/
public static final PropertyDefinition<Boolean> CONFIG_BIND_CLIENT_METRICS =
PropertyDefinition.define("decaton.client.metrics.micrometer.bound", Boolean.class, false,
v -> v instanceof Boolean);
/**
* Control time to "timeout" a deferred completion.
* Decaton allows {@link DecatonProcessor}s to defer completion of a task by calling
Expand Down Expand Up @@ -174,6 +184,7 @@ public class ProcessorProperties extends AbstractDecatonProperties {
CONFIG_GROUP_REBALANCE_TIMEOUT_MS,
CONFIG_SHUTDOWN_TIMEOUT_MS,
CONFIG_LOGGING_MDC_ENABLED,
CONFIG_BIND_CLIENT_METRICS,
CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS));

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.linecorp.decaton.processor.runtime;

import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_BIND_CLIENT_METRICS;

import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
Expand Down Expand Up @@ -140,6 +142,9 @@ public void receive(ConsumerRecord<String, byte[]> record) {
metrics = Metrics.withTags("subscription", scope.subscriptionId()).new SubscriptionMetrics();

Consumer<String, byte[]> consumer = consumerSupplier.get();
if (props.get(CONFIG_BIND_CLIENT_METRICS).value()) {
metrics.bindClientMetrics(consumer);
}
consumeManager = new ConsumeManager(consumer, contexts, new Handler(), metrics);
commitManager = new CommitManager(
consumer, props.get(ProcessorProperties.CONFIG_COMMIT_INTERVAL_MS), contexts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public class SubscriptionBuilder {
presetRetryProducerConfig.put(ProducerConfig.LINGER_MS_CONFIG, "100");
}

@Setter(AccessLevel.NONE)
private ProcessorProperties.Builder<ProcessorProperties> propertiesBuilder;
private final ProcessorProperties.Builder<ProcessorProperties> propertiesBuilder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid overwriting properties in SubscriptionBuilder - I can't see any good reason to do it

The intention is to provide a way to "clear" (reset) something set previously. So basically these methods taking collection of items replaces existing set rather than adding.
To keep this behavior consistent to other methods like consumerConfig and backward, I think we shouldn't change this.
I'm happy to add another kind of method like addProperties() (addProperty()) which supports it and being more explicit tho.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's what one would expect of a builder, but happy to oblige with your design constraints

ProcessorProperties.builder();

/**
* A unique identifier for this subscription. This ID is used mainly for identifying logs, metrics and
Expand Down Expand Up @@ -93,7 +93,6 @@ public class SubscriptionBuilder {

public SubscriptionBuilder(String subscriptionId) {
this.subscriptionId = Objects.requireNonNull(subscriptionId, "subscriptionId");
propertiesBuilder = ProcessorProperties.builder();
}

public static SubscriptionBuilder newBuilder(String subscriptionId) {
Expand All @@ -111,11 +110,9 @@ public static SubscriptionBuilder newBuilder(String subscriptionId) {
* @return updated instance of {@link SubscriptionBuilder}.
*/
public SubscriptionBuilder properties(PropertySupplier... suppliers) {
ProcessorProperties.Builder<ProcessorProperties> builder = ProcessorProperties.builder();
for (PropertySupplier supplier : suppliers) {
builder.setBySupplier(supplier);
propertiesBuilder.setBySupplier(supplier);
}
propertiesBuilder = builder;
return this;
}

Expand Down
38 changes: 22 additions & 16 deletions testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,25 @@

package com.linecorp.decaton.testing;

import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_BIND_CLIENT_METRICS;

import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.PropertySupplier;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.runtime.SubscriptionStateListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -62,6 +72,7 @@ private static Properties defaultProducerProps(String bootstrapServers) {
}

public static final String DEFAULT_GROUP_ID = "test-group";
public static final Duration DEFINITELY_TOO_SLOW = Duration.ofSeconds(20);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add timeouts of 20s in some places of the tests that were blocking forever, for easier debugging

curious why backtrace provided when test itself times out (@Test(timeout = xxx)) isn't sufficient to know at where it stuck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it was not exactly stuck. Producer and Consumer were running and doing nothing.


/**
* A helper to instantiate {@link DecatonClient} for producing protobuf tasks with preset configurations
Expand Down Expand Up @@ -116,7 +127,8 @@ public static Producer<String, DecatonTaskRequest> producer(String bootstrapServ
* @return {@link ProcessorSubscription} instance which is already running with unique subscription id assigned
*/
public static ProcessorSubscription subscription(String bootstrapServers,
Consumer<SubscriptionBuilder> builderConfigurer) {
Consumer<SubscriptionBuilder> builderConfigurer)
throws InterruptedException, TimeoutException {
return subscription("subscription-" + sequence(),
bootstrapServers,
builderConfigurer);
Expand All @@ -133,7 +145,8 @@ public static ProcessorSubscription subscription(String bootstrapServers,
*/
public static ProcessorSubscription subscription(String subscriptionId,
String bootstrapServers,
Consumer<SubscriptionBuilder> builderConfigurer) {
Consumer<SubscriptionBuilder> builderConfigurer)
throws InterruptedException, TimeoutException {
AtomicReference<SubscriptionStateListener> stateListenerRef = new AtomicReference<>();
CountDownLatch initializationLatch = new CountDownLatch(1);
SubscriptionStateListener outerStateListener = state -> {
Expand All @@ -160,14 +173,12 @@ public SubscriptionBuilder stateListener(SubscriptionStateListener stateListener

builderConfigurer.accept(builder);
builder.consumerConfig(props)
.properties(StaticPropertySupplier.of(Property.ofStatic(CONFIG_BIND_CLIENT_METRICS, true)))
.stateListener(outerStateListener);
ProcessorSubscription subscription = builder.buildAndStart();

try {
initializationLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
if (!initializationLatch.await(DEFINITELY_TOO_SLOW.toMillis(), TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Initialization did not complete within " + DEFINITELY_TOO_SLOW);
}
return subscription;
}
Expand All @@ -178,7 +189,7 @@ public SubscriptionBuilder stateListener(SubscriptionStateListener stateListener
* @param condition expected condition to be met
*/
public static void awaitCondition(String message,
Supplier<Boolean> condition) {
BooleanSupplier condition) {
awaitCondition(message, condition, Long.MAX_VALUE);
}

Expand All @@ -189,20 +200,15 @@ public static void awaitCondition(String message,
* @param timeoutMillis max duration to wait
*/
public static void awaitCondition(String message,
Supplier<Boolean> condition,
BooleanSupplier condition,
long timeoutMillis) {
long start = System.nanoTime();
while (!condition.get()) {
while (!condition.getAsBoolean()) {
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (elapsedMillis >= timeoutMillis) {
throw new AssertionError(message);
}
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intention of this change? Seems like waiting just 100ms here isn't an actual problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just avoid having to deal with InterruptedException. No strong preference anyways

}
}
}
Loading