diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 901c520b8de..fdd372bb10c 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -17,21 +17,26 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::{HashMap, HashSet}; use std::path::Path; use std::time::Duration; +use futures_util::StreamExt; +use itertools::Itertools; use quickwit_common::fs::{empty_dir, get_cache_directory_path}; +use quickwit_common::PrettySample; use quickwit_config::{validate_identifier, IndexConfig, SourceConfig}; use quickwit_indexing::check_source_connectivity; use quickwit_metastore::{ AddSourceRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt, - ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo, - SplitMetadata, SplitState, + ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, + MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState, }; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, EntityKind, - IndexMetadataRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, - MetastoreService, MetastoreServiceClient, ResetSourceCheckpointRequest, + IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest, + MarkSplitsForDeletionRequest, MetastoreError, MetastoreService, MetastoreServiceClient, + ResetSourceCheckpointRequest, }; use quickwit_proto::types::{IndexUid, SplitId}; use quickwit_proto::{ServiceError, ServiceErrorCode}; @@ -216,6 +221,100 @@ impl IndexService { Ok(deleted_splits) } + /// Deletes the indexes specified with `index_id_patterns`. + /// This is a wrapper of delete_index, and support index delete with index pattern + /// + /// * `index_id_patterns` - The targeted index ID patterns. + /// * `dry_run` - Should this only return a list of affected files without performing deletion. + pub async fn delete_indexes( + &self, + index_id_patterns: Vec, + dry_run: bool, + ) -> Result, IndexServiceError> { + let list_indexes_metadatas_request = ListIndexesMetadataRequest { + index_id_patterns: index_id_patterns.to_owned(), + }; + // disallow index_id patterns containing * + for index_id_pattern in &index_id_patterns { + if index_id_pattern.contains('*') { + return Err(IndexServiceError::Metastore( + MetastoreError::InvalidArgument { + message: format!("index_id pattern {} contains *", index_id_pattern), + }, + )); + } + } + + let mut metastore = self.metastore.clone(); + let indexes_metadata = metastore + .list_indexes_metadata(list_indexes_metadatas_request) + .await? + .deserialize_indexes_metadata()?; + + if indexes_metadata.len() != index_id_patterns.len() { + let found_index_ids: HashSet<&str> = indexes_metadata + .iter() + .map(|index_metadata| index_metadata.index_id()) + .collect(); + let missing_index_ids: Vec = index_id_patterns + .iter() + .filter(|index_id| !found_index_ids.contains(index_id.as_str())) + .map(|index_id| index_id.to_string()) + .collect_vec(); + return Err(IndexServiceError::Metastore(MetastoreError::NotFound( + EntityKind::Indexes { + index_ids: missing_index_ids.to_vec(), + }, + ))); + } + let index_ids = indexes_metadata + .iter() + .map(|index_metadata| index_metadata.index_id()) + .collect_vec(); + info!(index_ids = ?PrettySample::new(&index_ids, 5), "delete indexes"); + + // setup delete index tasks + let mut delete_index_tasks = Vec::new(); + for index_id in index_ids { + let task = async move { + let result = self.clone().delete_index(index_id, dry_run).await; + (index_id, result) + }; + delete_index_tasks.push(task); + } + let mut delete_responses: HashMap> = HashMap::new(); + let mut delete_errors: HashMap = HashMap::new(); + let mut stream = futures::stream::iter(delete_index_tasks).buffer_unordered(100); + while let Some((index_id, delete_response)) = stream.next().await { + match delete_response { + Ok(split_infos) => { + delete_responses.insert(index_id.to_string(), split_infos); + } + Err(error) => { + delete_errors.insert(index_id.to_string(), error); + } + } + } + + if delete_errors.is_empty() { + let mut concatenated_split_infos = Vec::new(); + for (_, split_info_vec) in delete_responses.into_iter() { + concatenated_split_infos.extend(split_info_vec); + } + Ok(concatenated_split_infos) + } else { + Err(IndexServiceError::Metastore(MetastoreError::Internal { + message: format!( + "errors occurred when deleting indexes: {:?}", + index_id_patterns + ), + cause: format!( + "errors: {:?}\ndeleted indexes: {:?}", + delete_errors, delete_responses + ), + })) + } + } /// Detect all dangling splits and associated files from the index and removes them. /// /// * `index_id` - The target index Id. diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index 96ef7c3ae00..ca3f524d2fe 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -172,6 +172,14 @@ pub(crate) fn elastic_index_count_filter( .and(json_or_empty()) } +#[utoipa::path(delete, tag = "Indexes", path = "/{index}")] +pub(crate) fn elastic_delete_index_filter( +) -> impl Filter,), Error = Rejection> + Clone { + warp::path!("_elastic" / String) + .and_then(extract_index_id_patterns) + .and(warp::get()) +} + // No support for any query parameters for now. #[utoipa::path(get, tag = "Search", path = "/{index}/_stats")] pub(crate) fn elastic_index_stats_filter( diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs index 02dbf03e1db..94c8bedd8fb 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs @@ -29,6 +29,7 @@ use bulk::{es_compat_bulk_handler, es_compat_index_bulk_handler}; pub use filter::ElasticCompatibleApi; use hyper::StatusCode; use quickwit_config::NodeConfig; +use quickwit_index_management::IndexService; use quickwit_ingest::IngestServiceClient; use quickwit_proto::ingest::router::IngestRouterServiceClient; use quickwit_proto::metastore::MetastoreServiceClient; @@ -41,9 +42,10 @@ use serde::{Deserialize, Serialize}; use warp::{Filter, Rejection}; use self::rest_handler::{ - es_compat_cat_indices_handler, es_compat_index_cat_indices_handler, - es_compat_index_count_handler, es_compat_index_field_capabilities_handler, - es_compat_index_stats_handler, es_compat_stats_handler, + es_compat_cat_indices_handler, es_compat_delete_index_handler, + es_compat_index_cat_indices_handler, es_compat_index_count_handler, + es_compat_index_field_capabilities_handler, es_compat_index_stats_handler, + es_compat_stats_handler, }; use crate::elasticsearch_api::model::ElasticsearchError; use crate::rest_api_response::RestApiResponse; @@ -59,6 +61,7 @@ pub fn elastic_api_handlers( ingest_service: IngestServiceClient, ingest_router: IngestRouterServiceClient, metastore: MetastoreServiceClient, + index_service: IndexService, ) -> impl Filter + Clone { es_compat_cluster_info_handler(node_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) @@ -75,6 +78,7 @@ pub fn elastic_api_handlers( )) .or(es_compat_index_bulk_handler(ingest_service, ingest_router)) .or(es_compat_index_stats_handler(metastore.clone())) + .or(es_compat_delete_index_handler(index_service)) .or(es_compat_stats_handler(metastore.clone())) .or(es_compat_index_cat_indices_handler(metastore.clone())) .or(es_compat_cat_indices_handler(metastore.clone())) @@ -128,10 +132,13 @@ mod tests { use assert_json_diff::assert_json_include; use mockall::predicate; use quickwit_config::NodeConfig; + use quickwit_index_management::IndexService; use quickwit_ingest::{IngestApiService, IngestServiceClient}; + use quickwit_metastore::metastore_for_test; use quickwit_proto::ingest::router::IngestRouterServiceClient; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_search::MockSearchService; + use quickwit_storage::StorageResolver; use serde_json::Value as JsonValue; use warp::Filter; @@ -166,12 +173,15 @@ mod tests { )) .returning(|_| Ok(Default::default())); let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let index_service = + IndexService::new(metastore_for_test(), StorageResolver::unconfigured()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), ingest_router, MetastoreServiceClient::mock().into(), + index_service, ); let msearch_payload = r#" {"index":"index-1"} @@ -217,12 +227,15 @@ mod tests { }); let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let index_service = + IndexService::new(metastore_for_test(), StorageResolver::unconfigured()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), ingest_router, MetastoreServiceClient::mock().into(), + index_service, ); let msearch_payload = r#" {"index":"index-1"} @@ -256,12 +269,15 @@ mod tests { let mock_search_service = MockSearchService::new(); let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let index_service = + IndexService::new(metastore_for_test(), StorageResolver::unconfigured()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), ingest_router, MetastoreServiceClient::mock().into(), + index_service, ); let msearch_payload = r#" {"index":"index-1" @@ -288,12 +304,15 @@ mod tests { let mock_search_service = MockSearchService::new(); let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let index_service = + IndexService::new(metastore_for_test(), StorageResolver::unconfigured()); let es_search_api_handler = elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), ingest_router, MetastoreServiceClient::mock().into(), + index_service, ); let msearch_payload = r#" {"index":"index-1"} @@ -320,12 +339,15 @@ mod tests { let mock_search_service = MockSearchService::new(); let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let index_service = + IndexService::new(metastore_for_test(), StorageResolver::unconfigured()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), ingest_router, MetastoreServiceClient::mock().into(), + index_service, ); let msearch_payload = r#" {"index":"index-1"} @@ -351,12 +373,15 @@ mod tests { let mock_search_service = MockSearchService::new(); let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let index_service = + IndexService::new(metastore_for_test(), StorageResolver::unconfigured()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), ingest_router, MetastoreServiceClient::mock().into(), + index_service, ); let msearch_payload = r#" {} @@ -394,12 +419,15 @@ mod tests { } }); let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); + let index_service = + IndexService::new(metastore_for_test(), StorageResolver::unconfigured()); let es_search_api_handler = super::elastic_api_handlers( config, Arc::new(mock_search_service), ingest_service_client(), ingest_router, MetastoreServiceClient::mock().into(), + index_service, ); let msearch_payload = r#" {"index": ["index-1", "index-2"]} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs index 674800a8509..f6571e9ec70 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs @@ -19,6 +19,7 @@ use elasticsearch_dsl::search::ErrorCause; use hyper::StatusCode; +use quickwit_index_management::IndexServiceError; use quickwit_ingest::IngestServiceError; use quickwit_proto::ingest::IngestV2Error; use quickwit_proto::ServiceError; @@ -108,3 +109,23 @@ impl From for ElasticsearchError { } } } + +impl From for ElasticsearchError { + fn from(ingest_error: IndexServiceError) -> Self { + let status = ingest_error.error_code().to_http_status_code(); + + let reason = ErrorCause { + reason: Some(ingest_error.to_string()), + caused_by: None, + root_cause: Vec::new(), + stack_trace: None, + suppressed: Vec::new(), + ty: None, + additional_details: Default::default(), + }; + ElasticsearchError { + status, + error: reason, + } + } +} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index ed9baf5050b..85d39123253 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -30,6 +30,7 @@ use hyper::StatusCode; use itertools::Itertools; use quickwit_common::truncate_str; use quickwit_config::{validate_index_id_pattern, NodeConfig}; +use quickwit_index_management::IndexService; use quickwit_metastore::*; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::search::{ @@ -46,11 +47,11 @@ use serde_json::json; use warp::{Filter, Rejection}; use super::filter::{ - elastic_cat_indices_filter, elastic_cluster_info_filter, elastic_field_capabilities_filter, - elastic_index_cat_indices_filter, elastic_index_count_filter, - elastic_index_field_capabilities_filter, elastic_index_search_filter, - elastic_index_stats_filter, elastic_multi_search_filter, elastic_scroll_filter, - elastic_stats_filter, elasticsearch_filter, + elastic_cat_indices_filter, elastic_cluster_info_filter, elastic_delete_index_filter, + elastic_field_capabilities_filter, elastic_index_cat_indices_filter, + elastic_index_count_filter, elastic_index_field_capabilities_filter, + elastic_index_search_filter, elastic_index_stats_filter, elastic_multi_search_filter, + elastic_scroll_filter, elastic_stats_filter, elasticsearch_filter, }; use super::model::{ build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, @@ -117,6 +118,16 @@ pub fn es_compat_index_field_capabilities_handler( .map(|result| make_elastic_api_response(result, BodyFormat::default())) } +/// DELETE _elastic/{index} +pub fn es_compat_delete_index_handler( + index_service: IndexService, +) -> impl Filter + Clone { + elastic_delete_index_filter() + .and(with_arg(index_service)) + .then(es_compat_delete_index) + .map(|result| make_elastic_api_response(result, BodyFormat::default())) +} + /// GET _elastic/_stats pub fn es_compat_stats_handler( search_service: MetastoreServiceClient, @@ -126,6 +137,7 @@ pub fn es_compat_stats_handler( .then(es_compat_stats) .map(|result| make_elastic_api_response(result, BodyFormat::default())) } + /// GET _elastic/{index}/_stats pub fn es_compat_index_stats_handler( search_service: MetastoreServiceClient, @@ -410,6 +422,26 @@ async fn es_compat_index_search( Ok(search_response_rest) } +/// Returns JSON in the format: +/// +/// { +/// "acknowledged": true +/// } +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct ElasticsearchDeleteResponse { + pub acknowledged: bool, +} + +async fn es_compat_delete_index( + index_id_patterns: Vec, + index_service: IndexService, +) -> Result { + index_service + .delete_indexes(index_id_patterns, false) + .await?; + Ok(ElasticsearchDeleteResponse { acknowledged: true }) +} + async fn es_compat_stats( metastore: MetastoreServiceClient, ) -> Result { diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index d80c87377eb..71036164953 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -207,6 +207,7 @@ fn api_v1_routes( quickwit_services.ingest_service.clone(), quickwit_services.ingest_router_service.clone(), quickwit_services.metastore_client.clone(), + quickwit_services.index_manager.clone(), )) .or(index_template_api_handlers( quickwit_services.metastore_client.clone(),