Skip to content

Commit

Permalink
feat: impl drop view
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Jun 28, 2024
1 parent a7aa556 commit 31689c5
Show file tree
Hide file tree
Showing 12 changed files with 515 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mod create_view;
pub mod drop_database;
pub mod drop_flow;
pub mod drop_table;
pub mod drop_view;
pub mod flow_meta;
mod physical_table_metadata;
pub mod table_meta;
Expand Down
231 changes: 231 additions & 0 deletions src/common/meta/src/ddl/drop_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use strum::AsRefStr;
use table::metadata::{TableId, TableType};
use table::table_reference::TableReference;

use super::utils::handle_retry_error;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::rpc::ddl::DropViewTask;
use crate::{metrics, ClusterId};

/// The procedure for dropping a view.
pub struct DropViewProcedure {
/// The context of procedure runtime.
pub(crate) context: DdlContext,
/// The serializable data.
pub(crate) data: DropViewData,
}

impl DropViewProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure:DropView";

pub fn new(cluster_id: ClusterId, task: DropViewTask, context: DdlContext) -> Self {
Self {
context,
data: DropViewData {
state: DropViewState::Prepare,
cluster_id,
task,
},
}
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: DropViewData = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self { context, data })
}

/// Checks whether view exists.
/// - Early returns if view not exists and `drop_if_exists` is `true`.
/// - Throws an error if view not exists and `drop_if_exists` is `false`.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let table_ref = self.data.table_ref();

let exists = self
.context
.table_metadata_manager
.table_name_manager()
.exists(TableNameKey::new(
table_ref.catalog,
table_ref.schema,
table_ref.table,
))
.await?;

if !exists && self.data.task.drop_if_exists {
return Ok(Status::done());
}

ensure!(
exists,
error::ViewNotFoundSnafu {
view_name: table_ref.to_string(),
}
);

self.check_view_metadata().await?;
self.data.state = DropViewState::DeleteMetadata;
Ok(Status::executing(true))
}

async fn check_view_metadata(&mut self) -> Result<()> {
let view_id = self.data.view_id();

let table_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(view_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: self.data.table_ref().to_string(),
})?;

// Ensure the exists one is view, we can't drop other table types
ensure!(
table_info_value.table_info.table_type == TableType::View,
error::InvalidViewInfoSnafu {
err_msg: format!("{} is not a view", self.data.table_ref()),
}
);

// Ensure [ViewInfoValue] exists
let _ = self
.context
.table_metadata_manager
.view_info_manager()
.get(view_id)
.await?
.with_context(|| error::ViewNotFoundSnafu {
view_name: self.data.table_ref().to_string(),
})?;

Ok(())
}

async fn on_delete_metadata(&mut self) -> Result<Status> {
let view_id = self.data.view_id();
self.context
.table_metadata_manager
.destroy_view_info(view_id, &self.data.table_ref().into())
.await?;

info!("Deleted view metadata for view {view_id}");

self.data.state = DropViewState::InvalidateViewCache;
Ok(Status::executing(true))
}

async fn on_broadcast(&mut self) -> Result<Status> {
let view_id = self.data.view_id();
let ctx = Context {
subject: Some("Invalidate view cache by dropping view".to_string()),
};

self.context
.cache_invalidator
.invalidate(
&ctx,
&[
CacheIdent::TableId(view_id),
CacheIdent::TableName(self.data.table_ref().into()),
],
)
.await?;

Ok(Status::done())
}
}

#[async_trait]
impl Procedure for DropViewProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_DROP_VIEW
.with_label_values(&[state.as_ref()])
.start_timer();

match self.data.state {
DropViewState::Prepare => self.on_prepare().await,
DropViewState::DeleteMetadata => self.on_delete_metadata().await,
DropViewState::InvalidateViewCache => self.on_broadcast().await,
}
.map_err(handle_retry_error)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let table_ref = &self.data.table_ref();
let view_id = self.data.view_id();
let lock_key = vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableLock::Write(view_id).into(),
];

LockKey::new(lock_key)
}
}

/// The serializable data
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DropViewData {
state: DropViewState,
cluster_id: ClusterId,
task: DropViewTask,
}

impl DropViewData {
fn table_ref(&self) -> TableReference {
self.task.table_ref()
}

fn view_id(&self) -> TableId {
self.task.view_id
}
}

/// The state of drop view
#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
enum DropViewState {
/// Prepares to drop the view
Prepare,
/// Deletes metadata
DeleteMetadata,
/// Invalidate view cache
InvalidateViewCache,
}
50 changes: 44 additions & 6 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::ddl::create_view::CreateViewProcedure;
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::drop_flow::DropFlowProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::drop_view::DropViewProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
use crate::error::{
Expand All @@ -50,8 +51,8 @@ use crate::rpc::ddl::DdlTask::{
};
use crate::rpc::ddl::{
AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, CreateViewTask,
DropDatabaseTask, DropFlowTask, DropTableTask, QueryContext, SubmitDdlTaskRequest,
SubmitDdlTaskResponse, TruncateTableTask,
DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
Expand Down Expand Up @@ -131,7 +132,8 @@ impl DdlManager {
DropFlowProcedure,
TruncateTableProcedure,
CreateDatabaseProcedure,
DropDatabaseProcedure
DropDatabaseProcedure,
DropViewProcedure
);

for (type_name, loader_factory) in loaders {
Expand Down Expand Up @@ -306,8 +308,8 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
/// Submits and executes a drop flow task.
#[tracing::instrument(skip_all)]
pub async fn submit_drop_flow_task(
&self,
cluster_id: ClusterId,
Expand All @@ -320,6 +322,20 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

/// Submits and executes a drop view task.
#[tracing::instrument(skip_all)]
pub async fn submit_drop_view_task(
&self,
cluster_id: ClusterId,
drop_view: DropViewTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = DropViewProcedure::new(cluster_id, drop_view, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

self.submit_procedure(procedure_with_id).await
}

/// Submits and executes a truncate table task.
#[tracing::instrument(skip_all)]
pub async fn submit_truncate_table_task(
Expand Down Expand Up @@ -599,6 +615,28 @@ async fn handle_drop_flow_task(
})
}

async fn handle_drop_view_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
drop_view_task: DropViewTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, _) = ddl_manager
.submit_drop_view_task(cluster_id, drop_view_task.clone())
.await?;

let procedure_id = id.to_string();
info!(
"View {}({}) is dropped via procedure_id {id:?}",
drop_view_task.table_ref(),
drop_view_task.view_id,
);

Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}

async fn handle_create_flow_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
Expand Down Expand Up @@ -750,8 +788,8 @@ impl ProcedureExecutor for DdlManager {
CreateView(create_view_task) => {
handle_create_view_task(self, cluster_id, create_view_task).await
}
DropView(_create_view_task) => {
todo!("implemented in the following PR");
DropView(drop_view_task) => {
handle_drop_view_task(self, cluster_id, drop_view_task).await
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/common/meta/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ lazy_static! {
"greptime_meta_procedure_drop_flow",
"meta procedure drop flow",
&["step"]
)
.unwrap();
pub static ref METRIC_META_PROCEDURE_DROP_VIEW: HistogramVec = register_histogram_vec!(
"greptime_meta_procedure_drop_VIEW",
"meta procedure drop view",
&["step"]
)
.unwrap();
pub static ref METRIC_META_PROCEDURE_CREATE_TABLES: HistogramVec = register_histogram_vec!(
Expand Down
Loading

0 comments on commit 31689c5

Please sign in to comment.