Skip to content

Commit

Permalink
fix: ensure table route metadata is eventually rolled back on failure (
Browse files Browse the repository at this point in the history
…#5174)

* fix: ensure table route metadata is eventually rolled back on procedure failure

* fix(fuzz): enhance procedure condition checking

* chore: add logs

* feat: close downgraded leader region actively

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Dec 19, 2024
1 parent c9ad8c7 commit 66f0581
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 71 deletions.
76 changes: 72 additions & 4 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod close_downgraded_region;
pub(crate) mod downgrade_leader_region;
pub(crate) mod manager;
pub(crate) mod migration_abort;
Expand Down Expand Up @@ -43,6 +44,7 @@ use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
use common_telemetry::info;
use manager::RegionMigrationProcedureGuard;
pub use manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
Expand Down Expand Up @@ -91,7 +93,9 @@ impl PersistentContext {
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
TableLock::Read(region_id.table_id()).into(),
// The optimistic updating of table route is not working very well,
// so we need to use the write lock here.
TableLock::Write(region_id.table_id()).into(),
RegionLock::Write(region_id).into(),
];

Expand Down Expand Up @@ -253,7 +257,7 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;
Expand Down Expand Up @@ -317,7 +321,7 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableInfo: {table_id}"),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?;
Expand Down Expand Up @@ -350,7 +354,7 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
})?
.context(error::DatanodeTableNotFoundSnafu {
Expand Down Expand Up @@ -468,6 +472,48 @@ impl RegionMigrationProcedure {
_guard: guard,
})
}

async fn rollback_inner(&mut self) -> Result<()> {
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&["rollback"])
.start_timer();

let table_id = self.context.region_id().table_id();
let region_id = self.context.region_id();
self.context.remove_table_route_value();
let table_metadata_manager = self.context.table_metadata_manager.clone();
let table_route = self.context.get_table_route_value().await?;

// Safety: It must be a physical table route.
let downgraded = table_route
.region_routes()
.unwrap()
.iter()
.filter(|route| route.region.id == region_id)
.any(|route| route.is_leader_downgrading());

if downgraded {
info!("Rollbacking downgraded region leader table route, region: {region_id}");
table_metadata_manager
.update_leader_region_status(table_id, table_route, |route| {
if route.region.id == region_id {
Some(None)
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
})?;
}

self.context.register_failure_detectors().await;

Ok(())
}
}

#[async_trait::async_trait]
Expand All @@ -476,6 +522,16 @@ impl Procedure for RegionMigrationProcedure {
Self::TYPE_NAME
}

async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner()
.await
.map_err(ProcedureError::external)
}

fn rollback_supported(&self) -> bool {
true
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;

Expand Down Expand Up @@ -701,6 +757,12 @@ mod tests {
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the close downgraded region",
None,
Assertion::simple(assert_close_downgraded_region, assert_no_persist),
),
// CloseDowngradedRegion
Step::next(
"Should be the region migration end",
None,
Expand Down Expand Up @@ -1071,6 +1133,12 @@ mod tests {
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the close downgraded region",
None,
Assertion::simple(assert_close_downgraded_region, assert_no_persist),
),
// CloseDowngradedRegion
Step::next(
"Should be the region migration end",
None,
Expand Down
138 changes: 138 additions & 0 deletions src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::time::Duration;

use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;

use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);

#[derive(Debug, Serialize, Deserialize)]
pub struct CloseDowngradedRegion;

#[async_trait::async_trait]
#[typetag::serde]
impl State for CloseDowngradedRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
if let Err(err) = self.close_downgraded_leader_region(ctx).await {
let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
let region_id = ctx.region_id();
warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode);
}

Ok((Box::new(RegionMigrationEnd), Status::done()))
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl CloseDowngradedRegion {
/// Builds close region instruction.
///
/// Abort(non-retry):
/// - Datanode Table is not found.
async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode_id = pc.from_peer.id;
let cluster_id = pc.cluster_id;
let table_id = pc.region_id.table_id();
let region_number = pc.region_id.region_number();
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;

let RegionInfo { engine, .. } = datanode_table_value.region_info.clone();

Ok(Instruction::CloseRegion(RegionIdent {
cluster_id,
datanode_id: downgrade_leader_datanode_id,
table_id,
region_number,
engine,
}))
}

/// Closes the downgraded leader region.
async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
let close_instruction = self.build_close_region_instruction(ctx).await?;
let region_id = ctx.region_id();
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode = &pc.from_peer;
let msg = MailboxMessage::json_message(
&format!("Close downgraded region: {}", region_id),
&format!("Meta@{}", ctx.server_addr()),
&format!(
"Datanode-{}@{}",
downgrade_leader_datanode.id, downgrade_leader_datanode.addr
),
common_time::util::current_time_millis(),
&close_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: close_instruction.to_string(),
})?;

let ch = Channel::Datanode(downgrade_leader_datanode.id);
let receiver = ctx
.mailbox
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
.await?;

match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received close downgraded leade region reply: {:?}, region: {}",
reply, region_id
);
let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect close region reply",
}
.fail();
};

if result {
Ok(())
} else {
error::UnexpectedSnafu {
violated: format!(
"Failed to close downgraded leader region: {region_id} on datanode {:?}, error: {error:?}",
downgrade_leader_datanode,
),
}
.fail()
}
}

Err(e) => Err(e),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use super::open_candidate_region::OpenCandidateRegion;
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};

/// The behaviors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;

Expand Down Expand Up @@ -145,7 +145,10 @@ impl OpenCandidateRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Received open region reply: {:?}", reply);
info!(
"Received open region reply: {:?}, region: {}",
reply, region_id
);
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
Expand Down
20 changes: 15 additions & 5 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,21 @@ use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use tokio::sync::mpsc::{Receiver, Sender};

use super::manager::RegionMigrationProcedureTracker;
use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::error::{self, Error, Result};
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::PersistentContext;
use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
use crate::procedure::region_migration::{
Context, ContextFactory, DefaultContextFactory, PersistentContext, State, VolatileContext,
};
use crate::service::mailbox::{Channel, MailboxRef};

pub type MockHeartbeatReceiver = Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>;
Expand Down Expand Up @@ -569,6 +571,14 @@ pub(crate) fn assert_region_migration_end(next: &dyn State) {
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}

/// Asserts the [State] should be [CloseDowngradedRegion].
pub(crate) fn assert_close_downgraded_region(next: &dyn State) {
let _ = next
.as_any()
.downcast_ref::<CloseDowngradedRegion>()
.unwrap();
}

/// Asserts the [State] should be [RegionMigrationAbort].
pub(crate) fn assert_region_migration_abort(next: &dyn State) {
let _ = next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use common_procedure::Status;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use crate::error::Result;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::{Context, State};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -58,7 +58,7 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
};
Ok((Box::new(RegionMigrationEnd), Status::done()))
Ok((Box::new(CloseDowngradedRegion), Status::executing(false)))
}
UpdateMetadata::Rollback => {
self.rollback_downgraded_region(ctx).await?;
Expand Down
Loading

0 comments on commit 66f0581

Please sign in to comment.