Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce some object allocations that are identified as expensive by allocation profiling #104

Merged
merged 3 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,33 @@ public class LoggingContext implements AutoCloseable {
public static final String SUBSCRIPTION_ID_KEY = "dt_subscription_id";
public static final String TASK_KEY = "dt_task_key";

private final boolean enabled;

public LoggingContext(boolean enabled, String subscriptionId, TaskRequest request, TaskMetadata metadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to keep current constructor to avoid unnecessary breaking change since this class is considered as public API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Let me follow that.

this.enabled = enabled;
if (enabled) {
MDC.put(METADATA_KEY, metadata.toString());
MDC.put(TASK_KEY, String.valueOf(request.key()));
MDC.put(SUBSCRIPTION_ID_KEY, subscriptionId);
MDC.put(OFFSET_KEY, String.valueOf(request.recordOffset()));
MDC.put(TOPIC_KEY, request.topicPartition().topic());
MDC.put(PARTITION_KEY, String.valueOf(request.topicPartition().partition()));
}
}

public LoggingContext(String subscriptionId, TaskRequest request, TaskMetadata metadata) {
MDC.put(METADATA_KEY, metadata.toString());
MDC.put(TASK_KEY, String.valueOf(request.key()));
MDC.put(SUBSCRIPTION_ID_KEY, subscriptionId);
MDC.put(OFFSET_KEY, String.valueOf(request.recordOffset()));
MDC.put(TOPIC_KEY, request.topicPartition().topic());
MDC.put(PARTITION_KEY, String.valueOf(request.topicPartition().partition()));
this(true, subscriptionId, request, metadata);
}

@Override
public void close() {
MDC.remove(OFFSET_KEY);
MDC.remove(TOPIC_KEY);
MDC.remove(TASK_KEY);
MDC.remove(PARTITION_KEY);
MDC.remove(METADATA_KEY);
MDC.remove(SUBSCRIPTION_ID_KEY);
if (enabled) {
MDC.remove(OFFSET_KEY);
MDC.remove(TOPIC_KEY);
MDC.remove(TASK_KEY);
MDC.remove(PARTITION_KEY);
MDC.remove(METADATA_KEY);
MDC.remove(SUBSCRIPTION_ID_KEY);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.List;
import java.util.Map;

import org.slf4j.MDC;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.runtime.internal.AbstractDecatonProperties;
import com.linecorp.decaton.processor.runtime.internal.OutOfOrderCommitControl;
Expand Down Expand Up @@ -123,6 +125,18 @@ public class ProcessorProperties extends AbstractDecatonProperties {
PropertyDefinition.define("decaton.processing.shutdown.timeout.ms", Long.class, 0L,
v -> v instanceof Long && (Long) v >= 0);

/**
* Control whether to enable or disable decaton specific information store in slf4j's {@link MDC}.
* This option is enabled by default, but it is known to cause some object allocations which could become
* a problem in massive scale traffic. This option intend to provide an option for users to disable MDC
* properties where not necessary to reduce GC pressure.
*
* Reloadable: yes
*/
public static final PropertyDefinition<Boolean> CONFIG_LOGGING_MDC_ENABLED =
PropertyDefinition.define("decaton.logging.mdc.enabled", Boolean.class, true,
v -> v instanceof Boolean);

public static final List<PropertyDefinition<?>> PROPERTY_DEFINITIONS =
Collections.unmodifiableList(Arrays.asList(
CONFIG_IGNORE_KEYS,
Expand All @@ -131,7 +145,8 @@ public class ProcessorProperties extends AbstractDecatonProperties {
CONFIG_MAX_PENDING_RECORDS,
CONFIG_COMMIT_INTERVAL_MS,
CONFIG_GROUP_REBALANCE_TIMEOUT_MS,
CONFIG_SHUTDOWN_TIMEOUT_MS));
CONFIG_SHUTDOWN_TIMEOUT_MS,
CONFIG_LOGGING_MDC_ENABLED));

/**
* Find and return a {@link PropertyDefinition} from its name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.DeferredCompletion;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.processor.metrics.Metrics;
import com.linecorp.decaton.processor.metrics.Metrics.ProcessMetrics;
Expand Down Expand Up @@ -105,7 +106,8 @@ DecatonTask<T> extract(TaskRequest request) {
// visible for testing
CompletableFuture<Void> process(TaskRequest request, DecatonTask<T> task) throws InterruptedException {
ProcessingContext<T> context =
new ProcessingContextImpl<>(scope.subscriptionId(), request, task, processors, retryProcessor);
new ProcessingContextImpl<>(scope.subscriptionId(), request, task, processors, retryProcessor,
scope.props());

Timer timer = Utils.timer();
CompletableFuture<Void> processResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import org.apache.kafka.common.header.Headers;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.DeferredCompletion;
import com.linecorp.decaton.processor.LoggingContext;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.tracing.TracingProvider.ProcessorTraceHandle;
import com.linecorp.decaton.processor.tracing.TracingProvider.RecordTraceHandle;
import com.linecorp.decaton.processor.LoggingContext;
import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider.NoopTrace;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -43,27 +44,31 @@ public class ProcessingContextImpl<T> implements ProcessingContext<T> {
private final DeferredCompletion completion;
private final List<DecatonProcessor<T>> downstreams;
private final DecatonProcessor<byte[]> retryQueueingProcessor;
private final ProcessorProperties props;
AtomicBoolean completionDeferred;

public ProcessingContextImpl(String subscriptionId,
TaskRequest request,
DecatonTask<T> task,
DeferredCompletion completion,
List<DecatonProcessor<T>> downstreams,
DecatonProcessor<byte[]> retryQueueingProcessor) {
DecatonProcessor<byte[]> retryQueueingProcessor,
ProcessorProperties props,
DeferredCompletion completion) {
this.subscriptionId = subscriptionId;
this.request = request;
this.task = task;
this.completion = completion;
this.downstreams = Collections.unmodifiableList(downstreams);
this.retryQueueingProcessor = retryQueueingProcessor;
this.props = props;
completionDeferred = new AtomicBoolean();
}

public ProcessingContextImpl(String subscriptionId, TaskRequest request, DecatonTask<T> task,
List<DecatonProcessor<T>> downstreams,
DecatonProcessor<byte[]> retryQueueingProcessor) {
this(subscriptionId, request, task, null, downstreams, retryQueueingProcessor);
DecatonProcessor<byte[]> retryQueueingProcessor,
ProcessorProperties props) {
this(subscriptionId, request, task, downstreams, retryQueueingProcessor, props, null);
}

@Override
Expand All @@ -83,7 +88,8 @@ public Headers headers() {

@Override
public LoggingContext loggingContext() {
return new LoggingContext(subscriptionId, request, task.metadata());
boolean enabled = props.get(ProcessorProperties.CONFIG_LOGGING_MDC_ENABLED).value();
return new LoggingContext(enabled, subscriptionId, request, task.metadata());
}

@Override
Expand Down Expand Up @@ -121,8 +127,9 @@ private <P> CompletableFuture<Void> pushDownStream(List<DecatonProcessor<P>> dow
}
};
ProcessingContextImpl<P> nextContext = new ProcessingContextImpl<>(
subscriptionId, request, task, nextCompletion,
downstreams.subList(1, downstreams.size()), retryQueueingProcessor);
subscriptionId, request, task, downstreams.subList(1, downstreams.size()),
retryQueueingProcessor, props, nextCompletion
);

try {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class TaskRequest {
private final long recordOffset;
private final DeferredCompletion completion;
private final String key;
private final String id;
@ToString.Exclude
private final Headers headers;
@ToString.Exclude
Expand All @@ -56,12 +55,15 @@ public TaskRequest(TopicPartition topicPartition,
this.headers = headers;
this.trace = trace;
this.rawRequestBytes = rawRequestBytes;
}

StringBuilder idBuilder = new StringBuilder();
idBuilder.append("topic=").append(topicPartition.topic());
idBuilder.append(" partition=").append(topicPartition.partition());
idBuilder.append(" offset=").append(recordOffset);
id = idBuilder.toString();
public String id() {
// TaskRequest object is held alive through associated ProcessingContext's lifetime, hence holding
// any value as its field makes memory occupation worse. Since this ID field is rarely used (typically
// when trace level logging is enabled), it is better to take short lived object allocation and cpu cost
// rather than building it once and cache as an object field.
return "topic=" + topicPartition.topic() + " partition=" + topicPartition.partition() +
" offset=" + recordOffset;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.DeferredCompletion;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.processors.CompactionProcessor.CompactChoice;
import com.linecorp.decaton.processor.processors.CompactionProcessor.CompactingTask;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.internal.ProcessingContextImpl;
import com.linecorp.decaton.processor.runtime.internal.TaskRequest;
import com.linecorp.decaton.protocol.Sample.HelloTask;
Expand Down Expand Up @@ -109,9 +109,9 @@ private TaskInput put(DecatonProcessor<HelloTask> processor,
TaskRequest request = new TaskRequest(
new TopicPartition("topic", 1), 1, null, name, null, null, null);
ProcessingContext<HelloTask> context =
Mockito.spy(new ProcessingContextImpl<>("subscription", request, task, completion,
Collections.singletonList(downstream),
null));
spy(new ProcessingContextImpl<>("subscription", request, task,
Collections.singletonList(downstream), null,
ProcessorProperties.builder().build(), completion));

TaskInput input = new TaskInput(taskData, completion, context);
if (beforeProcess != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.linecorp.decaton.processor.DeferredCompletion;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.tracing.TestTraceHandle;
import com.linecorp.decaton.processor.tracing.TestTracingProvider;
import com.linecorp.decaton.processor.tracing.TracingProvider.RecordTraceHandle;
Expand Down Expand Up @@ -117,7 +118,8 @@ private static ProcessingContextImpl<HelloTask> context(RecordTraceHandle traceH
null, traceHandle, REQUEST.toByteArray());
DecatonTask<HelloTask> task = new DecatonTask<>(
TaskMetadata.fromProto(REQUEST.getMetadata()), TASK, TASK.toByteArray());
return new ProcessingContextImpl<>("subscription", request, task, Arrays.asList(processors), null);
return new ProcessingContextImpl<>("subscription", request, task, Arrays.asList(processors),
null, ProcessorProperties.builder().build());
}

private static void safeAwait(CountDownLatch latch) {
Expand Down Expand Up @@ -345,8 +347,10 @@ public void testRetry() throws InterruptedException {
TaskMetadata.fromProto(REQUEST.getMetadata()), TASK.toByteArray(), TASK.toByteArray());

ProcessingContextImpl<byte[]> context =
spy(new ProcessingContextImpl<>("subscription", request, task, completion,
Collections.emptyList(), retryProcessor));
spy(new ProcessingContextImpl<>("subscription", request, task,
Collections.emptyList(), retryProcessor,
ProcessorProperties.builder().build(),
completion));

CompletableFuture<Void> produceFuture = context.retry();

Expand Down