diff --git a/libs/utils/src/generation.rs b/libs/utils/src/generation.rs new file mode 100644 index 000000000000..87c636125593 --- /dev/null +++ b/libs/utils/src/generation.rs @@ -0,0 +1,113 @@ +use std::fmt::Debug; + +use serde::{Deserialize, Serialize}; + +/// Tenant generations are used to provide split-brain safety and allow +/// multiple pageservers to attach the same tenant concurrently. +/// +/// See docs/rfcs/025-generation-numbers.md for detail on how generation +/// numbers are used. +#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)] +pub enum Generation { + // Generations with this magic value will not add a suffix to S3 keys, and will not + // be included in persisted index_part.json. This value is only to be used + // during migration from pre-generation metadata to generation-aware metadata, + // and should eventually go away. + // + // A special Generation is used rather than always wrapping Generation in an Option, + // so that code handling generations doesn't have to be aware of the legacy + // case everywhere it touches a generation. + None, + // Generations with this magic value may never be used to construct S3 keys: + // we will panic if someone tries to. This is for Tenants in the "Broken" state, + // so that we can satisfy their constructor with a Generation without risking + // a code bug using it in an S3 write (broken tenants should never write) + Broken, + Valid(u32), +} + +/// The Generation type represents a number associated with a Tenant, which +/// increments every time the tenant is attached to a new pageserver, or +/// an attached pageserver restarts. +/// +/// It is included as a suffix in S3 keys, as a protection against split-brain +/// scenarios where pageservers might otherwise issue conflicting writes to +/// remote storage +impl Generation { + /// Create a new Generation that represents a legacy key format with + /// no generation suffix + pub fn none() -> Self { + Self::None + } + + // Create a new generation that will panic if you try to use get_suffix + pub fn broken() -> Self { + Self::Broken + } + + pub fn new(v: u32) -> Self { + Self::Valid(v) + } + + pub fn is_none(&self) -> bool { + matches!(self, Self::None) + } + + pub fn get_suffix(&self) -> String { + match self { + Self::Valid(v) => { + format!("-{:08x}", v) + } + Self::None => "".into(), + Self::Broken => { + panic!("Tried to use a broken generation"); + } + } + } +} + +impl Serialize for Generation { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if let Self::Valid(v) = self { + v.serialize(serializer) + } else { + // We should never be asked to serialize a None or Broken. Structures + // that include an optional generation should convert None to an + // Option::None + Err(serde::ser::Error::custom( + "Tried to serialize invalid generation ({self})", + )) + } + } +} + +impl<'de> Deserialize<'de> for Generation { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + Ok(Self::Valid(u32::deserialize(deserializer)?)) + } +} + +// We intentionally do not implement Display for Generation, to reduce the +// risk of a bug where the generation is used in a format!() string directly +// instead of using get_suffix(). +impl Debug for Generation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Valid(v) => { + write!(f, "{:08x}", v) + } + Self::None => { + write!(f, "") + } + Self::Broken => { + write!(f, "") + } + } + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 6cf829a67c2a..160e08283311 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -27,6 +27,9 @@ pub mod id; // http endpoint utils pub mod http; +// definition of the Generation type for pageserver attachment APIs +pub mod generation; + // common log initialisation routine pub mod logging; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f2aa2f365eb4..5394f17398e7 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -643,23 +643,6 @@ impl PageServerConf { .join(METADATA_FILE_NAME) } - /// Files on the remote storage are stored with paths, relative to the workdir. - /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path. - /// - /// Errors if the path provided does not start from pageserver's workdir. - pub fn remote_path(&self, local_path: &Path) -> anyhow::Result { - local_path - .strip_prefix(&self.workdir) - .context("Failed to strip workdir prefix") - .and_then(RemotePath::new) - .with_context(|| { - format!( - "Failed to resolve remote part of path {:?} for base {:?}", - local_path, self.workdir - ) - }) - } - /// Turns storage remote path of a file into its local path. pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf { remote_path.with_base(&self.workdir) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a204f8a22b5d..2168db57de25 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -85,6 +85,7 @@ pub use pageserver_api::models::TenantState; use toml_edit; use utils::{ crashsafe, + generation::Generation, id::{TenantId, TimelineId}, lsn::{Lsn, RecordLsn}, }; @@ -178,6 +179,11 @@ pub struct Tenant { tenant_conf: Arc>, tenant_id: TenantId, + + /// The remote storage generation, used to protect S3 objects from split-brain. + /// Does not change over the lifetime of the [`Tenant`] object. + generation: Generation, + timelines: Mutex>>, // This mutex prevents creation of new timelines during GC. // Adding yet another mutex (in addition to `timelines`) is needed because holding @@ -522,6 +528,7 @@ impl Tenant { pub(crate) fn spawn_attach( conf: &'static PageServerConf, tenant_id: TenantId, + generation: Generation, broker_client: storage_broker::BrokerClientChannel, tenants: &'static tokio::sync::RwLock, remote_storage: GenericRemoteStorage, @@ -538,6 +545,7 @@ impl Tenant { tenant_conf, wal_redo_manager, tenant_id, + generation, Some(remote_storage.clone()), )); @@ -648,12 +656,8 @@ impl Tenant { .as_ref() .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?; - let remote_timeline_ids = remote_timeline_client::list_remote_timelines( - remote_storage, - self.conf, - self.tenant_id, - ) - .await?; + let remote_timeline_ids = + remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?; info!("found {} timelines", remote_timeline_ids.len()); @@ -665,6 +669,7 @@ impl Tenant { self.conf, self.tenant_id, timeline_id, + self.generation, ); part_downloads.spawn( async move { @@ -851,6 +856,7 @@ impl Tenant { TenantConfOpt::default(), wal_redo_manager, tenant_id, + Generation::broken(), None, )) } @@ -868,6 +874,7 @@ impl Tenant { pub(crate) fn spawn_load( conf: &'static PageServerConf, tenant_id: TenantId, + generation: Generation, resources: TenantSharedResources, init_order: Option, tenants: &'static tokio::sync::RwLock, @@ -893,6 +900,7 @@ impl Tenant { tenant_conf, wal_redo_manager, tenant_id, + generation, remote_storage.clone(), ); let tenant = Arc::new(tenant); @@ -2274,6 +2282,7 @@ impl Tenant { ancestor, new_timeline_id, self.tenant_id, + self.generation, Arc::clone(&self.walredo_mgr), resources, pg_version, @@ -2291,6 +2300,7 @@ impl Tenant { tenant_conf: TenantConfOpt, walredo_mgr: Arc, tenant_id: TenantId, + generation: Generation, remote_storage: Option, ) -> Tenant { let (state, mut rx) = watch::channel(state); @@ -2349,6 +2359,7 @@ impl Tenant { Tenant { tenant_id, + generation, conf, // using now here is good enough approximation to catch tenants with really long // activation times. @@ -2931,6 +2942,7 @@ impl Tenant { self.conf, self.tenant_id, timeline_id, + self.generation, ); Some(remote_client) } else { @@ -3454,6 +3466,7 @@ pub mod harness { pub conf: &'static PageServerConf, pub tenant_conf: TenantConf, pub tenant_id: TenantId, + pub generation: Generation, } static LOG_HANDLE: OnceCell<()> = OnceCell::new(); @@ -3495,6 +3508,7 @@ pub mod harness { conf, tenant_conf, tenant_id, + generation: Generation::new(0xdeadbeef), }) } @@ -3521,6 +3535,7 @@ pub mod harness { TenantConfOpt::from(self.tenant_conf), walredo_mgr, self.tenant_id, + self.generation, remote_storage, )); tenant diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a558c7d0badb..87617b544cf4 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -25,6 +25,7 @@ use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantSt use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME}; use utils::fs_ext::PathExt; +use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use super::delete::DeleteTenantError; @@ -202,6 +203,7 @@ pub(crate) fn schedule_local_tenant_processing( match Tenant::spawn_attach( conf, tenant_id, + Generation::none(), resources.broker_client, tenants, remote_storage, @@ -224,7 +226,15 @@ pub(crate) fn schedule_local_tenant_processing( } else { info!("tenant {tenant_id} is assumed to be loadable, starting load operation"); // Start loading the tenant into memory. It will initially be in Loading state. - Tenant::spawn_load(conf, tenant_id, resources, init_order, tenants, ctx) + Tenant::spawn_load( + conf, + tenant_id, + Generation::none(), + resources, + init_order, + tenants, + ctx, + ) }; Ok(tenant) } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index e46205810a00..50bb8b43dee9 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -216,7 +216,7 @@ use utils::backoff::{ }; use std::collections::{HashMap, VecDeque}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; @@ -235,6 +235,7 @@ use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::upload_queue::Delete; +use crate::tenant::TIMELINES_SEGMENT_NAME; use crate::{ config::PageServerConf, task_mgr, @@ -252,6 +253,7 @@ use self::index::IndexPart; use super::storage_layer::LayerFileName; use super::upload_queue::SetDeletedFlagProgress; +use super::Generation; // Occasional network issues and such can cause remote operations to fail, and // that's expected. If a download fails, we log it at info-level, and retry. @@ -315,6 +317,7 @@ pub struct RemoteTimelineClient { tenant_id: TenantId, timeline_id: TimelineId, + generation: Generation, upload_queue: Mutex, @@ -335,12 +338,14 @@ impl RemoteTimelineClient { conf: &'static PageServerConf, tenant_id: TenantId, timeline_id: TimelineId, + generation: Generation, ) -> RemoteTimelineClient { RemoteTimelineClient { conf, runtime: BACKGROUND_RUNTIME.handle().to_owned(), tenant_id, timeline_id, + generation, storage_impl: remote_storage, upload_queue: Mutex::new(UploadQueue::Uninitialized), metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)), @@ -449,10 +454,10 @@ impl RemoteTimelineClient { ); let index_part = download::download_index_part( - self.conf, &self.storage_impl, &self.tenant_id, &self.timeline_id, + self.generation, ) .measure_remote_op( self.tenant_id, @@ -650,22 +655,41 @@ impl RemoteTimelineClient { // from latest_files, but not yet scheduled for deletion. Use a closure // to syntactically forbid ? or bail! calls here. let no_bail_here = || { - for name in names { - if upload_queue.latest_files.remove(name).is_some() { - upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; - } - } + // Decorate our list of names with each name's generation, dropping + // makes that are unexpectedly missing from our metadata. + let with_generations: Vec<_> = names + .iter() + .filter_map(|name| { + // Remove from latest_files, learning the file's remote generation in the process + let meta = upload_queue.latest_files.remove(name); + + if let Some(meta) = meta { + upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; + Some((name, meta.generation)) + } else { + // This can only happen if we forgot to to schedule the file upload + // before scheduling the delete. Log it because it is a rare/strange + // situation, and in case something is misbehaving, we'd like to know which + // layers experienced this. + info!( + "Deleting layer {name} not found in latest_files list, never uploaded?" + ); + None + } + }) + .collect(); if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 { self.schedule_index_upload(upload_queue, metadata); } // schedule the actual deletions - for name in names { + for (name, generation) in with_generations { let op = UploadOp::Delete(Delete { file_kind: RemoteOpFileKind::Layer, layer_file_name: name.clone(), scheduled_from_timeline_delete: false, + generation, }); self.calls_unfinished_metric_begin(&op); upload_queue.queued_operations.push_back(op); @@ -761,10 +785,10 @@ impl RemoteTimelineClient { backoff::retry( || { upload::upload_index_part( - self.conf, &self.storage_impl, &self.tenant_id, &self.timeline_id, + self.generation, &index_part_with_deleted_at, ) }, @@ -822,12 +846,14 @@ impl RemoteTimelineClient { .reserve(stopped.upload_queue_for_deletion.latest_files.len()); // schedule the actual deletions - for name in stopped.upload_queue_for_deletion.latest_files.keys() { + for (name, meta) in &stopped.upload_queue_for_deletion.latest_files { let op = UploadOp::Delete(Delete { file_kind: RemoteOpFileKind::Layer, layer_file_name: name.clone(), scheduled_from_timeline_delete: true, + generation: meta.generation, }); + self.calls_unfinished_metric_begin(&op); stopped .upload_queue_for_deletion @@ -850,8 +876,7 @@ impl RemoteTimelineClient { // Do not delete index part yet, it is needed for possible retry. If we remove it first // and retry will arrive to different pageserver there wont be any traces of it on remote storage - let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id); - let timeline_storage_path = self.conf.remote_path(&timeline_path)?; + let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id); let remaining = backoff::retry( || async { @@ -1055,15 +1080,17 @@ impl RemoteTimelineClient { let upload_result: anyhow::Result<()> = match &task.op { UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => { - let path = &self + let path = self .conf .timeline_path(&self.tenant_id, &self.timeline_id) .join(layer_file_name.file_name()); + upload::upload_timeline_layer( self.conf, &self.storage_impl, - path, + &path, layer_metadata, + self.generation, ) .measure_remote_op( self.tenant_id, @@ -1085,10 +1112,10 @@ impl RemoteTimelineClient { }; let res = upload::upload_index_part( - self.conf, &self.storage_impl, &self.tenant_id, &self.timeline_id, + self.generation, index_part, ) .measure_remote_op( @@ -1113,7 +1140,7 @@ impl RemoteTimelineClient { .conf .timeline_path(&self.tenant_id, &self.timeline_id) .join(delete.layer_file_name.file_name()); - delete::delete_layer(self.conf, &self.storage_impl, path) + delete::delete_layer(self.conf, &self.storage_impl, path, delete.generation) .measure_remote_op( self.tenant_id, self.timeline_id, @@ -1360,6 +1387,71 @@ impl RemoteTimelineClient { } } +pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath { + let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}"); + RemotePath::from_string(&path).expect("Failed to construct path") +} + +pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath { + remote_timelines_path(tenant_id).join(&PathBuf::from(timeline_id.to_string())) +} + +pub fn remote_layer_path( + tenant_id: &TenantId, + timeline_id: &TimelineId, + layer_file_name: &LayerFileName, + layer_meta: &LayerFileMetadata, +) -> RemotePath { + // Generation-aware key format + let path = format!( + "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}", + layer_file_name.file_name(), + layer_meta.generation.get_suffix() + ); + + RemotePath::from_string(&path).expect("Failed to construct path") +} + +pub fn remote_index_path( + tenant_id: &TenantId, + timeline_id: &TimelineId, + generation: Generation, +) -> RemotePath { + RemotePath::from_string(&format!( + "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}", + IndexPart::FILE_NAME, + generation.get_suffix() + )) + .expect("Failed to construct path") +} + +/// Files on the remote storage are stored with paths, relative to the workdir. +/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path. +/// +/// Errors if the path provided does not start from pageserver's workdir. +pub fn remote_path( + conf: &PageServerConf, + local_path: &Path, + generation: Generation, +) -> anyhow::Result { + let stripped = local_path + .strip_prefix(&conf.workdir) + .context("Failed to strip workdir prefix")?; + + let suffixed = format!( + "{0}{1}", + stripped.to_string_lossy(), + generation.get_suffix() + ); + + RemotePath::new(&PathBuf::from(suffixed)).with_context(|| { + format!( + "to resolve remote part of path {:?} for base {:?}", + local_path, conf.workdir + ) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -1367,7 +1459,7 @@ mod tests { context::RequestContext, tenant::{ harness::{TenantHarness, TIMELINE_ID}, - Tenant, Timeline, + Generation, Tenant, Timeline, }, DEFAULT_PG_VERSION, }; @@ -1409,8 +1501,11 @@ mod tests { assert_eq!(avec, bvec); } - fn assert_remote_files(expected: &[&str], remote_path: &Path) { - let mut expected: Vec = expected.iter().map(|x| String::from(*x)).collect(); + fn assert_remote_files(expected: &[&str], remote_path: &Path, generation: Generation) { + let mut expected: Vec = expected + .iter() + .map(|x| format!("{}{}", x, generation.get_suffix())) + .collect(); expected.sort(); let mut found: Vec = Vec::new(); @@ -1461,6 +1556,8 @@ mod tests { storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), }; + let generation = Generation::new(0xdeadbeef); + let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); let client = Arc::new(RemoteTimelineClient { @@ -1468,6 +1565,7 @@ mod tests { runtime: tokio::runtime::Handle::current(), tenant_id: harness.tenant_id, timeline_id: TIMELINE_ID, + generation, storage_impl: storage, upload_queue: Mutex::new(UploadQueue::Uninitialized), metrics: Arc::new(RemoteTimelineClientMetrics::new( @@ -1526,6 +1624,8 @@ mod tests { .init_upload_queue_for_empty_remote(&metadata) .unwrap(); + let generation = Generation::new(0xdeadbeef); + // Create a couple of dummy files, schedule upload for them let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(); @@ -1545,13 +1645,13 @@ mod tests { client .schedule_layer_file_upload( &layer_file_name_1, - &LayerFileMetadata::new(content_1.len() as u64), + &LayerFileMetadata::new(content_1.len() as u64, generation), ) .unwrap(); client .schedule_layer_file_upload( &layer_file_name_2, - &LayerFileMetadata::new(content_2.len() as u64), + &LayerFileMetadata::new(content_2.len() as u64, generation), ) .unwrap(); @@ -1615,7 +1715,7 @@ mod tests { client .schedule_layer_file_upload( &layer_file_name_3, - &LayerFileMetadata::new(content_3.len() as u64), + &LayerFileMetadata::new(content_3.len() as u64, generation), ) .unwrap(); client @@ -1639,6 +1739,7 @@ mod tests { "index_part.json", ], &remote_timeline_dir, + generation, ); // Finish them @@ -1651,6 +1752,7 @@ mod tests { "index_part.json", ], &remote_timeline_dir, + generation, ); } @@ -1703,12 +1805,14 @@ mod tests { // Test + let generation = Generation::new(0xdeadbeef); + let init = get_bytes_started_stopped(); client .schedule_layer_file_upload( &layer_file_name_1, - &LayerFileMetadata::new(content_1.len() as u64), + &LayerFileMetadata::new(content_1.len() as u64, generation), ) .unwrap(); diff --git a/pageserver/src/tenant/remote_timeline_client/delete.rs b/pageserver/src/tenant/remote_timeline_client/delete.rs index 3f505d45ab41..7324559223d6 100644 --- a/pageserver/src/tenant/remote_timeline_client/delete.rs +++ b/pageserver/src/tenant/remote_timeline_client/delete.rs @@ -5,25 +5,30 @@ use tracing::debug; use remote_storage::GenericRemoteStorage; -use crate::config::PageServerConf; +use crate::{ + config::PageServerConf, + tenant::{remote_timeline_client::remote_path, Generation}, +}; pub(super) async fn delete_layer<'a>( conf: &'static PageServerConf, storage: &'a GenericRemoteStorage, local_layer_path: &'a Path, + generation: Generation, ) -> anyhow::Result<()> { fail::fail_point!("before-delete-layer", |_| { anyhow::bail!("failpoint before-delete-layer") }); debug!("Deleting layer from remote storage: {local_layer_path:?}",); - let path_to_delete = conf.remote_path(local_layer_path)?; + let path_to_delete = remote_path(conf, local_layer_path, generation)?; // We don't want to print an error if the delete failed if the file has // already been deleted. Thankfully, in this situation S3 already // does not yield an error. While OS-provided local file system APIs do yield // errors, we avoid them in the `LocalFs` wrapper. - storage.delete(&path_to_delete).await.with_context(|| { - format!("Failed to delete remote layer from storage at {path_to_delete:?}") - }) + storage + .delete(&path_to_delete) + .await + .with_context(|| format!("delete remote layer from storage at {path_to_delete:?}")) } diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 2cb33f07c9a4..dc8d87b9e186 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -15,14 +15,16 @@ use tokio_util::sync::CancellationToken; use utils::{backoff, crashsafe}; use crate::config::PageServerConf; +use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerFileName; use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id; +use crate::tenant::Generation; use remote_storage::{DownloadError, GenericRemoteStorage}; use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use super::index::{IndexPart, LayerFileMetadata}; -use super::{FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES}; +use super::{remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES}; static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120); @@ -41,13 +43,11 @@ pub async fn download_layer_file<'a>( ) -> Result { debug_assert_current_span_has_tenant_and_timeline_id(); - let timeline_path = conf.timeline_path(&tenant_id, &timeline_id); + let local_path = conf + .timeline_path(&tenant_id, &timeline_id) + .join(layer_file_name.file_name()); - let local_path = timeline_path.join(layer_file_name.file_name()); - - let remote_path = conf - .remote_path(&local_path) - .map_err(DownloadError::Other)?; + let remote_path = remote_layer_path(&tenant_id, &timeline_id, layer_file_name, layer_metadata); // Perform a rename inspired by durable_rename from file_utils.c. // The sequence: @@ -64,33 +64,43 @@ pub async fn download_layer_file<'a>( let (mut destination_file, bytes_amount) = download_retry( || async { // TODO: this doesn't use the cached fd for some reason? - let mut destination_file = fs::File::create(&temp_file_path).await.with_context(|| { - format!( - "create a destination file for layer '{}'", - temp_file_path.display() + let mut destination_file = fs::File::create(&temp_file_path) + .await + .with_context(|| { + format!( + "create a destination file for layer '{}'", + temp_file_path.display() + ) + }) + .map_err(DownloadError::Other)?; + let mut download = storage + .download(&remote_path) + .await + .with_context(|| { + format!( + "open a download stream for layer with remote storage path '{remote_path:?}'" ) - }) - .map_err(DownloadError::Other)?; - let mut download = storage.download(&remote_path).await.with_context(|| { + }) + .map_err(DownloadError::Other)?; + + let bytes_amount = tokio::time::timeout( + MAX_DOWNLOAD_DURATION, + tokio::io::copy(&mut download.download_stream, &mut destination_file), + ) + .await + .map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out {:?}", e)))? + .with_context(|| { format!( - "open a download stream for layer with remote storage path '{remote_path:?}'" + "download layer at remote path '{remote_path:?}' into file {temp_file_path:?}" ) }) .map_err(DownloadError::Other)?; - let bytes_amount = tokio::time::timeout(MAX_DOWNLOAD_DURATION, tokio::io::copy(&mut download.download_stream, &mut destination_file)) - .await - .map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out {:?}", e)))? - .with_context(|| { - format!("Failed to download layer with remote storage path '{remote_path:?}' into file {temp_file_path:?}") - }) - .map_err(DownloadError::Other)?; - Ok((destination_file, bytes_amount)) - }, &format!("download {remote_path:?}"), - ).await?; + ) + .await?; // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: // A file will not be closed immediately when it goes out of scope if there are any IO operations @@ -103,12 +113,7 @@ pub async fn download_layer_file<'a>( destination_file .flush() .await - .with_context(|| { - format!( - "failed to flush source file at {}", - temp_file_path.display() - ) - }) + .with_context(|| format!("flush source file at {}", temp_file_path.display())) .map_err(DownloadError::Other)?; let expected = layer_metadata.file_size(); @@ -139,17 +144,12 @@ pub async fn download_layer_file<'a>( fs::rename(&temp_file_path, &local_path) .await - .with_context(|| { - format!( - "Could not rename download layer file to {}", - local_path.display(), - ) - }) + .with_context(|| format!("rename download layer file to {}", local_path.display(),)) .map_err(DownloadError::Other)?; crashsafe::fsync_async(&local_path) .await - .with_context(|| format!("Could not fsync layer file {}", local_path.display(),)) + .with_context(|| format!("fsync layer file {}", local_path.display(),)) .map_err(DownloadError::Other)?; tracing::debug!("download complete: {}", local_path.display()); @@ -173,21 +173,19 @@ pub fn is_temp_download_file(path: &Path) -> bool { } /// List timelines of given tenant in remote storage -pub async fn list_remote_timelines<'a>( - storage: &'a GenericRemoteStorage, - conf: &'static PageServerConf, +pub async fn list_remote_timelines( + storage: &GenericRemoteStorage, tenant_id: TenantId, ) -> anyhow::Result> { - let tenant_path = conf.timelines_path(&tenant_id); - let tenant_storage_path = conf.remote_path(&tenant_path)?; + let remote_path = remote_timelines_path(&tenant_id); fail::fail_point!("storage-sync-list-remote-timelines", |_| { anyhow::bail!("storage-sync-list-remote-timelines"); }); let timelines = download_retry( - || storage.list_prefixes(Some(&tenant_storage_path)), - &format!("list prefixes for {tenant_path:?}"), + || storage.list_prefixes(Some(&remote_path)), + &format!("list prefixes for {tenant_id}"), ) .await?; @@ -202,9 +200,9 @@ pub async fn list_remote_timelines<'a>( anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}") })?; - let timeline_id: TimelineId = object_name.parse().with_context(|| { - format!("failed to parse object name into timeline id '{object_name}'") - })?; + let timeline_id: TimelineId = object_name + .parse() + .with_context(|| format!("parse object name into timeline id '{object_name}'"))?; // list_prefixes is assumed to return unique names. Ensure this here. // NB: it's safer to bail out than warn-log this because the pageserver @@ -222,21 +220,16 @@ pub async fn list_remote_timelines<'a>( } pub(super) async fn download_index_part( - conf: &'static PageServerConf, storage: &GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, + generation: Generation, ) -> Result { - let index_part_path = conf - .metadata_path(tenant_id, timeline_id) - .with_file_name(IndexPart::FILE_NAME); - let part_storage_path = conf - .remote_path(&index_part_path) - .map_err(DownloadError::BadInput)?; + let remote_path = remote_index_path(tenant_id, timeline_id, generation); let index_part_bytes = download_retry( || async { - let mut index_part_download = storage.download(&part_storage_path).await?; + let mut index_part_download = storage.download(&remote_path).await?; let mut index_part_bytes = Vec::new(); tokio::io::copy( @@ -244,20 +237,16 @@ pub(super) async fn download_index_part( &mut index_part_bytes, ) .await - .with_context(|| { - format!("Failed to download an index part into file {index_part_path:?}") - }) + .with_context(|| format!("download index part at {remote_path:?}")) .map_err(DownloadError::Other)?; Ok(index_part_bytes) }, - &format!("download {part_storage_path:?}"), + &format!("download {remote_path:?}"), ) .await?; let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) - .with_context(|| { - format!("Failed to deserialize index part file into file {index_part_path:?}") - }) + .with_context(|| format!("download index part file at {remote_path:?}")) .map_err(DownloadError::Other)?; Ok(index_part) diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index bcde6589c5f8..37ed0e2c3ff5 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -12,6 +12,7 @@ use utils::bin_ser::SerializeError; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::storage_layer::LayerFileName; use crate::tenant::upload_queue::UploadQueueInitialized; +use crate::tenant::Generation; use utils::lsn::Lsn; @@ -20,22 +21,28 @@ use utils::lsn::Lsn; /// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which /// might have less or more metadata depending if upgrading or rolling back an upgrade. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -#[cfg_attr(test, derive(Default))] +//#[cfg_attr(test, derive(Default))] pub struct LayerFileMetadata { file_size: u64, + + pub(crate) generation: Generation, } impl From<&'_ IndexLayerMetadata> for LayerFileMetadata { fn from(other: &IndexLayerMetadata) -> Self { LayerFileMetadata { file_size: other.file_size, + generation: other.generation, } } } impl LayerFileMetadata { - pub fn new(file_size: u64) -> Self { - LayerFileMetadata { file_size } + pub fn new(file_size: u64, generation: Generation) -> Self { + LayerFileMetadata { + file_size, + generation, + } } pub fn file_size(&self) -> u64 { @@ -128,15 +135,20 @@ impl TryFrom<&UploadQueueInitialized> for IndexPart { } /// Serialized form of [`LayerFileMetadata`]. -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct IndexLayerMetadata { pub(super) file_size: u64, + + #[serde(default = "Generation::none")] + #[serde(skip_serializing_if = "Generation::is_none")] + pub(super) generation: Generation, } impl From for IndexLayerMetadata { fn from(other: LayerFileMetadata) -> Self { IndexLayerMetadata { file_size: other.file_size, + generation: other.generation, } } } @@ -164,11 +176,13 @@ mod tests { layer_metadata: HashMap::from([ ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { file_size: 25600000, + generation: Generation::none() }), ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: 9007199254741001, + generation: Generation::none() }) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), @@ -200,11 +214,13 @@ mod tests { layer_metadata: HashMap::from([ ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { file_size: 25600000, + generation: Generation::none() }), ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: 9007199254741001, + generation: Generation::none() }) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), @@ -237,11 +253,13 @@ mod tests { layer_metadata: HashMap::from([ ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { file_size: 25600000, + generation: Generation::none() }), ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: 9007199254741001, + generation: Generation::none() }) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index a805e9bd60e8..c442c4f4456e 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -5,7 +5,11 @@ use fail::fail_point; use std::{io::ErrorKind, path::Path}; use tokio::fs; -use crate::{config::PageServerConf, tenant::remote_timeline_client::index::IndexPart}; +use super::Generation; +use crate::{ + config::PageServerConf, + tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path}, +}; use remote_storage::GenericRemoteStorage; use utils::id::{TenantId, TimelineId}; @@ -15,10 +19,10 @@ use tracing::info; /// Serializes and uploads the given index part data to the remote storage. pub(super) async fn upload_index_part<'a>( - conf: &'static PageServerConf, storage: &'a GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, + generation: Generation, index_part: &'a IndexPart, ) -> anyhow::Result<()> { tracing::trace!("uploading new index part"); @@ -27,20 +31,16 @@ pub(super) async fn upload_index_part<'a>( bail!("failpoint before-upload-index") }); - let index_part_bytes = serde_json::to_vec(&index_part) - .context("Failed to serialize index part file into bytes")?; + let index_part_bytes = + serde_json::to_vec(&index_part).context("serialize index part file into bytes")?; let index_part_size = index_part_bytes.len(); let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes)); - let index_part_path = conf - .metadata_path(tenant_id, timeline_id) - .with_file_name(IndexPart::FILE_NAME); - let storage_path = conf.remote_path(&index_part_path)?; - + let remote_path = remote_index_path(tenant_id, timeline_id, generation); storage - .upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path) + .upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path) .await - .with_context(|| format!("Failed to upload index part for '{tenant_id} / {timeline_id}'")) + .with_context(|| format!("upload index part for '{tenant_id} / {timeline_id}'")) } /// Attempts to upload given layer files. @@ -52,12 +52,13 @@ pub(super) async fn upload_timeline_layer<'a>( storage: &'a GenericRemoteStorage, source_path: &'a Path, known_metadata: &'a LayerFileMetadata, + generation: Generation, ) -> anyhow::Result<()> { fail_point!("before-upload-layer", |_| { bail!("failpoint before-upload-layer") }); - let storage_path = conf.remote_path(source_path)?; + let storage_path = remote_path(conf, source_path, generation)?; let source_file_res = fs::File::open(&source_path).await; let source_file = match source_file_res { Ok(source_file) => source_file, @@ -70,16 +71,15 @@ pub(super) async fn upload_timeline_layer<'a>( info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."); return Ok(()); } - Err(e) => Err(e) - .with_context(|| format!("Failed to open a source file for layer {source_path:?}"))?, + Err(e) => { + Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))? + } }; let fs_size = source_file .metadata() .await - .with_context(|| { - format!("Failed to get the source file metadata for layer {source_path:?}") - })? + .with_context(|| format!("get the source file metadata for layer {source_path:?}"))? .len(); let metadata_size = known_metadata.file_size(); @@ -87,19 +87,13 @@ pub(super) async fn upload_timeline_layer<'a>( bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}"); } - let fs_size = usize::try_from(fs_size).with_context(|| { - format!("File {source_path:?} size {fs_size} could not be converted to usize") - })?; + let fs_size = usize::try_from(fs_size) + .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?; storage .upload(source_file, fs_size, &storage_path, None) .await - .with_context(|| { - format!( - "Failed to upload a layer from local path '{}'", - source_path.display() - ) - })?; + .with_context(|| format!("upload layer from local path '{}'", source_path.display()))?; Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 04da85a24136..f0ae38580665 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -67,6 +67,7 @@ use postgres_connection::PgConnectionConfig; use postgres_ffi::to_pg_timestamp; use utils::{ completion, + generation::Generation, id::{TenantId, TimelineId}, lsn::{AtomicLsn, Lsn, RecordLsn}, seqwait::SeqWait, @@ -152,6 +153,10 @@ pub struct Timeline { pub tenant_id: TenantId, pub timeline_id: TimelineId, + /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects. + /// Never changes for the lifetime of this [`Timeline`] object. + generation: Generation, + pub pg_version: u32, /// The tuple has two elements. @@ -1199,7 +1204,7 @@ impl Timeline { Ok(delta) => Some(delta), }; - let layer_metadata = LayerFileMetadata::new(layer_file_size); + let layer_metadata = LayerFileMetadata::new(layer_file_size, self.generation); let new_remote_layer = Arc::new(match local_layer.filename() { LayerFileName::Image(image_name) => RemoteLayer::new_img( @@ -1377,6 +1382,7 @@ impl Timeline { ancestor: Option>, timeline_id: TimelineId, tenant_id: TenantId, + generation: Generation, walredo_mgr: Arc, resources: TimelineResources, pg_version: u32, @@ -1406,6 +1412,7 @@ impl Timeline { myself: myself.clone(), timeline_id, tenant_id, + generation, pg_version, layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())), wanted_image_layers: Mutex::new(None), @@ -1615,6 +1622,9 @@ impl Timeline { let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id); let span = tracing::Span::current(); + // Copy to move into the task we're about to spawn + let generation = self.generation; + let (loaded_layers, to_sync, total_physical_size) = tokio::task::spawn_blocking({ move || { let _g = span.entered(); @@ -1656,8 +1666,12 @@ impl Timeline { ); } - let decided = - init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn); + let decided = init::reconcile( + discovered_layers, + index_part.as_ref(), + disk_consistent_lsn, + generation, + ); let mut loaded_layers = Vec::new(); let mut needs_upload = Vec::new(); @@ -2669,7 +2683,7 @@ impl Timeline { ( HashMap::from([( layer.filename(), - LayerFileMetadata::new(layer.layer_desc().file_size), + LayerFileMetadata::new(layer.layer_desc().file_size, self.generation), )]), Some(layer), ) @@ -3065,7 +3079,10 @@ impl Timeline { .metadata() .with_context(|| format!("reading metadata of layer file {}", path.file_name()))?; - layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len())); + layer_paths_to_upload.insert( + path, + LayerFileMetadata::new(metadata.len(), self.generation), + ); self.metrics .resident_physical_size_gauge @@ -3740,7 +3757,7 @@ impl Timeline { if let Some(remote_client) = &self.remote_client { remote_client.schedule_layer_file_upload( &l.filename(), - &LayerFileMetadata::new(metadata.len()), + &LayerFileMetadata::new(metadata.len(), self.generation), )?; } @@ -3749,7 +3766,10 @@ impl Timeline { .resident_physical_size_gauge .add(metadata.len()); - new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); + new_layer_paths.insert( + new_delta_path, + LayerFileMetadata::new(metadata.len(), self.generation), + ); l.access_stats().record_residence_event( LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, diff --git a/pageserver/src/tenant/timeline/init.rs b/pageserver/src/tenant/timeline/init.rs index a270d96677af..33effb431878 100644 --- a/pageserver/src/tenant/timeline/init.rs +++ b/pageserver/src/tenant/timeline/init.rs @@ -7,6 +7,7 @@ use crate::{ index::{IndexPart, LayerFileMetadata}, }, storage_layer::LayerFileName, + Generation, }, METADATA_FILE_NAME, }; @@ -104,6 +105,7 @@ pub(super) fn reconcile( discovered: Vec<(LayerFileName, u64)>, index_part: Option<&IndexPart>, disk_consistent_lsn: Lsn, + generation: Generation, ) -> Vec<(LayerFileName, Result)> { use Decision::*; @@ -112,7 +114,15 @@ pub(super) fn reconcile( let mut discovered = discovered .into_iter() - .map(|(name, file_size)| (name, (Some(LayerFileMetadata::new(file_size)), None))) + .map(|(name, file_size)| { + ( + name, + // The generation here will be corrected to match IndexPart in the merge below, unless + // it is not in IndexPart, in which case using our current generation makes sense + // because it will be uploaded in this generation. + (Some(LayerFileMetadata::new(file_size, generation)), None), + ) + }) .collect::(); // merge any index_part information, when available @@ -137,7 +147,11 @@ pub(super) fn reconcile( Err(FutureLayer { local }) } else { Ok(match (local, remote) { - (Some(local), Some(remote)) if local != remote => UseRemote { local, remote }, + (Some(local), Some(remote)) if local != remote => { + assert_eq!(local.generation, remote.generation); + + UseRemote { local, remote } + } (Some(x), Some(_)) => UseLocal(x), (None, Some(x)) => Evicted(x), (Some(x), None) => NeedsUpload(x), diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 6026825b0d03..28822335b098 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -1,6 +1,7 @@ use crate::metrics::RemoteOpFileKind; use super::storage_layer::LayerFileName; +use super::Generation; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; @@ -205,6 +206,7 @@ pub(crate) struct Delete { pub(crate) file_kind: RemoteOpFileKind, pub(crate) layer_file_name: LayerFileName, pub(crate) scheduled_from_timeline_delete: bool, + pub(crate) generation: Generation, } #[derive(Debug)] @@ -228,17 +230,21 @@ impl std::fmt::Display for UploadOp { UploadOp::UploadLayer(path, metadata) => { write!( f, - "UploadLayer({}, size={:?})", + "UploadLayer({}, size={:?}, gen={:?})", path.file_name(), - metadata.file_size() + metadata.file_size(), + metadata.generation, ) } - UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn), + UploadOp::UploadMetadata(_, lsn) => { + write!(f, "UploadMetadata(lsn: {})", lsn) + } UploadOp::Delete(delete) => write!( f, - "Delete(path: {}, scheduled_from_timeline_delete: {})", + "Delete(path: {}, scheduled_from_timeline_delete: {}, gen: {:?})", delete.layer_file_name.file_name(), - delete.scheduled_from_timeline_delete + delete.scheduled_from_timeline_delete, + delete.generation ), UploadOp::Barrier(_) => write!(f, "Barrier"), }