Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restart indexing pipeline on index update #5265

Merged
merged 4 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String
source_id,
shard_ids,
pipeline_uid: _,
params_fingerprint: _,
} = indexing_task;
let index_uid = indexing_task.index_uid();
let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid());
Expand Down Expand Up @@ -543,6 +544,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option<IndexingTask>
source_id: source_id.to_string(),
pipeline_uid: Some(pipeline_uid),
shard_ids,
params_fingerprint: 0,
})
}

Expand Down Expand Up @@ -945,12 +947,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let indexing_task2 = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
cluster2
.set_self_key_value(GRPC_ADVERTISE_ADDR_KEY, "127.0.0.1:1001")
Expand Down Expand Up @@ -1032,6 +1036,7 @@ mod tests {
),
source_id: format!("source-{source_id}"),
shard_ids: Vec::new(),
params_fingerprint: 0,
}
})
.collect_vec();
Expand Down Expand Up @@ -1259,6 +1264,7 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
params_fingerprint: 0,
}],
&mut node_state,
);
Expand All @@ -1269,6 +1275,7 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2), ShardId::from(3)],
params_fingerprint: 0,
}],
&mut node_state,
);
Expand All @@ -1279,12 +1286,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
params_fingerprint: 0,
},
IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
params_fingerprint: 0,
},
],
&mut node_state,
Expand All @@ -1297,12 +1306,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
params_fingerprint: 0,
},
IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(IndexUid::for_test("test-index2", 0)),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
params_fingerprint: 0,
},
],
&mut node_state,
Expand All @@ -1315,12 +1326,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
params_fingerprint: 0,
},
IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source2".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
params_fingerprint: 0,
},
],
&mut node_state,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
serde_yaml = { workspace = true }
siphasher = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
utoipa = { workspace = true }
Expand Down
18 changes: 17 additions & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

pub(crate) mod serialize;

use std::hash::{Hash, Hasher};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -33,6 +34,7 @@ use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping};
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::{load_index_config_from_user_config, load_index_config_update};
use siphasher::sip::SipHasher;
use tracing::warn;

use crate::index_config::serialize::VersionedIndexConfig;
Expand All @@ -57,6 +59,12 @@ impl PartialEq for IndexingResources {
}
}

impl Hash for IndexingResources {
fn hash<H: Hasher>(&self, state: &mut H) {
self.heap_size.hash(state);
}
}

impl IndexingResources {
fn default_heap_size() -> ByteSize {
ByteSize::gb(2)
Expand Down Expand Up @@ -90,7 +98,7 @@ impl Default for IndexingResources {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct IndexingSettings {
#[schema(default = 60)]
Expand Down Expand Up @@ -253,6 +261,14 @@ pub struct IndexConfig {
}

impl IndexConfig {
/// Return a fingerprint of parameters relevant for indexers
pub fn indexing_params_fingerprint(&self) -> u64 {
let mut hasher = SipHasher::new();
self.doc_mapping.doc_mapping_uid.hash(&mut hasher);
self.indexing_settings.hash(&mut hasher);
hasher.finish()
}

#[cfg(any(test, feature = "testsuite"))]
pub fn for_test(index_id: &str, index_uri: &str) -> Self {
let index_uri = Uri::from_str(index_uri).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-config/src/merge_policy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::time::Duration;

use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct ConstWriteAmplificationMergePolicyConfig {
/// Number of splits to merge together in a single merge operation.
Expand Down Expand Up @@ -55,7 +55,7 @@ impl Default for ConstWriteAmplificationMergePolicyConfig {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct StableLogMergePolicyConfig {
/// Number of docs below which all splits are considered as belonging to the same level.
Expand Down Expand Up @@ -126,7 +126,7 @@ where S: Serializer {
s.serialize_str(&value_str)
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, utoipa::ToSchema)]
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(tag = "type")]
#[serde(deny_unknown_fields)]
pub enum MergePolicyConfig {
Expand Down
9 changes: 6 additions & 3 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,12 @@ impl Handler<UpdateIndexRequest> for ControlPlane {
return Err(ActorExitStatus::from(anyhow::anyhow!(serde_error)));
}
};
self.model
.update_index_config(&index_uid, index_metadata.index_config)?;
// TODO: Handle doc mapping and/or indexing settings update here.
if self
.model
.update_index_config(&index_uid, index_metadata.index_config)?
{
let _rebuild_plan_notifier = self.rebuild_plan_debounced(ctx);
}
info!(%index_uid, "updated index");
Ok(Ok(response))
}
Expand Down
22 changes: 22 additions & 0 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
if !source_config.enabled {
continue;
}
let params_fingerprint = model
.index_metadata(&source_uid.index_uid)
.map(|index_meta| index_meta.index_config.indexing_params_fingerprint())
.unwrap_or_default();
match source_config.source_params {
SourceParams::File(FileSourceParams::Filepath(_))
| SourceParams::IngestCli
Expand All @@ -181,6 +185,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
sources.push(SourceToSchedule {
source_uid,
source_type: SourceToScheduleType::IngestV1,
params_fingerprint,
});
}
SourceParams::Ingest => {
Expand All @@ -206,6 +211,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
shard_ids,
load_per_shard,
},
params_fingerprint,
});
}
SourceParams::Kafka(_)
Expand All @@ -221,6 +227,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis())
.unwrap(),
},
params_fingerprint,
});
}
}
Expand Down Expand Up @@ -680,18 +687,21 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_1b = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(11u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_2 = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(20u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-2".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
running_plan.insert(
"indexer-1".to_string(),
Expand All @@ -712,12 +722,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_2 = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-2".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
running_plan.insert("indexer-1".to_string(), vec![task_1.clone()]);
desired_plan.insert("indexer-1".to_string(), vec![task_2.clone()]);
Expand All @@ -744,12 +756,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_2 = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid2.clone()),
source_id: "source-2".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
running_plan.insert("indexer-2".to_string(), vec![task_2.clone()]);
desired_plan.insert("indexer-1".to_string(), vec![task_1.clone()]);
Expand Down Expand Up @@ -784,18 +798,21 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_1b = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(11u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_1c = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(12u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
running_plan.insert("indexer-1".to_string(), vec![task_1a.clone()]);
desired_plan.insert(
Expand Down Expand Up @@ -938,13 +955,15 @@ mod tests {
num_pipelines: 3,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
params_fingerprint: 0,
},
SourceToSchedule {
source_uid: source_2.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 2,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
params_fingerprint: 0,
},
];
let mut indexer_max_loads = FnvHashMap::default();
Expand All @@ -968,18 +987,21 @@ mod tests {
source_id: "my-source".to_string(),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec!["shard1".into()],
params_fingerprint: 0,
};
let task2 = IndexingTask {
index_uid: Some(IndexUid::for_test("index2", 123)),
source_id: "my-source".to_string(),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec!["shard2".into(), "shard3".into()],
params_fingerprint: 0,
};
let task3 = IndexingTask {
index_uid: Some(IndexUid::for_test("index3", 123)),
source_id: "my-source".to_string(),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec!["shard6".into()],
params_fingerprint: 0,
};
// order made to map with the debug for lisibility
map.insert("indexer5", vec![&task2]);
Expand Down
Loading
Loading