Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskMetadata as header #238

Merged
merged 7 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .idea/copyright/LINE_OSS.xml

This file was deleted.

7 changes: 7 additions & 0 deletions .idea/copyright/LY_OSS.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions .idea/copyright/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ public String get(String key) {
.subPartitionRuntime(subPartitionRuntime)
.processorsBuilder(
ProcessorsBuilder.consuming(config.topic(),
(TaskExtractor<Task>) bytes -> {
(TaskExtractor<Task>) record -> {
Task task = config.taskDeserializer()
.deserialize(config.topic(), bytes);
.deserialize(config.topic(), record.value());
return new DecatonTask<>(
TaskMetadata.builder().build(), task, bytes);
TaskMetadata.builder().build(), task, record.value());
})
.thenProcess(
(ctx, task) -> recording.process(task)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;

import com.linecorp.decaton.client.internal.DecatonClientImpl;
import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer;
import com.linecorp.decaton.common.Serializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;

import lombok.AccessLevel;
import lombok.Setter;
Expand All @@ -54,10 +52,10 @@ public class DecatonClientBuilder<T> {

public static class DefaultKafkaProducerSupplier implements KafkaProducerSupplier {
@Override
public Producer<byte[], DecatonTaskRequest> getProducer(Properties config) {
public Producer<byte[], byte[]> getProducer(Properties config) {
return new KafkaProducer<>(config,
new ByteArraySerializer(),
new ProtocolBuffersKafkaSerializer<>());
new ByteArraySerializer());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.clients.producer.Producer;

import com.linecorp.decaton.client.internal.DecatonClientImpl;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;

/**
* An interface to specify a custom instantiation function for {@link Producer}.
Expand All @@ -39,5 +38,5 @@ public interface KafkaProducerSupplier {
* @return an Kafka producer instance which implements {@link Producer}. The returned instance will be
* closed along with {@link DecatonClient#close} being called.
*/
Producer<byte[], DecatonTaskRequest> getProducer(Properties config);
Producer<byte[], byte[]> getProducer(Properties config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.google.protobuf.ByteString;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.client.KafkaProducerSupplier;
import com.linecorp.decaton.client.PutTaskResult;
import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer;
import com.linecorp.decaton.common.Serializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;

public class DecatonClientImpl<T> implements DecatonClient<T> {
Expand All @@ -52,7 +51,7 @@ public class DecatonClientImpl<T> implements DecatonClient<T> {
this.serializer = serializer;
this.applicationId = applicationId;
this.instanceId = instanceId;
producer = new DecatonTaskProducer(topic, producerConfig, producerSupplier);
producer = new DecatonTaskProducer(producerConfig, producerSupplier);
this.timestampSupplier = timestampSupplier;
}

Expand Down Expand Up @@ -109,13 +108,11 @@ private CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadataPro
byte[] serializedKey = keySerializer.serialize(topic, key);
byte[] serializedTask = serializer.serialize(task);

DecatonTaskRequest request =
DecatonTaskRequest.newBuilder()
.setMetadata(taskMetadataProto)
.setSerializedTask(ByteString.copyFrom(serializedTask))
.build();
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
topic, partition, serializedKey, serializedTask);
TaskMetadataUtil.writeAsHeader(taskMetadataProto, record.headers());

return producer.sendRequest(serializedKey, request, partition);
return producer.sendRequest(record);
}

private TaskMetadataProto convertToTaskMetadataProto(TaskMetadata overrideTaskMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.client.KafkaProducerSupplier;
import com.linecorp.decaton.client.PutTaskResult;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;

/**
* A raw interface to put a built {@link DecatonTaskRequest} directly.
* A raw interface to put decaton tasks.
* This interface isn't expected to be used by applications unless it's really necessary.
* Use {@link DecatonClient} to put task into a Decaton topic instead.
*/
Expand All @@ -44,8 +43,7 @@ public class DecatonTaskProducer implements AutoCloseable {
presetProducerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
}

private final Producer<byte[], DecatonTaskRequest> producer;
private final String topic;
private final Producer<byte[], byte[]> producer;

private static Properties completeProducerConfig(Properties producerConfig) {
final Properties result = new Properties();
Expand All @@ -54,20 +52,13 @@ private static Properties completeProducerConfig(Properties producerConfig) {
return result;
}

public DecatonTaskProducer(String topic, Properties producerConfig,
public DecatonTaskProducer(Properties producerConfig,
KafkaProducerSupplier producerSupplier) {
Properties completeProducerConfig = completeProducerConfig(producerConfig);
producer = producerSupplier.getProducer(completeProducerConfig);
this.topic = topic;
}

public CompletableFuture<PutTaskResult> sendRequest(byte[] key, DecatonTaskRequest request,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is this class now essentially a slightly different KafkaProducer implementation which at least gives CF as the returning value of sendRequest? Does it worth it to keep the class itself then..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also fill preset configs (e.g. acks=all).
So I think it's worth to keep it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

Integer partition) {
ProducerRecord<byte[], DecatonTaskRequest> record = new ProducerRecord<>(topic, partition, key, request);
return sendRequest(record);
}

private CompletableFuture<PutTaskResult> sendRequest(ProducerRecord<byte[], DecatonTaskRequest> record) {
public CompletableFuture<PutTaskResult> sendRequest(ProducerRecord<byte[], byte[]> record) {
CompletableFuture<PutTaskResult> result = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
if (exception == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.client.internal;

import java.io.UncheckedIOException;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import com.google.protobuf.InvalidProtocolBufferException;

import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;

public class TaskMetadataUtil {
private static final String METADATA_HEADER_KEY = "dt_meta";

/**
* Write metadata to {@link Headers}
* @param metadata task metadata to be written
* @param headers record header to write to
*/
public static void writeAsHeader(TaskMetadataProto metadata, Headers headers) {
headers.remove(METADATA_HEADER_KEY);
headers.add(METADATA_HEADER_KEY, metadata.toByteArray());
}

/**
* Read metadata from given {@link Headers}
* @param headers record header to read from
* @return parsed {@link TaskMetadataProto} or null if header is absent
* @throws IllegalStateException if metadata bytes is invalid
*/
public static TaskMetadataProto readFromHeader(Headers headers) {
Header header = headers.lastHeader(METADATA_HEADER_KEY);
if (header == null) {
return null;
}
try {
return TaskMetadataProto.parseFrom(header.value());
} catch (InvalidProtocolBufferException e) {
throw new UncheckedIOException(e);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import com.linecorp.decaton.client.internal.TaskMetadataUtil;
import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;
import com.linecorp.decaton.protocol.Sample.HelloTask;

@ExtendWith(MockitoExtension.class)
public class DecatonClientBuilderTest {
@Mock
private Producer<byte[], DecatonTaskRequest> producer;
private Producer<byte[], byte[]> producer;

@Captor
private ArgumentCaptor<ProducerRecord<byte[], DecatonTaskRequest>> recordCaptor;
private ArgumentCaptor<ProducerRecord<byte[], byte[]>> recordCaptor;

private ProducerRecord<byte[], DecatonTaskRequest> doProduce(DecatonClient<HelloTask> dclient) {
private ProducerRecord<byte[], byte[]> doProduce(DecatonClient<HelloTask> dclient) {
dclient.put(null, HelloTask.getDefaultInstance());
verify(producer, times(1)).send(recordCaptor.capture(), any(Callback.class));
return recordCaptor.getValue();
Expand All @@ -66,10 +66,10 @@ public void testBuild() {
.producerSupplier(config -> producer)
.build();

ProducerRecord<byte[], DecatonTaskRequest> record = doProduce(dclient);
ProducerRecord<byte[], byte[]> record = doProduce(dclient);
assertEquals(topic, record.topic());

TaskMetadataProto metadata = record.value().getMetadata();
TaskMetadataProto metadata = TaskMetadataUtil.readFromHeader(record.headers());
assertEquals(applicationId, metadata.getSourceApplicationId());
assertEquals(instanceId, metadata.getSourceInstanceId());
}
Expand Down
Loading
Loading