Skip to content

Commit

Permalink
Merge pull request #273 from influxdata/hiltontj/replica-catalog-retry
Browse files Browse the repository at this point in the history
fix: load replica catalog in a loop for initial startup
  • Loading branch information
hiltontj authored Dec 11, 2024
2 parents 6871235 + 6cfce31 commit 505c590
Showing 1 changed file with 43 additions and 10 deletions.
53 changes: 43 additions & 10 deletions influxdb3_pro/buffer/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use iox_query::{
use iox_time::TimeProvider;
use metric::{Attributes, DurationHistogram, Registry};
use object_store::{path::Path, ObjectStore};
use observability_deps::tracing::{debug, error, info};
use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::RwLock;
use schema::Schema;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -320,6 +320,8 @@ struct ReplicatedBufferMetrics {
replica_ttbr: DurationHistogram,
}

const REPLICA_CATALOG_RETRY_INTERVAL_SECONDS: u64 = 1;

pub(crate) struct CreateReplicatedBufferArgs {
replica_order: i64,
object_store: Arc<dyn ObjectStore>,
Expand Down Expand Up @@ -355,15 +357,46 @@ impl ReplicatedBuffer {
.load_snapshots(N_SNAPSHOTS_TO_LOAD_ON_START)
.await
.context("failed to load snapshots for replicated host")?;
let catalog = persister.load_catalog()
.await
.with_context(|| format!("unable to load a catalog for host '{host_identifier_prefix}' from object store"))?
.map(|persisted| Arc::new(Catalog::from_inner(persisted)))
.with_context(|| format!("there was no catalog for host '{host_identifier_prefix}'"))?;
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
persisted_snapshots,
));
(catalog, persisted_files)
// Attempt to load the catalog for the replica in a retry loop. This is for the scenario
// where two hosts are started at the same time, and the catalog may not be persisted
// yet in the other replicated host when this code runs. So, the retry only happens when
// there are no catalogs found for the replica.
loop {
let catalog = match persister.load_catalog().await {
Ok(Some(persisted)) => Ok(Arc::new(Catalog::from_inner(persisted))),
Ok(None) => {
warn!(
from_host = host_identifier_prefix,
"there was no catalog for replicated host, this may mean that it has \
not been persisted to object store yet, or that the host name of the \
replica was not specified correctly, will retry in {} second(s)",
REPLICA_CATALOG_RETRY_INTERVAL_SECONDS
);
tokio::time::sleep(Duration::from_secs(
REPLICA_CATALOG_RETRY_INTERVAL_SECONDS,
))
.await;
continue;
}
Err(error) => {
error!(
from_host = host_identifier_prefix,
%error,
"error when attempting to load catalog from replicated host from \
object store, will retry"
);
Err(error)
}
}
.context(
"received error from object store when accessing catalog for replica, \
please see the logs",
)?;
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
persisted_snapshots,
));
break (catalog, persisted_files);
}
};
let host: Cow<'static, str> = Cow::from(host_identifier_prefix.clone());
let attributes = Attributes::from([("from_host", host)]);
Expand Down

0 comments on commit 505c590

Please sign in to comment.