Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement zstd compression for multipart payloads on the Rust side. #1401

Closed
wants to merge 2 commits into from

Conversation

obi1kenobi
Copy link
Collaborator

I've tested this with both the Python and Rust changes applied, and I've successfully sent zstd-compressed traces to LangSmith servers.

This introduces breaking changes in the APIs for both langsmith-tracing-client and langsmith-pyo3, so I'm bumping the left-most non-zero version number in each of their manifests. In Rust, the left-most non-zero number is considered a "major" version -- in other words, leading zeroes are ignored for SemVer purposes.

This will require us to publish a new langsmith-pyo3 Python package version with the new changes. I'd like to trigger the publish workflow after this is merged, so I can make the corresponding Python changes to enable zstd end-to-end. Lmk if you have concerns or want me to hold off on publishing a new version.

obi1kenobi added a commit that referenced this pull request Jan 9, 2025
Requires #1401 to merge first, and will need a re-lock of poetry dependencies when that happens.

Tested end-to-end by sending zstd-compressed traces to LangSmith servers.
I've tested this with both the Python and Rust changes applied, and I've successfully sent zstd-compressed traces to LangSmith servers.

This introduces breaking changes in the APIs for both `langsmith-tracing-client` and `langsmith-pyo3`, so I'm bumping the left-most non-zero version number in each of their manifests. In Rust, the left-most non-zero number is considered a "major" version -- in other words, leading zeroes are ignored for SemVer purposes.

This will require us to publish a new `langsmith-pyo3` Python package version with the new changes. I'd like to trigger the publish workflow after this is merged, so I can make the corresponding Python changes to enable zstd end-to-end. Lmk if you have concerns or want me to hold off on publishing a new version.
@obi1kenobi obi1kenobi force-pushed the pg/zstd-compression-in-tracing branch from c007aed to 2446ce9 Compare January 9, 2025 19:04
@obi1kenobi
Copy link
Collaborator Author

Something to consider in the code review: from the server's perspective, this approach is equivalent to the current pure-Python zstd implementation, but has an internal caveat that may prove to be a problem down the line.

Due to how the reqwest APIs work, it's forced to fully materialize the entire multipart payload in memory before compressing it. This can lead to unnecessarily high memory use.

In an ideal world, we'd do the compression in a streaming fashion. This would either require some changes to reqwest APIs, or would require us to DIY the logic for how multipart requests are constructed instead of using reqwest here. We can do that in subsequent PRs if it proves important.

Comment on lines +351 to +380
// `reqwest` doesn't have a public method for getting *just* the multipart form data:
// the `Form::reader()` method isn't public. Instead, we pretend to be preparing a request,
// place the multipart data, and ask for the body to be buffered and made available as `&[u8]`.
// *This does not send the request!* We merely prepare a request in memory as a workaround.
//
// Just in case, we use `example.com` as the URL here, which is explicitly reserved for
// example use in the standards and is guaranteed to not be taken. This means that
// under no circumstances will we send multipart data to any other place,
// even if `reqwest` were to suddenly change the API to send data on `.build()`.
let builder = reqwest::blocking::Client::new().get("http://example.com/").multipart(form);
let mut request = builder.build().expect("failed to construct request");
let multipart_form_bytes = request
.body_mut()
.as_mut()
.expect("multipart request had no body, this should never happen")
.buffer()
.map_err(|e| TracingClientError::IoError(e.to_string()))?;

let mut buffer = Vec::with_capacity(multipart_form_bytes.len());
let mut encoder = zstd::Encoder::new(&mut buffer, compression_level)
.and_then(|mut encoder| {
encoder.multithread(n_workers)?;
Ok(encoder)
})
.map_err(|e| TracingClientError::IoError(e.to_string()))?;

encoder
.write_all(multipart_form_bytes)
.map_err(|e| TracingClientError::IoError(e.to_string()))?;
encoder.finish().map_err(|e| TracingClientError::IoError(e.to_string()))?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This workaround will not be required anymore once the next reqwest version is released. This code will be replaced in #1403 which makes use of new reqwest functionality to do this properly.

@obi1kenobi obi1kenobi closed this Jan 15, 2025
@obi1kenobi obi1kenobi deleted the pg/zstd-compression-in-tracing branch January 15, 2025 16:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant