From 0db74ac9efc484c2c40d9743eec4f32ad3b2d1be Mon Sep 17 00:00:00 2001 From: Yuta Imazu Date: Fri, 16 Dec 2022 11:23:38 +0900 Subject: [PATCH 1/3] Add retry count metric --- .../com/linecorp/decaton/processor/metrics/Metrics.java | 7 +++++++ .../internal/DecatonTaskRetryQueueingProcessor.java | 4 +++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java b/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java index e8e09a8f..d516441d 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java @@ -30,6 +30,7 @@ import com.linecorp.decaton.processor.metrics.internal.AvailableTags; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.Meter.Id; @@ -261,6 +262,12 @@ public class RetryMetrics extends AbstractMetrics { .description("The number of tasks failed to enqueue in retry topic") .tags(availableTags.subscriptionScope()) .register(registry)); + + public final DistributionSummary retryCount = + meter(() -> DistributionSummary.builder("retry.count") + .description("The number of times a task was retried") + .tags(availableTags.subscriptionScope()) + .register(registry)); } public static Metrics withTags(String... keyValues) { diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java index d2e4bf73..1977860b 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java @@ -52,10 +52,11 @@ public DecatonTaskRetryQueueingProcessor(SubscriptionScope scope, DecatonTaskPro public void process(ProcessingContext context, byte[] serializedTask) throws InterruptedException { TaskMetadata originalMeta = context.metadata(); + long nextRetryCount = originalMeta.retryCount() + 1; long nextTryTimeMillis = System.currentTimeMillis() + backoff.toMillis(); TaskMetadataProto taskMetadata = TaskMetadataProto.newBuilder(originalMeta.toProto()) - .setRetryCount(originalMeta.retryCount() + 1) + .setRetryCount(nextRetryCount) .setScheduledTimeMillis(nextTryTimeMillis) .build(); DecatonTaskRequest request = @@ -71,6 +72,7 @@ public void process(ProcessingContext context, byte[] serializedTask) } else { metrics.retryQueueingFailed.increment(); } + metrics.retryCount.record(nextRetryCount); }); context.deferCompletion().completeWith(future); } From edc1b444b4346dcd188e9fe2be9f5d095fb569bc Mon Sep 17 00:00:00 2001 From: Yuta Imazu Date: Fri, 16 Dec 2022 11:47:21 +0900 Subject: [PATCH 2/3] Rename metric --- .../java/com/linecorp/decaton/processor/metrics/Metrics.java | 4 ++-- .../runtime/internal/DecatonTaskRetryQueueingProcessor.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java b/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java index d516441d..ff2a43d9 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/metrics/Metrics.java @@ -263,8 +263,8 @@ public class RetryMetrics extends AbstractMetrics { .tags(availableTags.subscriptionScope()) .register(registry)); - public final DistributionSummary retryCount = - meter(() -> DistributionSummary.builder("retry.count") + public final DistributionSummary retryTaskRetries = + meter(() -> DistributionSummary.builder("retry.task.retries") .description("The number of times a task was retried") .tags(availableTags.subscriptionScope()) .register(registry)); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java index 1977860b..2fc56316 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java @@ -72,7 +72,7 @@ public void process(ProcessingContext context, byte[] serializedTask) } else { metrics.retryQueueingFailed.increment(); } - metrics.retryCount.record(nextRetryCount); + metrics.retryTaskRetries.record(nextRetryCount); }); context.deferCompletion().completeWith(future); } From 58e93c5b33e5cbf0ce585b723aaaa018576a2dfe Mon Sep 17 00:00:00 2001 From: Yuta Imazu Date: Fri, 16 Dec 2022 15:16:42 +0900 Subject: [PATCH 3/3] Update metric before sending request --- .../runtime/internal/DecatonTaskRetryQueueingProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java index 2fc56316..917a322c 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java @@ -64,6 +64,7 @@ public void process(ProcessingContext context, byte[] serializedTask) .setMetadata(taskMetadata) .setSerializedTask(ByteString.copyFrom(serializedTask)) .build(); + metrics.retryTaskRetries.record(nextRetryCount); CompletableFuture future = producer.sendRequest(context.key(), request); future.whenComplete((r, e) -> { @@ -72,7 +73,6 @@ public void process(ProcessingContext context, byte[] serializedTask) } else { metrics.retryQueueingFailed.increment(); } - metrics.retryTaskRetries.record(nextRetryCount); }); context.deferCompletion().completeWith(future); }