Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bytes for record keys instead of String #157

Merged
merged 27 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
39ef2af
Add ArbitraryTopicTypeTest
ajalab Jun 7, 2022
b3048ae
Use bytes for record keys instead of String
ajalab Jun 7, 2022
33d026e
Use String for DecatonTask key in DecatonClient
ajalab Jun 13, 2022
198033e
Remove TODO comment about eliminating array copy
ajalab Jun 17, 2022
1dfb7a1
Add and use TestUtils.producer(String, Serializer, Serializer)
ajalab Jun 17, 2022
d79abd2
Replace redundant ArbitraryTopicTypeTest test case
ajalab Jun 17, 2022
ae91f56
Clarify only String keys are supported by CONFIG_IGNORE_KEYS
ajalab Jun 17, 2022
f991510
Display byte keys with a given key formatter
ajalab Jun 20, 2022
ec75b24
Add final to KafkaClusterRule field in ArbitraryTopicTypeTest
ajalab Jun 20, 2022
8c90fe5
Introduce TaskKey; a wrapper for keys in bytes
ajalab Jun 20, 2022
fc63284
Use TaskKey for BlacklistedKeysFilter
ajalab Jun 20, 2022
4f26f2e
Revert "Use TaskKey for BlacklistedKeysFilter"
ajalab Jun 21, 2022
3994ebc
Revert "Introduce TaskKey; a wrapper for keys in bytes"
ajalab Jun 21, 2022
f034907
Revert "Display byte keys with a given key formatter"
ajalab Jun 21, 2022
601ef68
Print bytes key as UTF-8 in LoggingContext
ajalab Jun 21, 2022
f69c303
Add HashableKey to replace ByteBuffer usage for Map/Set
ajalab Jun 21, 2022
cd07084
Exclude byte[] key from toString target
ajalab Jun 21, 2022
6d3cc6c
Use HashableKey for BlacklistedKeysFilter
ajalab Jun 21, 2022
d2894c4
Support null key in HashableKey
ajalab Jun 21, 2022
b9eac22
Print "null" instead of empty string if key is null
ajalab Jun 22, 2022
bf3d42c
Make TestUtils.defaultProducerProps private again
ajalab Jun 22, 2022
9d24b49
Redefine HashableKey as HashableByteArray
ajalab Jun 22, 2022
0c6ee0a
Compute hashCode of bytes preemptively only once
ajalab Jun 22, 2022
9d9db83
Move HashableByteArray to decaton.processor.internal
ajalab Jun 22, 2022
a288fc6
Introduce ByteArrays.toString for reuse
ajalab Jun 22, 2022
6f69376
Do not surround decoded array with quotes for consistency
ajalab Jun 23, 2022
46ce23c
LINELINT-911 Remove unused HashableByteArray#getArray
ajalab Jun 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import com.linecorp.decaton.client.internal.DecatonClientImpl;
import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer;
import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer;
import com.linecorp.decaton.common.Serializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
Expand Down Expand Up @@ -54,9 +54,9 @@ public class DecatonClientBuilder<T> {

public static class DefaultKafkaProducerSupplier implements KafkaProducerSupplier {
@Override
public Producer<String, DecatonTaskRequest> getProducer(Properties config) {
public Producer<byte[], DecatonTaskRequest> getProducer(Properties config) {
return new KafkaProducer<>(config,
new PrintableAsciiStringSerializer(),
new ByteArraySerializer(),
new ProtocolBuffersKafkaSerializer<>());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ public interface KafkaProducerSupplier {
* @return an Kafka producer instance which implements {@link Producer}. The returned instance will be
* closed along with {@link DecatonClient#close} being called.
*/
Producer<String, DecatonTaskRequest> getProducer(Properties config);
Producer<byte[], DecatonTaskRequest> getProducer(Properties config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.client.KafkaProducerSupplier;
import com.linecorp.decaton.client.PutTaskResult;
import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer;
import com.linecorp.decaton.common.Serializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;

public class DecatonClientImpl<T> implements DecatonClient<T> {
private static final org.apache.kafka.common.serialization.Serializer<String> keySerializer = new PrintableAsciiStringSerializer();
private final String topic;
private final Serializer<T> serializer;
private final DecatonTaskProducer producer;
private final String applicationId;
Expand All @@ -44,6 +47,7 @@ public class DecatonClientImpl<T> implements DecatonClient<T> {
Properties producerConfig,
KafkaProducerSupplier producerSupplier,
Supplier<Long> timestampSupplier) {
this.topic = topic;
this.serializer = serializer;
this.applicationId = applicationId;
this.instanceId = instanceId;
Expand Down Expand Up @@ -93,6 +97,7 @@ public void close() throws Exception {
}

private CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadataProto taskMetadataProto) {
byte[] serializedKey = keySerializer.serialize(topic, key);
byte[] serializedTask = serializer.serialize(task);

DecatonTaskRequest request =
Expand All @@ -101,7 +106,7 @@ private CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadataPro
.setSerializedTask(ByteString.copyFrom(serializedTask))
.build();

return producer.sendRequest(key, request);
return producer.sendRequest(serializedKey, request);
}

private TaskMetadataProto convertToTaskMetadataProto(TaskMetadata overrideTaskMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class DecatonTaskProducer implements AutoCloseable {
presetProducerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
}

private final Producer<String, DecatonTaskRequest> producer;
private final Producer<byte[], DecatonTaskRequest> producer;
private final String topic;

private static Properties completeProducerConfig(Properties producerConfig) {
Expand All @@ -61,8 +61,8 @@ public DecatonTaskProducer(String topic, Properties producerConfig,
this.topic = topic;
}

public CompletableFuture<PutTaskResult> sendRequest(String key, DecatonTaskRequest request) {
ProducerRecord<String, DecatonTaskRequest> record = new ProducerRecord<>(topic, key, request);
public CompletableFuture<PutTaskResult> sendRequest(byte[] key, DecatonTaskRequest request) {
ProducerRecord<byte[], DecatonTaskRequest> record = new ProducerRecord<>(topic, key, request);

CompletableFuture<PutTaskResult> result = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public class DecatonClientBuilderTest {
public MockitoRule rule = MockitoJUnit.rule();

@Mock
private Producer<String, DecatonTaskRequest> producer;
private Producer<byte[], DecatonTaskRequest> producer;

@Captor
private ArgumentCaptor<ProducerRecord<String, DecatonTaskRequest>> recordCaptor;
private ArgumentCaptor<ProducerRecord<byte[], DecatonTaskRequest>> recordCaptor;

private ProducerRecord<String, DecatonTaskRequest> doProduce(DecatonClient<HelloTask> dclient) {
private ProducerRecord<byte[], DecatonTaskRequest> doProduce(DecatonClient<HelloTask> dclient) {
dclient.put(null, HelloTask.getDefaultInstance());
verify(producer, times(1)).send(recordCaptor.capture(), any(Callback.class));
return recordCaptor.getValue();
Expand All @@ -69,7 +69,7 @@ public void testBuild() {
.producerSupplier(config -> producer)
.build();

ProducerRecord<String, DecatonTaskRequest> record = doProduce(dclient);
ProducerRecord<byte[], DecatonTaskRequest> record = doProduce(dclient);
assertEquals(topic, record.topic());

TaskMetadataProto metadata = record.value().getMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ public class DecatonClientImplTest {
public MockitoRule rule = MockitoJUnit.rule();

@Mock
private Producer<String, DecatonTaskRequest> producer;
private Producer<byte[], DecatonTaskRequest> producer;

@Mock
private Supplier<Long> timestampSupplier;

private DecatonClientImpl<HelloTask> client;

@Captor
private ArgumentCaptor<ProducerRecord<String, DecatonTaskRequest>> captor;
private ArgumentCaptor<ProducerRecord<byte[], DecatonTaskRequest>> captor;

@Before
public void setUp() {
Expand All @@ -78,7 +78,7 @@ public void testTimestampFieldSetInternally() {
client.put("key", HelloTask.getDefaultInstance());

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(1234, record.value().getMetadata().getTimestampMillis());
}
Expand All @@ -90,7 +90,7 @@ public void testTimestampFieldSetInternallyWithCallback() {
client.put("key", HelloTask.getDefaultInstance(), ignored -> {});

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(1234, record.value().getMetadata().getTimestampMillis());
}
Expand All @@ -102,7 +102,7 @@ public void testTimestampFieldSetExternally() {
client.put("key", HelloTask.getDefaultInstance(), 5678);

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(5678, record.value().getMetadata().getTimestampMillis());
}
Expand All @@ -114,7 +114,7 @@ public void testTimestampFieldSetExternallyWithCallback() {
client.put("key", HelloTask.getDefaultInstance(), 5678, ignored -> {});

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(5678, record.value().getMetadata().getTimestampMillis());
}
Expand Down Expand Up @@ -149,15 +149,15 @@ public void testWithEmptyTaskMetaDataSetExternally() {
client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder().build());

verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertTrue(record.value().getMetadata().getTimestampMillis() > 0);
assertNotNull(record.value().getMetadata().getSourceApplicationId());
assertNotNull(record.value().getMetadata().getSourceInstanceId());
}

private void verifyAndAssertTaskMetadata(long timestamp, long scheduledTime) {
verify(producer, times(1)).send(captor.capture(), any(Callback.class));
ProducerRecord<String, DecatonTaskRequest> record = captor.getValue();
ProducerRecord<byte[], DecatonTaskRequest> record = captor.getValue();
assertNull(record.timestamp());
assertEquals(timestamp, record.value().getMetadata().getTimestampMillis());
assertEquals(scheduledTime, record.value().getMetadata().getScheduledTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.RetryConfig;
import com.linecorp.decaton.processor.runtime.SubscriptionBuilder;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.testing.KafkaClusterRule;
import com.linecorp.decaton.testing.TestUtils;

/**
* This test class verifies that {@link ProcessorSubscription} is capable of subscribing topics with
* - any key types other than String (our default)
* - any record types other than {@link DecatonTask}
*/
public class ArbitraryTopicTypeTest {
@ClassRule
public static final KafkaClusterRule rule = new KafkaClusterRule();

private String topic;
private String retryTopic;

@Before
public void setUp() {
topic = rule.admin().createRandomTopic(3, 3);
retryTopic = rule.admin().createRandomTopic(3, 3);
}

@After
public void tearDown() {
rule.admin().deleteTopics(true, topic, retryTopic);
}

private static final class TestTaskExtractor<T> implements TaskExtractor<T> {
private final String topic;
private final Deserializer<T> deserializer;

private TestTaskExtractor(String topic, Deserializer<T> deserializer) {
this.topic = topic;
this.deserializer = deserializer;
}

@Override
public DecatonTask<T> extract(byte[] bytes) {
final T value = deserializer.deserialize(topic, bytes);
final TaskMetadata metadata = TaskMetadata.builder().build();
return new DecatonTask<>(metadata, value, bytes);
}
}

private <K, V> void testRetryWithKeyValue(
Serializer<K> keySerializer,
K key,
Serializer<V> valueSerializer,
Deserializer<V> valueDeserializer,
V value
) throws Exception {
final CountDownLatch processLatch = new CountDownLatch(1);
final RetryConfig retryConfig = RetryConfig.builder().retryTopic(retryTopic).backoff(Duration.ofMillis(10)).build();
final Consumer<SubscriptionBuilder> builderConfigurer = builder -> builder.processorsBuilder(
ProcessorsBuilder.consuming(topic, new TestTaskExtractor<>(topic, valueDeserializer)).thenProcess((context, task) -> {
if (context.metadata().retryCount() == 0) {
context.retry();
} else {
processLatch.countDown();
}
})).enableRetry(retryConfig);

try (ProcessorSubscription subscription = TestUtils.subscription(rule.bootstrapServers(), builderConfigurer);
Producer<K, V> producer = TestUtils.producer(rule.bootstrapServers(), keySerializer, valueSerializer)) {
producer.send(new ProducerRecord<>(topic, key, value));
processLatch.await();
}
}

@Test(timeout = 30000)
public void testBytesKeyValue() throws Exception {
testRetryWithKeyValue(
new ByteArraySerializer(),
"key".getBytes(StandardCharsets.UTF_8),
new ByteArraySerializer(),
new ByteArrayDeserializer(),
"value".getBytes(StandardCharsets.UTF_8)
);
}

@Test(timeout = 30000)
public void testLongKeyValue() throws Exception {
testRetryWithKeyValue(
new LongSerializer(),
123L,
new LongSerializer(),
new LongDeserializer(),
100L
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linecorp.decaton.processor.runtime.ProcessorScope;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.runtime.internal.HashableByteArray;
import com.linecorp.decaton.testing.KafkaClusterRule;
import com.linecorp.decaton.testing.RandomRule;
import com.linecorp.decaton.testing.processor.ProcessedRecord;
Expand Down Expand Up @@ -153,17 +154,17 @@ public void testSingleThreadProcessing() throws Exception {
// Note that this processing semantics is not be considered as Decaton specification which users can rely on.
// Rather, this is just a expected behavior based on current implementation when we set concurrency to 1.
ProcessingGuarantee noDuplicates = new ProcessingGuarantee() {
private final Map<String, List<TestTask>> produced = new HashMap<>();
private final Map<String, List<TestTask>> processed = new HashMap<>();
private final Map<HashableByteArray, List<TestTask>> produced = new HashMap<>();
private final Map<HashableByteArray, List<TestTask>> processed = new HashMap<>();

@Override
public synchronized void onProduce(ProducedRecord record) {
produced.computeIfAbsent(record.key(), key -> new ArrayList<>()).add(record.task());
produced.computeIfAbsent(new HashableByteArray(record.key()), key -> new ArrayList<>()).add(record.task());
}

@Override
public synchronized void onProcess(TaskMetadata metadata, ProcessedRecord record) {
processed.computeIfAbsent(record.key(), key -> new ArrayList<>()).add(record.task());
processed.computeIfAbsent(new HashableByteArray(record.key()), key -> new ArrayList<>()).add(record.task());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.runtime.internal.HashableByteArray;
import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer;
import com.linecorp.decaton.protocol.Sample.HelloTask;
import com.linecorp.decaton.testing.KafkaClusterRule;
Expand Down Expand Up @@ -62,11 +63,11 @@ public void testPropertyDynamicSwitch() throws Exception {
for (int i = 0; i < 10000; i++) {
keys.add("key" + i);
}
Set<String> processedKeys = Collections.synchronizedSet(new HashSet<>());
Set<HashableByteArray> processedKeys = Collections.synchronizedSet(new HashSet<>());
CountDownLatch processLatch = new CountDownLatch(keys.size());

DecatonProcessor<HelloTask> processor = (context, task) -> {
processedKeys.add(context.key());
processedKeys.add(new HashableByteArray(context.key()));
processLatch.countDown();
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.linecorp.decaton.processor;

import java.nio.charset.StandardCharsets;

import org.slf4j.MDC;

import com.linecorp.decaton.processor.runtime.internal.TaskRequest;
Expand Down Expand Up @@ -43,8 +45,10 @@ public class LoggingContext implements AutoCloseable {
public LoggingContext(boolean enabled, String subscriptionId, TaskRequest request, TaskMetadata metadata) {
this.enabled = enabled;
if (enabled) {
final String taskKey = request.key() == null ? "null" : new String(request.key(), StandardCharsets.UTF_8);

MDC.put(METADATA_KEY, metadata.toString());
MDC.put(TASK_KEY, String.valueOf(request.key()));
MDC.put(TASK_KEY, taskKey);
MDC.put(SUBSCRIPTION_ID_KEY, subscriptionId);
MDC.put(OFFSET_KEY, String.valueOf(request.recordOffset()));
MDC.put(TOPIC_KEY, request.topicPartition().topic());
Expand Down
Loading