Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support delete index in es API #4606

Merged
merged 6 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 111 additions & 4 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::time::Duration;

use futures_util::StreamExt;
use itertools::Itertools;
use quickwit_common::fs::{empty_dir, get_cache_directory_path};
use quickwit_common::pretty::PrettySample;
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{
AddSourceRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt,
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo,
SplitMetadata, SplitState,
ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt,
MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState,
};
use quickwit_proto::metastore::{
serde_utils, AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, EntityKind,
IndexMetadataRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreService, MetastoreServiceClient, ResetSourceCheckpointRequest,
IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest,
MarkSplitsForDeletionRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
ResetSourceCheckpointRequest,
};
use quickwit_proto::types::{IndexUid, SplitId};
use quickwit_proto::{ServiceError, ServiceErrorCode};
Expand Down Expand Up @@ -216,6 +221,108 @@ impl IndexService {
Ok(deleted_splits)
}

/// Deletes the indexes specified with `index_id_patterns`.
/// This is a wrapper of delete_index, and support index delete with index pattern
///
/// * `index_id_patterns` - The targeted index ID patterns.
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
pub async fn delete_indexes(
&self,
index_id_patterns: Vec<String>,
ignore_missing: bool,
dry_run: bool,
) -> Result<Vec<SplitInfo>, IndexServiceError> {
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: index_id_patterns.to_owned(),
};
// disallow index_id patterns
for index_id_pattern in &index_id_patterns {
if index_id_pattern.contains('*') {
return Err(IndexServiceError::Metastore(
MetastoreError::InvalidArgument {
message: format!("index_id pattern {} contains *", index_id_pattern),
},
));
}
if index_id_pattern == "_all" {
return Err(IndexServiceError::Metastore(
MetastoreError::InvalidArgument {
message: "index_id pattern _all not supported".to_string(),
},
));
}
}

let mut metastore = self.metastore.clone();
let indexes_metadata = metastore
.list_indexes_metadata(list_indexes_metadatas_request)
.await?
.deserialize_indexes_metadata()?;

if !ignore_missing && indexes_metadata.len() != index_id_patterns.len() {
let found_index_ids: HashSet<&str> = indexes_metadata
.iter()
.map(|index_metadata| index_metadata.index_id())
.collect();
let missing_index_ids: Vec<String> = index_id_patterns
.iter()
.filter(|index_id| !found_index_ids.contains(index_id.as_str()))
.map(|index_id| index_id.to_string())
.collect_vec();
return Err(IndexServiceError::Metastore(MetastoreError::NotFound(
EntityKind::Indexes {
index_ids: missing_index_ids.to_vec(),
},
)));
}
let index_ids = indexes_metadata
.iter()
.map(|index_metadata| index_metadata.index_id())
.collect_vec();
info!(index_ids = ?PrettySample::new(&index_ids, 5), "delete indexes");

// setup delete index tasks
let mut delete_index_tasks = Vec::new();
for index_id in index_ids {
let task = async move {
let result = self.clone().delete_index(index_id, dry_run).await;
(index_id, result)
};
delete_index_tasks.push(task);
}
let mut delete_responses: HashMap<String, Vec<SplitInfo>> = HashMap::new();
let mut delete_errors: HashMap<String, IndexServiceError> = HashMap::new();
let mut stream = futures::stream::iter(delete_index_tasks).buffer_unordered(100);
while let Some((index_id, delete_response)) = stream.next().await {
match delete_response {
Ok(split_infos) => {
delete_responses.insert(index_id.to_string(), split_infos);
}
Err(error) => {
delete_errors.insert(index_id.to_string(), error);
}
}
}

if delete_errors.is_empty() {
let mut concatenated_split_infos = Vec::new();
for (_, split_info_vec) in delete_responses.into_iter() {
concatenated_split_infos.extend(split_info_vec);
}
Ok(concatenated_split_infos)
} else {
Err(IndexServiceError::Metastore(MetastoreError::Internal {
message: format!(
"errors occurred when deleting indexes: {:?}",
index_id_patterns
),
cause: format!(
"errors: {:?}\ndeleted indexes: {:?}",
delete_errors, delete_responses
),
}))
}
}
/// Detect all dangling splits and associated files from the index and removes them.
///
/// * `index_id` - The target index Id.
Expand Down
24 changes: 24 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,13 @@ mod tests {

use hyper::StatusCode;
use quickwit_config::{IngestApiConfig, NodeConfig};
use quickwit_index_management::IndexService;
use quickwit_ingest::{FetchRequest, IngestServiceClient, SuggestTruncateRequest};
use quickwit_metastore::metastore_for_test;
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_search::MockSearchService;
use quickwit_storage::StorageResolver;

use crate::elasticsearch_api::bulk_v2::ElasticBulkResponse;
use crate::elasticsearch_api::elastic_api_handlers;
Expand All @@ -166,12 +169,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index", "_id" : "1"} }
Expand All @@ -196,12 +202,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
Expand Down Expand Up @@ -230,12 +239,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = "
{\"create\": {\"_index\": \"my-index-1\", \"_id\": \"1674834324802805760\"}}
Expand All @@ -261,12 +273,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
Expand Down Expand Up @@ -295,12 +310,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
Expand Down Expand Up @@ -380,12 +398,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
Expand Down Expand Up @@ -463,12 +484,15 @@ mod tests {
let metastore_service = MetastoreServiceClient::mock();
let ingest_service = IngestServiceClient::from(IngestServiceClient::mock());
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{"create": {"_index": "my-index", "_id": "1"},}
Expand Down
11 changes: 10 additions & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use warp::reject::LengthRequired;
use warp::{Filter, Rejection};

use super::model::{
CatIndexQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
CatIndexQueryParams, DeleteQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
MultiSearchQueryParams, SearchQueryParamsCount,
};
use crate::decompression::get_body_bytes;
Expand Down Expand Up @@ -172,6 +172,15 @@ pub(crate) fn elastic_index_count_filter(
.and(json_or_empty())
}

#[utoipa::path(delete, tag = "Indexes", path = "/{index}")]
pub(crate) fn elastic_delete_index_filter(
) -> impl Filter<Extract = (Vec<String>, DeleteQueryParams), Error = Rejection> + Clone {
warp::path!("_elastic" / String)
.and_then(extract_index_id_patterns)
.and(warp::delete())
.and(serde_qs::warp::query(serde_qs::Config::default()))
}

// No support for any query parameters for now.
#[utoipa::path(get, tag = "Search", path = "/{index}/_stats")]
pub(crate) fn elastic_index_stats_filter(
Expand Down
Loading
Loading