Skip to content

Commit

Permalink
Display byte keys with a given key formatter
Browse files Browse the repository at this point in the history
For LoggingContext to print record keys in bytes as human-readable
String, this commit introduces a new interface `KeyFormatter` that
translates keys from bytes into `String`.
This interface has a canonical implementation that reads the byte key as
a UTF-8 byte sequence.

For other places, this commit just prevents the byte keys from being
printed, or prints it as String if it's inside a test case.
  • Loading branch information
ajalab committed Jun 20, 2022
1 parent ae91f56 commit f991510
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.slf4j.MDC;

import com.linecorp.decaton.processor.formatter.KeyFormatter;
import com.linecorp.decaton.processor.runtime.internal.TaskRequest;

/**
Expand All @@ -40,20 +41,20 @@ public class LoggingContext implements AutoCloseable {

private final boolean enabled;

public LoggingContext(boolean enabled, String subscriptionId, TaskRequest request, TaskMetadata metadata) {
public LoggingContext(boolean enabled, String subscriptionId, TaskRequest request, TaskMetadata metadata, KeyFormatter keyFormatter) {
this.enabled = enabled;
if (enabled) {
MDC.put(METADATA_KEY, metadata.toString());
MDC.put(TASK_KEY, String.valueOf(request.key()));
MDC.put(TASK_KEY, keyFormatter.format(request.key()));
MDC.put(SUBSCRIPTION_ID_KEY, subscriptionId);
MDC.put(OFFSET_KEY, String.valueOf(request.recordOffset()));
MDC.put(TOPIC_KEY, request.topicPartition().topic());
MDC.put(PARTITION_KEY, String.valueOf(request.topicPartition().partition()));
}
}

public LoggingContext(String subscriptionId, TaskRequest request, TaskMetadata metadata) {
this(true, subscriptionId, request, metadata);
public LoggingContext(String subscriptionId, TaskRequest request, TaskMetadata metadata, KeyFormatter keyFormatter) {
this(true, subscriptionId, request, metadata, keyFormatter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.formatter;

import java.nio.charset.StandardCharsets;

@FunctionalInterface
public interface KeyFormatter {
KeyFormatter DEFAULT = key -> new String(key, StandardCharsets.UTF_8);

String format(byte[] key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.linecorp.decaton.common.Deserializer;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.formatter.KeyFormatter;
import com.linecorp.decaton.processor.runtime.internal.DecatonProcessorSupplierImpl;
import com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor;
import com.linecorp.decaton.processor.runtime.internal.Processors;
Expand All @@ -42,6 +43,7 @@ public class ProcessorsBuilder<T> {
private final TaskExtractor<T> retryTaskExtractor;

private final List<DecatonProcessorSupplier<T>> suppliers;
private KeyFormatter keyFormatter = KeyFormatter.DEFAULT;

public ProcessorsBuilder(String topic, TaskExtractor<T> taskExtractor, TaskExtractor<T> retryTaskExtractor) {
this.topic = topic;
Expand Down Expand Up @@ -130,7 +132,17 @@ public ProcessorsBuilder<T> thenProcess(DecatonProcessor<T> processor) {
return thenProcess(new DecatonProcessorSupplierImpl<>(() -> processor, ProcessorScope.PROVIDED));
}

/**
* Configure a key formatter that translates record keys from bytes into String for logging purpose.
* @param keyFormatter a {@link KeyFormatter}, function that maps bytes into String
* @return updated instance of {@link ProcessorsBuilder}.
*/
public ProcessorsBuilder<T> setKeyFormatter(KeyFormatter keyFormatter) {
this.keyFormatter = keyFormatter;
return this;
}

public Processors<T> build(DecatonProcessorSupplier<byte[]> retryProcessorSupplier) {
return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor);
return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor, keyFormatter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@
import java.util.Collections;
import java.util.List;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.Completion;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.LoggingContext;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.processor.formatter.KeyFormatter;
import com.linecorp.decaton.processor.metrics.Metrics;
import com.linecorp.decaton.processor.metrics.Metrics.ProcessMetrics;
import com.linecorp.decaton.processor.metrics.Metrics.TaskMetrics;
import com.linecorp.decaton.processor.LoggingContext;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.processor.runtime.internal.Utils.Timer;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -41,6 +42,7 @@ public class ProcessPipeline<T> implements AutoCloseable {
private final List<DecatonProcessor<T>> processors;
private final DecatonProcessor<byte[]> retryProcessor;
private final TaskExtractor<T> taskExtractor;
private final KeyFormatter keyFormatter;
private final ExecutionScheduler scheduler;
private final TaskMetrics taskMetrics;
private final ProcessMetrics processMetrics;
Expand All @@ -51,13 +53,15 @@ public class ProcessPipeline<T> implements AutoCloseable {
List<DecatonProcessor<T>> processors,
DecatonProcessor<byte[]> retryProcessor,
TaskExtractor<T> taskExtractor,
KeyFormatter keyFormatter,
ExecutionScheduler scheduler,
Metrics metrics,
Clock clock) {
this.scope = scope;
this.processors = Collections.unmodifiableList(processors);
this.retryProcessor = retryProcessor;
this.taskExtractor = taskExtractor;
this.keyFormatter = keyFormatter;
this.scheduler = scheduler;
this.clock = clock;

Expand All @@ -69,9 +73,10 @@ public ProcessPipeline(ThreadScope scope,
List<DecatonProcessor<T>> processors,
DecatonProcessor<byte[]> retryProcessor,
TaskExtractor<T> taskExtractor,
KeyFormatter keyFormatter,
ExecutionScheduler scheduler,
Metrics metrics) {
this(scope, processors, retryProcessor, taskExtractor, scheduler, metrics, Clock.systemDefaultZone());
this(scope, processors, retryProcessor, taskExtractor, keyFormatter, scheduler, metrics, Clock.systemDefaultZone());
}

public void scheduleThenProcess(TaskRequest request) throws InterruptedException {
Expand Down Expand Up @@ -119,7 +124,7 @@ DecatonTask<T> extract(TaskRequest request) {
Completion process(TaskRequest request, DecatonTask<T> task) throws InterruptedException {
ProcessingContext<T> context =
new ProcessingContextImpl<>(scope.subscriptionId(), request, task, processors, retryProcessor,
scope.props());
keyFormatter, scope.props());

Timer timer = Utils.timer();
Completion processResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@

import org.apache.kafka.common.header.Headers;

import com.linecorp.decaton.processor.Completion;
import com.linecorp.decaton.processor.Completion.TimeoutChoice;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.LoggingContext;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.Completion;
import com.linecorp.decaton.processor.Completion.TimeoutChoice;
import com.linecorp.decaton.processor.formatter.KeyFormatter;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.tracing.TracingProvider.ProcessorTraceHandle;
Expand All @@ -44,6 +45,7 @@ public class ProcessingContextImpl<T> implements ProcessingContext<T> {
private final DecatonTask<T> task;
private final List<DecatonProcessor<T>> downstreams;
private final DecatonProcessor<byte[]> retryQueueingProcessor;
private final KeyFormatter keyFormatter;
private final ProcessorProperties props;
private final AtomicReference<CompletionImpl> deferredCompletion;

Expand All @@ -52,12 +54,14 @@ public ProcessingContextImpl(String subscriptionId,
DecatonTask<T> task,
List<DecatonProcessor<T>> downstreams,
DecatonProcessor<byte[]> retryQueueingProcessor,
KeyFormatter keyFormatter,
ProcessorProperties props) {
this.subscriptionId = subscriptionId;
this.request = request;
this.task = task;
this.downstreams = Collections.unmodifiableList(downstreams);
this.retryQueueingProcessor = retryQueueingProcessor;
this.keyFormatter = keyFormatter;
this.props = props;
deferredCompletion = new AtomicReference<>();
}
Expand All @@ -80,7 +84,7 @@ public Headers headers() {
@Override
public LoggingContext loggingContext() {
boolean enabled = props.get(ProcessorProperties.CONFIG_LOGGING_MDC_ENABLED).value();
return new LoggingContext(enabled, subscriptionId, request, task.metadata());
return new LoggingContext(enabled, subscriptionId, request, task.metadata(), keyFormatter);
}

@Override
Expand Down Expand Up @@ -114,7 +118,7 @@ private <P> Completion pushDownStream(List<DecatonProcessor<P>> downstreams, P t
"Exception from tracing", NoopTrace.INSTANCE);
ProcessingContextImpl<P> nextContext = new ProcessingContextImpl<>(
subscriptionId, request, task, downstreams.subList(1, downstreams.size()),
retryQueueingProcessor, props);
retryQueueingProcessor, keyFormatter, props);

CompletionImpl completion;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.formatter.KeyFormatter;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.processor.metrics.Metrics;
import com.linecorp.decaton.processor.runtime.DecatonProcessorSupplier;
Expand All @@ -36,15 +38,18 @@ public class Processors<T> {
private final DecatonProcessorSupplier<byte[]> retryProcessorSupplier;
private final TaskExtractor<T> taskExtractor;
private final TaskExtractor<T> retryTaskExtractor;
private final KeyFormatter keyFormatter;

public Processors(List<DecatonProcessorSupplier<T>> suppliers,
DecatonProcessorSupplier<byte[]> retryProcessorSupplier,
TaskExtractor<T> taskExtractor,
TaskExtractor<T> retryTaskExtractor) {
TaskExtractor<T> retryTaskExtractor,
KeyFormatter keyFormatter) {
this.suppliers = Collections.unmodifiableList(suppliers);
this.retryProcessorSupplier = retryProcessorSupplier;
this.taskExtractor = taskExtractor;
this.retryTaskExtractor = retryTaskExtractor;
this.keyFormatter = keyFormatter;
}

private DecatonProcessor<byte[]> retryProcessor(ThreadScope scope) {
Expand Down Expand Up @@ -78,7 +83,7 @@ public ProcessPipeline<T> newPipeline(ThreadScope scope,
scope.threadId()))
.collect(Collectors.toList());
logger.info("Creating partition processor core: {}", scope);
return new ProcessPipeline<>(scope, processors, retryProcessor, taskExtractor, scheduler, metrics);
return new ProcessPipeline<>(scope, processors, retryProcessor, taskExtractor, keyFormatter, scheduler, metrics);
} catch (RuntimeException e) {
// If exception occurred in the middle of instantiating processors, we have to make sure
// all the previously created processors are destroyed before bubbling up the exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class TaskRequest {
private final TopicPartition topicPartition;
private final long recordOffset;
private final OffsetState offsetState;
@ToString.Exclude
private final byte[] key;
@ToString.Exclude
private final Headers headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.formatter.KeyFormatter;
import com.linecorp.decaton.processor.processors.CompactionProcessor.CompactChoice;
import com.linecorp.decaton.processor.processors.CompactionProcessor.CompactingTask;
import com.linecorp.decaton.processor.runtime.DecatonTask;
Expand Down Expand Up @@ -106,7 +107,7 @@ private TaskInput put(DecatonProcessor<HelloTask> processor,
ProcessingContext<HelloTask> context =
spy(new ProcessingContextImpl<>("subscription", request, task,
Arrays.asList(processor, downstream), null,
ProcessorProperties.builder().build()));
KeyFormatter.DEFAULT, ProcessorProperties.builder().build()));

if (beforeProcess != null) {
beforeProcess.accept(taskData, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.linecorp.decaton.processor.DeferredCompletion;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.formatter.KeyFormatter;
import com.linecorp.decaton.processor.metrics.Metrics;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.DynamicProperty;
Expand Down Expand Up @@ -118,7 +119,8 @@ public void setUp() {
completionTimeoutMsProp.set(100L);
doReturn(10L).when(clock).millis();
pipeline = spy(new ProcessPipeline<>(
scope, Collections.singletonList(processorMock), null, extractorMock, schedulerMock, METRICS, clock));
scope, Collections.singletonList(processorMock), null, extractorMock, KeyFormatter.DEFAULT,
schedulerMock, METRICS, clock));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.Completion;
import com.linecorp.decaton.processor.Completion.TimeoutChoice;
import com.linecorp.decaton.processor.formatter.KeyFormatter;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.tracing.TestTraceHandle;
Expand Down Expand Up @@ -116,7 +117,7 @@ private static ProcessingContextImpl<HelloTask> context(RecordTraceHandle traceH
DecatonTask<HelloTask> task = new DecatonTask<>(
TaskMetadata.fromProto(REQUEST.getMetadata()), TASK, TASK.toByteArray());
return new ProcessingContextImpl<>("subscription", request, task, Arrays.asList(processors),
null, ProcessorProperties.builder().build());
null, KeyFormatter.DEFAULT, ProcessorProperties.builder().build());
}

private static void safeAwait(CountDownLatch latch) {
Expand Down Expand Up @@ -368,7 +369,7 @@ public void process(ProcessingContext<byte[]> context, byte[] task)

ProcessingContextImpl<byte[]> context =
spy(new ProcessingContextImpl<>("subscription", request, task,
Collections.emptyList(), retryProcessor,
Collections.emptyList(), retryProcessor, KeyFormatter.DEFAULT,
ProcessorProperties.builder().build()));

Completion retryComp = context.retry();
Expand Down Expand Up @@ -421,7 +422,7 @@ public void process(ProcessingContext<byte[]> context, byte[] task)

ProcessingContextImpl<byte[]> context =
spy(new ProcessingContextImpl<>("subscription", request, task,
Arrays.asList(processor), retryProcessor,
Arrays.asList(processor), retryProcessor, KeyFormatter.DEFAULT,
ProcessorProperties.builder().build()));

Completion comp = context.push(new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import com.linecorp.decaton.processor.formatter.KeyFormatter;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.DecatonProcessorSupplier;
import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void testCleanupPartiallyInitializedProcessors() {
Processors<HelloTask> processors = new Processors<>(
suppliers, null,
new DefaultTaskExtractor<>(bytes -> HelloTask.getDefaultInstance()),
null);
null, KeyFormatter.DEFAULT);

doThrow(new RuntimeException("exception")).when(suppliers.get(2)).getProcessor(any(), any(), anyInt());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.linecorp.decaton.processor.DecatonProcessor;

import lombok.ToString;
import lombok.Value;
import lombok.experimental.Accessors;

Expand All @@ -30,6 +31,7 @@ public class ProcessedRecord {
/**
* Key of the task
*/
@ToString.Exclude
byte[] key;
/**
* Processed task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.Matchers.lessThan;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -54,7 +55,8 @@ public void doAssert() {
ProcessedRecord prev = perKeyRecords.get(i - 1);
ProcessedRecord current = perKeyRecords.get(i);

assertThat("Process time shouldn't overlap. key: " + entry.getKey(),
String key = StandardCharsets.UTF_8.decode(entry.getKey()).toString();
assertThat("Process time shouldn't overlap. key: " + key,
prev.endTimeNanos(), lessThan(current.startTimeNanos()));
}
}
Expand Down

0 comments on commit f991510

Please sign in to comment.