Skip to content

Commit

Permalink
Start and stop per timeline recovery task.
Browse files Browse the repository at this point in the history
Slightly refactors init: now load_tenant_timelines is also async to properly
init the timeline, but to keep global map lock sync we just acquire it anew for
each timeline.

Recovery task itself is just a stub here.

part of
#4875
  • Loading branch information
arssher committed Aug 23, 2023
1 parent 704519d commit 9f080ca
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 55 deletions.
37 changes: 20 additions & 17 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,21 +341,35 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {

let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);

// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;

// Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
FuturesUnordered::new();

// Start wal backup launcher before loading timelines as we'll notify it
// through the channel about timelines which need offloading, not draining
// the channel would cause deadlock.
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));

// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;

let conf_ = conf.clone();
// Run everything in current thread rt, if asked.
if conf.current_thread_runtime {
info!("running in current thread runtime");
}
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));

let wal_service_handle = current_thread_rt
.as_ref()
Expand Down Expand Up @@ -408,17 +422,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("WAL remover".to_owned(), res));
tasks_handles.push(Box::pin(wal_remover_handle));

let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));

set_build_info_metric(GIT_VERSION);

// TODO: update tokio-stream, convert to real async Stream with
Expand Down
1 change: 1 addition & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod json_ctrl;
pub mod metrics;
pub mod pull_timeline;
pub mod receive_wal;
pub mod recovery;
pub mod remove_wal;
pub mod safekeeper;
pub mod send_wal;
Expand Down
4 changes: 3 additions & 1 deletion safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
tokio::fs::rename(tli_dir_path, &timeline_path).await?;

let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?;
let tli = GlobalTimelines::load_timeline(ttid)
.await
.context("Failed to load timeline after copy")?;

info!(
"Loaded timeline {}, flush_lsn={}",
Expand Down
44 changes: 44 additions & 0 deletions safekeeper/src/recovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//! This module implements pulling WAL from peer safekeepers if compute can't
//! provide it, i.e. safekeeper lags too much.
use std::sync::Arc;

use tokio::{select, time::sleep, time::Duration};
use tracing::{info, instrument};


use crate::{
timeline::{Timeline},
SafeKeeperConf,
};

/// Entrypoint for per timeline task which always runs, checking whether
/// recovery for this safekeeper is needed and starting it if so.
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};

select! {
_ = recovery_main_loop(tli) => { unreachable!() }
_ = cancellation_rx.changed() => {
info!("stopped");
}
}
}

const CHECK_INTERVAL_MS: u64 = 2000;

/// Check regularly whether we need to start recovery.
async fn recovery_main_loop(_tli: Arc<Timeline>) {
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
loop {
sleep(check_duration).await;
}
}
37 changes: 28 additions & 9 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;

use crate::receive_wal::WalReceivers;
use crate::recovery::recovery_main;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, Term,
Expand Down Expand Up @@ -327,7 +328,7 @@ pub struct Timeline {
impl Timeline {
/// Load existing timeline from disk.
pub fn load_timeline(
conf: SafeKeeperConf,
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<Timeline> {
Expand Down Expand Up @@ -355,7 +356,7 @@ impl Timeline {

/// Create a new timeline, which is not yet persisted to disk.
pub fn create_empty(
conf: SafeKeeperConf,
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
server_info: ServerInfo,
Expand All @@ -380,12 +381,16 @@ impl Timeline {
})
}

/// Initialize fresh timeline on disk and start background tasks. If bootstrap
/// Initialize fresh timeline on disk and start background tasks. If init
/// fails, timeline is cancelled and cannot be used anymore.
///
/// Bootstrap is transactional, so if it fails, created files will be deleted,
/// Init is transactional, so if it fails, created files will be deleted,
/// and state on disk should remain unchanged.
pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> {
pub async fn init_new(
self: &Arc<Timeline>,
shared_state: &mut MutexGuard<'_, SharedState>,
conf: &SafeKeeperConf,
) -> Result<()> {
match fs::metadata(&self.timeline_dir).await {
Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged
Expand All @@ -401,7 +406,7 @@ impl Timeline {
// Create timeline directory.
fs::create_dir_all(&self.timeline_dir).await?;

// Write timeline to disk and TODO: start background tasks.
// Write timeline to disk and start background tasks.
if let Err(e) = shared_state.sk.persist().await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
Expand All @@ -415,12 +420,16 @@ impl Timeline {

return Err(e);
}

// TODO: add more initialization steps here
self.update_status(shared_state);
self.bootstrap(conf);
Ok(())
}

/// Bootstrap new or existing timeline starting background stasks.
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
// Start recovery task which always runs on the timeline.
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}

/// Delete timeline from disk completely, by removing timeline directory. Background
/// timeline activities will stop eventually.
pub async fn delete_from_disk(
Expand Down Expand Up @@ -454,6 +463,16 @@ impl Timeline {
*self.cancellation_rx.borrow()
}

/// Returns watch channel which gets value when timeline is cancelled. It is
/// guaranteed to have not cancelled value observed (errors otherwise).
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
let rx = self.cancellation_rx.clone();
if *rx.borrow() {
bail!(TimelineError::Cancelled(self.ttid));
}
Ok(rx)
}

/// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock().await
Expand Down
66 changes: 38 additions & 28 deletions safekeeper/src/timelines_global_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use serde::Serialize;
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender;
use tracing::*;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct GlobalTimelines;

impl GlobalTimelines {
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
pub fn init(
pub async fn init(
conf: SafeKeeperConf,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<()> {
Expand All @@ -84,6 +84,7 @@ impl GlobalTimelines {
// named as a valid tenant_id.
let mut tenant_count = 0;
let tenants_dir = state.get_conf().workdir.clone();
drop(state);
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
.with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
{
Expand All @@ -93,7 +94,7 @@ impl GlobalTimelines {
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
{
tenant_count += 1;
GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?;
GlobalTimelines::load_tenant_timelines(tenant_id).await?;
}
}
Err(e) => error!(
Expand All @@ -108,25 +109,26 @@ impl GlobalTimelines {
info!(
"found {} tenants directories, successfully loaded {} timelines",
tenant_count,
state.timelines.len()
TIMELINES_STATE.lock().unwrap().timelines.len()
);
Ok(())
}

/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
/// errors if any.
///
/// Note: This function (and all reading/loading below) is sync because
/// timelines are loaded while holding GlobalTimelinesState lock. Which is
/// fine as this is called only from single threaded main runtime on boot,
/// but clippy complains anyway, and suppressing that isn't trivial as async
/// is the keyword, ha. That only other user is pull_timeline.rs for which
/// being blocked is not that bad, and we can do spawn_blocking.
fn load_tenant_timelines(
state: &mut MutexGuard<'_, GlobalTimelinesState>,
tenant_id: TenantId,
) -> Result<()> {
let timelines_dir = state.get_conf().tenant_dir(&tenant_id);
/// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
/// sync and there is no important reason to make it async (it is always
/// held for a short while) we just lock and unlock it for each timeline --
/// this function is called during init when nothing else is running, so
/// this is fine.
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
let state = TIMELINES_STATE.lock().unwrap();
let conf = state.get_conf().clone();
let wal_backup_launcher_tx = state.wal_backup_launcher_tx.as_ref().unwrap().clone();
drop(state);

let timelines_dir = conf.tenant_dir(&tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
{
Expand All @@ -136,13 +138,16 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(
state.get_conf().clone(),
ttid,
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
) {
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
Ok(timeline) => {
state.timelines.insert(ttid, Arc::new(timeline));
let tli = Arc::new(timeline);
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf);
tli.update_status_notify().await.unwrap();
}
// If we can't load a timeline, it's most likely because of a corrupted
// directory. We will log an error and won't allow to delete/recreate
Expand All @@ -168,18 +173,22 @@ impl GlobalTimelines {
}

/// Load timeline from disk to the memory.
pub fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();

match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) {
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
Ok(timeline) => {
let tli = Arc::new(timeline);

// TODO: prevent concurrent timeline creation/loading
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());

tli.bootstrap(&conf);

Ok(tli)
}
// If we can't load a timeline, it's bad. Caller will figure it out.
Expand Down Expand Up @@ -217,7 +226,7 @@ impl GlobalTimelines {
info!("creating new timeline {}", ttid);

let timeline = Arc::new(Timeline::create_empty(
conf,
&conf,
ttid,
wal_backup_launcher_tx,
server_info,
Expand All @@ -240,23 +249,24 @@ impl GlobalTimelines {
// Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
if let Err(e) = timeline.bootstrap(&mut shared_state).await {
// Note: the most likely reason for bootstrap failure is that the timeline
if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
// Note: the most likely reason for init failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve
// the timeline directory in this case, for further inspection.

// TODO: this is an unusual error, perhaps we should send it to sentry
// TODO: compute will try to create timeline every second, we should add backoff
error!("failed to bootstrap timeline {}: {}", ttid, e);
error!("failed to init new timeline {}: {}", ttid, e);

// Timeline failed to bootstrap, it cannot be used. Remove it from the map.
// Timeline failed to init, it cannot be used. Remove it from the map.
TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
return Err(e);
}
// We are done with bootstrap, release the lock, return the timeline.
// {} block forces release before .await
}
timeline.update_status_notify().await?;
timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
Ok(timeline)
}
Expand Down

0 comments on commit 9f080ca

Please sign in to comment.