Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metric-engine): introduce index options from metric engine #5374

Merged
merged 14 commits into from
Jan 20, 2025
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/api/src/v1/column_def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ mod tests {
case_sensitive: false,
})
.unwrap();
schema.with_inverted_index(true);
schema.set_inverted_index(true);
let options = options_from_column_schema(&schema).unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
Expand Down
15 changes: 14 additions & 1 deletion src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ impl ColumnSchema {
self
}

pub fn with_inverted_index(&mut self, value: bool) {
/// Set the inverted index for the column.
/// Similar to [with_inverted_index] but don't take the ownership.
///
/// [with_inverted_index]: Self::with_inverted_index
pub fn set_inverted_index(&mut self, value: bool) {
match value {
true => {
self.metadata
Expand All @@ -170,6 +174,15 @@ impl ColumnSchema {
}
}

/// Set the inverted index for the column.
/// Similar to [set_inverted_index] but take the ownership and return a owned value.
///
/// [set_inverted_index]: Self::set_inverted_index
pub fn with_inverted_index(mut self, value: bool) -> Self {
self.set_inverted_index(value);
self
}

// Put a placeholder to invalidate schemas.all(!has_inverted_index_key).
pub fn insert_inverted_index_placeholder(&mut self) {
self.metadata
Expand Down
1 change: 1 addition & 0 deletions src/metric-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mito2.workspace = true
mur3 = "0.1"
object-store.workspace = true
prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
Expand Down
35 changes: 31 additions & 4 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::{debug, info, warn};
use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
Expand All @@ -26,9 +27,10 @@ use store_api::region_request::{
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ConcreteDataType, RegionId};

use crate::engine::IndexOptions;
use crate::error::{
ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, Result,
MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION};
use crate::utils;
Expand Down Expand Up @@ -64,13 +66,16 @@ impl DataRegion {
&self,
region_id: RegionId,
columns: &mut [ColumnMetadata],
index_options: IndexOptions,
) -> Result<()> {
let region_id = utils::to_data_region_id(region_id);

let mut retries = 0;
// submit alter request
while retries < MAX_RETRIES {
let request = self.assemble_alter_request(region_id, columns).await?;
let request = self
.assemble_alter_request(region_id, columns, index_options)
.await?;

let _timer = MITO_DDL_DURATION.start_timer();

Expand All @@ -97,6 +102,7 @@ impl DataRegion {
&self,
region_id: RegionId,
columns: &mut [ColumnMetadata],
index_options: IndexOptions,
) -> Result<RegionRequest> {
// retrieve underlying version
let region_metadata = self
Expand Down Expand Up @@ -142,6 +148,19 @@ impl DataRegion {

c.column_id = new_column_id_start + delta as u32;
c.column_schema.set_nullable();
match index_options {
IndexOptions::Inverted => {
c.column_schema.set_inverted_index(true);
}
IndexOptions::Skipping { granularity } => {
c.column_schema
.set_skipping_options(&SkippingIndexOptions {
granularity,
index_type: SkippingIndexType::BloomFilter,
})
.context(SetSkippingIndexOptionSnafu)?;
}
}

Ok(AddColumn {
column_metadata: c.clone(),
Expand Down Expand Up @@ -256,7 +275,11 @@ mod test {
},
];
env.data_region()
.add_columns(env.default_physical_region_id(), &mut new_columns)
.add_columns(
env.default_physical_region_id(),
&mut new_columns,
IndexOptions::Inverted,
)
.await
.unwrap();

Expand Down Expand Up @@ -295,7 +318,11 @@ mod test {
}];
let result = env
.data_region()
.add_columns(env.default_physical_region_id(), &mut new_columns)
.add_columns(
env.default_physical_region_id(),
&mut new_columns,
IndexOptions::Inverted,
)
.await;
assert!(result.is_err());
}
Expand Down
1 change: 1 addition & 0 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use mito2::engine::MitoEngine;
pub(crate) use options::IndexOptions;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
Expand Down
20 changes: 17 additions & 3 deletions src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
use crate::error::{LogicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu};
use crate::error::{
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
};
use crate::utils::{to_data_region_id, to_metadata_region_id};

impl MetricEngineInner {
Expand Down Expand Up @@ -64,16 +66,27 @@ impl MetricEngineInner {
logical_region_id: RegionId,
request: RegionAlterRequest,
) -> Result<RegionId> {
let physical_region_id = {
let (physical_region_id, index_options) = {
let state = &self.state.read().unwrap();
state
let physical_region_id = state
.get_physical_region_id(logical_region_id)
.with_context(|| {
error!("Trying to alter an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})?;

let index_options = state
.physical_region_states()
.get(&physical_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?
.options()
.index;

(physical_region_id, index_options)
};

// only handle adding column
Expand Down Expand Up @@ -122,6 +135,7 @@ impl MetricEngineInner {
data_region_id,
logical_region_id,
&mut columns_to_add,
index_options,
)
.await?;

Expand Down
22 changes: 18 additions & 4 deletions src/metric-engine/src/engine/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
// limitations under the License.

use common_telemetry::debug;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
use crate::error::{MitoCatchupOperationSnafu, Result, UnsupportedRegionRequestSnafu};
use crate::error::{
MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
};
use crate::utils;

impl MetricEngineInner {
Expand All @@ -34,6 +36,18 @@ impl MetricEngineInner {
}
.fail();
}
let data_region_id = utils::to_data_region_id(region_id);
let physical_region_options = *self
.state
.read()
.unwrap()
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.options();

let metadata_region_id = utils::to_metadata_region_id(region_id);
// TODO(weny): improve the catchup, we can read the wal entries only once.
debug!("Catchup metadata region {metadata_region_id}");
Expand All @@ -49,7 +63,6 @@ impl MetricEngineInner {
.await
.context(MitoCatchupOperationSnafu)?;

let data_region_id = utils::to_data_region_id(region_id);
debug!("Catchup data region {data_region_id}");
self.mito
.handle_request(
Expand All @@ -64,7 +77,8 @@ impl MetricEngineInner {
.context(MitoCatchupOperationSnafu)
.map(|response| response.affected_rows)?;

self.recover_states(region_id).await?;
self.recover_states(region_id, physical_region_options)
.await?;
Ok(0)
}
}
3 changes: 1 addition & 2 deletions src/metric-engine/src/engine/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ impl MetricEngineInner {
.state
.read()
.unwrap()
.physical_regions()
.contains_key(&data_region_id)
.exist_physical_region(data_region_id)
{
self.close_physical_region(data_region_id).await?;
self.state
Expand Down
Loading
Loading