Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FutureProducer's send is slow when data is skewed #695

Open
immno opened this issue Jul 5, 2024 · 3 comments
Open

FutureProducer's send is slow when data is skewed #695

immno opened this issue Jul 5, 2024 · 3 comments

Comments

@immno
Copy link

immno commented Jul 5, 2024

I recently used rdkafka and found that the send efficiency is very low when the data is skewed. Because I want to ensure that the data with the same key is in the same partition and in order.
Therefore, I used actix's wait to send data. When the data is skewed, wait sending is very time-consuming. After I changed to spawn, the sending is very fast, but it is not sequential. What should I do to ensure a larger IO capacity and the order of data with the same key remains unchanged.
Or can you provide an api for send(Vec<Data>)?

impl Handler<PipelineAggDataMsg> for PipelineAggActor {
    type Result = ();

    fn handle(&mut self, msg: PipelineAggDataMsg, ctx: &mut Context<Self>) {
        let queue = self.queue.clone();
        // ctx.wait(async move {
        ctx.spawn(async move {
            let data = PipelineAggregationResultProto {
                id: msg.source_id,
                source_id: msg.source_id,
                source_type: msg.source_type,
                time: msg.ts,
                data: Some(DataValue::from_json(msg.v).into()),
            };
            queue.send(QueueData::PipelineAggregation(data)).await;
        }.into_actor(self));
    }
}

queue.send:

    #[inline]
    pub async fn send(&self, event: QueueData) {
        let key = event.key().to_be_bytes();
        let key = key.as_slice();
        let (b, t) = self.payload_and_topic(&event);
        self.sender.send(t, key, b, event).await;
    }

    #[inline]
    pub async fn send(&self, topic: &str, key: &[u8], payload: Vec<u8>, event: S) {
        let record = FutureRecord::to(topic)
            .key(key)
            .timestamp(chrono::Utc::now().timestamp_millis())
            .payload(payload.as_slice());
        if let Err((e, _)) = self.producer.send(record, Timeout::Never).await {
            tracing::warn!("KafkaSender send fail, {}, {:?}", e, event);
        }
    }
@manifest
Copy link
Contributor

manifest commented Jul 9, 2024

We found performance of FutureProducer comparing to BasicProducer is low in general.

If this issue (here is PR) solved, we may open source our (async) Producer which is performing on par with BasicProducer.

@vatj
Copy link

vatj commented Aug 5, 2024

Hej!
I would be keen in understanding how your implementation of the async producer can perform on par with the base producer and where is the bottleneck? Any chance you can share the code now so I can try on a dev branch?
Thanks for submitting a PR to fix it and cool that you are pushing to get the boll rolling with the maintainers

@manifest
Copy link
Contributor

manifest commented Aug 7, 2024

Hey @vatj, sorry for the late response.

For publisher implementation, the basic idea that results in better performance is to send a batch of messages right away, collect futures for that batch, then wait on them all later to confirm delivery.

I don't think that it would be productive to release fragments of code, I rather to organize it and publish at once.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants