diff --git a/config/quickwit.yaml b/config/quickwit.yaml index bff32a8031a..cb11358f1f7 100644 --- a/config/quickwit.yaml +++ b/config/quickwit.yaml @@ -32,7 +32,14 @@ version: 0.6 # 2. pass `0.0.0.0` and let Quickwit do its best to discover the node's IP (see `advertise_address`) # # listen_address: 127.0.0.1 -# rest_listen_port: 7280 +# +# rest: +# listen_port: 7280 +# cors_allow_origins: +# - "http://localhost:3000" +# extra_headers: +# x-header-1: header-value-1 +# x-header-2: header-value-2 # # IP address advertised by the node, i.e. the IP address that peer nodes should use to connect to the node for RPCs. # The environment variable `QW_ADVERTISE_ADDRESS` can also be used to override this value. diff --git a/config/tutorials/hdfs-logs/searcher-1.yaml b/config/tutorials/hdfs-logs/searcher-1.yaml index 140102c5f8e..09886e5cab7 100644 --- a/config/tutorials/hdfs-logs/searcher-1.yaml +++ b/config/tutorials/hdfs-logs/searcher-1.yaml @@ -1,7 +1,11 @@ version: 0.6 node_id: searcher-1 listen_address: 127.0.0.1 -rest_listen_port: 7280 +rest: + listen_port: 7280 +ingest_api: + max_queue_memory_usage: 4GiB + max_queue_disk_usage: 8GiB peer_seeds: - 127.0.0.1:7290 # searcher-2 - 127.0.0.1:7300 # searcher-3 diff --git a/config/tutorials/hdfs-logs/searcher-2.yaml b/config/tutorials/hdfs-logs/searcher-2.yaml index 110475d732b..53f5cf3c573 100644 --- a/config/tutorials/hdfs-logs/searcher-2.yaml +++ b/config/tutorials/hdfs-logs/searcher-2.yaml @@ -1,7 +1,8 @@ version: 0.6 node_id: searcher-2 listen_address: 127.0.0.1 -rest_listen_port: 7290 +rest: + listen_port: 7290 peer_seeds: - 127.0.0.1:7280 # searcher-1 - 127.0.0.1:7300 # searcher-3 diff --git a/config/tutorials/hdfs-logs/searcher-3.yaml b/config/tutorials/hdfs-logs/searcher-3.yaml index ba287724e03..6a86e375d39 100644 --- a/config/tutorials/hdfs-logs/searcher-3.yaml +++ b/config/tutorials/hdfs-logs/searcher-3.yaml @@ -1,7 +1,8 @@ version: 0.6 node_id: searcher-3 listen_address: 127.0.0.1 -rest_listen_port: 7300 +rest: + listen_port: 7300 peer_seeds: - 127.0.0.1:7280 # searcher-1 - 127.0.0.1:7290 # searcher-2 diff --git a/docs/configuration/node-config.md b/docs/configuration/node-config.md index ce318c8e109..c78201d9d8d 100644 --- a/docs/configuration/node-config.md +++ b/docs/configuration/node-config.md @@ -25,14 +25,48 @@ A commented example is available here: [quickwit.yaml](https://github.com/quickw | `enabled_services` | Enabled services (control_plane, indexer, janitor, metastore, searcher) | `QW_ENABLED_SERVICES` | all services | | `listen_address` | The IP address or hostname that Quickwit service binds to for starting REST and GRPC server and connecting this node to other nodes. By default, Quickwit binds itself to 127.0.0.1 (localhost). This default is not valid when trying to form a cluster. | `QW_LISTEN_ADDRESS` | `127.0.0.1` | | `advertise_address` | IP address advertised by the node, i.e. the IP address that peer nodes should use to connect to the node for RPCs. | `QW_ADVERTISE_ADDRESS` | `listen_address` | -| `rest_listen_port` | The port which to listen for HTTP REST API. | `QW_REST_LISTEN_PORT` | `7280` | -| `gossip_listen_port` | The port which to listen for the Gossip cluster membership service (UDP). | `QW_GOSSIP_LISTEN_PORT` | `rest_listen_port` | -| `grpc_listen_port` | The port which to listen for the gRPC service.| `QW_GRPC_LISTEN_PORT` | `rest_listen_port + 1` | +| `gossip_listen_port` | The port which to listen for the Gossip cluster membership service (UDP). | `QW_GOSSIP_LISTEN_PORT` | `rest.listen_port` | +| `grpc_listen_port` | The port on which gRPC services listen for traffic. | `QW_GRPC_LISTEN_PORT` | `rest.listen_port + 1` | | `peer_seeds` | List of IP addresses or hostnames used to bootstrap the cluster and discover the complete set of nodes. This list may contain the current node address and does not need to be exhaustive. | `QW_PEER_SEEDS` | | | `data_dir` | Path to directory where data (tmp data, splits kept for caching purpose) is persisted. This is mostly used in indexing. | `QW_DATA_DIR` | `./qwdata` | | `metastore_uri` | Metastore URI. Can be a local directory or `s3://my-bucket/indexes` or `postgres://username:password@localhost:5432/metastore`. [Learn more about the metastore configuration](metastore-config.md). | `QW_METASTORE_URI` | `{data_dir}/indexes` | | `default_index_root_uri` | Default index root URI that defines the location where index data (splits) is stored. The index URI is built following the scheme: `{default_index_root_uri}/{index-id}` | `QW_DEFAULT_INDEX_ROOT_URI` | `{data_dir}/indexes` | -| `rest_cors_allow_origins` | Configure the CORS origins which are allowed to access the API. [Read more](#configuring-cors-cross-origin-resource-sharing) | | + +## REST configuration + +This section contains the REST API configuration options. + +| Property | Description | Env variable | Default value | +| --- | --- | --- | --- | +| `listen_port` | The port on which the REST API listens for HTTP traffic. | `QW_REST_LISTEN_PORT` | `7280` | +| `cors_allow_origins` | Configure the CORS origins which are allowed to access the API. [Read more](#configuring-cors-cross-origin-resource-sharing) | | +| `extra_headers` | List of header names and values | | | + +### Configuring CORS (Cross-origin resource sharing) + +CORS (Cross-origin resource sharing) describes which address or origins can access the REST API from the browser. +By default, sharing resources cross-origin is not allowed. + +A wildcard, single origin, or multiple origins can be specified as part of the `cors_allow_origins` parameter: + + +Example of a REST configuration: + +```yaml + +rest: + listen_port: 1789 + extra_headers: + x-header-1: header-value-1 + x-header-2: header-value-2 + cors_allow_origins: '*' + +# cors_allow_origins: https://my-hdfs-logs.domain.com # Optionally we can specify one domain +# cors_allow_origins: # Or allow multiple origins +# - https://my-hdfs-logs.domain.com +# - https://my-hdfs.other-domain.com + +``` ## Storage configuration @@ -203,7 +237,8 @@ version: 0.6 cluster_id: quickwit-cluster node_id: my-unique-node-id listen_address: ${QW_LISTEN_ADDRESS} -rest_listen_port: ${QW_LISTEN_PORT:-1111} +rest: + listen_port: ${QW_LISTEN_PORT:-1111} ``` Will be interpreted by Quickwit as: @@ -213,23 +248,6 @@ version: 0.6 cluster_id: quickwit-cluster node_id: my-unique-node-id listen_address: 0.0.0.0 -rest_listen_port: 1111 -``` - -## Configuring CORS (Cross-origin resource sharing) - -CORS (Cross-origin resource sharing) describes which address or origins can access the REST API from the browser. -By default, sharing resources cross-origin is not allowed. - -A wildcard, single origin, or multiple origins can be specified as part of the `rest_cors_allow_origins` parameter: - -```yaml -version: 0.6 -index_id: hdfs - -rest_cors_allow_origins: '*' # Allow all origins -# rest_cors_allow_origins: https://my-hdfs-logs.domain.com # Optionally we can specify one domain -# rest_cors_allow_origins: # Or allow multiple origins -# - https://my-hdfs-logs.domain.com -# - https://my-hdfs.other-domain.com +rest: + listen_port: 1111 ``` diff --git a/docs/configuration/ports-config.md b/docs/configuration/ports-config.md index 623aeb1b897..d8384540cb0 100644 --- a/docs/configuration/ports-config.md +++ b/docs/configuration/ports-config.md @@ -4,18 +4,18 @@ sidebar_position: 6 --- When starting a quickwit search server, one important parameter that can be configured is -the `rest_listen_port` (defaults to :7280). +the `rest.listen_port` (defaults to :7280). Internally, Quickwit will, in fact, use three sockets. The ports of these three sockets cannot be configured independently at the moment. -The ports used are computed relative to the `rest_listen_port` port, as follows. +The ports used are computed relative to the `rest.listen_port` port, as follows. | Service | Port used | Protocol | Default | |-------------------------------|---------------------------|----------|-----------| -| Http server with the rest api | `${rest_listen_port}` | TCP | 7280 | -| Cluster membership | `${rest_listen_port}` | UDP | 7280 | -| GRPC service | `${rest_listen_port} + 1` | TCP | 7281 | +| Http server with the rest api | `${rest.listen_port}` | TCP | 7280 | +| Cluster membership | `${rest.listen_port}` | UDP | 7280 | +| GRPC service | `${rest.listen_port} + 1` | TCP | 7281 | It is not possible for the moment to configure these ports independently. diff --git a/docs/get-started/tutorials/tutorial-hdfs-logs-distributed-search-aws-s3.md b/docs/get-started/tutorials/tutorial-hdfs-logs-distributed-search-aws-s3.md index 4773e6460c7..ad95ec7d539 100644 --- a/docs/get-started/tutorials/tutorial-hdfs-logs-distributed-search-aws-s3.md +++ b/docs/get-started/tutorials/tutorial-hdfs-logs-distributed-search-aws-s3.md @@ -197,8 +197,8 @@ Now that we have indexed the logs and can search from one instance, it's time to ## Start two more instances -Quickwit needs a port `rest_listen_port` for serving the HTTP rest API via TCP as well as maintaining the cluster formation via UDP. -Also, it needs `{rest_listen_port} + 1` for gRPC communication between instances. +Quickwit needs a port `rest.listen_port` for serving the HTTP rest API via TCP as well as maintaining the cluster formation via UDP. +Also, it needs `{rest.listen_port} + 1` for gRPC communication between instances. In AWS, you can create a security group to group these inbound rules. Check out the [network section](../../guides/aws-setup) of our AWS setup guide. diff --git a/docs/guides/cluster-with-docker-compose.md b/docs/guides/cluster-with-docker-compose.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 35f17f1d077..a759f82b21b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5355,6 +5355,8 @@ dependencies = [ "chrono", "cron", "enum-iterator", + "http", + "http-serde", "humantime", "itertools 0.12.0", "json_comments", diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index 2e546ffa8b7..5493baee194 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -84,7 +84,8 @@ const DEFAULT_QUICKWIT_CONFIG: &str = r#" version: 0.6 metastore_uri: #metastore_uri data_dir: #data_dir - rest_listen_port: #rest_listen_port + rest: + listen_port: #rest_listen_port grpc_listen_port: #grpc_listen_port "#; diff --git a/quickwit/quickwit-config/Cargo.toml b/quickwit/quickwit-config/Cargo.toml index 8e6912d0ace..453a0669e61 100644 --- a/quickwit/quickwit-config/Cargo.toml +++ b/quickwit/quickwit-config/Cargo.toml @@ -16,6 +16,8 @@ bytesize = { workspace = true } chrono = { workspace = true } cron = { workspace = true } enum-iterator = { workspace = true } +http = { workspace = true } +http-serde = { workspace = true } humantime = { workspace = true } itertools = { workspace = true } json_comments = { workspace = true } diff --git a/quickwit/quickwit-config/resources/tests/config/quickwit.json b/quickwit/quickwit-config/resources/tests/config/quickwit.json index a0c7573d0a4..8b3b8d6450f 100644 --- a/quickwit/quickwit-config/resources/tests/config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/config/quickwit.json @@ -9,7 +9,6 @@ ], "listen_address": "0.0.0.0", "advertise_address": "172.0.0.12", - "rest_listen_port": 1111, "gossip_listen_port": 2222, "grpc_listen_port": 3333, "peer_seeds": [ @@ -19,6 +18,13 @@ "data_dir": "/opt/quickwit/data", "metastore_uri": "postgres://username:password@host:port/db", "default_index_root_uri": "s3://quickwit-indexes", + "rest": { + "listen_port": 1111, + "extra_headers": { + "x-header-1": "header-value-1", + "x-header-2": "header-value-2" + } + }, "storage": { "azure": { "account": "quickwit-dev" diff --git a/quickwit/quickwit-config/resources/tests/config/quickwit.toml b/quickwit/quickwit-config/resources/tests/config/quickwit.toml index 701be1d2c90..04f384eda17 100644 --- a/quickwit/quickwit-config/resources/tests/config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/config/quickwit.toml @@ -5,7 +5,6 @@ node_id = "my-unique-node-id" enabled_services = [ "janitor", "metastore" ] listen_address = "0.0.0.0" advertise_address = "172.0.0.12" -rest_listen_port = 1111 gossip_listen_port = 2222 grpc_listen_port = 3333 peer_seeds = [ "quickwit-searcher-0.local", "quickwit-searcher-1.local" ] @@ -13,6 +12,13 @@ data_dir = "/opt/quickwit/data" metastore_uri = "postgres://username:password@host:port/db" default_index_root_uri = "s3://quickwit-indexes" +[rest] +listen_port = 1111 + +[rest.extra_headers] +x-header-1 = "header-value-1" +x-header-2 = "header-value-2" + [storage.azure] account = "quickwit-dev" diff --git a/quickwit/quickwit-config/resources/tests/config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/config/quickwit.yaml index 5f3ab13c828..760afdcb60c 100644 --- a/quickwit/quickwit-config/resources/tests/config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/config/quickwit.yaml @@ -7,7 +7,6 @@ enabled_services: - metastore listen_address: 0.0.0.0 advertise_address: 172.0.0.12 -rest_listen_port: 1111 gossip_listen_port: 2222 grpc_listen_port: 3333 peer_seeds: @@ -17,6 +16,12 @@ data_dir: /opt/quickwit/data metastore_uri: postgres://username:password@host:port/db default_index_root_uri: s3://quickwit-indexes +rest: + listen_port: 1111 + extra_headers: + x-header-1: header-value-1 + x-header-2: header-value-2 + storage: azure: account: quickwit-dev diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index b6412ad6ba0..200cd73dfed 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -28,6 +28,7 @@ use std::time::Duration; use anyhow::{bail, ensure}; use bytesize::ByteSize; +use http::HeaderMap; use quickwit_common::net::HostAddr; use quickwit_common::uri::Uri; use quickwit_proto::indexing::CpuCapacity; @@ -41,6 +42,15 @@ use crate::{ConfigFormat, MetastoreConfigs}; pub const DEFAULT_QW_CONFIG_PATH: &str = "config/quickwit.yaml"; +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RestConfig { + pub listen_addr: SocketAddr, + pub cors_allow_origins: Vec, + #[serde(with = "http_serde::header_map")] + pub extra_headers: HeaderMap, +} + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct IndexerConfig { @@ -304,7 +314,6 @@ pub struct NodeConfig { pub cluster_id: String, pub node_id: String, pub enabled_services: HashSet, - pub rest_listen_addr: SocketAddr, pub gossip_listen_addr: SocketAddr, pub grpc_listen_addr: SocketAddr, pub gossip_advertise_addr: SocketAddr, @@ -313,7 +322,7 @@ pub struct NodeConfig { pub data_dir_path: PathBuf, pub metastore_uri: Uri, pub default_index_root_uri: Uri, - pub rest_cors_allow_origins: Vec, + pub rest_config: RestConfig, pub storage_configs: StorageConfigs, pub metastore_configs: MetastoreConfigs, pub indexer_config: IndexerConfig, diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 4f2d259222b..9162c067958 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -22,12 +22,14 @@ use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use anyhow::{bail, Context}; +use http::HeaderMap; use quickwit_common::net::{find_private_ip, get_short_hostname, Host}; use quickwit_common::new_coolid; use quickwit_common::uri::Uri; use serde::{Deserialize, Serialize}; use tracing::{info, warn}; +use super::RestConfig; use crate::config_value::ConfigValue; use crate::qw_env_vars::*; use crate::service::QuickwitService; @@ -88,8 +90,8 @@ fn default_listen_address() -> ConfigValue { ConfigValue::with_default(Host::default().to_string()) } -fn default_rest_listen_port() -> ConfigValue { - ConfigValue::with_default(7280) +fn default_rest_listen_port() -> u16 { + 7280 } fn default_data_dir_uri() -> ConfigValue { @@ -168,8 +170,8 @@ struct NodeConfigBuilder { #[serde(default = "default_listen_address")] listen_address: ConfigValue, advertise_address: ConfigValue, - #[serde(default = "default_rest_listen_port")] - rest_listen_port: ConfigValue, + // Deprecated, use `rest.listen_port` instead. + rest_listen_port: Option, gossip_listen_port: ConfigValue, grpc_listen_port: ConfigValue, #[serde(default)] @@ -179,9 +181,9 @@ struct NodeConfigBuilder { data_dir_uri: ConfigValue, metastore_uri: ConfigValue, default_index_root_uri: ConfigValue, + #[serde(rename = "rest")] #[serde(default)] - #[serde_as(deserialize_as = "serde_with::OneOrMany<_>")] - rest_cors_allow_origins: Vec, + rest_config_builder: RestConfigBuilder, #[serde(rename = "storage")] #[serde(default)] storage_configs: StorageConfigs, @@ -219,19 +221,32 @@ impl NodeConfigBuilder { let listen_host = listen_address.parse::()?; let listen_ip = listen_host.resolve().await?; - let rest_listen_port = self.rest_listen_port.resolve(env_vars)?; - let rest_listen_addr = SocketAddr::new(listen_ip, rest_listen_port); + if let Some(rest_listen_port) = self.rest_listen_port { + if self.rest_config_builder.listen_port.is_some() { + bail!( + "conflicting configuration values: please use only `rest.listen_port`, \ + `rest_listen_port` is deprecated and should not be used alongside \ + `rest.listen_port`. Update your configuration to use `rest.listen_port`." + ); + } + warn!("`rest_listen_port` is deprecated, use `rest.listen_port` instead"); + self.rest_config_builder.listen_port = Some(rest_listen_port); + } + + let rest_config = self + .rest_config_builder + .build_and_validate(listen_ip, env_vars)?; let gossip_listen_port = self .gossip_listen_port .resolve_optional(env_vars)? - .unwrap_or(rest_listen_port); + .unwrap_or(rest_config.listen_addr.port()); let gossip_listen_addr = SocketAddr::new(listen_ip, gossip_listen_port); let grpc_listen_port = self .grpc_listen_port .resolve_optional(env_vars)? - .unwrap_or(rest_listen_port + 1); + .unwrap_or(rest_config.listen_addr.port() + 1); let grpc_listen_addr = SocketAddr::new(listen_ip, grpc_listen_port); let advertise_address = self.advertise_address.resolve_optional(env_vars)?; @@ -272,7 +287,6 @@ impl NodeConfigBuilder { cluster_id: self.cluster_id.resolve(env_vars)?, node_id: self.node_id.resolve(env_vars)?, enabled_services, - rest_listen_addr, gossip_listen_addr, grpc_listen_addr, gossip_advertise_addr, @@ -281,7 +295,7 @@ impl NodeConfigBuilder { data_dir_path, metastore_uri, default_index_root_uri, - rest_cors_allow_origins: self.rest_cors_allow_origins, + rest_config, metastore_configs: self.metastore_configs, storage_configs: self.storage_configs, indexer_config: self.indexer_config, @@ -319,7 +333,7 @@ impl Default for NodeConfigBuilder { node_id: default_node_id(), enabled_services: default_enabled_services(), listen_address: default_listen_address(), - rest_listen_port: default_rest_listen_port(), + rest_listen_port: None, gossip_listen_port: ConfigValue::none(), grpc_listen_port: ConfigValue::none(), advertise_address: ConfigValue::none(), @@ -327,7 +341,7 @@ impl Default for NodeConfigBuilder { data_dir_uri: default_data_dir_uri(), metastore_uri: ConfigValue::none(), default_index_root_uri: ConfigValue::none(), - rest_cors_allow_origins: Vec::new(), + rest_config_builder: RestConfigBuilder::default(), storage_configs: StorageConfigs::default(), metastore_configs: MetastoreConfigs::default(), indexer_config: IndexerConfig::default(), @@ -338,10 +352,44 @@ impl Default for NodeConfigBuilder { } } +#[serde_with::serde_as] +#[derive(Debug, Deserialize, PartialEq, Default)] +#[serde(deny_unknown_fields)] +struct RestConfigBuilder { + #[serde(default)] + listen_port: Option, + #[serde(default)] + #[serde_as(deserialize_as = "serde_with::OneOrMany<_>")] + pub cors_allow_origins: Vec, + #[serde(with = "http_serde::header_map")] + #[serde(default)] + pub extra_headers: HeaderMap, +} + +impl RestConfigBuilder { + fn build_and_validate( + self, + listen_ip: IpAddr, + env_vars: &HashMap, + ) -> anyhow::Result { + let listen_port_from_config_or_default = + self.listen_port.unwrap_or(default_rest_listen_port()); + let listen_port = ConfigValue::::with_default( + listen_port_from_config_or_default, + ) + .resolve(env_vars)?; + let rest_config = RestConfig { + listen_addr: SocketAddr::new(listen_ip, listen_port), + cors_allow_origins: self.cors_allow_origins, + extra_headers: self.extra_headers, + }; + Ok(rest_config) + } +} + #[cfg(any(test, feature = "testsuite"))] pub fn node_config_for_test() -> NodeConfig { let enabled_services = QuickwitService::supported_services(); - let listen_address = Host::default(); let rest_listen_port = quickwit_common::net::find_available_tcp_port() .expect("The OS should almost always find an available port."); @@ -367,21 +415,24 @@ pub fn node_config_for_test() -> NodeConfig { .to_path_buf(); let metastore_uri = default_metastore_uri(&data_dir_uri); let default_index_root_uri = default_index_root_uri(&data_dir_uri); - + let rest_config = RestConfig { + listen_addr: rest_listen_addr, + cors_allow_origins: Vec::new(), + extra_headers: HeaderMap::new(), + }; NodeConfig { cluster_id: default_cluster_id().unwrap(), node_id: default_node_id().unwrap(), enabled_services, gossip_advertise_addr: gossip_listen_addr, grpc_advertise_addr: grpc_listen_addr, - rest_listen_addr, gossip_listen_addr, grpc_listen_addr, peer_seeds: Vec::new(), data_dir_path, metastore_uri, default_index_root_uri, - rest_cors_allow_origins: Vec::new(), + rest_config, storage_configs: StorageConfigs::default(), metastore_configs: MetastoreConfigs::default(), indexer_config: IndexerConfig::default(), @@ -425,9 +476,17 @@ mod tests { assert!(config.is_service_enabled(QuickwitService::Metastore)); assert_eq!( - config.rest_listen_addr, + config.rest_config.listen_addr, SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1111) ); + assert_eq!( + config.rest_config.extra_headers.get("x-header-1").unwrap(), + "header-value-1" + ); + assert_eq!( + config.rest_config.extra_headers.get("x-header-2").unwrap(), + "header-value-2" + ); assert_eq!( config.gossip_listen_addr, SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 2222) @@ -573,7 +632,7 @@ mod tests { QuickwitService::supported_services() ); assert_eq!( - config.rest_listen_addr, + config.rest_config.listen_addr, SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7280) ); assert_eq!( @@ -649,7 +708,7 @@ mod tests { &[&QuickwitService::Indexer, &QuickwitService::Metastore] ); assert_eq!( - config.rest_listen_addr, + config.rest_config.listen_addr, SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 0, 0, 12)), 1234) ); assert_eq!( @@ -754,7 +813,6 @@ mod tests { async fn test_peer_socket_addrs() { { let node_config = NodeConfigBuilder { - rest_listen_port: ConfigValue::for_test(1789), ..Default::default() } .build_and_validate(&HashMap::new()) @@ -764,7 +822,6 @@ mod tests { } { let node_config = NodeConfigBuilder { - rest_listen_port: ConfigValue::for_test(1789), peer_seeds: ConfigValue::for_test(List(vec!["unresolvable-host".to_string()])), ..Default::default() } @@ -775,7 +832,10 @@ mod tests { } { let node_config = NodeConfigBuilder { - rest_listen_port: ConfigValue::for_test(1789), + rest_config_builder: RestConfigBuilder { + listen_port: Some(1789), + ..Default::default() + }, peer_seeds: ConfigValue::for_test(List(vec![ "unresolvable-host".to_string(), "localhost".to_string(), @@ -810,40 +870,78 @@ mod tests { .build_and_validate(&HashMap::new()) .await .unwrap(); - assert_eq!(node_config.rest_listen_addr.to_string(), "127.0.0.1:7280"); + assert_eq!( + node_config.rest_config.listen_addr.to_string(), + "127.0.0.1:7280" + ); assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:7280"); assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:7281"); } { let node_config = NodeConfigBuilder { listen_address: default_listen_address(), - rest_listen_port: ConfigValue::for_test(1789), + rest_config_builder: RestConfigBuilder { + listen_port: Some(1789), + ..Default::default() + }, ..Default::default() } .build_and_validate(&HashMap::new()) .await .unwrap(); - assert_eq!(node_config.rest_listen_addr.to_string(), "127.0.0.1:1789"); + assert_eq!( + node_config.rest_config.listen_addr.to_string(), + "127.0.0.1:1789" + ); assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:1789"); assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:1790"); } { let node_config = NodeConfigBuilder { listen_address: default_listen_address(), - rest_listen_port: ConfigValue::for_test(1789), gossip_listen_port: ConfigValue::for_test(1889), grpc_listen_port: ConfigValue::for_test(1989), + rest_config_builder: RestConfigBuilder { + listen_port: Some(1789), + ..Default::default() + }, ..Default::default() } .build_and_validate(&HashMap::new()) .await .unwrap(); - assert_eq!(node_config.rest_listen_addr.to_string(), "127.0.0.1:1789"); + assert_eq!( + node_config.rest_config.listen_addr.to_string(), + "127.0.0.1:1789" + ); assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:1889"); assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:1989"); } } + #[tokio::test] + async fn test_rest_deprecated_listen_port_config() { + // This test should be removed once deprecated `rest_listen_port` field is removed. + let node_config = NodeConfigBuilder { + rest_listen_port: Some(1789), + listen_address: default_listen_address(), + rest_config_builder: RestConfigBuilder { + listen_port: None, + ..Default::default() + }, + ..Default::default() + } + .build_and_validate(&HashMap::new()) + .await + .unwrap(); + assert_eq!( + node_config.rest_config.listen_addr.to_string(), + "127.0.0.1:1789" + ); + assert_eq!(node_config.gossip_listen_addr.to_string(), "127.0.0.1:1789"); + assert_eq!(node_config.grpc_listen_addr.to_string(), "127.0.0.1:1790"); + } + #[tokio::test] async fn test_load_config_with_validation_error() { let config_filepath = get_config_filepath("quickwit.yaml"); @@ -933,6 +1031,29 @@ mod tests { } } + #[tokio::test] + async fn test_config_invalid_when_both_listen_ports_params_are_configured() { + let config_yaml = r#" + version: 0.6 + rest_listen_port: 1789 + rest: + listen_port: 1789 + "#; + let config = load_node_config_with_env( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &HashMap::default(), + ) + .await + .unwrap_err(); + assert_eq!( + &config.to_string(), + "conflicting configuration values: please use only `rest.listen_port`, \ + `rest_listen_port` is deprecated and should not be used alongside \ + `rest.listen_port`. Update your configuration to use `rest.listen_port`." + ); + } + #[test] fn test_jaeger_config_rejects_null_values() { let jaeger_config_yaml = r#" @@ -949,7 +1070,8 @@ mod tests { async fn test_rest_config_accepts_wildcard() { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: '*' + rest: + cors_allow_origins: '*' "#; let config = load_node_config_with_env( ConfigFormat::Yaml, @@ -958,14 +1080,16 @@ mod tests { ) .await .expect("Deserialize rest config"); - assert_eq!(config.rest_cors_allow_origins, ["*"]); + assert_eq!(config.rest_config.cors_allow_origins, ["*"]); } #[tokio::test] async fn test_rest_config_accepts_single_origin() { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: https://www.my-domain.com + rest: + cors_allow_origins: + - https://www.my-domain.com "#; let config = load_node_config_with_env( ConfigFormat::Yaml, @@ -975,13 +1099,14 @@ mod tests { .await .expect("Deserialize rest config"); assert_eq!( - config.rest_cors_allow_origins, + config.rest_config.cors_allow_origins, ["https://www.my-domain.com"] ); let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: http://192.168.0.108:7280 + rest: + cors_allow_origins: http://192.168.0.108:7280 "#; let config = load_node_config_with_env( ConfigFormat::Yaml, @@ -991,7 +1116,7 @@ mod tests { .await .expect("Deserialize rest config"); assert_eq!( - config.rest_cors_allow_origins, + config.rest_config.cors_allow_origins, ["http://192.168.0.108:7280"] ); } @@ -1000,7 +1125,8 @@ mod tests { async fn test_rest_config_accepts_multi_origin() { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: + rest: + cors_allow_origins: - https://www.my-domain.com "#; let config = load_node_config_with_env( @@ -1011,13 +1137,14 @@ mod tests { .await .expect("Deserialize rest config"); assert_eq!( - config.rest_cors_allow_origins, + config.rest_config.cors_allow_origins, ["https://www.my-domain.com"] ); let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: + rest: + cors_allow_origins: - https://www.my-domain.com - https://www.my-other-domain.com "#; @@ -1029,7 +1156,7 @@ mod tests { .await .expect("Deserialize rest config"); assert_eq!( - config.rest_cors_allow_origins, + config.rest_config.cors_allow_origins, [ "https://www.my-domain.com", "https://www.my-other-domain.com" @@ -1038,7 +1165,8 @@ mod tests { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: + rest: + rest_cors_allow_origins: "#; load_node_config_with_env( ConfigFormat::Yaml, @@ -1050,7 +1178,8 @@ mod tests { let rest_config_yaml = r#" version: 0.6 - rest_cors_allow_origins: + rest: + cors_allow_origins: - "#; load_node_config_with_env( diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index e70b29839f8..0f1cb22ef59 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -188,11 +188,11 @@ impl ClusterSandbox { Ok(Self { node_configs, searcher_rest_client: QuickwitClientBuilder::new(transport_url( - searcher_config.node_config.rest_listen_addr, + searcher_config.node_config.rest_config.listen_addr, )) .build(), indexer_rest_client: QuickwitClientBuilder::new(transport_url( - indexer_config.node_config.rest_listen_addr, + indexer_config.node_config.rest_config.listen_addr, )) .build(), _temp_dir: temp_dir, diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index 758f22f6b35..15cb0abc04f 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -54,9 +54,12 @@ async fn test_ui_redirect_on_get() { .pool_idle_timeout(Duration::from_secs(30)) .http2_only(true) .build_http(); - let root_uri = format!("http://{}/", node_config.node_config.rest_listen_addr) - .parse::() - .unwrap(); + let root_uri = format!( + "http://{}/", + node_config.node_config.rest_config.listen_addr + ) + .parse::() + .unwrap(); let response = client.get(root_uri.clone()).await.unwrap(); assert_eq!(response.status(), StatusCode::MOVED_PERMANENTLY); let post_request = Request::builder() diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index 44d77fa2ac0..ab5fecde5da 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -113,6 +113,7 @@ mod tests { use serde_json::Value as JsonValue; use warp::Filter; + use super::elastic_api_handlers; use super::model::ElasticSearchError; use crate::elastic_search_api::model::MultiSearchResponse; use crate::elastic_search_api::rest_handler::es_compat_cluster_info_handler; @@ -160,6 +161,7 @@ mod tests { .reply(&es_search_api_handler) .await; assert_eq!(resp.status(), 200); + assert!(resp.headers().get("x-elastic-product").is_none(),); let string_body = String::from_utf8(resp.body().to_vec()).unwrap(); let es_msearch_response: MultiSearchResponse = serde_json::from_str(&string_body).unwrap(); assert_eq!(es_msearch_response.responses.len(), 2); @@ -251,7 +253,7 @@ mod tests { async fn test_msearch_api_return_400_with_malformed_request_body() { let config = Arc::new(NodeConfig::for_test()); let mock_search_service = MockSearchService::new(); - let es_search_api_handler = super::elastic_api_handlers( + let es_search_api_handler = elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), diff --git a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs index 41751e3570c..0cd4874b02c 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs @@ -57,7 +57,7 @@ pub fn es_compat_cluster_info_handler( build_info: &'static BuildInfo, ) -> impl Filter + Clone { elastic_cluster_info_filter() - .and(with_arg(node_config)) + .and(with_arg(node_config.clone())) .and(with_arg(build_info)) .then( |config: Arc, build_info: &'static BuildInfo| async move { diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index cd5c892e27e..93aa2c6ad9f 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -484,7 +484,7 @@ pub async fn serve_quickwit( }; let grpc_listen_addr = node_config.grpc_listen_addr; - let rest_listen_addr = node_config.rest_listen_addr; + let rest_listen_addr = node_config.rest_config.listen_addr; let quickwit_services: Arc = Arc::new(QuickwitServices { node_config: Arc::new(node_config), cluster: cluster.clone(), diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 7f0a9435e90..e3792ef3c84 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -85,46 +85,9 @@ pub(crate) async fn start_rest_server( // `/metrics` route. let metrics_routes = warp::path("metrics").and(warp::get()).map(metrics_handler); - let ingest_router = quickwit_services.ingest_router_service.clone(); - let ingest_service = quickwit_services.ingest_service.clone(); - // `/api/v1/*` routes. - let api_v1_root_url = warp::path!("api" / "v1" / ..); - let api_v1_routes = cluster_handler(quickwit_services.cluster.clone()) - .or(node_info_handler( - BuildInfo::get(), - RuntimeInfo::get(), - quickwit_services.node_config.clone(), - )) - .or(indexing_get_handler( - quickwit_services.indexing_service_opt.clone(), - )) - .or(search_get_handler(quickwit_services.search_service.clone())) - .or(search_post_handler( - quickwit_services.search_service.clone(), - )) - .or(search_stream_handler( - quickwit_services.search_service.clone(), - )) - .or(ingest_api_handlers( - ingest_router, - ingest_service.clone(), - quickwit_services.node_config.ingest_api_config.clone(), - )) - .or(index_management_handlers( - quickwit_services.index_manager.clone(), - quickwit_services.node_config.clone(), - )) - .or(delete_task_api_handlers( - quickwit_services.metastore_client.clone(), - )) - .or(elastic_api_handlers( - quickwit_services.node_config.clone(), - quickwit_services.search_service.clone(), - ingest_service.clone(), - )); + let api_v1_root_route = api_v1_routes(quickwit_services.clone()); - let api_v1_root_route = api_v1_root_url.and(api_v1_routes); let redirect_root_to_ui_route = warp::path::end() .and(warp::get()) .map(|| redirect(http::Uri::from_static("/ui/search"))); @@ -143,7 +106,7 @@ pub(crate) async fn start_rest_server( let warp_service = warp::service(rest_routes); let compression_predicate = DefaultPredicate::new().and(SizeAbove::new(MINIMUM_RESPONSE_COMPRESSION_SIZE)); - let cors = build_cors(&quickwit_services.node_config.rest_cors_allow_origins); + let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins); let service = ServiceBuilder::new() .layer( @@ -177,6 +140,66 @@ pub(crate) async fn start_rest_server( Ok(()) } +fn api_v1_routes( + quickwit_services: Arc, +) -> impl Filter + Clone { + if !quickwit_services + .node_config + .rest_config + .extra_headers + .is_empty() + { + info!( + "Extra headers will be added to all responses: {:?}", + quickwit_services.node_config.rest_config.extra_headers + ); + } + let api_v1_root_url = warp::path!("api" / "v1" / ..); + api_v1_root_url + .and( + cluster_handler(quickwit_services.cluster.clone()) + .or(node_info_handler( + BuildInfo::get(), + RuntimeInfo::get(), + quickwit_services.node_config.clone(), + )) + .or(indexing_get_handler( + quickwit_services.indexing_service_opt.clone(), + )) + .or(search_get_handler(quickwit_services.search_service.clone())) + .or(search_post_handler( + quickwit_services.search_service.clone(), + )) + .or(search_stream_handler( + quickwit_services.search_service.clone(), + )) + .or(ingest_api_handlers( + quickwit_services.ingest_router_service.clone(), + quickwit_services.ingest_service.clone(), + quickwit_services.node_config.ingest_api_config.clone(), + )) + .or(index_management_handlers( + quickwit_services.index_manager.clone(), + quickwit_services.node_config.clone(), + )) + .or(delete_task_api_handlers( + quickwit_services.metastore_client.clone(), + )) + .or(elastic_api_handlers( + quickwit_services.node_config.clone(), + quickwit_services.search_service.clone(), + quickwit_services.ingest_service.clone(), + )), + ) + .with(warp::reply::with::headers( + quickwit_services + .node_config + .rest_config + .extra_headers + .clone(), + )) +} + /// This function returns a formatted error based on the given rejection reason. /// The ordering of rejection processing is very important, we need to start /// with the most specific rejections and end with the most generic. If not, Quickwit @@ -309,11 +332,27 @@ mod tests { use std::pin::Pin; use std::task::{Context, Poll}; + use http::HeaderName; use hyper::{Request, Response, StatusCode}; + use quickwit_cluster::{create_cluster_for_test, ChannelTransport}; + use quickwit_config::NodeConfig; + use quickwit_index_management::IndexService; + use quickwit_ingest::{IngestApiService, IngestServiceClient}; + use quickwit_proto::control_plane::ControlPlaneServiceClient; + use quickwit_proto::ingest::router::IngestRouterServiceClient; + use quickwit_proto::metastore::MetastoreServiceClient; + use quickwit_search::MockSearchService; + use quickwit_storage::StorageResolver; use tower::Service; use super::*; + pub(crate) fn ingest_service_client() -> IngestServiceClient { + let universe = quickwit_actors::Universe::new(); + let (ingest_service_mailbox, _) = universe.create_test_mailbox::(); + IngestServiceClient::from_mailbox(ingest_service_mailbox) + } + #[tokio::test] async fn test_cors() { // No cors enabled @@ -525,4 +564,59 @@ mod tests { Box::pin(fut) } } + + #[tokio::test] + async fn test_extra_headers() { + let mut node_config = NodeConfig::for_test(); + node_config.rest_config.extra_headers.insert( + HeaderName::from_static("x-custom-header"), + HeaderValue::from_static("custom-value"), + ); + node_config.rest_config.extra_headers.insert( + HeaderName::from_static("x-custom-header-2"), + HeaderValue::from_static("custom-value-2"), + ); + let metastore_client = MetastoreServiceClient::from(MetastoreServiceClient::mock()); + let index_service = + IndexService::new(metastore_client.clone(), StorageResolver::unconfigured()); + let control_plane_service = + ControlPlaneServiceClient::from(ControlPlaneServiceClient::mock()); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &[], &transport, false) + .await + .unwrap(); + let quickwit_services = QuickwitServices { + _report_splits_subscription_handle_opt: None, + _local_shards_update_listener_handle_opt: None, + cluster, + control_plane_service, + indexing_service_opt: None, + index_manager: index_service, + ingest_service: ingest_service_client(), + + ingester_service_opt: None, + ingest_router_service: IngestRouterServiceClient::from( + IngestRouterServiceClient::mock(), + ), + janitor_service_opt: None, + metastore_client, + metastore_server_opt: None, + node_config: Arc::new(node_config.clone()), + search_service: Arc::new(MockSearchService::new()), + }; + let handler = api_v1_routes(Arc::new(quickwit_services)); + let resp = warp::test::request() + .path("/api/v1/version") + .reply(&handler) + .await; + assert_eq!(resp.status(), 200); + assert_eq!( + resp.headers().get("x-custom-header").unwrap(), + "custom-value" + ); + assert_eq!( + resp.headers().get("x-custom-header-2").unwrap(), + "custom-value-2" + ); + } }