From e4f7757a7624577f98f228c039a5ff415b48ae97 Mon Sep 17 00:00:00 2001 From: TuziBen Date: Mon, 27 Nov 2023 10:03:21 +0800 Subject: [PATCH 1/7] append compatible elastic header --- .../quickwit-config/src/node_config/mod.rs | 8 +++- .../src/node_config/serialize.rs | 11 +++++- quickwit/quickwit-config/src/qw_env_vars.rs | 3 +- .../src/elastic_search_api/bulk.rs | 11 ++++++ .../src/elastic_search_api/mod.rs | 38 +++++++++++++++---- .../src/elastic_search_api/rest_handler.rs | 22 ++++++++++- 6 files changed, 81 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index b6412ad6ba0..a880cf16c79 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -303,6 +303,7 @@ impl Default for JaegerConfig { pub struct NodeConfig { pub cluster_id: String, pub node_id: String, + pub enable_elastic_header: bool, pub enabled_services: HashSet, pub rest_listen_addr: SocketAddr, pub gossip_listen_addr: SocketAddr, @@ -375,7 +376,12 @@ impl NodeConfig { #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> Self { - serialize::node_config_for_test() + serialize::node_config_for_test(false) + } + + #[cfg(any(test, feature = "testsuite"))] + pub fn for_test_elastic_header() -> Self { + serialize::node_config_for_test(true) } } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 4f2d259222b..3cc34c0a902 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -59,6 +59,10 @@ fn default_node_id() -> ConfigValue { ConfigValue::with_default(node_id) } +fn default_enable_elastic_header() -> ConfigValue { + ConfigValue::with_default(false) +} + #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] struct List(Vec); @@ -163,6 +167,8 @@ struct NodeConfigBuilder { cluster_id: ConfigValue, #[serde(default = "default_node_id")] node_id: ConfigValue, + #[serde(default = "default_enable_elastic_header")] + enable_elastic_header: ConfigValue, #[serde(default = "default_enabled_services")] enabled_services: ConfigValue, #[serde(default = "default_listen_address")] @@ -271,6 +277,7 @@ impl NodeConfigBuilder { let node_config = NodeConfig { cluster_id: self.cluster_id.resolve(env_vars)?, node_id: self.node_id.resolve(env_vars)?, + enable_elastic_header: self.enable_elastic_header.resolve(env_vars)?, enabled_services, rest_listen_addr, gossip_listen_addr, @@ -318,6 +325,7 @@ impl Default for NodeConfigBuilder { cluster_id: default_cluster_id(), node_id: default_node_id(), enabled_services: default_enabled_services(), + enable_elastic_header: default_enable_elastic_header(), listen_address: default_listen_address(), rest_listen_port: default_rest_listen_port(), gossip_listen_port: ConfigValue::none(), @@ -339,7 +347,7 @@ impl Default for NodeConfigBuilder { } #[cfg(any(test, feature = "testsuite"))] -pub fn node_config_for_test() -> NodeConfig { +pub fn node_config_for_test(enable_es_header: bool) -> NodeConfig { let enabled_services = QuickwitService::supported_services(); let listen_address = Host::default(); @@ -371,6 +379,7 @@ pub fn node_config_for_test() -> NodeConfig { NodeConfig { cluster_id: default_cluster_id().unwrap(), node_id: default_node_id().unwrap(), + enable_elastic_header: enable_es_header, enabled_services, gossip_advertise_addr: gossip_listen_addr, grpc_advertise_addr: grpc_listen_addr, diff --git a/quickwit/quickwit-config/src/qw_env_vars.rs b/quickwit/quickwit-config/src/qw_env_vars.rs index 9a9b293507e..17ba65c5a2c 100644 --- a/quickwit/quickwit-config/src/qw_env_vars.rs +++ b/quickwit/quickwit-config/src/qw_env_vars.rs @@ -59,7 +59,8 @@ qw_env_vars!( QW_PEER_SEEDS, QW_DATA_DIR, QW_METASTORE_URI, - QW_DEFAULT_INDEX_ROOT_URI + QW_DEFAULT_INDEX_ROOT_URI, + QW_ENABLE_ELSTIC_HEADER ); #[cfg(test)] diff --git a/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs b/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs index 75f2f8e588d..742e9bee26f 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs @@ -18,12 +18,14 @@ // along with this program. If not, see . use std::collections::HashMap; +use std::sync::Arc; use bytes::Bytes; use hyper::StatusCode; use quickwit_ingest::{ CommitType, DocBatchBuilder, IngestRequest, IngestResponse, IngestService, IngestServiceClient, }; +use quickwit_config::{NodeConfig}; use warp::{Filter, Rejection}; use crate::elastic_search_api::filter::{elastic_bulk_filter, elastic_index_bulk_filter}; @@ -32,9 +34,11 @@ use crate::elastic_search_api::model::{BulkAction, ElasticIngestOptions, Elastic use crate::format::extract_format_from_qs; use crate::ingest_api::lines; use crate::with_arg; +use super::append_elastic_header; /// POST `_elastic/_bulk` pub fn es_compat_bulk_handler( + node_config: Arc, ingest_service: IngestServiceClient, ) -> impl Filter + Clone { elastic_bulk_filter() @@ -44,10 +48,14 @@ pub fn es_compat_bulk_handler( }) .and(extract_format_from_qs()) .map(make_elastic_api_response) + .map(move |response| { + append_elastic_header(node_config.enable_elastic_header, response) + }) } /// POST `_elastic//_bulk` pub fn es_compat_index_bulk_handler( + node_config: Arc, ingest_service: IngestServiceClient, ) -> impl Filter + Clone { elastic_index_bulk_filter() @@ -57,6 +65,9 @@ pub fn es_compat_index_bulk_handler( }) .and(extract_format_from_qs()) .map(make_elastic_api_response) + .map(move |response| { + append_elastic_header(node_config.enable_elastic_header, response) + }) } async fn elastic_ingest_bulk( diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index 44d77fa2ac0..9d0838600fd 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -50,13 +50,13 @@ pub fn elastic_api_handlers( search_service: Arc, ingest_service: IngestServiceClient, ) -> impl Filter + Clone { - es_compat_cluster_info_handler(node_config, BuildInfo::get()) - .or(es_compat_search_handler(search_service.clone())) - .or(es_compat_index_search_handler(search_service.clone())) - .or(es_compat_scroll_handler(search_service.clone())) - .or(es_compat_index_multi_search_handler(search_service)) - .or(es_compat_bulk_handler(ingest_service.clone())) - .or(es_compat_index_bulk_handler(ingest_service)) + es_compat_cluster_info_handler(node_config.clone(), BuildInfo::get()) + .or(es_compat_search_handler(node_config.clone(), search_service.clone())) + .or(es_compat_index_search_handler(node_config.clone(), search_service.clone())) + .or(es_compat_scroll_handler(node_config.clone(), search_service.clone())) + .or(es_compat_index_multi_search_handler(node_config.clone(), search_service)) + .or(es_compat_bulk_handler(node_config.clone(), ingest_service.clone())) + .or(es_compat_index_bulk_handler(node_config, ingest_service)) // Register newly created handlers here. } @@ -101,6 +101,15 @@ fn make_elastic_api_response( JsonApiResponse::new(&elasticsearch_result, status_code, &format) } +/// Elasticsearch clients will check the response header +/// whether the heads contain `X-Elastic-Product` and the value is `Elasticsearch` +fn append_elastic_header(enable_elastic_header: bool, response: T) -> warp::reply::WithHeader { + match enable_elastic_header { + true => { warp::reply::with_header(response, "X-Elastic-Product", "Elasticsearch") } + false => { warp::reply::with_header(response, "", "") } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -402,4 +411,19 @@ mod tests { .await; assert_eq!(resp.status(), 200); } + + #[tokio::test] + async fn test_elastic_header() { + let build_info = BuildInfo::get(); + let config = Arc::new(NodeConfig::for_test_elastic_header()); + let handler = + es_compat_cluster_info_handler(config.clone(), build_info).recover(recover_fn); + let resp = warp::test::request() + .path("/_elastic") + .method("HEAD") + .reply(&handler) + .await; + assert_eq!(resp.headers().get("x-elastic-product").unwrap(), "Elasticsearch"); + assert_eq!(resp.status(), 200); + } } diff --git a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs index 41751e3570c..c65fbfe961b 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs @@ -46,7 +46,7 @@ use super::model::{ ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams, }; -use super::{make_elastic_api_response, TrackTotalHits}; +use super::{make_elastic_api_response, append_elastic_header, TrackTotalHits}; use crate::format::BodyFormat; use crate::json_api_response::{make_json_api_response, ApiError, JsonApiResponse}; use crate::{with_arg, BuildInfo}; @@ -57,7 +57,7 @@ pub fn es_compat_cluster_info_handler( build_info: &'static BuildInfo, ) -> impl Filter + Clone { elastic_cluster_info_filter() - .and(with_arg(node_config)) + .and(with_arg(node_config.clone())) .and(with_arg(build_info)) .then( |config: Arc, build_info: &'static BuildInfo| async move { @@ -73,10 +73,14 @@ pub fn es_compat_cluster_info_handler( })) }, ) + .map(move |response| { + append_elastic_header(node_config.enable_elastic_header, response) + }) } /// GET or POST _elastic/_search pub fn es_compat_search_handler( + node_config: Arc, _search_service: Arc, ) -> impl Filter + Clone { elastic_search_filter().then(|_params: SearchQueryParams| async move { @@ -88,31 +92,42 @@ pub fn es_compat_search_handler( .to_string(), }; make_json_api_response::<(), _>(Err(api_error), BodyFormat::default()) + }).map(move |response| { + append_elastic_header(node_config.enable_elastic_header, response) }) } /// GET or POST _elastic/{index}/_search pub fn es_compat_index_search_handler( + node_config: Arc, search_service: Arc, ) -> impl Filter + Clone { elastic_index_search_filter() .and(with_arg(search_service)) .then(es_compat_index_search) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .map(move |response| { + append_elastic_header(node_config.enable_elastic_header, response) + }) } /// GET or POST _elastic/_search/scroll pub fn es_compat_scroll_handler( + node_config: Arc, search_service: Arc, ) -> impl Filter + Clone { elastic_scroll_filter() .and(with_arg(search_service)) .then(es_scroll) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .map(move |response| { + append_elastic_header(node_config.enable_elastic_header, response) + }) } /// POST _elastic/_search pub fn es_compat_index_multi_search_handler( + node_config: Arc, search_service: Arc, ) -> impl Filter + Clone { elastic_multi_search_filter() @@ -125,6 +140,9 @@ pub fn es_compat_index_multi_search_handler( }; JsonApiResponse::new(&result, status_code, &BodyFormat::default()) }) + .map(move |response| { + append_elastic_header(node_config.enable_elastic_header, response) + }) } fn build_request_for_es_api( From 96fb87c3c5b21c62249b301a7289771c6220cf4a Mon Sep 17 00:00:00 2001 From: TuziBen Date: Mon, 27 Nov 2023 10:55:26 +0800 Subject: [PATCH 2/7] code format --- .../src/elastic_search_api/bulk.rs | 12 ++---- .../src/elastic_search_api/mod.rs | 39 +++++++++++++---- .../src/elastic_search_api/rest_handler.rs | 42 ++++++++----------- 3 files changed, 51 insertions(+), 42 deletions(-) diff --git a/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs b/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs index 742e9bee26f..472937ce314 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs @@ -22,19 +22,19 @@ use std::sync::Arc; use bytes::Bytes; use hyper::StatusCode; +use quickwit_config::NodeConfig; use quickwit_ingest::{ CommitType, DocBatchBuilder, IngestRequest, IngestResponse, IngestService, IngestServiceClient, }; -use quickwit_config::{NodeConfig}; use warp::{Filter, Rejection}; +use super::append_elastic_header; use crate::elastic_search_api::filter::{elastic_bulk_filter, elastic_index_bulk_filter}; use crate::elastic_search_api::make_elastic_api_response; use crate::elastic_search_api::model::{BulkAction, ElasticIngestOptions, ElasticSearchError}; use crate::format::extract_format_from_qs; use crate::ingest_api::lines; use crate::with_arg; -use super::append_elastic_header; /// POST `_elastic/_bulk` pub fn es_compat_bulk_handler( @@ -48,9 +48,7 @@ pub fn es_compat_bulk_handler( }) .and(extract_format_from_qs()) .map(make_elastic_api_response) - .map(move |response| { - append_elastic_header(node_config.enable_elastic_header, response) - }) + .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } /// POST `_elastic//_bulk` @@ -65,9 +63,7 @@ pub fn es_compat_index_bulk_handler( }) .and(extract_format_from_qs()) .map(make_elastic_api_response) - .map(move |response| { - append_elastic_header(node_config.enable_elastic_header, response) - }) + .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } async fn elastic_ingest_bulk( diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index 9d0838600fd..954476f7173 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -51,11 +51,26 @@ pub fn elastic_api_handlers( ingest_service: IngestServiceClient, ) -> impl Filter + Clone { es_compat_cluster_info_handler(node_config.clone(), BuildInfo::get()) - .or(es_compat_search_handler(node_config.clone(), search_service.clone())) - .or(es_compat_index_search_handler(node_config.clone(), search_service.clone())) - .or(es_compat_scroll_handler(node_config.clone(), search_service.clone())) - .or(es_compat_index_multi_search_handler(node_config.clone(), search_service)) - .or(es_compat_bulk_handler(node_config.clone(), ingest_service.clone())) + .or(es_compat_search_handler( + node_config.clone(), + search_service.clone(), + )) + .or(es_compat_index_search_handler( + node_config.clone(), + search_service.clone(), + )) + .or(es_compat_scroll_handler( + node_config.clone(), + search_service.clone(), + )) + .or(es_compat_index_multi_search_handler( + node_config.clone(), + search_service, + )) + .or(es_compat_bulk_handler( + node_config.clone(), + ingest_service.clone(), + )) .or(es_compat_index_bulk_handler(node_config, ingest_service)) // Register newly created handlers here. } @@ -103,10 +118,13 @@ fn make_elastic_api_response( /// Elasticsearch clients will check the response header /// whether the heads contain `X-Elastic-Product` and the value is `Elasticsearch` -fn append_elastic_header(enable_elastic_header: bool, response: T) -> warp::reply::WithHeader { +fn append_elastic_header( + enable_elastic_header: bool, + response: T, +) -> warp::reply::WithHeader { match enable_elastic_header { - true => { warp::reply::with_header(response, "X-Elastic-Product", "Elasticsearch") } - false => { warp::reply::with_header(response, "", "") } + true => warp::reply::with_header(response, "X-Elastic-Product", "Elasticsearch"), + false => warp::reply::with_header(response, "", ""), } } @@ -423,7 +441,10 @@ mod tests { .method("HEAD") .reply(&handler) .await; - assert_eq!(resp.headers().get("x-elastic-product").unwrap(), "Elasticsearch"); + assert_eq!( + resp.headers().get("x-elastic-product").unwrap(), + "Elasticsearch" + ); assert_eq!(resp.status(), 200); } } diff --git a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs index c65fbfe961b..eae33270910 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs @@ -46,7 +46,7 @@ use super::model::{ ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams, }; -use super::{make_elastic_api_response, append_elastic_header, TrackTotalHits}; +use super::{append_elastic_header, make_elastic_api_response, TrackTotalHits}; use crate::format::BodyFormat; use crate::json_api_response::{make_json_api_response, ApiError, JsonApiResponse}; use crate::{with_arg, BuildInfo}; @@ -73,9 +73,7 @@ pub fn es_compat_cluster_info_handler( })) }, ) - .map(move |response| { - append_elastic_header(node_config.enable_elastic_header, response) - }) + .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } /// GET or POST _elastic/_search @@ -83,18 +81,18 @@ pub fn es_compat_search_handler( node_config: Arc, _search_service: Arc, ) -> impl Filter + Clone { - elastic_search_filter().then(|_params: SearchQueryParams| async move { - // TODO - let api_error = ApiError { - service_code: ServiceErrorCode::NotSupportedYet, - message: "_elastic/_search is not supported yet. Please try the index search endpoint \ - (_elastic/{index}/search)" - .to_string(), - }; - make_json_api_response::<(), _>(Err(api_error), BodyFormat::default()) - }).map(move |response| { - append_elastic_header(node_config.enable_elastic_header, response) - }) + elastic_search_filter() + .then(|_params: SearchQueryParams| async move { + // TODO + let api_error = ApiError { + service_code: ServiceErrorCode::NotSupportedYet, + message: "_elastic/_search is not supported yet. Please try the index search \ + endpoint (_elastic/{index}/search)" + .to_string(), + }; + make_json_api_response::<(), _>(Err(api_error), BodyFormat::default()) + }) + .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } /// GET or POST _elastic/{index}/_search @@ -106,9 +104,7 @@ pub fn es_compat_index_search_handler( .and(with_arg(search_service)) .then(es_compat_index_search) .map(|result| make_elastic_api_response(result, BodyFormat::default())) - .map(move |response| { - append_elastic_header(node_config.enable_elastic_header, response) - }) + .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } /// GET or POST _elastic/_search/scroll @@ -120,9 +116,7 @@ pub fn es_compat_scroll_handler( .and(with_arg(search_service)) .then(es_scroll) .map(|result| make_elastic_api_response(result, BodyFormat::default())) - .map(move |response| { - append_elastic_header(node_config.enable_elastic_header, response) - }) + .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } /// POST _elastic/_search @@ -140,9 +134,7 @@ pub fn es_compat_index_multi_search_handler( }; JsonApiResponse::new(&result, status_code, &BodyFormat::default()) }) - .map(move |response| { - append_elastic_header(node_config.enable_elastic_header, response) - }) + .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } fn build_request_for_es_api( From ce674196f10a1cc0e57bddce86f8c57a483e2fc3 Mon Sep 17 00:00:00 2001 From: fmassot Date: Tue, 28 Nov 2023 11:43:02 +0100 Subject: [PATCH 3/7] Review PR on es header --- .../quickwit-config/src/node_config/mod.rs | 9 +- .../src/node_config/serialize.rs | 17 +-- quickwit/quickwit-config/src/qw_env_vars.rs | 3 +- .../src/elastic_search_api/bulk.rs | 7 -- .../src/elastic_search_api/mod.rs | 104 +++++++++--------- .../src/elastic_search_api/rest_handler.rs | 32 ++---- 6 files changed, 69 insertions(+), 103 deletions(-) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index a880cf16c79..767afefdba7 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -303,7 +303,7 @@ impl Default for JaegerConfig { pub struct NodeConfig { pub cluster_id: String, pub node_id: String, - pub enable_elastic_header: bool, + pub add_elastic_header: bool, pub enabled_services: HashSet, pub rest_listen_addr: SocketAddr, pub gossip_listen_addr: SocketAddr, @@ -376,12 +376,7 @@ impl NodeConfig { #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> Self { - serialize::node_config_for_test(false) - } - - #[cfg(any(test, feature = "testsuite"))] - pub fn for_test_elastic_header() -> Self { - serialize::node_config_for_test(true) + serialize::node_config_for_test() } } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 3cc34c0a902..c0e03b46979 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -59,10 +59,6 @@ fn default_node_id() -> ConfigValue { ConfigValue::with_default(node_id) } -fn default_enable_elastic_header() -> ConfigValue { - ConfigValue::with_default(false) -} - #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] struct List(Vec); @@ -167,8 +163,6 @@ struct NodeConfigBuilder { cluster_id: ConfigValue, #[serde(default = "default_node_id")] node_id: ConfigValue, - #[serde(default = "default_enable_elastic_header")] - enable_elastic_header: ConfigValue, #[serde(default = "default_enabled_services")] enabled_services: ConfigValue, #[serde(default = "default_listen_address")] @@ -186,6 +180,8 @@ struct NodeConfigBuilder { metastore_uri: ConfigValue, default_index_root_uri: ConfigValue, #[serde(default)] + add_elastic_header: bool, + #[serde(default)] #[serde_as(deserialize_as = "serde_with::OneOrMany<_>")] rest_cors_allow_origins: Vec, #[serde(rename = "storage")] @@ -277,7 +273,7 @@ impl NodeConfigBuilder { let node_config = NodeConfig { cluster_id: self.cluster_id.resolve(env_vars)?, node_id: self.node_id.resolve(env_vars)?, - enable_elastic_header: self.enable_elastic_header.resolve(env_vars)?, + add_elastic_header: self.add_elastic_header, enabled_services, rest_listen_addr, gossip_listen_addr, @@ -325,7 +321,7 @@ impl Default for NodeConfigBuilder { cluster_id: default_cluster_id(), node_id: default_node_id(), enabled_services: default_enabled_services(), - enable_elastic_header: default_enable_elastic_header(), + add_elastic_header: false, listen_address: default_listen_address(), rest_listen_port: default_rest_listen_port(), gossip_listen_port: ConfigValue::none(), @@ -347,9 +343,8 @@ impl Default for NodeConfigBuilder { } #[cfg(any(test, feature = "testsuite"))] -pub fn node_config_for_test(enable_es_header: bool) -> NodeConfig { +pub fn node_config_for_test() -> NodeConfig { let enabled_services = QuickwitService::supported_services(); - let listen_address = Host::default(); let rest_listen_port = quickwit_common::net::find_available_tcp_port() .expect("The OS should almost always find an available port."); @@ -379,7 +374,7 @@ pub fn node_config_for_test(enable_es_header: bool) -> NodeConfig { NodeConfig { cluster_id: default_cluster_id().unwrap(), node_id: default_node_id().unwrap(), - enable_elastic_header: enable_es_header, + add_elastic_header: false, enabled_services, gossip_advertise_addr: gossip_listen_addr, grpc_advertise_addr: grpc_listen_addr, diff --git a/quickwit/quickwit-config/src/qw_env_vars.rs b/quickwit/quickwit-config/src/qw_env_vars.rs index 17ba65c5a2c..9a9b293507e 100644 --- a/quickwit/quickwit-config/src/qw_env_vars.rs +++ b/quickwit/quickwit-config/src/qw_env_vars.rs @@ -59,8 +59,7 @@ qw_env_vars!( QW_PEER_SEEDS, QW_DATA_DIR, QW_METASTORE_URI, - QW_DEFAULT_INDEX_ROOT_URI, - QW_ENABLE_ELSTIC_HEADER + QW_DEFAULT_INDEX_ROOT_URI ); #[cfg(test)] diff --git a/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs b/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs index 472937ce314..75f2f8e588d 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/bulk.rs @@ -18,17 +18,14 @@ // along with this program. If not, see . use std::collections::HashMap; -use std::sync::Arc; use bytes::Bytes; use hyper::StatusCode; -use quickwit_config::NodeConfig; use quickwit_ingest::{ CommitType, DocBatchBuilder, IngestRequest, IngestResponse, IngestService, IngestServiceClient, }; use warp::{Filter, Rejection}; -use super::append_elastic_header; use crate::elastic_search_api::filter::{elastic_bulk_filter, elastic_index_bulk_filter}; use crate::elastic_search_api::make_elastic_api_response; use crate::elastic_search_api::model::{BulkAction, ElasticIngestOptions, ElasticSearchError}; @@ -38,7 +35,6 @@ use crate::with_arg; /// POST `_elastic/_bulk` pub fn es_compat_bulk_handler( - node_config: Arc, ingest_service: IngestServiceClient, ) -> impl Filter + Clone { elastic_bulk_filter() @@ -48,12 +44,10 @@ pub fn es_compat_bulk_handler( }) .and(extract_format_from_qs()) .map(make_elastic_api_response) - .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } /// POST `_elastic//_bulk` pub fn es_compat_index_bulk_handler( - node_config: Arc, ingest_service: IngestServiceClient, ) -> impl Filter + Clone { elastic_index_bulk_filter() @@ -63,7 +57,6 @@ pub fn es_compat_index_bulk_handler( }) .and(extract_format_from_qs()) .map(make_elastic_api_response) - .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } async fn elastic_ingest_bulk( diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index 954476f7173..d34b725913a 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use bulk::{es_compat_bulk_handler, es_compat_index_bulk_handler}; pub use filter::ElasticCompatibleApi; +use hyper::header::HeaderValue; use hyper::StatusCode; use quickwit_config::NodeConfig; use quickwit_ingest::IngestServiceClient; @@ -50,28 +51,15 @@ pub fn elastic_api_handlers( search_service: Arc, ingest_service: IngestServiceClient, ) -> impl Filter + Clone { - es_compat_cluster_info_handler(node_config.clone(), BuildInfo::get()) - .or(es_compat_search_handler( - node_config.clone(), - search_service.clone(), - )) - .or(es_compat_index_search_handler( - node_config.clone(), - search_service.clone(), - )) - .or(es_compat_scroll_handler( - node_config.clone(), - search_service.clone(), - )) - .or(es_compat_index_multi_search_handler( - node_config.clone(), - search_service, - )) - .or(es_compat_bulk_handler( - node_config.clone(), - ingest_service.clone(), - )) - .or(es_compat_index_bulk_handler(node_config, ingest_service)) + let add_elastic_header = node_config.add_elastic_header; + es_compat_cluster_info_handler(node_config, BuildInfo::get()) + .or(es_compat_search_handler(search_service.clone())) + .or(es_compat_index_search_handler(search_service.clone())) + .or(es_compat_scroll_handler(search_service.clone())) + .or(es_compat_index_multi_search_handler(search_service)) + .or(es_compat_bulk_handler(ingest_service.clone())) + .or(es_compat_index_bulk_handler(ingest_service)) + .map(move |response| add_elastic_header_to_response(add_elastic_header, response)) // Register newly created handlers here. } @@ -116,16 +104,20 @@ fn make_elastic_api_response( JsonApiResponse::new(&elasticsearch_result, status_code, &format) } -/// Elasticsearch clients will check the response header -/// whether the heads contain `X-Elastic-Product` and the value is `Elasticsearch` -fn append_elastic_header( - enable_elastic_header: bool, +/// Elasticsearch clients will check whether the response header +/// contains `X-Elastic-Product` key and `Elasticsearch` value. +fn add_elastic_header_to_response( + add_elastic_header: bool, response: T, -) -> warp::reply::WithHeader { - match enable_elastic_header { - true => warp::reply::with_header(response, "X-Elastic-Product", "Elasticsearch"), - false => warp::reply::with_header(response, "", ""), +) -> warp::reply::Response { + let mut response = response.into_response(); + if add_elastic_header { + response.headers_mut().insert( + "X-Elastic-Product", + HeaderValue::from_static("Elasticsearch"), + ); } + response } #[cfg(test)] @@ -140,9 +132,9 @@ mod tests { use serde_json::Value as JsonValue; use warp::Filter; + use super::elastic_api_handlers; use super::model::ElasticSearchError; use crate::elastic_search_api::model::MultiSearchResponse; - use crate::elastic_search_api::rest_handler::es_compat_cluster_info_handler; use crate::rest::recover_fn; use crate::BuildInfo; @@ -187,6 +179,7 @@ mod tests { .reply(&es_search_api_handler) .await; assert_eq!(resp.status(), 200); + assert!(resp.headers().get("x-elastic-product").is_none(),); let string_body = String::from_utf8(resp.body().to_vec()).unwrap(); let es_msearch_response: MultiSearchResponse = serde_json::from_str(&string_body).unwrap(); assert_eq!(es_msearch_response.responses.len(), 2); @@ -278,7 +271,7 @@ mod tests { async fn test_msearch_api_return_400_with_malformed_request_body() { let config = Arc::new(NodeConfig::for_test()); let mock_search_service = MockSearchService::new(); - let es_search_api_handler = super::elastic_api_handlers( + let es_search_api_handler = elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), @@ -394,18 +387,28 @@ mod tests { #[tokio::test] async fn test_es_compat_cluster_info_handler() { let build_info = BuildInfo::get(); - let config = Arc::new(NodeConfig::for_test()); - let handler = - es_compat_cluster_info_handler(config.clone(), build_info).recover(recover_fn); + let mut node_config = NodeConfig::for_test(); + node_config.add_elastic_header = true; + let mock_search_service = Arc::new(MockSearchService::new()); + let handler = elastic_api_handlers( + Arc::new(node_config.clone()), + mock_search_service, + ingest_service_client(), + ) + .recover(recover_fn); let resp = warp::test::request() .path("/_elastic") .reply(&handler) .await; assert_eq!(resp.status(), 200); + assert_eq!( + resp.headers().get("x-elastic-product").unwrap(), + "Elasticsearch" + ); let resp_json: JsonValue = serde_json::from_slice(resp.body()).unwrap(); let expected_response_json = serde_json::json!({ - "name" : config.node_id, - "cluster_name" : config.cluster_id, + "name" : node_config.node_id, + "cluster_name" : node_config.cluster_id, "version" : { "distribution" : "quickwit", "number" : build_info.version, @@ -418,33 +421,24 @@ mod tests { #[tokio::test] async fn test_head_request_on_root_endpoint() { - let build_info = BuildInfo::get(); - let config = Arc::new(NodeConfig::for_test()); - let handler = - es_compat_cluster_info_handler(config.clone(), build_info).recover(recover_fn); + let mut node_config = NodeConfig::for_test(); + node_config.add_elastic_header = true; + let mock_search_service = Arc::new(MockSearchService::new()); + let handler = elastic_api_handlers( + Arc::new(node_config.clone()), + mock_search_service, + ingest_service_client(), + ) + .recover(recover_fn); let resp = warp::test::request() .path("/_elastic") .method("HEAD") .reply(&handler) .await; assert_eq!(resp.status(), 200); - } - - #[tokio::test] - async fn test_elastic_header() { - let build_info = BuildInfo::get(); - let config = Arc::new(NodeConfig::for_test_elastic_header()); - let handler = - es_compat_cluster_info_handler(config.clone(), build_info).recover(recover_fn); - let resp = warp::test::request() - .path("/_elastic") - .method("HEAD") - .reply(&handler) - .await; assert_eq!( resp.headers().get("x-elastic-product").unwrap(), "Elasticsearch" ); - assert_eq!(resp.status(), 200); } } diff --git a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs index eae33270910..0cd4874b02c 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs @@ -46,7 +46,7 @@ use super::model::{ ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams, }; -use super::{append_elastic_header, make_elastic_api_response, TrackTotalHits}; +use super::{make_elastic_api_response, TrackTotalHits}; use crate::format::BodyFormat; use crate::json_api_response::{make_json_api_response, ApiError, JsonApiResponse}; use crate::{with_arg, BuildInfo}; @@ -73,55 +73,46 @@ pub fn es_compat_cluster_info_handler( })) }, ) - .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } /// GET or POST _elastic/_search pub fn es_compat_search_handler( - node_config: Arc, _search_service: Arc, ) -> impl Filter + Clone { - elastic_search_filter() - .then(|_params: SearchQueryParams| async move { - // TODO - let api_error = ApiError { - service_code: ServiceErrorCode::NotSupportedYet, - message: "_elastic/_search is not supported yet. Please try the index search \ - endpoint (_elastic/{index}/search)" - .to_string(), - }; - make_json_api_response::<(), _>(Err(api_error), BodyFormat::default()) - }) - .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) + elastic_search_filter().then(|_params: SearchQueryParams| async move { + // TODO + let api_error = ApiError { + service_code: ServiceErrorCode::NotSupportedYet, + message: "_elastic/_search is not supported yet. Please try the index search endpoint \ + (_elastic/{index}/search)" + .to_string(), + }; + make_json_api_response::<(), _>(Err(api_error), BodyFormat::default()) + }) } /// GET or POST _elastic/{index}/_search pub fn es_compat_index_search_handler( - node_config: Arc, search_service: Arc, ) -> impl Filter + Clone { elastic_index_search_filter() .and(with_arg(search_service)) .then(es_compat_index_search) .map(|result| make_elastic_api_response(result, BodyFormat::default())) - .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } /// GET or POST _elastic/_search/scroll pub fn es_compat_scroll_handler( - node_config: Arc, search_service: Arc, ) -> impl Filter + Clone { elastic_scroll_filter() .and(with_arg(search_service)) .then(es_scroll) .map(|result| make_elastic_api_response(result, BodyFormat::default())) - .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } /// POST _elastic/_search pub fn es_compat_index_multi_search_handler( - node_config: Arc, search_service: Arc, ) -> impl Filter + Clone { elastic_multi_search_filter() @@ -134,7 +125,6 @@ pub fn es_compat_index_multi_search_handler( }; JsonApiResponse::new(&result, status_code, &BodyFormat::default()) }) - .map(move |response| append_elastic_header(node_config.enable_elastic_header, response)) } fn build_request_for_es_api( From 63d2e33fd073cf342a04beaea15735822d4700ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Fri, 1 Dec 2023 22:38:30 +0100 Subject: [PATCH 4/7] Update config with additional headers. (#4221) * Update config with additional headers. * Move rest_listen_port into rest config. * Update docs/configuration/node-config.md Co-authored-by: Adrien Guillo * Update docs/configuration/node-config.md Co-authored-by: Adrien Guillo * Update docs/configuration/node-config.md Co-authored-by: Adrien Guillo * Clean. --------- Co-authored-by: Adrien Guillo --- config/quickwit.yaml | 4 +- config/tutorials/hdfs-logs/searcher-1.yaml | 6 +- config/tutorials/hdfs-logs/searcher-2.yaml | 3 +- config/tutorials/hdfs-logs/searcher-3.yaml | 3 +- docs/configuration/node-config.md | 66 ++++-- docs/configuration/ports-config.md | 10 +- ...ial-hdfs-logs-distributed-search-aws-s3.md | 4 +- docs/guides/cluster-with-docker-compose.md | 0 quickwit/Cargo.lock | 2 + quickwit/quickwit-cli/tests/helpers.rs | 3 +- quickwit/quickwit-config/Cargo.toml | 2 + .../resources/tests/config/quickwit.json | 8 +- .../resources/tests/config/quickwit.toml | 8 +- .../resources/tests/config/quickwit.yaml | 7 +- .../quickwit-config/src/node_config/mod.rs | 14 +- .../src/node_config/serialize.rs | 213 ++++++++++++++---- .../src/test_utils/cluster_sandbox.rs | 4 +- .../src/tests/basic_tests.rs | 9 +- .../src/elastic_search_api/mod.rs | 82 ------- quickwit/quickwit-serve/src/lib.rs | 2 +- quickwit/quickwit-serve/src/rest.rs | 159 +++++++++---- 21 files changed, 395 insertions(+), 214 deletions(-) create mode 100644 docs/guides/cluster-with-docker-compose.md diff --git a/config/quickwit.yaml b/config/quickwit.yaml index bff32a8031a..05e01eea373 100644 --- a/config/quickwit.yaml +++ b/config/quickwit.yaml @@ -32,7 +32,9 @@ version: 0.6 # 2. pass `0.0.0.0` and let Quickwit do its best to discover the node's IP (see `advertise_address`) # # listen_address: 127.0.0.1 -# rest_listen_port: 7280 +# +# rest: +# listen_port: 7280 # # IP address advertised by the node, i.e. the IP address that peer nodes should use to connect to the node for RPCs. # The environment variable `QW_ADVERTISE_ADDRESS` can also be used to override this value. diff --git a/config/tutorials/hdfs-logs/searcher-1.yaml b/config/tutorials/hdfs-logs/searcher-1.yaml index 140102c5f8e..09886e5cab7 100644 --- a/config/tutorials/hdfs-logs/searcher-1.yaml +++ b/config/tutorials/hdfs-logs/searcher-1.yaml @@ -1,7 +1,11 @@ version: 0.6 node_id: searcher-1 listen_address: 127.0.0.1 -rest_listen_port: 7280 +rest: + listen_port: 7280 +ingest_api: + max_queue_memory_usage: 4GiB + max_queue_disk_usage: 8GiB peer_seeds: - 127.0.0.1:7290 # searcher-2 - 127.0.0.1:7300 # searcher-3 diff --git a/config/tutorials/hdfs-logs/searcher-2.yaml b/config/tutorials/hdfs-logs/searcher-2.yaml index 110475d732b..53f5cf3c573 100644 --- a/config/tutorials/hdfs-logs/searcher-2.yaml +++ b/config/tutorials/hdfs-logs/searcher-2.yaml @@ -1,7 +1,8 @@ version: 0.6 node_id: searcher-2 listen_address: 127.0.0.1 -rest_listen_port: 7290 +rest: + listen_port: 7290 peer_seeds: - 127.0.0.1:7280 # searcher-1 - 127.0.0.1:7300 # searcher-3 diff --git a/config/tutorials/hdfs-logs/searcher-3.yaml b/config/tutorials/hdfs-logs/searcher-3.yaml index ba287724e03..6a86e375d39 100644 --- a/config/tutorials/hdfs-logs/searcher-3.yaml +++ b/config/tutorials/hdfs-logs/searcher-3.yaml @@ -1,7 +1,8 @@ version: 0.6 node_id: searcher-3 listen_address: 127.0.0.1 -rest_listen_port: 7300 +rest: + listen_port: 7300 peer_seeds: - 127.0.0.1:7280 # searcher-1 - 127.0.0.1:7290 # searcher-2 diff --git a/docs/configuration/node-config.md b/docs/configuration/node-config.md index ce318c8e109..b890270514f 100644 --- a/docs/configuration/node-config.md +++ b/docs/configuration/node-config.md @@ -25,14 +25,48 @@ A commented example is available here: [quickwit.yaml](https://github.com/quickw | `enabled_services` | Enabled services (control_plane, indexer, janitor, metastore, searcher) | `QW_ENABLED_SERVICES` | all services | | `listen_address` | The IP address or hostname that Quickwit service binds to for starting REST and GRPC server and connecting this node to other nodes. By default, Quickwit binds itself to 127.0.0.1 (localhost). This default is not valid when trying to form a cluster. | `QW_LISTEN_ADDRESS` | `127.0.0.1` | | `advertise_address` | IP address advertised by the node, i.e. the IP address that peer nodes should use to connect to the node for RPCs. | `QW_ADVERTISE_ADDRESS` | `listen_address` | -| `rest_listen_port` | The port which to listen for HTTP REST API. | `QW_REST_LISTEN_PORT` | `7280` | -| `gossip_listen_port` | The port which to listen for the Gossip cluster membership service (UDP). | `QW_GOSSIP_LISTEN_PORT` | `rest_listen_port` | -| `grpc_listen_port` | The port which to listen for the gRPC service.| `QW_GRPC_LISTEN_PORT` | `rest_listen_port + 1` | +| `gossip_listen_port` | The port which to listen for the Gossip cluster membership service (UDP). | `QW_GOSSIP_LISTEN_PORT` | `rest.listen_port` | +| `grpc_listen_port` | The port which to listen for the gRPC service.| `QW_GRPC_LISTEN_PORT` | `rest.listen_port + 1` | | `peer_seeds` | List of IP addresses or hostnames used to bootstrap the cluster and discover the complete set of nodes. This list may contain the current node address and does not need to be exhaustive. | `QW_PEER_SEEDS` | | | `data_dir` | Path to directory where data (tmp data, splits kept for caching purpose) is persisted. This is mostly used in indexing. | `QW_DATA_DIR` | `./qwdata` | | `metastore_uri` | Metastore URI. Can be a local directory or `s3://my-bucket/indexes` or `postgres://username:password@localhost:5432/metastore`. [Learn more about the metastore configuration](metastore-config.md). | `QW_METASTORE_URI` | `{data_dir}/indexes` | | `default_index_root_uri` | Default index root URI that defines the location where index data (splits) is stored. The index URI is built following the scheme: `{default_index_root_uri}/{index-id}` | `QW_DEFAULT_INDEX_ROOT_URI` | `{data_dir}/indexes` | -| `rest_cors_allow_origins` | Configure the CORS origins which are allowed to access the API. [Read more](#configuring-cors-cross-origin-resource-sharing) | | + +## REST configuration + +This section contains the REST API configuration options. + +| Property | Description | Env variable | Default value | +| --- | --- | --- | --- | +| `listen_port` | The port on which the REST API listens for HTTP traffic. | `QW_REST_LISTEN_PORT` | `7280` | +| `cors_allow_origins` | Configure the CORS origins which are allowed to access the API. [Read more](#configuring-cors-cross-origin-resource-sharing) | | +| `extra_headers` | List of header names and values | | | + +### Configuring CORS (Cross-origin resource sharing) + +CORS (Cross-origin resource sharing) describes which address or origins can access the REST API from the browser. +By default, sharing resources cross-origin is not allowed. + +A wildcard, single origin, or multiple origins can be specified as part of the `cors_allow_origins` parameter: + + +Example of a REST configuration: + +```yaml + +rest: + listen_port: 1789 + extra_headers: + x-header-1: header-value-1 + x-header-2: header-value-2 + cors_allow_origins: '*' + +# cors_allow_origins: https://my-hdfs-logs.domain.com # Optionally we can specify one domain +# cors_allow_origins: # Or allow multiple origins +# - https://my-hdfs-logs.domain.com +# - https://my-hdfs.other-domain.com + +``` ## Storage configuration @@ -203,7 +237,8 @@ version: 0.6 cluster_id: quickwit-cluster node_id: my-unique-node-id listen_address: ${QW_LISTEN_ADDRESS} -rest_listen_port: ${QW_LISTEN_PORT:-1111} +rest: + listen_port: ${QW_LISTEN_PORT:-1111} ``` Will be interpreted by Quickwit as: @@ -213,23 +248,6 @@ version: 0.6 cluster_id: quickwit-cluster node_id: my-unique-node-id listen_address: 0.0.0.0 -rest_listen_port: 1111 -``` - -## Configuring CORS (Cross-origin resource sharing) - -CORS (Cross-origin resource sharing) describes which address or origins can access the REST API from the browser. -By default, sharing resources cross-origin is not allowed. - -A wildcard, single origin, or multiple origins can be specified as part of the `rest_cors_allow_origins` parameter: - -```yaml -version: 0.6 -index_id: hdfs - -rest_cors_allow_origins: '*' # Allow all origins -# rest_cors_allow_origins: https://my-hdfs-logs.domain.com # Optionally we can specify one domain -# rest_cors_allow_origins: # Or allow multiple origins -# - https://my-hdfs-logs.domain.com -# - https://my-hdfs.other-domain.com +rest: + listen_port: 1111 ``` diff --git a/docs/configuration/ports-config.md b/docs/configuration/ports-config.md index 623aeb1b897..d8384540cb0 100644 --- a/docs/configuration/ports-config.md +++ b/docs/configuration/ports-config.md @@ -4,18 +4,18 @@ sidebar_position: 6 --- When starting a quickwit search server, one important parameter that can be configured is -the `rest_listen_port` (defaults to :7280). +the `rest.listen_port` (defaults to :7280). Internally, Quickwit will, in fact, use three sockets. The ports of these three sockets cannot be configured independently at the moment. -The ports used are computed relative to the `rest_listen_port` port, as follows. +The ports used are computed relative to the `rest.listen_port` port, as follows. | Service | Port used | Protocol | Default | |-------------------------------|---------------------------|----------|-----------| -| Http server with the rest api | `${rest_listen_port}` | TCP | 7280 | -| Cluster membership | `${rest_listen_port}` | UDP | 7280 | -| GRPC service | `${rest_listen_port} + 1` | TCP | 7281 | +| Http server with the rest api | `${rest.listen_port}` | TCP | 7280 | +| Cluster membership | `${rest.listen_port}` | UDP | 7280 | +| GRPC service | `${rest.listen_port} + 1` | TCP | 7281 | It is not possible for the moment to configure these ports independently. diff --git a/docs/get-started/tutorials/tutorial-hdfs-logs-distributed-search-aws-s3.md b/docs/get-started/tutorials/tutorial-hdfs-logs-distributed-search-aws-s3.md index 4773e6460c7..ad95ec7d539 100644 --- a/docs/get-started/tutorials/tutorial-hdfs-logs-distributed-search-aws-s3.md +++ b/docs/get-started/tutorials/tutorial-hdfs-logs-distributed-search-aws-s3.md @@ -197,8 +197,8 @@ Now that we have indexed the logs and can search from one instance, it's time to ## Start two more instances -Quickwit needs a port `rest_listen_port` for serving the HTTP rest API via TCP as well as maintaining the cluster formation via UDP. -Also, it needs `{rest_listen_port} + 1` for gRPC communication between instances. +Quickwit needs a port `rest.listen_port` for serving the HTTP rest API via TCP as well as maintaining the cluster formation via UDP. +Also, it needs `{rest.listen_port} + 1` for gRPC communication between instances. In AWS, you can create a security group to group these inbound rules. Check out the [network section](../../guides/aws-setup) of our AWS setup guide. diff --git a/docs/guides/cluster-with-docker-compose.md b/docs/guides/cluster-with-docker-compose.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 35f17f1d077..a759f82b21b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5355,6 +5355,8 @@ dependencies = [ "chrono", "cron", "enum-iterator", + "http", + "http-serde", "humantime", "itertools 0.12.0", "json_comments", diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index 2e546ffa8b7..5493baee194 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -84,7 +84,8 @@ const DEFAULT_QUICKWIT_CONFIG: &str = r#" version: 0.6 metastore_uri: #metastore_uri data_dir: #data_dir - rest_listen_port: #rest_listen_port + rest: + listen_port: #rest_listen_port grpc_listen_port: #grpc_listen_port "#; diff --git a/quickwit/quickwit-config/Cargo.toml b/quickwit/quickwit-config/Cargo.toml index 8e6912d0ace..453a0669e61 100644 --- a/quickwit/quickwit-config/Cargo.toml +++ b/quickwit/quickwit-config/Cargo.toml @@ -16,6 +16,8 @@ bytesize = { workspace = true } chrono = { workspace = true } cron = { workspace = true } enum-iterator = { workspace = true } +http = { workspace = true } +http-serde = { workspace = true } humantime = { workspace = true } itertools = { workspace = true } json_comments = { workspace = true } diff --git a/quickwit/quickwit-config/resources/tests/config/quickwit.json b/quickwit/quickwit-config/resources/tests/config/quickwit.json index a0c7573d0a4..8b3b8d6450f 100644 --- a/quickwit/quickwit-config/resources/tests/config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/config/quickwit.json @@ -9,7 +9,6 @@ ], "listen_address": "0.0.0.0", "advertise_address": "172.0.0.12", - "rest_listen_port": 1111, "gossip_listen_port": 2222, "grpc_listen_port": 3333, "peer_seeds": [ @@ -19,6 +18,13 @@ "data_dir": "/opt/quickwit/data", "metastore_uri": "postgres://username:password@host:port/db", "default_index_root_uri": "s3://quickwit-indexes", + "rest": { + "listen_port": 1111, + "extra_headers": { + "x-header-1": "header-value-1", + "x-header-2": "header-value-2" + } + }, "storage": { "azure": { "account": "quickwit-dev" diff --git a/quickwit/quickwit-config/resources/tests/config/quickwit.toml b/quickwit/quickwit-config/resources/tests/config/quickwit.toml index 701be1d2c90..04f384eda17 100644 --- a/quickwit/quickwit-config/resources/tests/config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/config/quickwit.toml @@ -5,7 +5,6 @@ node_id = "my-unique-node-id" enabled_services = [ "janitor", "metastore" ] listen_address = "0.0.0.0" advertise_address = "172.0.0.12" -rest_listen_port = 1111 gossip_listen_port = 2222 grpc_listen_port = 3333 peer_seeds = [ "quickwit-searcher-0.local", "quickwit-searcher-1.local" ] @@ -13,6 +12,13 @@ data_dir = "/opt/quickwit/data" metastore_uri = "postgres://username:password@host:port/db" default_index_root_uri = "s3://quickwit-indexes" +[rest] +listen_port = 1111 + +[rest.extra_headers] +x-header-1 = "header-value-1" +x-header-2 = "header-value-2" + [storage.azure] account = "quickwit-dev" diff --git a/quickwit/quickwit-config/resources/tests/config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/config/quickwit.yaml index 5f3ab13c828..760afdcb60c 100644 --- a/quickwit/quickwit-config/resources/tests/config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/config/quickwit.yaml @@ -7,7 +7,6 @@ enabled_services: - metastore listen_address: 0.0.0.0 advertise_address: 172.0.0.12 -rest_listen_port: 1111 gossip_listen_port: 2222 grpc_listen_port: 3333 peer_seeds: @@ -17,6 +16,12 @@ data_dir: /opt/quickwit/data metastore_uri: postgres://username:password@host:port/db default_index_root_uri: s3://quickwit-indexes +rest: + listen_port: 1111 + extra_headers: + x-header-1: header-value-1 + x-header-2: header-value-2 + storage: azure: account: quickwit-dev diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 767afefdba7..200cd73dfed 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -28,6 +28,7 @@ use std::time::Duration; use anyhow::{bail, ensure}; use bytesize::ByteSize; +use http::HeaderMap; use quickwit_common::net::HostAddr; use quickwit_common::uri::Uri; use quickwit_proto::indexing::CpuCapacity; @@ -41,6 +42,15 @@ use crate::{ConfigFormat, MetastoreConfigs}; pub const DEFAULT_QW_CONFIG_PATH: &str = "config/quickwit.yaml"; +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RestConfig { + pub listen_addr: SocketAddr, + pub cors_allow_origins: Vec, + #[serde(with = "http_serde::header_map")] + pub extra_headers: HeaderMap, +} + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct IndexerConfig { @@ -303,9 +313,7 @@ impl Default for JaegerConfig { pub struct NodeConfig { pub cluster_id: String, pub node_id: String, - pub add_elastic_header: bool, pub enabled_services: HashSet, - pub rest_listen_addr: SocketAddr, pub gossip_listen_addr: SocketAddr, pub grpc_listen_addr: SocketAddr, pub gossip_advertise_addr: SocketAddr, @@ -314,7 +322,7 @@ pub struct NodeConfig { pub data_dir_path: PathBuf, pub metastore_uri: Uri, pub default_index_root_uri: Uri, - pub rest_cors_allow_origins: Vec, + pub rest_config: RestConfig, pub storage_configs: StorageConfigs, pub metastore_configs: MetastoreConfigs, pub indexer_config: IndexerConfig, diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index c0e03b46979..09ccc41f881 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -22,12 +22,14 @@ use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use anyhow::{bail, Context}; +use http::HeaderMap; use quickwit_common::net::{find_private_ip, get_short_hostname, Host}; use quickwit_common::new_coolid; use quickwit_common::uri::Uri; use serde::{Deserialize, Serialize}; use tracing::{info, warn}; +use super::RestConfig; use crate::config_value::ConfigValue; use crate::qw_env_vars::*; use crate::service::QuickwitService; @@ -88,8 +90,8 @@ fn default_listen_address() -> ConfigValue { ConfigValue::with_default(Host::default().to_string()) } -fn default_rest_listen_port() -> ConfigValue { - ConfigValue::with_default(7280) +fn default_rest_listen_port() -> u16 { + 7280 } fn default_data_dir_uri() -> ConfigValue { @@ -168,8 +170,8 @@ struct NodeConfigBuilder { #[serde(default = "default_listen_address")] listen_address: ConfigValue, advertise_address: ConfigValue, - #[serde(default = "default_rest_listen_port")] - rest_listen_port: ConfigValue, + // Deprecated, use `rest.listen_port` instead. + rest_listen_port: Option, gossip_listen_port: ConfigValue, grpc_listen_port: ConfigValue, #[serde(default)] @@ -179,11 +181,9 @@ struct NodeConfigBuilder { data_dir_uri: ConfigValue, metastore_uri: ConfigValue, default_index_root_uri: ConfigValue, + #[serde(rename = "rest")] #[serde(default)] - add_elastic_header: bool, - #[serde(default)] - #[serde_as(deserialize_as = "serde_with::OneOrMany<_>")] - rest_cors_allow_origins: Vec, + rest_config_builder: RestConfigBuilder, #[serde(rename = "storage")] #[serde(default)] storage_configs: StorageConfigs, @@ -221,19 +221,32 @@ impl NodeConfigBuilder { let listen_host = listen_address.parse::()?; let listen_ip = listen_host.resolve().await?; - let rest_listen_port = self.rest_listen_port.resolve(env_vars)?; - let rest_listen_addr = SocketAddr::new(listen_ip, rest_listen_port); + if let Some(rest_listen_port) = self.rest_listen_port { + if self.rest_config_builder.listen_port.is_some() { + bail!( + "conflicting configuration values: please use only `rest.listen_port`, \ + `rest_listen_port` is deprecated and should not be used alongside \ + `rest.listen_port`. Update your configuration to use `rest.listen_port`." + ); + } + warn!("`rest_listen_port` is deprecated, use `rest.listen_port` instead"); + self.rest_config_builder.listen_port = Some(rest_listen_port); + } + + let rest_config = self + .rest_config_builder + .build_and_validate(listen_ip, env_vars)?; let gossip_listen_port = self .gossip_listen_port .resolve_optional(env_vars)? - .unwrap_or(rest_listen_port); + .unwrap_or(rest_config.listen_addr.port()); let gossip_listen_addr = SocketAddr::new(listen_ip, gossip_listen_port); let grpc_listen_port = self .grpc_listen_port .resolve_optional(env_vars)? - .unwrap_or(rest_listen_port + 1); + .unwrap_or(rest_config.listen_addr.port() + 1); let grpc_listen_addr = SocketAddr::new(listen_ip, grpc_listen_port); let advertise_address = self.advertise_address.resolve_optional(env_vars)?; @@ -273,9 +286,7 @@ impl NodeConfigBuilder { let node_config = NodeConfig { cluster_id: self.cluster_id.resolve(env_vars)?, node_id: self.node_id.resolve(env_vars)?, - add_elastic_header: self.add_elastic_header, enabled_services, - rest_listen_addr, gossip_listen_addr, grpc_listen_addr, gossip_advertise_addr, @@ -284,7 +295,7 @@ impl NodeConfigBuilder { data_dir_path, metastore_uri, default_index_root_uri, - rest_cors_allow_origins: self.rest_cors_allow_origins, + rest_config, metastore_configs: self.metastore_configs, storage_configs: self.storage_configs, indexer_config: self.indexer_config, @@ -321,9 +332,8 @@ impl Default for NodeConfigBuilder { cluster_id: default_cluster_id(), node_id: default_node_id(), enabled_services: default_enabled_services(), - add_elastic_header: false, listen_address: default_listen_address(), - rest_listen_port: default_rest_listen_port(), + rest_listen_port: None, gossip_listen_port: ConfigValue::none(), grpc_listen_port: ConfigValue::none(), advertise_address: ConfigValue::none(), @@ -331,7 +341,7 @@ impl Default for NodeConfigBuilder { data_dir_uri: default_data_dir_uri(), metastore_uri: ConfigValue::none(), default_index_root_uri: ConfigValue::none(), - rest_cors_allow_origins: Vec::new(), + rest_config_builder: RestConfigBuilder::default(), storage_configs: StorageConfigs::default(), metastore_configs: MetastoreConfigs::default(), indexer_config: IndexerConfig::default(), @@ -342,6 +352,41 @@ impl Default for NodeConfigBuilder { } } +#[serde_with::serde_as] +#[derive(Debug, Deserialize, PartialEq, Default)] +#[serde(deny_unknown_fields)] +struct RestConfigBuilder { + #[serde(default)] + listen_port: Option, + #[serde(default)] + #[serde_as(deserialize_as = "serde_with::OneOrMany<_>")] + pub cors_allow_origins: Vec, + #[serde(with = "http_serde::header_map")] + #[serde(default)] + pub extra_headers: HeaderMap, +} + +impl RestConfigBuilder { + fn build_and_validate( + self, + listen_ip: IpAddr, + env_vars: &HashMap, + ) -> anyhow::Result { + let listen_port_from_config_or_default = + self.listen_port.unwrap_or(default_rest_listen_port()); + let listen_port = ConfigValue::::with_default( + listen_port_from_config_or_default, + ) + .resolve(env_vars)?; + let rest_config = RestConfig { + listen_addr: SocketAddr::new(listen_ip, listen_port), + cors_allow_origins: self.cors_allow_origins, + extra_headers: self.extra_headers, + }; + Ok(rest_config) + } +} + #[cfg(any(test, feature = "testsuite"))] pub fn node_config_for_test() -> NodeConfig { let enabled_services = QuickwitService::supported_services(); @@ -370,22 +415,24 @@ pub fn node_config_for_test() -> NodeConfig { .to_path_buf(); let metastore_uri = default_metastore_uri(&data_dir_uri); let default_index_root_uri = default_index_root_uri(&data_dir_uri); - + let rest_config = RestConfig { + listen_addr: rest_listen_addr, + cors_allow_origins: Vec::new(), + extra_headers: HeaderMap::new(), + }; NodeConfig { cluster_id: default_cluster_id().unwrap(), node_id: default_node_id().unwrap(), - add_elastic_header: false, enabled_services, gossip_advertise_addr: gossip_listen_addr, grpc_advertise_addr: grpc_listen_addr, - rest_listen_addr, gossip_listen_addr, grpc_listen_addr, peer_seeds: Vec::new(), data_dir_path, metastore_uri, default_index_root_uri, - rest_cors_allow_origins: Vec::new(), + rest_config, storage_configs: StorageConfigs::default(), metastore_configs: MetastoreConfigs::default(), indexer_config: IndexerConfig::default(), @@ -429,9 +476,17 @@ mod tests { assert!(config.is_service_enabled(QuickwitService::Metastore)); assert_eq!( - config.rest_listen_addr, + config.rest_config.listen_addr, SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1111) ); + assert_eq!( + config.rest_config.extra_headers.get("x-header-1").unwrap(), + "header-value-1" + ); + assert_eq!( + config.rest_config.extra_headers.get("x-header-2").unwrap(), + "header-value-2" + ); assert_eq!( config.gossip_listen_addr, SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 2222) @@ -577,7 +632,7 @@ mod tests { QuickwitService::supported_services() ); assert_eq!( - config.rest_listen_addr, + config.rest_config.listen_addr, SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7280) ); assert_eq!( @@ -653,7 +708,7 @@ mod tests { &[&QuickwitService::Indexer, &QuickwitService::Metastore] ); assert_eq!( - config.rest_listen_addr, + config.rest_config.listen_addr, SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 0, 0, 12)), 1234) ); assert_eq!( @@ -758,7 +813,6 @@ mod tests { async fn test_peer_socket_addrs() { { let node_config = NodeConfigBuilder { - rest_listen_port: ConfigValue::for_test(1789), ..Default::default() } .build_and_validate(&HashMap::new()) @@ -768,7 +822,6 @@ mod tests { } { let node_config = NodeConfigBuilder { - rest_listen_port: ConfigValue::for_test(1789), peer_seeds: ConfigValue::for_test(List(vec!["unresolvable-host".to_string()])), ..Default::default() } @@ -779,7 +832,10 @@ mod tests { } { let node_config = NodeConfigBuilder { - rest_listen_port: ConfigValue::for_test(1789), + rest_config_builder: RestConfigBuilder { + listen_port: Some(1789), + ..Default::default() + }, peer_seeds: ConfigValue::for_test(List(vec![ "unresolvable-host".to_string(), "localhost".to_string(), @@ -814,38 +870,74 @@ mod tests { .build_and_validate(&HashMap::new()) .await .unwrap(); - assert_eq!(node_config.rest_listen_addr.to_string(), "127.0.0.1:7280"); + assert_eq!( + node_config.rest_config.listen_addr.to_string(), + "127.0.0.1:7280" + ); assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:7280"); assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:7281"); } { let node_config = NodeConfigBuilder { listen_address: default_listen_address(), - rest_listen_port: ConfigValue::for_test(1789), + rest_config_builder: RestConfigBuilder { + listen_port: Some(1789), + ..Default::default() + }, ..Default::default() } .build_and_validate(&HashMap::new()) .await .unwrap(); - assert_eq!(node_config.rest_listen_addr.to_string(), "127.0.0.1:1789"); + assert_eq!( + node_config.rest_config.listen_addr.to_string(), + "127.0.0.1:1789" + ); assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:1789"); assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:1790"); } { let node_config = NodeConfigBuilder { listen_address: default_listen_address(), - rest_listen_port: ConfigValue::for_test(1789), gossip_listen_port: ConfigValue::for_test(1889), grpc_listen_port: ConfigValue::for_test(1989), + rest_config_builder: RestConfigBuilder { + listen_port: Some(1789), + ..Default::default() + }, ..Default::default() } .build_and_validate(&HashMap::new()) .await .unwrap(); - assert_eq!(node_config.rest_listen_addr.to_string(), "127.0.0.1:1789"); + assert_eq!( + node_config.rest_config.listen_addr.to_string(), + "127.0.0.1:1789" + ); assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:1889"); assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:1989"); } + { + // Check that `rest_listen_port` can still be used for backward compatibility. + let node_config = NodeConfigBuilder { + rest_listen_port: Some(1789), + listen_address: default_listen_address(), + rest_config_builder: RestConfigBuilder { + listen_port: None, + ..Default::default() + }, + ..Default::default() + } + .build_and_validate(&HashMap::new()) + .await + .unwrap(); + assert_eq!( + node_config.rest_config.listen_addr.to_string(), + "127.0.0.1:1789" + ); + assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:1789"); + assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:1790"); + } } #[tokio::test] @@ -937,6 +1029,29 @@ mod tests { } } + #[tokio::test] + async fn test_config_invalid_when_both_listen_ports_params_are_configured() { + let config_yaml = r#" + version: 0.6 + rest_listen_port: 1789 + rest: + listen_port: 1789 + "#; + let config = load_node_config_with_env( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &HashMap::default(), + ) + .await + .unwrap_err(); + assert_eq!( + &config.to_string(), + "conflicting configuration values: please use only `rest.listen_port`, \ + `rest_listen_port` is deprecated and should not be used alongside \ + `rest.listen_port`. Update your configuration to use `rest.listen_port`." + ); + } + #[test] fn test_jaeger_config_rejects_null_values() { let jaeger_config_yaml = r#" @@ -953,7 +1068,8 @@ mod tests { async fn test_rest_config_accepts_wildcard() { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: '*' + rest: + cors_allow_origins: '*' "#; let config = load_node_config_with_env( ConfigFormat::Yaml, @@ -962,14 +1078,16 @@ mod tests { ) .await .expect("Deserialize rest config"); - assert_eq!(config.rest_cors_allow_origins, ["*"]); + assert_eq!(config.rest_config.cors_allow_origins, ["*"]); } #[tokio::test] async fn test_rest_config_accepts_single_origin() { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: https://www.my-domain.com + rest: + cors_allow_origins: + - https://www.my-domain.com "#; let config = load_node_config_with_env( ConfigFormat::Yaml, @@ -979,13 +1097,14 @@ mod tests { .await .expect("Deserialize rest config"); assert_eq!( - config.rest_cors_allow_origins, + config.rest_config.cors_allow_origins, ["https://www.my-domain.com"] ); let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: http://192.168.0.108:7280 + rest: + cors_allow_origins: http://192.168.0.108:7280 "#; let config = load_node_config_with_env( ConfigFormat::Yaml, @@ -995,7 +1114,7 @@ mod tests { .await .expect("Deserialize rest config"); assert_eq!( - config.rest_cors_allow_origins, + config.rest_config.cors_allow_origins, ["http://192.168.0.108:7280"] ); } @@ -1004,7 +1123,8 @@ mod tests { async fn test_rest_config_accepts_multi_origin() { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: + rest: + cors_allow_origins: - https://www.my-domain.com "#; let config = load_node_config_with_env( @@ -1015,13 +1135,14 @@ mod tests { .await .expect("Deserialize rest config"); assert_eq!( - config.rest_cors_allow_origins, + config.rest_config.cors_allow_origins, ["https://www.my-domain.com"] ); let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: + rest: + cors_allow_origins: - https://www.my-domain.com - https://www.my-other-domain.com "#; @@ -1033,7 +1154,7 @@ mod tests { .await .expect("Deserialize rest config"); assert_eq!( - config.rest_cors_allow_origins, + config.rest_config.cors_allow_origins, [ "https://www.my-domain.com", "https://www.my-other-domain.com" @@ -1042,7 +1163,8 @@ mod tests { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: + rest: + rest_cors_allow_origins: "#; load_node_config_with_env( ConfigFormat::Yaml, @@ -1054,7 +1176,8 @@ mod tests { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: + rest: + cors_allow_origins: - "#; load_node_config_with_env( diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index e70b29839f8..0f1cb22ef59 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -188,11 +188,11 @@ impl ClusterSandbox { Ok(Self { node_configs, searcher_rest_client: QuickwitClientBuilder::new(transport_url( - searcher_config.node_config.rest_listen_addr, + searcher_config.node_config.rest_config.listen_addr, )) .build(), indexer_rest_client: QuickwitClientBuilder::new(transport_url( - indexer_config.node_config.rest_listen_addr, + indexer_config.node_config.rest_config.listen_addr, )) .build(), _temp_dir: temp_dir, diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index 758f22f6b35..15cb0abc04f 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -54,9 +54,12 @@ async fn test_ui_redirect_on_get() { .pool_idle_timeout(Duration::from_secs(30)) .http2_only(true) .build_http(); - let root_uri = format!("http://{}/", node_config.node_config.rest_listen_addr) - .parse::() - .unwrap(); + let root_uri = format!( + "http://{}/", + node_config.node_config.rest_config.listen_addr + ) + .parse::() + .unwrap(); let response = client.get(root_uri.clone()).await.unwrap(); assert_eq!(response.status(), StatusCode::MOVED_PERMANENTLY); let post_request = Request::builder() diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index d34b725913a..a957d59281b 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use bulk::{es_compat_bulk_handler, es_compat_index_bulk_handler}; pub use filter::ElasticCompatibleApi; -use hyper::header::HeaderValue; use hyper::StatusCode; use quickwit_config::NodeConfig; use quickwit_ingest::IngestServiceClient; @@ -51,7 +50,6 @@ pub fn elastic_api_handlers( search_service: Arc, ingest_service: IngestServiceClient, ) -> impl Filter + Clone { - let add_elastic_header = node_config.add_elastic_header; es_compat_cluster_info_handler(node_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) .or(es_compat_index_search_handler(search_service.clone())) @@ -59,7 +57,6 @@ pub fn elastic_api_handlers( .or(es_compat_index_multi_search_handler(search_service)) .or(es_compat_bulk_handler(ingest_service.clone())) .or(es_compat_index_bulk_handler(ingest_service)) - .map(move |response| add_elastic_header_to_response(add_elastic_header, response)) // Register newly created handlers here. } @@ -104,39 +101,18 @@ fn make_elastic_api_response( JsonApiResponse::new(&elasticsearch_result, status_code, &format) } -/// Elasticsearch clients will check whether the response header -/// contains `X-Elastic-Product` key and `Elasticsearch` value. -fn add_elastic_header_to_response( - add_elastic_header: bool, - response: T, -) -> warp::reply::Response { - let mut response = response.into_response(); - if add_elastic_header { - response.headers_mut().insert( - "X-Elastic-Product", - HeaderValue::from_static("Elasticsearch"), - ); - } - response -} - #[cfg(test)] mod tests { use std::sync::Arc; - use assert_json_diff::assert_json_include; use mockall::predicate; use quickwit_config::NodeConfig; use quickwit_ingest::{IngestApiService, IngestServiceClient}; use quickwit_search::MockSearchService; - use serde_json::Value as JsonValue; - use warp::Filter; use super::elastic_api_handlers; use super::model::ElasticSearchError; use crate::elastic_search_api::model::MultiSearchResponse; - use crate::rest::recover_fn; - use crate::BuildInfo; fn ingest_service_client() -> IngestServiceClient { let universe = quickwit_actors::Universe::new(); @@ -383,62 +359,4 @@ mod tests { .await; assert_eq!(resp.status(), 200); } - - #[tokio::test] - async fn test_es_compat_cluster_info_handler() { - let build_info = BuildInfo::get(); - let mut node_config = NodeConfig::for_test(); - node_config.add_elastic_header = true; - let mock_search_service = Arc::new(MockSearchService::new()); - let handler = elastic_api_handlers( - Arc::new(node_config.clone()), - mock_search_service, - ingest_service_client(), - ) - .recover(recover_fn); - let resp = warp::test::request() - .path("/_elastic") - .reply(&handler) - .await; - assert_eq!(resp.status(), 200); - assert_eq!( - resp.headers().get("x-elastic-product").unwrap(), - "Elasticsearch" - ); - let resp_json: JsonValue = serde_json::from_slice(resp.body()).unwrap(); - let expected_response_json = serde_json::json!({ - "name" : node_config.node_id, - "cluster_name" : node_config.cluster_id, - "version" : { - "distribution" : "quickwit", - "number" : build_info.version, - "build_hash" : build_info.commit_hash, - "build_date" : build_info.build_date, - } - }); - assert_json_include!(actual: resp_json, expected: expected_response_json); - } - - #[tokio::test] - async fn test_head_request_on_root_endpoint() { - let mut node_config = NodeConfig::for_test(); - node_config.add_elastic_header = true; - let mock_search_service = Arc::new(MockSearchService::new()); - let handler = elastic_api_handlers( - Arc::new(node_config.clone()), - mock_search_service, - ingest_service_client(), - ) - .recover(recover_fn); - let resp = warp::test::request() - .path("/_elastic") - .method("HEAD") - .reply(&handler) - .await; - assert_eq!(resp.status(), 200); - assert_eq!( - resp.headers().get("x-elastic-product").unwrap(), - "Elasticsearch" - ); - } } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index cd5c892e27e..93aa2c6ad9f 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -484,7 +484,7 @@ pub async fn serve_quickwit( }; let grpc_listen_addr = node_config.grpc_listen_addr; - let rest_listen_addr = node_config.rest_listen_addr; + let rest_listen_addr = node_config.rest_config.listen_addr; let quickwit_services: Arc = Arc::new(QuickwitServices { node_config: Arc::new(node_config), cluster: cluster.clone(), diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 7f0a9435e90..abd8a13ba02 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -85,46 +85,9 @@ pub(crate) async fn start_rest_server( // `/metrics` route. let metrics_routes = warp::path("metrics").and(warp::get()).map(metrics_handler); - let ingest_router = quickwit_services.ingest_router_service.clone(); - let ingest_service = quickwit_services.ingest_service.clone(); - // `/api/v1/*` routes. - let api_v1_root_url = warp::path!("api" / "v1" / ..); - let api_v1_routes = cluster_handler(quickwit_services.cluster.clone()) - .or(node_info_handler( - BuildInfo::get(), - RuntimeInfo::get(), - quickwit_services.node_config.clone(), - )) - .or(indexing_get_handler( - quickwit_services.indexing_service_opt.clone(), - )) - .or(search_get_handler(quickwit_services.search_service.clone())) - .or(search_post_handler( - quickwit_services.search_service.clone(), - )) - .or(search_stream_handler( - quickwit_services.search_service.clone(), - )) - .or(ingest_api_handlers( - ingest_router, - ingest_service.clone(), - quickwit_services.node_config.ingest_api_config.clone(), - )) - .or(index_management_handlers( - quickwit_services.index_manager.clone(), - quickwit_services.node_config.clone(), - )) - .or(delete_task_api_handlers( - quickwit_services.metastore_client.clone(), - )) - .or(elastic_api_handlers( - quickwit_services.node_config.clone(), - quickwit_services.search_service.clone(), - ingest_service.clone(), - )); + let api_v1_root_route = api_v1_routes(quickwit_services.clone()); - let api_v1_root_route = api_v1_root_url.and(api_v1_routes); let redirect_root_to_ui_route = warp::path::end() .and(warp::get()) .map(|| redirect(http::Uri::from_static("/ui/search"))); @@ -143,7 +106,7 @@ pub(crate) async fn start_rest_server( let warp_service = warp::service(rest_routes); let compression_predicate = DefaultPredicate::new().and(SizeAbove::new(MINIMUM_RESPONSE_COMPRESSION_SIZE)); - let cors = build_cors(&quickwit_services.node_config.rest_cors_allow_origins); + let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins); let service = ServiceBuilder::new() .layer( @@ -177,6 +140,55 @@ pub(crate) async fn start_rest_server( Ok(()) } +fn api_v1_routes( + quickwit_services: Arc, +) -> impl Filter + Clone { + let api_v1_root_url = warp::path!("api" / "v1" / ..); + api_v1_root_url + .and( + cluster_handler(quickwit_services.cluster.clone()) + .or(node_info_handler( + BuildInfo::get(), + RuntimeInfo::get(), + quickwit_services.node_config.clone(), + )) + .or(indexing_get_handler( + quickwit_services.indexing_service_opt.clone(), + )) + .or(search_get_handler(quickwit_services.search_service.clone())) + .or(search_post_handler( + quickwit_services.search_service.clone(), + )) + .or(search_stream_handler( + quickwit_services.search_service.clone(), + )) + .or(ingest_api_handlers( + quickwit_services.ingest_router_service.clone(), + quickwit_services.ingest_service.clone(), + quickwit_services.node_config.ingest_api_config.clone(), + )) + .or(index_management_handlers( + quickwit_services.index_manager.clone(), + quickwit_services.node_config.clone(), + )) + .or(delete_task_api_handlers( + quickwit_services.metastore_client.clone(), + )) + .or(elastic_api_handlers( + quickwit_services.node_config.clone(), + quickwit_services.search_service.clone(), + quickwit_services.ingest_service.clone(), + )), + ) + .with(warp::reply::with::headers( + quickwit_services + .node_config + .rest_config + .extra_headers + .clone(), + )) +} + /// This function returns a formatted error based on the given rejection reason. /// The ordering of rejection processing is very important, we need to start /// with the most specific rejections and end with the most generic. If not, Quickwit @@ -309,11 +321,27 @@ mod tests { use std::pin::Pin; use std::task::{Context, Poll}; + use http::HeaderName; use hyper::{Request, Response, StatusCode}; + use quickwit_cluster::{create_cluster_for_test, ChannelTransport}; + use quickwit_config::NodeConfig; + use quickwit_index_management::IndexService; + use quickwit_ingest::{IngestApiService, IngestServiceClient}; + use quickwit_proto::control_plane::ControlPlaneServiceClient; + use quickwit_proto::ingest::router::IngestRouterServiceClient; + use quickwit_proto::metastore::MetastoreServiceClient; + use quickwit_search::MockSearchService; + use quickwit_storage::StorageResolver; use tower::Service; use super::*; + pub(crate) fn ingest_service_client() -> IngestServiceClient { + let universe = quickwit_actors::Universe::new(); + let (ingest_service_mailbox, _) = universe.create_test_mailbox::(); + IngestServiceClient::from_mailbox(ingest_service_mailbox) + } + #[tokio::test] async fn test_cors() { // No cors enabled @@ -525,4 +553,57 @@ mod tests { Box::pin(fut) } } + + #[tokio::test] + async fn test_additional_headers() { + let mut node_config = NodeConfig::for_test(); + node_config.rest_config.extra_headers.insert( + HeaderName::from_static("x-custom-header"), + HeaderValue::from_static("custom-value"), + ); + node_config.rest_config.extra_headers.insert( + HeaderName::from_static("x-custom-header-2"), + HeaderValue::from_static("custom-value-2"), + ); + let metastore_client = MetastoreServiceClient::from(MetastoreServiceClient::mock()); + let index_service = + IndexService::new(metastore_client.clone(), StorageResolver::unconfigured()); + let control_plane_service = + ControlPlaneServiceClient::from(ControlPlaneServiceClient::mock()); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &[], &transport, false) + .await + .unwrap(); + let quickwit_services = QuickwitServices { + cluster, + _report_splits_subscription_handle_opt: None, + control_plane_service, + ingester_service_opt: None, + metastore_server_opt: None, + index_manager: index_service, + indexing_service_opt: None, + janitor_service_opt: None, + search_service: Arc::new(MockSearchService::new()), + ingest_router_service: IngestRouterServiceClient::from( + IngestRouterServiceClient::mock(), + ), + ingest_service: ingest_service_client(), + metastore_client, + node_config: Arc::new(node_config.clone()), + }; + let handler = api_v1_routes(Arc::new(quickwit_services)); + let resp = warp::test::request() + .path("/api/v1/version") + .reply(&handler) + .await; + assert_eq!(resp.status(), 200); + assert_eq!( + resp.headers().get("x-custom-header").unwrap(), + "custom-value" + ); + assert_eq!( + resp.headers().get("x-custom-header-2").unwrap(), + "custom-value-2" + ); + } } From 551e27799e97643e3dad7554ecee0813d8929cd0 Mon Sep 17 00:00:00 2001 From: fmassot Date: Mon, 4 Dec 2023 09:54:39 +0100 Subject: [PATCH 5/7] Please clippy. --- quickwit/quickwit-serve/src/rest.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index abd8a13ba02..5901dc418c0 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -575,21 +575,23 @@ mod tests { .await .unwrap(); let quickwit_services = QuickwitServices { - cluster, _report_splits_subscription_handle_opt: None, + _local_shards_update_listener_handle_opt: None, + cluster, control_plane_service, - ingester_service_opt: None, - metastore_server_opt: None, - index_manager: index_service, indexing_service_opt: None, - janitor_service_opt: None, - search_service: Arc::new(MockSearchService::new()), + index_manager: index_service, + ingest_service: ingest_service_client(), + + ingester_service_opt: None, ingest_router_service: IngestRouterServiceClient::from( IngestRouterServiceClient::mock(), ), - ingest_service: ingest_service_client(), + janitor_service_opt: None, metastore_client, + metastore_server_opt: None, node_config: Arc::new(node_config.clone()), + search_service: Arc::new(MockSearchService::new()), }; let handler = api_v1_routes(Arc::new(quickwit_services)); let resp = warp::test::request() From fe07d01b85d50a7fc61a5120a1ca4c5470509af9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Mon, 4 Dec 2023 15:27:02 +0100 Subject: [PATCH 6/7] Apply suggestions from code review Co-authored-by: Adrien Guillo --- docs/configuration/node-config.md | 2 +- quickwit/quickwit-serve/src/rest.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/configuration/node-config.md b/docs/configuration/node-config.md index b890270514f..c78201d9d8d 100644 --- a/docs/configuration/node-config.md +++ b/docs/configuration/node-config.md @@ -26,7 +26,7 @@ A commented example is available here: [quickwit.yaml](https://github.com/quickw | `listen_address` | The IP address or hostname that Quickwit service binds to for starting REST and GRPC server and connecting this node to other nodes. By default, Quickwit binds itself to 127.0.0.1 (localhost). This default is not valid when trying to form a cluster. | `QW_LISTEN_ADDRESS` | `127.0.0.1` | | `advertise_address` | IP address advertised by the node, i.e. the IP address that peer nodes should use to connect to the node for RPCs. | `QW_ADVERTISE_ADDRESS` | `listen_address` | | `gossip_listen_port` | The port which to listen for the Gossip cluster membership service (UDP). | `QW_GOSSIP_LISTEN_PORT` | `rest.listen_port` | -| `grpc_listen_port` | The port which to listen for the gRPC service.| `QW_GRPC_LISTEN_PORT` | `rest.listen_port + 1` | +| `grpc_listen_port` | The port on which gRPC services listen for traffic. | `QW_GRPC_LISTEN_PORT` | `rest.listen_port + 1` | | `peer_seeds` | List of IP addresses or hostnames used to bootstrap the cluster and discover the complete set of nodes. This list may contain the current node address and does not need to be exhaustive. | `QW_PEER_SEEDS` | | | `data_dir` | Path to directory where data (tmp data, splits kept for caching purpose) is persisted. This is mostly used in indexing. | `QW_DATA_DIR` | `./qwdata` | | `metastore_uri` | Metastore URI. Can be a local directory or `s3://my-bucket/indexes` or `postgres://username:password@localhost:5432/metastore`. [Learn more about the metastore configuration](metastore-config.md). | `QW_METASTORE_URI` | `{data_dir}/indexes` | diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 5901dc418c0..66a5022f905 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -555,7 +555,7 @@ mod tests { } #[tokio::test] - async fn test_additional_headers() { + async fn test_extra_headers() { let mut node_config = NodeConfig::for_test(); node_config.rest_config.extra_headers.insert( HeaderName::from_static("x-custom-header"), From 6849a51339eca555f70340e5f8217136a8e5bf5a Mon Sep 17 00:00:00 2001 From: fmassot Date: Mon, 4 Dec 2023 15:43:14 +0100 Subject: [PATCH 7/7] Log extra headers if any and take review's comments into account. --- config/quickwit.yaml | 5 +++ .../src/node_config/serialize.rs | 40 +++++++++-------- .../src/elastic_search_api/mod.rs | 45 +++++++++++++++++++ quickwit/quickwit-serve/src/rest.rs | 11 +++++ 4 files changed, 82 insertions(+), 19 deletions(-) diff --git a/config/quickwit.yaml b/config/quickwit.yaml index 05e01eea373..cb11358f1f7 100644 --- a/config/quickwit.yaml +++ b/config/quickwit.yaml @@ -35,6 +35,11 @@ version: 0.6 # # rest: # listen_port: 7280 +# cors_allow_origins: +# - "http://localhost:3000" +# extra_headers: +# x-header-1: header-value-1 +# x-header-2: header-value-2 # # IP address advertised by the node, i.e. the IP address that peer nodes should use to connect to the node for RPCs. # The environment variable `QW_ADVERTISE_ADDRESS` can also be used to override this value. diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 09ccc41f881..9162c067958 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -917,27 +917,29 @@ mod tests { assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:1889"); assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:1989"); } - { - // Check that `rest_listen_port` can still be used for backward compatibility. - let node_config = NodeConfigBuilder { - rest_listen_port: Some(1789), - listen_address: default_listen_address(), - rest_config_builder: RestConfigBuilder { - listen_port: None, - ..Default::default() - }, + } + + #[tokio::test] + async fn test_rest_deprecated_listen_port_config() { + // This test should be removed once deprecated `rest_listen_port` field is removed. + let node_config = NodeConfigBuilder { + rest_listen_port: Some(1789), + listen_address: default_listen_address(), + rest_config_builder: RestConfigBuilder { + listen_port: None, ..Default::default() - } - .build_and_validate(&HashMap::new()) - .await - .unwrap(); - assert_eq!( - node_config.rest_config.listen_addr.to_string(), - "127.0.0.1:1789" - ); - assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:1789"); - assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:1790"); + }, + ..Default::default() } + .build_and_validate(&HashMap::new()) + .await + .unwrap(); + assert_eq!( + node_config.rest_config.listen_addr.to_string(), + "127.0.0.1:1789" + ); + assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:1789"); + assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:1790"); } #[tokio::test] diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index a957d59281b..ab5fecde5da 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -105,14 +105,20 @@ fn make_elastic_api_response( mod tests { use std::sync::Arc; + use assert_json_diff::assert_json_include; use mockall::predicate; use quickwit_config::NodeConfig; use quickwit_ingest::{IngestApiService, IngestServiceClient}; use quickwit_search::MockSearchService; + use serde_json::Value as JsonValue; + use warp::Filter; use super::elastic_api_handlers; use super::model::ElasticSearchError; use crate::elastic_search_api::model::MultiSearchResponse; + use crate::elastic_search_api::rest_handler::es_compat_cluster_info_handler; + use crate::rest::recover_fn; + use crate::BuildInfo; fn ingest_service_client() -> IngestServiceClient { let universe = quickwit_actors::Universe::new(); @@ -359,4 +365,43 @@ mod tests { .await; assert_eq!(resp.status(), 200); } + + #[tokio::test] + async fn test_es_compat_cluster_info_handler() { + let build_info = BuildInfo::get(); + let config = Arc::new(NodeConfig::for_test()); + let handler = + es_compat_cluster_info_handler(config.clone(), build_info).recover(recover_fn); + let resp = warp::test::request() + .path("/_elastic") + .reply(&handler) + .await; + assert_eq!(resp.status(), 200); + let resp_json: JsonValue = serde_json::from_slice(resp.body()).unwrap(); + let expected_response_json = serde_json::json!({ + "name" : config.node_id, + "cluster_name" : config.cluster_id, + "version" : { + "distribution" : "quickwit", + "number" : build_info.version, + "build_hash" : build_info.commit_hash, + "build_date" : build_info.build_date, + } + }); + assert_json_include!(actual: resp_json, expected: expected_response_json); + } + + #[tokio::test] + async fn test_head_request_on_root_endpoint() { + let build_info = BuildInfo::get(); + let config = Arc::new(NodeConfig::for_test()); + let handler = + es_compat_cluster_info_handler(config.clone(), build_info).recover(recover_fn); + let resp = warp::test::request() + .path("/_elastic") + .method("HEAD") + .reply(&handler) + .await; + assert_eq!(resp.status(), 200); + } } diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 66a5022f905..e3792ef3c84 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -143,6 +143,17 @@ pub(crate) async fn start_rest_server( fn api_v1_routes( quickwit_services: Arc, ) -> impl Filter + Clone { + if !quickwit_services + .node_config + .rest_config + .extra_headers + .is_empty() + { + info!( + "Extra headers will be added to all responses: {:?}", + quickwit_services.node_config.rest_config.extra_headers + ); + } let api_v1_root_url = warp::path!("api" / "v1" / ..); api_v1_root_url .and(