Skip to content

Commit

Permalink
add support delete index in es API
Browse files Browse the repository at this point in the history
closes #3841
  • Loading branch information
PSeitz committed Feb 19, 2024
1 parent 3ef4f76 commit e51e3ee
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 12 deletions.
107 changes: 103 additions & 4 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::time::Duration;

use futures_util::StreamExt;
use itertools::Itertools;
use quickwit_common::fs::{empty_dir, get_cache_directory_path};
use quickwit_common::PrettySample;
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{
AddSourceRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt,
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo,
SplitMetadata, SplitState,
ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt,
MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState,
};
use quickwit_proto::metastore::{
serde_utils, AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, EntityKind,
IndexMetadataRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreService, MetastoreServiceClient, ResetSourceCheckpointRequest,
IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest,
MarkSplitsForDeletionRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
ResetSourceCheckpointRequest,
};
use quickwit_proto::types::{IndexUid, SplitId};
use quickwit_proto::{ServiceError, ServiceErrorCode};
Expand Down Expand Up @@ -216,6 +221,100 @@ impl IndexService {
Ok(deleted_splits)
}

/// Deletes the indexes specified with `index_id_patterns`.
/// This is a wrapper of delete_index, and support index delete with index pattern
///
/// * `index_id_patterns` - The targeted index ID patterns.
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
pub async fn delete_indexes(
&self,
index_id_patterns: Vec<String>,
dry_run: bool,
) -> Result<Vec<SplitInfo>, IndexServiceError> {
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: index_id_patterns.to_owned(),
};
// disallow index_id patterns containing *
for index_id_pattern in &index_id_patterns {
if index_id_pattern.contains('*') {
return Err(IndexServiceError::Metastore(
MetastoreError::InvalidArgument {
message: format!("index_id pattern {} contains *", index_id_pattern),
},
));
}
}

let mut metastore = self.metastore.clone();
let indexes_metadata = metastore
.list_indexes_metadata(list_indexes_metadatas_request)
.await?
.deserialize_indexes_metadata()?;

if indexes_metadata.len() != index_id_patterns.len() {
let found_index_ids: HashSet<&str> = indexes_metadata
.iter()
.map(|index_metadata| index_metadata.index_id())
.collect();
let missing_index_ids: Vec<String> = index_id_patterns
.iter()
.filter(|index_id| !found_index_ids.contains(index_id.as_str()))
.map(|index_id| index_id.to_string())
.collect_vec();
return Err(IndexServiceError::Metastore(MetastoreError::NotFound(
EntityKind::Indexes {
index_ids: missing_index_ids.to_vec(),
},
)));
}
let index_ids = indexes_metadata
.iter()
.map(|index_metadata| index_metadata.index_id())
.collect_vec();
info!(index_ids = ?PrettySample::new(&index_ids, 5), "delete indexes");

// setup delete index tasks
let mut delete_index_tasks = Vec::new();
for index_id in index_ids {
let task = async move {
let result = self.clone().delete_index(index_id, dry_run).await;
(index_id, result)
};
delete_index_tasks.push(task);
}
let mut delete_responses: HashMap<String, Vec<SplitInfo>> = HashMap::new();
let mut delete_errors: HashMap<String, IndexServiceError> = HashMap::new();
let mut stream = futures::stream::iter(delete_index_tasks).buffer_unordered(100);
while let Some((index_id, delete_response)) = stream.next().await {
match delete_response {
Ok(split_infos) => {
delete_responses.insert(index_id.to_string(), split_infos);
}
Err(error) => {
delete_errors.insert(index_id.to_string(), error);
}
}
}

if delete_errors.is_empty() {
let mut concatenated_split_infos = Vec::new();
for (_, split_info_vec) in delete_responses.into_iter() {
concatenated_split_infos.extend(split_info_vec);
}
Ok(concatenated_split_infos)
} else {
Err(IndexServiceError::Metastore(MetastoreError::Internal {
message: format!(
"errors occurred when deleting indexes: {:?}",
index_id_patterns
),
cause: format!(
"errors: {:?}\ndeleted indexes: {:?}",
delete_errors, delete_responses
),
}))
}
}
/// Detect all dangling splits and associated files from the index and removes them.
///
/// * `index_id` - The target index Id.
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ pub(crate) fn elastic_index_count_filter(
.and(json_or_empty())
}

#[utoipa::path(delete, tag = "Indexes", path = "/{index}")]
pub(crate) fn elastic_delete_index_filter(
) -> impl Filter<Extract = (Vec<String>,), Error = Rejection> + Clone {
warp::path!("_elastic" / String)
.and_then(extract_index_id_patterns)
.and(warp::get())
}

// No support for any query parameters for now.
#[utoipa::path(get, tag = "Search", path = "/{index}/_stats")]
pub(crate) fn elastic_index_stats_filter(
Expand Down
34 changes: 31 additions & 3 deletions quickwit/quickwit-serve/src/elasticsearch_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use bulk::{es_compat_bulk_handler, es_compat_index_bulk_handler};
pub use filter::ElasticCompatibleApi;
use hyper::StatusCode;
use quickwit_config::NodeConfig;
use quickwit_index_management::IndexService;
use quickwit_ingest::IngestServiceClient;
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
Expand All @@ -41,9 +42,10 @@ use serde::{Deserialize, Serialize};
use warp::{Filter, Rejection};

use self::rest_handler::{
es_compat_cat_indices_handler, es_compat_index_cat_indices_handler,
es_compat_index_count_handler, es_compat_index_field_capabilities_handler,
es_compat_index_stats_handler, es_compat_stats_handler,
es_compat_cat_indices_handler, es_compat_delete_index_handler,
es_compat_index_cat_indices_handler, es_compat_index_count_handler,
es_compat_index_field_capabilities_handler, es_compat_index_stats_handler,
es_compat_stats_handler,
};
use crate::elasticsearch_api::model::ElasticsearchError;
use crate::rest_api_response::RestApiResponse;
Expand All @@ -59,6 +61,7 @@ pub fn elastic_api_handlers(
ingest_service: IngestServiceClient,
ingest_router: IngestRouterServiceClient,
metastore: MetastoreServiceClient,
index_service: IndexService,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
es_compat_cluster_info_handler(node_config, BuildInfo::get())
.or(es_compat_search_handler(search_service.clone()))
Expand All @@ -75,6 +78,7 @@ pub fn elastic_api_handlers(
))
.or(es_compat_index_bulk_handler(ingest_service, ingest_router))
.or(es_compat_index_stats_handler(metastore.clone()))
.or(es_compat_delete_index_handler(index_service))
.or(es_compat_stats_handler(metastore.clone()))
.or(es_compat_index_cat_indices_handler(metastore.clone()))
.or(es_compat_cat_indices_handler(metastore.clone()))
Expand Down Expand Up @@ -128,10 +132,13 @@ mod tests {
use assert_json_diff::assert_json_include;
use mockall::predicate;
use quickwit_config::NodeConfig;
use quickwit_index_management::IndexService;
use quickwit_ingest::{IngestApiService, IngestServiceClient};
use quickwit_metastore::metastore_for_test;
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_search::MockSearchService;
use quickwit_storage::StorageResolver;
use serde_json::Value as JsonValue;
use warp::Filter;

Expand Down Expand Up @@ -166,12 +173,15 @@ mod tests {
))
.returning(|_| Ok(Default::default()));
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
index_service,
);
let msearch_payload = r#"
{"index":"index-1"}
Expand Down Expand Up @@ -217,12 +227,15 @@ mod tests {
});

let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
index_service,
);
let msearch_payload = r#"
{"index":"index-1"}
Expand Down Expand Up @@ -256,12 +269,15 @@ mod tests {
let mock_search_service = MockSearchService::new();

let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
index_service,
);
let msearch_payload = r#"
{"index":"index-1"
Expand All @@ -288,12 +304,15 @@ mod tests {
let mock_search_service = MockSearchService::new();

let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let es_search_api_handler = elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
index_service,
);
let msearch_payload = r#"
{"index":"index-1"}
Expand All @@ -320,12 +339,15 @@ mod tests {
let mock_search_service = MockSearchService::new();

let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
index_service,
);
let msearch_payload = r#"
{"index":"index-1"}
Expand All @@ -351,12 +373,15 @@ mod tests {
let mock_search_service = MockSearchService::new();

let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
index_service,
);
let msearch_payload = r#"
{}
Expand Down Expand Up @@ -394,12 +419,15 @@ mod tests {
}
});
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
index_service,
);
let msearch_payload = r#"
{"index": ["index-1", "index-2"]}
Expand Down
21 changes: 21 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use elasticsearch_dsl::search::ErrorCause;
use hyper::StatusCode;
use quickwit_index_management::IndexServiceError;
use quickwit_ingest::IngestServiceError;
use quickwit_proto::ingest::IngestV2Error;
use quickwit_proto::ServiceError;
Expand Down Expand Up @@ -108,3 +109,23 @@ impl From<IngestV2Error> for ElasticsearchError {
}
}
}

impl From<IndexServiceError> for ElasticsearchError {
fn from(ingest_error: IndexServiceError) -> Self {
let status = ingest_error.error_code().to_http_status_code();

let reason = ErrorCause {
reason: Some(ingest_error.to_string()),
caused_by: None,
root_cause: Vec::new(),
stack_trace: None,
suppressed: Vec::new(),
ty: None,
additional_details: Default::default(),
};
ElasticsearchError {
status,
error: reason,
}
}
}
Loading

0 comments on commit e51e3ee

Please sign in to comment.