From f5011574fc6d99d57f2e391553f2158d74e29afd Mon Sep 17 00:00:00 2001 From: Igor Motov <igor@motovs.org> Date: Wed, 19 Apr 2023 07:10:54 -1000 Subject: [PATCH] Fix handling of commit parameters in rest client (#3197) Fixes handling of CommitType::Force and CommitType::WaitFor in the rest client. Fixes #3196 --- quickwit/quickwit-ingest/src/lib.rs | 6 +- .../quickwit-rest-client/src/rest_client.rs | 62 ++++++++++++++++++- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-ingest/src/lib.rs b/quickwit/quickwit-ingest/src/lib.rs index 3cbc5a8845b..b48f55d1cd9 100644 --- a/quickwit/quickwit-ingest/src/lib.rs +++ b/quickwit/quickwit-ingest/src/lib.rs @@ -137,11 +137,11 @@ impl From<u32> for CommitType { } impl CommitType { - pub fn to_query_parameter(&self) -> Option<&'static str> { + pub fn to_query_parameter(&self) -> Option<&'static [(&'static str, &'static str)]> { match self { CommitType::Auto => None, - CommitType::WaitFor => Some("commit=wait_for"), - CommitType::Force => Some("commit=force"), + CommitType::WaitFor => Some(&[("commit", "wait_for")]), + CommitType::Force => Some(&[("commit", "force")]), } } } diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index 064b5352e7b..689ada37e1a 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -445,7 +445,9 @@ mod test { use serde_json::json; use tokio::fs::File; use tokio::io::AsyncReadExt; - use wiremock::matchers::{body_bytes, body_json, header, method, path, query_param}; + use wiremock::matchers::{ + body_bytes, body_json, header, method, path, query_param, query_param_is_missing, + }; use wiremock::{Mock, MockServer, ResponseTemplate}; use super::{QuickwitClient, Transport}; @@ -529,6 +531,7 @@ mod test { .unwrap(); Mock::given(method("POST")) .and(path("/api/v1/my-index/ingest")) + .and(query_param_is_missing("commit")) .and(body_bytes(buffer.clone())) .respond_with(ResponseTemplate::new(StatusCode::TOO_MANY_REQUESTS)) .up_to_n_times(2) @@ -537,6 +540,7 @@ mod test { .await; Mock::given(method("POST")) .and(path("/api/v1/my-index/ingest")) + .and(query_param_is_missing("commit")) .and(body_bytes(buffer)) .respond_with(ResponseTemplate::new(StatusCode::OK)) .up_to_n_times(1) @@ -549,6 +553,62 @@ mod test { .unwrap(); } + #[tokio::test] + async fn test_ingest_endpoint_with_force_commit() { + let mock_server = MockServer::start().await; + let server_url = Url::parse(&mock_server.uri()).unwrap(); + let qw_client = QuickwitClient::new(Transport::new(server_url)); + let ndjson_filepath = get_ndjson_filepath("documents_to_ingest.json"); + let mut buffer = Vec::new(); + File::open(&ndjson_filepath) + .await + .unwrap() + .read_to_end(&mut buffer) + .await + .unwrap(); + Mock::given(method("POST")) + .and(path("/api/v1/my-index/ingest")) + .and(query_param("commit", "force")) + .and(body_bytes(buffer)) + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .up_to_n_times(1) + .mount(&mock_server) + .await; + let ingest_source = IngestSource::File(PathBuf::from_str(&ndjson_filepath).unwrap()); + qw_client + .ingest("my-index", ingest_source, None, CommitType::Force) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_ingest_endpoint_with_wait_for_commit() { + let mock_server = MockServer::start().await; + let server_url = Url::parse(&mock_server.uri()).unwrap(); + let qw_client = QuickwitClient::new(Transport::new(server_url)); + let ndjson_filepath = get_ndjson_filepath("documents_to_ingest.json"); + let mut buffer = Vec::new(); + File::open(&ndjson_filepath) + .await + .unwrap() + .read_to_end(&mut buffer) + .await + .unwrap(); + Mock::given(method("POST")) + .and(path("/api/v1/my-index/ingest")) + .and(query_param("commit", "wait_for")) + .and(body_bytes(buffer)) + .respond_with(ResponseTemplate::new(StatusCode::OK)) + .up_to_n_times(1) + .mount(&mock_server) + .await; + let ingest_source = IngestSource::File(PathBuf::from_str(&ndjson_filepath).unwrap()); + qw_client + .ingest("my-index", ingest_source, None, CommitType::WaitFor) + .await + .unwrap(); + } + #[tokio::test] async fn test_ingest_endpoint_should_return_api_error() { let mock_server = MockServer::start().await;