From dfb5d93669cb5495ff89078b765031b76edd186a Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Wed, 12 Jun 2024 11:42:11 -0700 Subject: [PATCH] doc(rs): some docs for the ClickHouse batch module (#6020) * doc(rs): some docs for the ClickHouse batch module * fix variable rename --- rust_snuba/src/strategies/clickhouse/batch.rs | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/rust_snuba/src/strategies/clickhouse/batch.rs b/rust_snuba/src/strategies/clickhouse/batch.rs index 86bda9b07c..29275af6e5 100644 --- a/rust_snuba/src/strategies/clickhouse/batch.rs +++ b/rust_snuba/src/strategies/clickhouse/batch.rs @@ -12,18 +12,27 @@ use tokio_stream::wrappers::ReceiverStream; use crate::runtime_config::get_str_config; use crate::types::RowData; -const CLICKHOUSE_HTTP_CHUNK_SIZE: usize = 1_000_000; +const CLICKHOUSE_HTTP_CHUNK_SIZE_BYTES: usize = 1_000_000; const CHANNEL_CAPACITY: usize = 8_192; pub struct BatchFactory { + /// HTTP client for sending requests client: Client, + /// URL for where to send requests url: String, + /// Prepended query body query: String, + /// Handle for thread pool to spawn new HTTP batch listeners handle: Handle, } #[allow(clippy::too_many_arguments)] impl BatchFactory { + /// + /// A BatchFactory stores a set of connection parameters on creation and concurrency configuration. + /// It provides a method (`new_batch`) which returns an orchestrator for physical + /// writes to ClickHouse. + /// pub fn new( hostname: &str, http_port: u16, @@ -76,6 +85,19 @@ impl BatchFactory { } } + /// + /// new_batch creates a `HttpBatch` with its factory's parameters. + /// + /// The caller writes to ClickHouse by sending data through the `sender` field in + /// the returned HttpBatch, then calling `result_handle.take()` to force the write to occur. If + /// the ConcurrencyConfig on the `BatchFactory` allows, then multiple writer threads may be spawned + /// to handle inserts. + /// + /// This is generally wrapped by `HttpBatch.write_rows()` to add data to an internal buffer, + /// followed by `HttpBatch.finish()` to push that internal buffer data across a channel to + /// a (hopefully free) receiver thread which will initiate a HTTP request to ClickHouse based + /// on the connection paraemters supplied in `BatchFactory::new()`. + /// pub fn new_batch(&self) -> HttpBatch { let (sender, receiver) = channel(CHANNEL_CAPACITY); @@ -121,6 +143,11 @@ impl BatchFactory { } } +/// +/// `HttpBatch` encapsulates the state of a single buffer for ClickHouse writes, +/// as well as handles for flushing data to a reader thread and a handle for initiating +/// a POST (insert) to ClickHouse +/// pub struct HttpBatch { current_chunk: Vec, num_rows: usize, @@ -138,8 +165,13 @@ impl HttpBatch { self.num_bytes } + /// + /// write_rows writes rows to an internal buffer, up until hitting the point of + /// CLICKHOUSE_HTTP_CHUNK_SIZE_BYTES bytes. When it hits that threshold, it will flush + /// the existing buffer to the channel in `sender`. + /// pub fn write_rows(&mut self, data: &RowData) -> anyhow::Result<()> { - if self.current_chunk.len() > CLICKHOUSE_HTTP_CHUNK_SIZE { + if self.current_chunk.len() > CLICKHOUSE_HTTP_CHUNK_SIZE_BYTES { self.flush_chunk()?; } @@ -167,6 +199,10 @@ impl HttpBatch { Ok(()) } + /// + /// finish flushes the existing in-memory buffer and then forces an attempt to write to + /// ClickHouse on the thread created in `BatchFactory::new` + /// pub async fn finish(mut self) -> Result { self.flush_chunk()?; // finish stream