From 7ec03f9cd2afec9df8b5be405851ff860d4b32bd Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 9 Dec 2024 10:15:16 +0100 Subject: [PATCH] Use content_length_limit for ES bulk limit (#5573) --- .../quickwit-serve/src/elasticsearch_api/bulk.rs | 7 +++++-- .../src/elasticsearch_api/bulk_v2.rs | 16 +++++++++------- .../src/elasticsearch_api/filter.rs | 7 ++++--- .../quickwit-serve/src/elasticsearch_api/mod.rs | 8 +++++++- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs index c6723d8521e..eaffcda87a3 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::time::Instant; +use bytesize::ByteSize; use hyper::StatusCode; use quickwit_config::{disable_ingest_v1, enable_ingest_v2}; use quickwit_ingest::{ @@ -42,8 +43,9 @@ use crate::{with_arg, Body}; pub fn es_compat_bulk_handler( ingest_service: IngestServiceClient, ingest_router: IngestRouterServiceClient, + content_length_limit: ByteSize, ) -> impl Filter + Clone { - elastic_bulk_filter() + elastic_bulk_filter(content_length_limit) .and(with_arg(ingest_service)) .and(with_arg(ingest_router)) .then(|body, bulk_options, ingest_service, ingest_router| { @@ -58,8 +60,9 @@ pub fn es_compat_bulk_handler( pub fn es_compat_index_bulk_handler( ingest_service: IngestServiceClient, ingest_router: IngestRouterServiceClient, + content_length_limit: ByteSize, ) -> impl Filter + Clone { - elastic_index_bulk_filter() + elastic_index_bulk_filter(content_length_limit) .and(with_arg(ingest_service)) .and(with_arg(ingest_router)) .then( diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs index 3a142b82b2e..8fb114838d8 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs @@ -351,6 +351,7 @@ fn remove_doc_handles( #[cfg(test)] mod tests { + use bytesize::ByteSize; use quickwit_proto::ingest::router::{ IngestFailure, IngestFailureReason, IngestResponseV2, IngestSuccess, MockIngestRouterService, @@ -399,8 +400,9 @@ mod tests { fn es_compat_bulk_handler_v2( ingest_router: IngestRouterServiceClient, + content_length_limit: ByteSize, ) -> impl Filter + Clone { - elastic_bulk_filter() + elastic_bulk_filter(content_length_limit) .and(with_arg(ingest_router)) .then(|body, bulk_options, ingest_router| { elastic_bulk_ingest_v2(None, body, bulk_options, ingest_router) @@ -459,7 +461,7 @@ mod tests { }) }); let ingest_router = IngestRouterServiceClient::from_mock(mock_ingest_router); - let handler = es_compat_bulk_handler_v2(ingest_router); + let handler = es_compat_bulk_handler_v2(ingest_router, ByteSize::mb(10)); let payload = r#" {"create": {"_index": "my-index-1", "_id" : "1"}} @@ -511,7 +513,7 @@ mod tests { #[tokio::test] async fn test_bulk_api_accepts_empty_requests() { let ingest_router = IngestRouterServiceClient::mocked(); - let handler = es_compat_bulk_handler_v2(ingest_router); + let handler = es_compat_bulk_handler_v2(ingest_router, ByteSize::mb(10)); let response = warp::test::request() .path("/_elastic/_bulk") @@ -556,7 +558,7 @@ mod tests { }) }); let ingest_router = IngestRouterServiceClient::from_mock(mock_ingest_router); - let handler = es_compat_bulk_handler_v2(ingest_router); + let handler = es_compat_bulk_handler_v2(ingest_router, ByteSize::mb(10)); let payload = r#" @@ -579,7 +581,7 @@ mod tests { #[tokio::test] async fn test_bulk_api_handles_malformed_requests() { let ingest_router = IngestRouterServiceClient::mocked(); - let handler = es_compat_bulk_handler_v2(ingest_router); + let handler = es_compat_bulk_handler_v2(ingest_router, ByteSize::mb(10)); let payload = r#" {"create": {"_index": "my-index-1", "_id" : "1"},} @@ -680,7 +682,7 @@ mod tests { }) }); let ingest_router = IngestRouterServiceClient::from_mock(mock_ingest_router); - let handler = es_compat_bulk_handler_v2(ingest_router); + let handler = es_compat_bulk_handler_v2(ingest_router, ByteSize::mb(10)); let payload = r#" {"index": {"_index": "my-index-1", "_id" : "1"}} @@ -822,7 +824,7 @@ mod tests { }) }); let ingest_router = IngestRouterServiceClient::from_mock(mock_ingest_router); - let handler = es_compat_bulk_handler_v2(ingest_router); + let handler = es_compat_bulk_handler_v2(ingest_router, ByteSize::mb(10)); let payload = r#" {"create": {"_index": "my-index-1", "_id" : "1"}} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index 968046ebcaf..bfffe3c9ff0 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -35,7 +35,6 @@ use crate::search_api::{extract_index_id_patterns, extract_index_id_patterns_def use crate::Body; const BODY_LENGTH_LIMIT: ByteSize = ByteSize::mib(1); -const CONTENT_LENGTH_LIMIT: ByteSize = ByteSize::mib(10); // TODO: Make all elastic endpoint models `utoipa` compatible // and register them here. @@ -72,11 +71,12 @@ pub(crate) fn elasticsearch_filter( ) )] pub(crate) fn elastic_bulk_filter( + content_length_limit: ByteSize, ) -> impl Filter + Clone { warp::path!("_elastic" / "_bulk") .and(warp::post().or(warp::put()).unify()) .and(warp::body::content_length_limit( - CONTENT_LENGTH_LIMIT.as_u64(), + content_length_limit.as_u64(), )) .and(get_body_bytes()) .and(serde_qs::warp::query(serde_qs::Config::default())) @@ -95,11 +95,12 @@ pub(crate) fn elastic_bulk_filter( ) )] pub(crate) fn elastic_index_bulk_filter( + content_length_limit: ByteSize, ) -> impl Filter + Clone { warp::path!("_elastic" / String / "_bulk") .and(warp::post().or(warp::put()).unify()) .and(warp::body::content_length_limit( - CONTENT_LENGTH_LIMIT.as_u64(), + content_length_limit.as_u64(), )) .and(get_body_bytes()) .and(serde_qs::warp::query::( diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs index 479e48687f4..77b686db537 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs @@ -61,14 +61,20 @@ pub fn elastic_api_handlers( metastore: MetastoreServiceClient, index_service: IndexService, ) -> impl Filter + Clone { + let ingest_content_length_limit = node_config.ingest_api_config.content_length_limit; es_compat_cluster_info_handler(node_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) .or(es_compat_bulk_handler( ingest_service.clone(), ingest_router.clone(), + ingest_content_length_limit, )) .boxed() - .or(es_compat_index_bulk_handler(ingest_service, ingest_router)) + .or(es_compat_index_bulk_handler( + ingest_service, + ingest_router, + ingest_content_length_limit, + )) .or(es_compat_index_search_handler(search_service.clone())) .or(es_compat_index_count_handler(search_service.clone())) .or(es_compat_scroll_handler(search_service.clone()))