Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Forward Protocol Specification for zstd compression support #4758

Open
daipom opened this issue Jan 6, 2025 · 30 comments
Open

Update Forward Protocol Specification for zstd compression support #4758

daipom opened this issue Jan 6, 2025 · 30 comments
Assignees
Labels
enhancement Feature request or improve operations

Comments

@daipom
Copy link
Contributor

daipom commented Jan 6, 2025

Is your feature request related to a problem? Please describe.

The following PR supports zstd compression.

So, we need to update Forward Protocol Specification - CompressedPackedForward Mode.

Describe the solution you'd like

Update the following description and add zstd value to compressed option.

Would that be Forward Protocol Specification v1.6?

Describe alternatives you've considered

Having no idea.

Additional context

No response

@daipom daipom added enhancement Feature request or improve operations and removed waiting-for-triage labels Jan 6, 2025
@daipom daipom moved this to Work-In-Progress in Fluentd Kanban Jan 6, 2025
@daipom daipom self-assigned this Jan 6, 2025
@daipom
Copy link
Contributor Author

daipom commented Jan 6, 2025

I propose changes to the following two sections.

CompressedPackedForward Mode

https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#compressedpackedforward-mode

 ### CompressedPackedForward Mode
 
-It carries a series of events as a msgpack binary, compressed by gzip, on a single request. The supported compression algorithm is only gzip.
+It carries a series of events as a msgpack binary, compressed by gzip or zstd, on a single request.
 
-- `entries` is a gzipped binary chunk of `MessagePackEventStream`, which MAY be a concatenated binary of multiple gzip binary strings.
-- Client MUST send an option with `compressed` key with the value `gzip`.
-- Client MUST send a gzipped chunk as msgpack `bin` format.
+- `entries` is a gzip/zstd binary chunk of `MessagePackEventStream`, which MAY be a concatenated binary of multiple gzip/zstd binary strings.
+- Client MUST send an option with `compressed` key with the value `gzip` or `zstd`.
+- Client MUST send a gzip/zstd chunk as msgpack `bin` format.
 - Server MUST accept `bin` format.
 - Server MAY decompress and decode individual events on demand but MAY NOT do right after request arrival. It means it MAY costs less, compared to `Forward` mode, when decoding is not needed by any plugins.

Option

https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#option

 - Server MAY just ignore any options given.
 - `size`: Clients MAY send the `size` option to show the number of event records in an entries by an integer as a value. Server can know the number of events without unpacking entries (especially for PackedForward and CompressedPackedForward mode).
 - `chunk`: Clients MAY send the `chunk` option to confirm the server receives event records. The value is a string of Base64 representation of 128 bits `unique_id` which is an ID of a set of events.
-- `compressed`: Clients MUST send the `compressed` option with value `gzip` to tell servers that entries is `CompressedPackedForward`. Other values will be ignored.
+- `compressed`: Clients MUST send the `compressed` option with value `gzip` or `zstd` to tell servers that entries is `CompressedPackedForward`. Other values will be ignored.
 
 ```json
 {"chunk": "p8n9gmxTQVC8/nh2wlKKeQ==", "size": 4097}

@daipom
Copy link
Contributor Author

daipom commented Jan 6, 2025

Does this revision require a voting process?

@Athishpranav2003
Copy link
Contributor

The changes suggested here looks good to me.

@daipom
Copy link
Contributor Author

daipom commented Jan 7, 2025

If there are no objections, I would like to set it to v1.6 with the agreement of both Fluentd and Fluent Bit maintainers.

@cosmo0920
Could you please check #4758 (comment) ?

@ashie
Copy link
Member

ashie commented Jan 7, 2025

LGTM

@cosmo0920
Copy link
Contributor

This would be good:

-- entries is a gzipped binary chunk of MessagePackEventStream, which MAY be a concatenated binary of multiple gzip binary strings.
-- Client MUST send an option with compressed key with the value gzip.
-- Client MUST send a gzipped chunk as msgpack bin format.
+- entries is a gzip/zstd binary chunk of MessagePackEventStream, which MAY be a concatenated binary of multiple gzip/zstd binary strings.
+- Client MUST send an option with compressed key with the value gzip or zstd.
+- Client MUST send a gzip/zstd chunk as msgpack bin format.

However, I have a question for these sentences. Currently, we're able to decompress compressed chunks one-by-one. So, in paper, we are also able to take turns to decompress gzip compressed chunks and zstd compressed chunks.
Is this specification needed to point out explicitly in the revised specification?

@Athishpranav2003
Copy link
Contributor

@cosmo0920 are u pointing to the case where different sources send different type of compressed chunks?

Incase of forward protocol the message itself has the compression type so the decompression happens wrt the metadata of the compressed chunk.

@cosmo0920
Copy link
Contributor

cosmo0920 commented Jan 8, 2025

@cosmo0920 are u pointing to the case where different sources send different type of compressed chunks?

Incase of forward protocol the message itself has the compression type so the decompression happens wrt the metadata of the compressed chunk.

Yes, different sources use the same in_forward case. This is surely occurred because Fluentd is able to be used as an aggregator which will collect the different Fluentd instances with pointing the same forward endpoint. So, probably we need to clarify on it even if the implementation already does.

@Athishpranav2003
Copy link
Contributor

@cosmo0920
https://github.com/fluent/fluentd/pull/4657/files#diff-71dd20388a1f90eaec72fb257002fc6d44a497d457821d0c105acc0510844be1R314
In this line the compression format is also used as an argument during decompression. I guess this should answer your question.

@cosmo0920
Copy link
Contributor

cosmo0920 commented Jan 9, 2025

@cosmo0920
https://github.com/fluent/fluentd/pull/4657/files#diff-71dd20388a1f90eaec72fb257002fc6d44a497d457821d0c105acc0510844be1R314
In this line the compression format is also used as an argument during decompression. I guess this should answer your question.

It’s not answered my question. My question is: Should we write down the current decompression behavior explicitly in the specification document of forward protocol? It's not covered for the actual implementation. This is because Fluent Protocol is shared between Fluentd and Fluent Bit. So, we need to describe the specification in the document instead of the implementations.

@Athishpranav2003
Copy link
Contributor

got it @cosmo0920 .

Previously there was a possibility that multiple sources could send either gzip compressed chunk or uncompressed one. If we consider uncompressed data also as a compression type then essentially the previous specification was good enough to express that 2 types were supported. Now we are just adding one more number to it so like idk if we really need to be explicit here

@daipom
Copy link
Contributor Author

daipom commented Jan 9, 2025

@cosmo0920 @Athishpranav2003
Thanks for your review!

This would be good:

-- entries is a gzipped binary chunk of MessagePackEventStream, which MAY be a concatenated binary of multiple gzip binary strings.
-- Client MUST send an option with compressed key with the value gzip.
-- Client MUST send a gzipped chunk as msgpack bin format.
+- entries is a gzip/zstd binary chunk of MessagePackEventStream, which MAY be a concatenated binary of multiple gzip/zstd binary strings.
+- Client MUST send an option with compressed key with the value gzip or zstd.
+- Client MUST send a gzip/zstd chunk as msgpack bin format.

However, I have a question for these sentences. Currently, we're able to decompress compressed chunks one-by-one. So, in paper, we are also able to take turns to decompress gzip compressed chunks and zstd compressed chunks. Is this specification needed to point out explicitly in the revised specification?

I see!

I didn't know about this.

Currently, we're able to decompress compressed chunks one-by-one.

Fluentd only decompresses the whole of CompressedMessagePackEventStream.
I don't assume that compression types are mixed as follows.

So, in paper, we are also able to take turns to decompress gzip compressed chunks and zstd compressed chunks.

So, it would be better to improve the text to avoid confusion.

@cosmo0920
I'm not familiar with Fluent Bit's specifications, but is that specification OK for Fluent Bit as well?

@cosmo0920
Copy link
Contributor

cosmo0920 commented Jan 9, 2025

Fluentd only decompresses the whole of CompressedMessagePackEventStream.
I don't assume that compression types are mixed as follows.

Ah, Fluentd also handles CompressedMessagePackEventStream one-by-one. That indicates:

  1. First CompressedMessagePackEventStream that is compressed as gzip and decompressed with gzip decompressor
  2. Second CompressedMessagePackEventStream that is compressed as zstd and decompressed with zstd decompressor
  3. The latter sequence could be compressed with the supported compression methods and decompressed with the corresponding decompressing method.

This could be happened if multiple sender send payloads for an aggregator. I'm not sure you'd been misunderstanding what I meant but am I right here?

I mean, the above situation could be happened and this mixed compressed sequence could be happened. Note that I didn't mean that this contamination would be happened within the specific CompressedMessagePackEventStream.

@cosmo0920
Copy link
Contributor

cosmo0920 commented Jan 9, 2025

@cosmo0920
I'm not familiar with Fluent Bit's specifications, but is that specification OK for Fluent Bit as well?

This could be work because Fluent Bit also decompresses payloads which are compressed with certain compression methods.

@daipom
Copy link
Contributor Author

daipom commented Jan 9, 2025

Note that I didn't mean that this contamination would be happened within the specific CompressedMessagePackEventStream.

Oh! Sorry, I misunderstood! I see!

This could be work because Fluent Bit also decompresses payloads which are compressed with certain compression methods.

Thanks!

@daipom
Copy link
Contributor Author

daipom commented Jan 9, 2025

As @cosmo0920 says, it is possible for multiple senders to send in different compression types.

As @Athishpranav2003 says, there is no problem because what was originally two types will only become three types.

It would be better to clarify the description to avoid misunderstandings.

@daipom
Copy link
Contributor Author

daipom commented Jan 9, 2025

This would be good:

-- entries is a gzipped binary chunk of MessagePackEventStream, which MAY be a concatenated binary of multiple gzip binary strings.
-- Client MUST send an option with compressed key with the value gzip.
-- Client MUST send a gzipped chunk as msgpack bin format.
+- entries is a gzip/zstd binary chunk of MessagePackEventStream, which MAY be a concatenated binary of multiple gzip/zstd binary strings.
+- Client MUST send an option with compressed key with the value gzip or zstd.
+- Client MUST send a gzip/zstd chunk as msgpack bin format.

However, I have a question for these sentences. Currently, we're able to decompress compressed chunks one-by-one. So, in paper, we are also able to take turns to decompress gzip compressed chunks and zstd compressed chunks. Is this specification needed to point out explicitly in the revised specification?

How about this?

 - Server MUST accept `bin` format.
+- Server MUST decompress `entries` in the format according to the value of `compressed` key of the option for each msgpack binary.
 - Server MAY decompress and decode individual events on demand but MAY NOT do right after request arrival. It means it MAY costs less, compared to `Forward` mode, when decoding is not needed by any plugins.

@daipom
Copy link
Contributor Author

daipom commented Jan 9, 2025

I noticed the description of these table should also be changed.

https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#logs-type-3
https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#metrics-or-traces-type-3

 #### Logs Type
 
 name | Ruby type | msgpack format | content
 --- | --- | --- | ---
 tag | String | str | tag name
- entries | CompressedMessagePackEventStream | bin | gzipped msgpack stream of Entry
+ entries | CompressedMessagePackEventStream | bin | compressed msgpack stream of Entry
 option | Hash | map | option including key "compressed" (required)
 
 ```json
 [
   "tag.name",
   "<<CompressedMessagePackEventStream>>",
   {"compressed": "gzip"}
 ]
 ```
 
 #### Metrics or Traces Type
 
 name | Ruby type | msgpack format | content
 --- | --- | --- | ---
 tag | String | str | tag name
- entries | Msgpack stream for observabilities | bin | gzipped msgpack stream of Entry
+ entries | Msgpack stream for observabilities | bin | compressed msgpack stream of Entry
 option | Hash | map | option including key "compressed" and "fluent\_signal" (required)
 
 ```json
 [
   "tag.name",
   "<<Compressed payloads of observabilities>>",
   {"compressed": "gzip", "fluent_signal": 1|2} # 1 for metrics and 2 for traces.
 ]
 ```

@daipom
Copy link
Contributor Author

daipom commented Jan 9, 2025

Based on the above, I re-propose the following change as v1.6.

CompressedPackedForward Mode

https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#compressedpackedforward-mode

 ### CompressedPackedForward Mode
 
-It carries a series of events as a msgpack binary, compressed by gzip, on a single request. The supported compression algorithm is only gzip.
+It carries a series of events as a msgpack binary, compressed by gzip or zstd, on a single request.
 
-- `entries` is a gzipped binary chunk of `MessagePackEventStream`, which MAY be a concatenated binary of multiple gzip binary strings.
-- Client MUST send an option with `compressed` key with the value `gzip`.
-- Client MUST send a gzipped chunk as msgpack `bin` format.
+- `entries` is a gzip/zstd binary chunk of `MessagePackEventStream`, which MAY be a concatenated binary of multiple gzip/zstd binary strings.
+- Client MUST send an option with `compressed` key with the value `gzip` or `zstd`.
+- Client MUST send a gzip/zstd chunk as msgpack `bin` format.
 - Server MUST accept `bin` format.
+- Server MUST decompress `entries` in the format according to the value of `compressed` key of the option for each msgpack binary.
 - Server MAY decompress and decode individual events on demand but MAY NOT do right after request arrival. It means it MAY costs less, compared to `Forward` mode, when decoding is not needed by any plugins.

Logs Type

https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#logs-type-3

 #### Logs Type
 
 name | Ruby type | msgpack format | content
 --- | --- | --- | ---
 tag | String | str | tag name
- entries | CompressedMessagePackEventStream | bin | gzipped msgpack stream of Entry
+ entries | CompressedMessagePackEventStream | bin | compressed msgpack stream of Entry
 option | Hash | map | option including key "compressed" (required)

Metrics or Traces Type

https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#metrics-or-traces-type-3

 #### Metrics or Traces Type
 
 name | Ruby type | msgpack format | content
 --- | --- | --- | ---
 tag | String | str | tag name
- entries | Msgpack stream for observabilities | bin | gzipped msgpack stream of Entry
+ entries | Msgpack stream for observabilities | bin | compressed msgpack stream of Entry
 option | Hash | map | option including key "compressed" and "fluent\_signal" (required)

Option

https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#option

 - Server MAY just ignore any options given.
 - `size`: Clients MAY send the `size` option to show the number of event records in an entries by an integer as a value. Server can know the number of events without unpacking entries (especially for PackedForward and CompressedPackedForward mode).
 - `chunk`: Clients MAY send the `chunk` option to confirm the server receives event records. The value is a string of Base64 representation of 128 bits `unique_id` which is an ID of a set of events.
-- `compressed`: Clients MUST send the `compressed` option with value `gzip` to tell servers that entries is `CompressedPackedForward`. Other values will be ignored.
+- `compressed`: Clients MUST send the `compressed` option with value `gzip` or `zstd` to tell servers that entries is `CompressedPackedForward`. Other values will be ignored.
 
 ```json
 {"chunk": "p8n9gmxTQVC8/nh2wlKKeQ==", "size": 4097}

@cosmo0920
Copy link
Contributor

I propose the following:

-- Server MUST decompress `entries` in the format according to the value of `compressed` key of the option for each msgpack binary.
+- Server MUST decompress `entries` in the format according to the value of `compressed` key of the option which contains `gzip` value for each gzip compressed msgpack binary.
+- Server MAY decompress `entries` in the format according to the value og `compressed` key of the option which contains `zstd` value for each zstd compressed msgpack binary.
+- Server MUST decompress `entries` that are gzip or ztsd compressed formats if a server supports both of decompression formats.

This is because zstd support status on Fluent Bit is still in PoC. So, we need to provide an option to support zstd compression and decompression in forward protocol.

@daipom
Copy link
Contributor Author

daipom commented Jan 9, 2025

Hmm, but wouldn't that be an incomplete protocol?
If the server does not decompress zstd, what happens to the zstd compressed data sent by the client?

This is because zstd support status on Fluent Bit is still in PoC. So, we need to provide an option to support zstd compression and decompression in forward protocol.

Isn't it simply a matter of not being able to use zstd compression until both the server and client support v1.6 protocol?
It seems to me that this new protocol version does not have to be immediately compliant with all applications.

If we find any problems in supporting this protocol in the future, we can revise it again at that time.

@cosmo0920
Copy link
Contributor

cosmo0920 commented Jan 9, 2025

Hmm, but wouldn't that be an incomplete protocol? If the server does not decompress zstd, what happens to the zstd compressed data sent by the client?

This is because zstd support status on Fluent Bit is still in PoC. So, we need to provide an option to support zstd compression and decompression in forward protocol.

Isn't it simply a matter of not being able to use zstd compression until both the server and client support v1.6 protocol? It seems to me that this new protocol version does not have to be immediately compliant with all applications.

How about extending HELO (and PING/PONG) in forward protocol v1.6?
The current version of HELO just sends an option which can be easily to extend to indicate that a fluent server really support gzip or zstd compression method. What do you think of this way to implement how to represent/tell the supported compression methods?

@cosmo0920
Copy link
Contributor

cosmo0920 commented Jan 9, 2025

I mean, there is a possibility to support zstd to be delayed in Fluent Bit. So, we need to tell clients which want to send their payloads which is using forward protocol. Plus, there is a possibility to rolling out client at first and aggregator in some of the users' deployment.
So, in this case, the newly deployed clients are only supporting the new way of decompression and the still running aggregators are not supporting the new decompression methods.
Thus, I wanted to suggest that when a fluent server responds to show ztsd decompression is not supported yet, the client must not use zstd compression.

  1. Server shows that this server accepts gzip (and zstd) decompression
  2. Client just show warnings in their logs when users specify to use zstd compression in out_forward.
  3. Under that case, forward server and client just fall back to use gzip compression instead of zstd compression.

@daipom
Copy link
Contributor Author

daipom commented Jan 10, 2025

Thus, I wanted to suggest that when a fluent server responds to show ztsd decompression is not supported yet, the client must not use zstd compression.

I see!
Let's consider a draft in that direction.

@Athishpranav2003
Copy link
Contributor

@daipom so essentially do we do a handshake to ensure client supports zstd and allow server to send zstd chunks only when this handshake is successful?

@cosmo0920
Copy link
Contributor

cosmo0920 commented Jan 20, 2025

Yup. HELO is able to contains options. Currently, it contains nonce, auth, and keepalive. So, we need to add compression option there like: compression: ["gzip", "zstd"]. If omitting compression, fluent client is not to use zstd compression method.

ref: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5#helo

@daipom
Copy link
Contributor Author

daipom commented Jan 21, 2025

@Athishpranav2003
Sorry, I haven't worked on this lately.

The protocol would be as @cosmo0920 says.

We need to implement it with in_forward and out_forward.

Let's do that with a different PR than #4657.
I will merge #4657!

@daipom
Copy link
Contributor Author

daipom commented Jan 21, 2025

Let's do that with a different PR than #4657.

connect(nil, ack: ack) do |sock, ri|
ensure_established_connection(sock, ri)
send_data_actual(sock, tag, chunk)
end

The options of HELO would be added into ri here.
So, for example, we can decompress the chunk if the server side does not support the compression format of the chunk here.
(For efficiency, one possible approach would be to select an appropriate compression format for each server in advance, but for now, there is no need to go that far, I think.)

@Athishpranav2003
Copy link
Contributor

@daipom so essentially we assume that the client to be forwarded is only known at runtime so we perform this check in the establish connection part and if the check fails we decrypt the info and send it instead of giving an error ryt?(i presume we through a warning log atleast)

@daipom
Copy link
Contributor Author

daipom commented Jan 21, 2025

@Athishpranav2003
Yes!

instead of giving an error ryt?(i presume we through a warning log at least)

It would be preferable to fallback to plain format rather than to treat it as an error.
It would be a good idea to generate a warning log at this time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Feature request or improve operations
Projects
Status: Work-In-Progress
Development

No branches or pull requests

4 participants