Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way of specifying a partition in producing a task #159

Closed
ta7uw opened this issue Jun 16, 2022 · 2 comments · Fixed by #182
Closed

Provide a way of specifying a partition in producing a task #159

ta7uw opened this issue Jun 16, 2022 · 2 comments · Fixed by #182
Assignees
Labels
good first issue Good for newcomers

Comments

@ta7uw
Copy link
Member

ta7uw commented Jun 16, 2022

We can inject a custom org.apache.kafka.clients.producer.Partitioner by implementing it and passing it through DecatonClientBuilder#producerConfig.

However, A decaton client wraps a task set by our application in DecatonTaskRequest.
So, to determine a partition based on a task in Partitioner, we should handle a DecatonTaskRequest (deserialize our defined task. after that proceed partition step) like the following.

public class CustomPartitioner extends DefaultPartitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (value instanceof DecatonTaskRequest) {
            DecatonTaskRequest request = (DecatonTaskRequest) value;
            MyTask task = new ProtocolBuffersDeserializer<>(MyTask.parser())
                                           .deserialize(request.getSerializedTask().toByteArray());
            # determine partition
            .... 
            return result;
        }
        return super.partition(topic, key, keyBytes, value, valueBytes, cluster);
    }

The above seems redundant and Inefficient due to deserializing a serialized task.

Could we provide a way of specifying a partition in producing a task like the following API?

public class DecatonClientImpl<T> implements DecatonClient<T> {
   ....
  public CompletableFuture<PutTaskResult> put(String key, T task, int partition) {
  ...
}

The partition that is passed as the argument would be set to TaskMetadataProto and be used in an internal Partitioner that determines a partition base on it.

What do you think about this feature?

@ocadaruma
Copy link
Contributor

Yeah, if we want to specify the partition based on its task value by custom partitioner, it needs such redundant works.

Providing an overload for specifying partition explicitly sounds make sense.

@ta7uw
Copy link
Member Author

ta7uw commented Jun 27, 2022

Thanks for your reply.
I'll work on this feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants