Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Large Message Size (PIP-37 / "chunking") #456

Closed
flowchartsman opened this issue Jan 28, 2021 · 3 comments · May be fixed by #717
Closed

Support Large Message Size (PIP-37 / "chunking") #456

flowchartsman opened this issue Jan 28, 2021 · 3 comments · May be fixed by #717
Assignees

Comments

@flowchartsman
Copy link
Contributor

flowchartsman commented Jan 28, 2021

Currently the go client does not seem to support PIP-37 which allows for messages to be sent that are larger than the maximum message size by breaking them up on the producer side and re-assembling them in the consumer. This would be a handy feature to have parity with the Java client

@Gleiphir2769
Copy link
Contributor

Gleiphir2769 commented Jun 29, 2022

Hello, I am very interested in this feature, the following is my plan.

Motivation

Make pulsar go client support chunking to produce and consume big messages.

Modifications

Publish Chunked Messages

The maxMessageSize limited the big message publishing.

if len(payload) > int(p._getConn().GetMaxMessageSize()) {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", len(payload)).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}

If the size of message payload is bigger than maxMessageSize, it will be discarded. So it should be split into chunked messages with a size not exceeding the maxMessageSize, and they are sent to the brokers separately. I think the chunk logic can be added in internalSendAsync.
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,

Receive Chunked Messages

Pulsar allows multiple Producers to produce messages to the same topic at the same time, which means that the chunks of multiple big messages may be alternately arranged in the topic. And each chunk of the same big message is not necessarily consecutive arrived (but must arrive in order, which is guaranteed by the broker).
So the go client needs a ChunkedMessageCtx to track and buffer the chunked message. The context ChunkedMessageCtx maintains the position of the currently received chunks and accumulates the payload of the chunks that have been received. When all chunks are received, ChunkedMessageCtx returns the accumulated payload to the user, i.e. the full message before the chunking.
snipaste_2022-05-16_17-54-35.png
All ChunkedMessageCtx need to be maintained in a cache. Due to memory pressure, the number of ChunkedMessageCtx needs to be limited (the default upper limit for Java clients is 100). This cache is essentially a concurrent map with eviction policy (LRU). It can be simply implementted as map + mutex + pending queue or some other more complex one (https://github.com/Gleiphir2769/s-cache).
I think it shoud be modified here.

func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {

Some Details

Batching

Currently pulsar go client depends on BatchBuilder to send all messages even batching is closed (each message will cause a flush of the batch in this case).

if !sendAsBatch {
p.internalFlushCurrentBatch()
}

In the Java Client, batch message logic will skip the processing of chunk messages. So we need a single message sending implement independent of BatchBuilder.
Considering the problem of consumer available-permits calculation in shared subscription (issue #10417), batching and chunking cannot be enabled at the same time.

Chunked Message ID

This is related to PIP 107. It's good to take the solution in the new Java Client, which is to implement an ChunkMessageIdImpl that can invoke getFirstChunkMessageId. It will modify the Seek implement which seek the first chunk message id.

func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
seek.err = pc.requestSeek(seek.msgID.messageID)
}

id := &pb.MessageIdData{}
err := proto.Unmarshal(msgID.Serialize(), id)
if err != nil {
pc.log.WithError(err).Errorf("deserialize message id error: %s", err.Error())
return err
}
requestID := pc.client.rpcClient.NewRequestID()
cmdSeek := &pb.CommandSeek{
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
MessageId: id,
}

Size Calculation

This is related to issue #16196. Message metadate should be updated before computing the chunk size. An the total size should include all bytes other than the metadata and payload, e.g. the 4 bytes checksum field.

Shared Subscription

There are some problems of chunking with shared subscription. issue #16202 supported chunking with Shared subscription. And go client may not need to limit chunking with Shared subscription in ConsumerImpl.

unAckedChunkedMessageIdSequenceMap

Go client doen't support ackTimeout now. So there is no unAckedMessageTracker, it seems not a problem for this feature.

RobertIndie pushed a commit that referenced this issue Oct 25, 2022
Master Issue: [#456](#456)

### Motivation

Make pulsar go client support chunking to produce/consume big messages. The earlier implementation ([#717](#717)) didn't take into account many details, so I decided to reimplement it.

### Modifications

- Add `internalSingleSend` to send message without batch because batch message will not be received by chunk.
- Moved `BlockIfQueueFull` check from `internalSendAsync` to `internalSend` (`canAddQueue`) to ensure the normal block in chunking.
- Make producer send big messages by chunking.
- Add `chunkedMsgCtxMap` to store chunked messages meta and data.
- Make consumer can obtain chunks and consume the big message.
@geniusjoe
Copy link
Contributor

geniusjoe commented Jul 11, 2024

@RobertIndie Since chunk feature is supported in v0.10 #805 , can we close this issue as completed?

@RobertIndie
Copy link
Member

@RobertIndie Since chunk feature is supported in v0.10 #805 , can we close this issue as completed?

Yes. Sure. Thanks for your reminder.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants