Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate region failover implementation to region migration #4172

Merged
merged 19 commits into from
Jun 25, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: apply suggestions from CR
WenyXu committed Jun 25, 2024

Verified

This commit was signed with the committer’s verified signature.
WenyXu Weny Xu
commit c04ecd20aea0d2d35e18f0e35cd5b52a7f9a92ab
2 changes: 1 addition & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
@@ -139,7 +139,7 @@ use crate::rpc::store::BatchDeleteRequest;
use crate::DatanodeId;

pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*";
pub const MAINTENANCE_KEY: &str = "maintenance";
pub const MAINTENANCE_KEY: &str = "__maintenance";

const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
2 changes: 1 addition & 1 deletion src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ impl RegionFailureHandler {
pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self {
let heartbeat_acceptor = region_supervisor.heartbeat_acceptor();
info!("Starting region supervisor");
tokio::spawn(async move { region_supervisor.run().await });
common_runtime::spawn_bg(async move { region_supervisor.run().await });
Self { heartbeat_acceptor }
}
}
10 changes: 0 additions & 10 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
@@ -67,16 +67,6 @@ impl RegionMigrationProcedureTracker {
}
}

/// Returns the [RegionMigrationProcedureTask] of the specific region(`region_id`).
pub(crate) fn get(&self, region_id: RegionId) -> Option<RegionMigrationProcedureTask> {
self.running_procedures
.read()
.unwrap()
.get(&region_id)
.cloned()
}

#[cfg(test)]
/// Returns true if it contains the specific region(`region_id`).
pub(crate) fn contains(&self, region_id: RegionId) -> bool {
self.running_procedures
167 changes: 49 additions & 118 deletions src/meta-srv/src/region/supervisor.rs
Original file line number Diff line number Diff line change
@@ -12,21 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use common_meta::distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef, MAINTENANCE_KEY};
use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_meta::{ClusterId, DatanodeId};
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use snafu::{ensure, OptionExt, ResultExt};
use error::Error::{MigrationRunning, TableRouteNotFound};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{interval, MissedTickBehavior};
@@ -140,7 +138,7 @@ impl Drop for RegionSupervisorTicker {
pub type RegionSupervisorRef = Arc<RegionSupervisor>;

/// The default tick interval.
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS);
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
WenyXu marked this conversation as resolved.
Show resolved Hide resolved

/// The [`RegionSupervisor`] is used to detect Region failures
/// and initiate Region failover upon detection, ensuring uninterrupted region service.
@@ -161,7 +159,6 @@ pub struct RegionSupervisor {
region_migration_manager: RegionMigrationManagerRef,
// TODO(weny): find a better way
kv_backend: KvBackendRef,
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
table_metadata_manager: TableMetadataManagerRef,
}

/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
@@ -204,7 +201,6 @@ impl RegionSupervisor {
selector_context,
selector,
region_migration_manager,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
kv_backend,
}
}
@@ -244,7 +240,7 @@ impl RegionSupervisor {
info!("RegionSupervisor is stopped!");
}

async fn handle_region_failures(&self, regions: Vec<(ClusterId, DatanodeId, RegionId)>) {
async fn handle_region_failures(&self, mut regions: Vec<(ClusterId, DatanodeId, RegionId)>) {
if regions.is_empty() {
return;
}
@@ -260,26 +256,16 @@ impl RegionSupervisor {
}
}

let table_ids = regions
.iter()
.map(|(_, _, region_id)| region_id.table_id())
.collect::<HashSet<_>>()
.into_iter()
let migrating_regions = regions
.extract_if(|(_, _, region_id)| {
self.region_migration_manager.tracker().contains(*region_id)
})
.collect::<Vec<_>>();

let table_routes = match self
.table_metadata_manager
.table_route_manager()
.batch_get_physical_table_routes(&table_ids)
.await
.context(error::TableMetadataManagerSnafu)
{
Ok(table_routes) => table_routes,
Err(err) => {
error!(err; "Failed to retrieves table routes: {table_ids:?}");
return;
}
};
for (cluster_id, datanode_id, region_id) in migrating_regions {
self.failure_detector
.remove(&(cluster_id, datanode_id, region_id));
}

warn!(
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
"Detects region failures: {:?}",
@@ -289,71 +275,17 @@ impl RegionSupervisor {
.collect::<Vec<_>>()
);
for (cluster_id, datanode_id, region_id) in regions {
match table_routes.get(&region_id.table_id()) {
Some(route) => {
match self
.handle_region_failure(
cluster_id,
datanode_id,
region_id,
&route.region_routes,
)
.await
{
Ok(_) => {
self.failure_detector
.remove(&(cluster_id, datanode_id, region_id))
}
Err(err) => {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
}
}
}
None => {
info!(
"Skipping to execute region failover for region: {}, target table: {} is not exists",
region_id,
region_id.table_id()
);
self.failure_detector
.remove(&(cluster_id, datanode_id, region_id));
match self.do_failover(cluster_id, datanode_id, region_id).await {
Ok(_) => self
.failure_detector
.remove(&(cluster_id, datanode_id, region_id)),
Err(err) => {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
}
}
}
}

async fn handle_region_failure(
&self,
cluster_id: ClusterId,
datanode_id: DatanodeId,
region_id: RegionId,
region_routes: &[RegionRoute],
) -> Result<()> {
let region_leader_peer = region_routes
.iter()
.find_map(|region| {
if region.region.id == region_id {
region.leader_peer.clone()
} else {
None
}
})
.context(error::RegionLeaderNotFoundSnafu { region_id })?;
ensure!(
region_leader_peer.id == datanode_id,
error::UnexpectedSnafu {
violated: format!(
"Region leader peer is changed, expected: Datanode {}, actual: Datanode {}",
datanode_id, region_leader_peer.id
)
}
);
self.do_failover(cluster_id, region_leader_peer, region_id)
.await?;

Ok(())
}

pub(crate) async fn is_maintenance_mode(&self) -> Result<bool> {
self.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
@@ -364,39 +296,38 @@ impl RegionSupervisor {
async fn do_failover(
&self,
cluster_id: ClusterId,
from_peer: Peer,
datanode_id: DatanodeId,
region_id: RegionId,
) -> Result<()> {
let task = self.region_migration_manager.tracker().get(region_id);
match task {
Some(task) => {
info!(
"Region is migrating to Datanode({}), skipping the region failover",
task.to_peer.id
);
}
None => {
let mut peers = self
.selector
.select(
cluster_id,
&self.selector_context,
SelectorOptions {
min_required_items: 1,
allow_duplication: false,
},
)
.await?;
let to_peer = peers.remove(0);
let task = RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer,
to_peer,
replay_timeout: Duration::from_secs(60),
};
self.region_migration_manager.submit_procedure(task).await?;
}
let mut peers = self
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
.selector
.select(
cluster_id,
&self.selector_context,
SelectorOptions {
min_required_items: 1,
allow_duplication: false,
},
)
.await?;
let to_peer = peers.remove(0);
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
let task = RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer: Peer {
id: datanode_id,
addr: String::new(),
},
to_peer,
replay_timeout: Duration::from_secs(60),
};

if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
return match err {
// Returns Ok if it's running or table is dropped.
MigrationRunning { .. } | TableRouteNotFound { .. } => Ok(()),
err => Err(err),
};
};

Ok(())