You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Between the messageAggregator and the flusher we actually re-shuffle all messages to produce three times: once into a single []*ProducerMessage for batching purposes, once into a map[string]map[int32][]*ProducerMessage to aid internal state transitions, and finally into the actual ProduceRequest structure. This whole process is rather silly and over-complicated.
Once #300 lands, we should be able to simplify this substantially. With the appropriate re-organization of state, the messageAggregator should be able to put messages directly into a ProduceRequest as they arrive, getting rid of both existing re-shuffling passes.
This change will also enable one other subtle optimization. Since compressed messages are wrapped together and sent as the payload of a single "message" in the protocol, the total size of compressed messages sent is not just limited by the MaxRequestSize, but also by MaxMessageBytes which is typically much smaller.
However, while MaxRequestSize is per-request, MaxMessageSize (as it applies to compressed message sets masquerading as single messages) is per partition. Unfortunately, the current messageAggregator enforces this limit per request because it doesn't have the state to calculate it per-partition. This has the effect of artificially limiting throughput when compression is enabled and multiple topics are being produced to the same broker (you end up with each request limited to 1MB instead of each partition of each request being limited to 1MB).
The text was updated successfully, but these errors were encountered:
Put them in a map right up front in the aggregator, it only requires tracking
one exta piece of metadata (total messages in the map) and it means we don't
have to shuffle them into this form before constructing the request anyways.
One piece of #433.
#449 found a problem in this area which #538 fixed in a rather short-term hacky way. However this ends up re-organized must solve those problems as well.
Between the
messageAggregator
and theflusher
we actually re-shuffle all messages to produce three times: once into a single[]*ProducerMessage
for batching purposes, once into amap[string]map[int32][]*ProducerMessage
to aid internal state transitions, and finally into the actualProduceRequest
structure. This whole process is rather silly and over-complicated.Once #300 lands, we should be able to simplify this substantially. With the appropriate re-organization of state, the
messageAggregator
should be able to put messages directly into aProduceRequest
as they arrive, getting rid of both existing re-shuffling passes.This change will also enable one other subtle optimization. Since compressed messages are wrapped together and sent as the payload of a single "message" in the protocol, the total size of compressed messages sent is not just limited by the
MaxRequestSize
, but also byMaxMessageBytes
which is typically much smaller.However, while
MaxRequestSize
is per-request,MaxMessageSize
(as it applies to compressed message sets masquerading as single messages) is per partition. Unfortunately, the currentmessageAggregator
enforces this limit per request because it doesn't have the state to calculate it per-partition. This has the effect of artificially limiting throughput when compression is enabled and multiple topics are being produced to the same broker (you end up with each request limited to 1MB instead of each partition of each request being limited to 1MB).The text was updated successfully, but these errors were encountered: