From 2446ce99051ee8bc26fb5ec086bbab8330834a1e Mon Sep 17 00:00:00 2001 From: Predrag Gruevski <obi1kenobi82@gmail.com> Date: Thu, 9 Jan 2025 18:22:12 +0000 Subject: [PATCH 1/2] Implement zstd compression for multipart payloads on the Rust side. I've tested this with both the Python and Rust changes applied, and I've successfully sent zstd-compressed traces to LangSmith servers. This introduces breaking changes in the APIs for both `langsmith-tracing-client` and `langsmith-pyo3`, so I'm bumping the left-most non-zero version number in each of their manifests. In Rust, the left-most non-zero number is considered a "major" version -- in other words, leading zeroes are ignored for SemVer purposes. This will require us to publish a new `langsmith-pyo3` Python package version with the new changes. I'd like to trigger the publish workflow after this is merged, so I can make the corresponding Python changes to enable zstd end-to-end. Lmk if you have concerns or want me to hold off on publishing a new version. --- rust/Cargo.lock | 51 +++++++++++++- rust/Cargo.toml | 2 + rust/crates/langsmith-pyo3/Cargo.toml | 2 +- .../src/blocking_tracing_client.rs | 2 + .../langsmith-tracing-client/Cargo.toml | 4 +- .../src/client/blocking/processor.rs | 68 ++++++++++++++++++- .../src/client/blocking/tracing_client.rs | 1 + 7 files changed, 125 insertions(+), 5 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 9f86ce696..251c40703 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -153,6 +153,8 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -907,6 +909,15 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db69f08d4fb10524cacdb074c10b296299d71274ddbc830a8ee65666867002e9" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.74" @@ -919,7 +930,7 @@ dependencies = [ [[package]] name = "langsmith-pyo3" -version = "0.1.0-rc5" +version = "0.2.0-rc1" dependencies = [ "langsmith-tracing-client", "orjson", @@ -931,12 +942,13 @@ dependencies = [ [[package]] name = "langsmith-tracing-client" -version = "0.1.0" +version = "0.2.0" dependencies = [ "chrono", "criterion", "flate2", "futures", + "http", "mockito", "multer", "rayon", @@ -949,6 +961,7 @@ dependencies = [ "tokio-util", "ureq", "uuid", + "zstd", ] [[package]] @@ -1180,6 +1193,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + [[package]] name = "plotters" version = "0.3.7" @@ -2444,3 +2463,31 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zstd" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 546a08a34..64fbe0122 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -11,6 +11,7 @@ resolver = "2" chrono = "0.4.38" flate2 = "1.0.34" futures = "0.3.31" +http = "1.2.0" rayon = "1.10.0" serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" @@ -20,6 +21,7 @@ tokio = { version = "1", features = ["full"] } tokio-util = "0.7.12" ureq = "2.10.1" uuid = { version = "1.11.0", features = ["v4"] } +zstd = { version = "0.13.2", features = ["zstdmt"] } # Use rustls instead of OpenSSL, because OpenSSL is a nightmare when compiling across platforms. # OpenSSL is a default feature, so we have to disable all default features, then re-add diff --git a/rust/crates/langsmith-pyo3/Cargo.toml b/rust/crates/langsmith-pyo3/Cargo.toml index 7ebf2e089..d06812adf 100644 --- a/rust/crates/langsmith-pyo3/Cargo.toml +++ b/rust/crates/langsmith-pyo3/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "langsmith-pyo3" -version = "0.1.0-rc5" +version = "0.2.0-rc1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs index 6b3816f15..0c5bf5a3b 100644 --- a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs +++ b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs @@ -27,6 +27,7 @@ impl BlockingTracingClient { batch_size: usize, batch_timeout_millis: u64, worker_threads: usize, + compression_level: i32, ) -> PyResult<Self> { let config = langsmith_tracing_client::client::blocking::ClientConfig { endpoint, @@ -39,6 +40,7 @@ impl BlockingTracingClient { headers: None, // TODO: support custom headers num_worker_threads: worker_threads, + compression_level, }; let client = RustTracingClient::new(config) diff --git a/rust/crates/langsmith-tracing-client/Cargo.toml b/rust/crates/langsmith-tracing-client/Cargo.toml index 725948d79..2789c1da5 100644 --- a/rust/crates/langsmith-tracing-client/Cargo.toml +++ b/rust/crates/langsmith-tracing-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "langsmith-tracing-client" -version = "0.1.0" +version = "0.2.0" edition = "2021" [dependencies] @@ -17,6 +17,8 @@ futures = { workspace = true } rayon = { workspace = true } ureq = { workspace = true } flate2 = { workspace = true } +zstd = { workspace = true } +http = { workspace = true } [dev-dependencies] multer = "3.1.0" diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs index 495c546d9..1ff71cd44 100644 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs +++ b/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; +use std::io::Write; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{mpsc, Arc, Mutex}; +use std::thread::available_parallelism; use std::time::{Duration, Instant}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; @@ -266,14 +268,18 @@ impl RunProcessor { for (part_name, part) in json_parts.into_iter().chain(attachment_parts) { form = form.part(part_name, part); } + let content_type = format!("multipart/form-data; boundary={}", form.boundary()); + let compressed_data = compress_multipart_body(form, self.config.compression_level)?; // send the multipart POST request let start_send_batch = Instant::now(); let response = self .http_client .post(format!("{}/runs/multipart", self.config.endpoint)) - .multipart(form) .headers(self.config.headers.as_ref().cloned().unwrap_or_default()) + .header(http::header::CONTENT_TYPE, content_type) + .header(http::header::CONTENT_ENCODING, "zstd") + .body(compressed_data) .send()?; // println!("Sending batch took {:?}", start_send_batch.elapsed()); @@ -315,3 +321,63 @@ impl RunProcessor { Ok(part) } } + +fn compress_multipart_body( + form: Form, + compression_level: i32, +) -> Result<Vec<u8>, TracingClientError> { + // We want to use as many threads as available cores to compress data. + // However, we have to be mindful of special values in the zstd library: + // - A setting of `0` here means "use the current thread only." + // - A setting of `1` means "use a separate thread, but only one." + // + // `1` isn't a useful setting for us, so turn `1` into `0` while + // keeping higher numbers the same. + let n_workers = match available_parallelism() { + Ok(num) => { + if num.get() == 1 { + 0 + } else { + num.get() as u32 + } + } + Err(_) => { + // We failed to query the available number of cores. + // Use only the current single thread, to be safe. + 0 + } + }; + + // `reqwest` doesn't have a public method for getting *just* the multipart form data: + // the `Form::reader()` method isn't public. Instead, we pretend to be preparing a request, + // place the multipart data, and ask for the body to be buffered and made available as `&[u8]`. + // *This does not send the request!* We merely prepare a request in memory as a workaround. + // + // Just in case, we use `example.com` as the URL here, which is explicitly reserved for + // example use in the standards and is guaranteed to not be taken. This means that + // under no circumstances will we send multipart data to any other place, + // even if `reqwest` were to suddenly change the API to send data on `.build()`. + let builder = reqwest::blocking::Client::new().get("http://example.com/").multipart(form); + let mut request = builder.build().expect("failed to construct request"); + let multipart_form_bytes = request + .body_mut() + .as_mut() + .expect("multipart request had no body, this should never happen") + .buffer() + .map_err(|e| TracingClientError::IoError(e.to_string()))?; + + let mut buffer = Vec::with_capacity(multipart_form_bytes.len()); + let mut encoder = zstd::Encoder::new(&mut buffer, compression_level) + .and_then(|mut encoder| { + encoder.multithread(n_workers)?; + Ok(encoder) + }) + .map_err(|e| TracingClientError::IoError(e.to_string()))?; + + encoder + .write_all(multipart_form_bytes) + .map_err(|e| TracingClientError::IoError(e.to_string()))?; + encoder.finish().map_err(|e| TracingClientError::IoError(e.to_string()))?; + + Ok(buffer) +} diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs index 4861ffe3d..ca875ebd1 100644 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs +++ b/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs @@ -19,6 +19,7 @@ pub struct ClientConfig { pub batch_timeout: Duration, pub headers: Option<HeaderMap>, pub num_worker_threads: usize, + pub compression_level: i32, } pub struct TracingClient { From b5ef3bcaae04d5a4334c8d404e44b7fa0ba2564a Mon Sep 17 00:00:00 2001 From: Predrag Gruevski <obi1kenobi82@gmail.com> Date: Thu, 9 Jan 2025 19:09:43 +0000 Subject: [PATCH 2/2] Update benchmark config. --- .../langsmith-tracing-client/benches/tracing_client_benchmark.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/crates/langsmith-tracing-client/benches/tracing_client_benchmark.rs b/rust/crates/langsmith-tracing-client/benches/tracing_client_benchmark.rs index 4cc2cc421..29cdd957c 100644 --- a/rust/crates/langsmith-tracing-client/benches/tracing_client_benchmark.rs +++ b/rust/crates/langsmith-tracing-client/benches/tracing_client_benchmark.rs @@ -30,6 +30,7 @@ fn create_mock_client_config_sync(server_url: &str, batch_size: usize) -> Blocki batch_timeout: Duration::from_secs(1), headers: Default::default(), num_worker_threads: 1, + compression_level: 1, } }