Skip to content

Commit

Permalink
doc(rs): some docs for the ClickHouse batch module (#6020)
Browse files Browse the repository at this point in the history
* doc(rs): some docs for the ClickHouse batch module

* fix variable rename
  • Loading branch information
onewland authored Jun 12, 2024
1 parent 7cfd567 commit dfb5d93
Showing 1 changed file with 38 additions and 2 deletions.
40 changes: 38 additions & 2 deletions rust_snuba/src/strategies/clickhouse/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,27 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::runtime_config::get_str_config;
use crate::types::RowData;

const CLICKHOUSE_HTTP_CHUNK_SIZE: usize = 1_000_000;
const CLICKHOUSE_HTTP_CHUNK_SIZE_BYTES: usize = 1_000_000;
const CHANNEL_CAPACITY: usize = 8_192;

pub struct BatchFactory {
/// HTTP client for sending requests
client: Client,
/// URL for where to send requests
url: String,
/// Prepended query body
query: String,
/// Handle for thread pool to spawn new HTTP batch listeners
handle: Handle,
}

#[allow(clippy::too_many_arguments)]
impl BatchFactory {
///
/// A BatchFactory stores a set of connection parameters on creation and concurrency configuration.
/// It provides a method (`new_batch`) which returns an orchestrator for physical
/// writes to ClickHouse.
///
pub fn new(
hostname: &str,
http_port: u16,
Expand Down Expand Up @@ -76,6 +85,19 @@ impl BatchFactory {
}
}

///
/// new_batch creates a `HttpBatch` with its factory's parameters.
///
/// The caller writes to ClickHouse by sending data through the `sender` field in
/// the returned HttpBatch, then calling `result_handle.take()` to force the write to occur. If
/// the ConcurrencyConfig on the `BatchFactory` allows, then multiple writer threads may be spawned
/// to handle inserts.
///
/// This is generally wrapped by `HttpBatch.write_rows()` to add data to an internal buffer,
/// followed by `HttpBatch.finish()` to push that internal buffer data across a channel to
/// a (hopefully free) receiver thread which will initiate a HTTP request to ClickHouse based
/// on the connection paraemters supplied in `BatchFactory::new()`.
///
pub fn new_batch(&self) -> HttpBatch {
let (sender, receiver) = channel(CHANNEL_CAPACITY);

Expand Down Expand Up @@ -121,6 +143,11 @@ impl BatchFactory {
}
}

///
/// `HttpBatch` encapsulates the state of a single buffer for ClickHouse writes,
/// as well as handles for flushing data to a reader thread and a handle for initiating
/// a POST (insert) to ClickHouse
///
pub struct HttpBatch {
current_chunk: Vec<u8>,
num_rows: usize,
Expand All @@ -138,8 +165,13 @@ impl HttpBatch {
self.num_bytes
}

///
/// write_rows writes rows to an internal buffer, up until hitting the point of
/// CLICKHOUSE_HTTP_CHUNK_SIZE_BYTES bytes. When it hits that threshold, it will flush
/// the existing buffer to the channel in `sender`.
///
pub fn write_rows(&mut self, data: &RowData) -> anyhow::Result<()> {
if self.current_chunk.len() > CLICKHOUSE_HTTP_CHUNK_SIZE {
if self.current_chunk.len() > CLICKHOUSE_HTTP_CHUNK_SIZE_BYTES {
self.flush_chunk()?;
}

Expand Down Expand Up @@ -167,6 +199,10 @@ impl HttpBatch {
Ok(())
}

///
/// finish flushes the existing in-memory buffer and then forces an attempt to write to
/// ClickHouse on the thread created in `BatchFactory::new`
///
pub async fn finish(mut self) -> Result<bool, anyhow::Error> {
self.flush_chunk()?;
// finish stream
Expand Down

0 comments on commit dfb5d93

Please sign in to comment.