Skip to content

Commit

Permalink
Merge pull request #184 from influxdata/hiltontj/last-cache-id-map-fa…
Browse files Browse the repository at this point in the history
…ilure

feat: catalog merge and WAL mapping failure on incompatible last cache
  • Loading branch information
hiltontj authored Nov 7, 2024
2 parents 8d64b31 + e728c1d commit c4dc302
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 104 deletions.
246 changes: 143 additions & 103 deletions influxdb3_catalog/src/catalog/pro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,22 @@ impl TableDefinition {
}

// merge in any new last cache definitions
// TODO: need to validate the configuration of last caches for compatability here
let mut new_last_caches: Vec<LastCacheDefinition> = vec![];
for (merge_name, merge_last_cache) in other.last_caches() {
if !self.last_caches.contains_key(&merge_name) {
new_last_caches.push(map_last_cache_definition_column_ids(
merge_last_cache,
&id_map,
)?);
if let Some(local_last_cache) = self.last_caches.get(&merge_name) {
let mapped_other_last_cache_def =
id_map.map_last_cache_definition_column_ids(merge_last_cache)?;
if local_last_cache != &mapped_other_last_cache_def {
return Err(
Error::Other(
anyhow!("the last cache definition from the other host does not match the local one.\n\
local: {local_last_cache:#?}\n\
other: {mapped_other_last_cache_def:#?}")
));
}
} else {
new_last_caches
.push(id_map.map_last_cache_definition_column_ids(merge_last_cache)?);
}
}

Expand Down Expand Up @@ -241,7 +249,7 @@ impl TableDefinition {
for (name, lc) in &other.last_caches {
last_caches.insert(
Arc::clone(name),
map_last_cache_definition_column_ids(lc, &id_map)?,
id_map.map_last_cache_definition_column_ids(lc)?,
);
}

Expand All @@ -260,55 +268,6 @@ impl TableDefinition {
}
}

/// Map all of the [`ColumnId`]s within a [`LastCacheDefinition`] from another host's catalog to
/// their respective IDs on the local catalog. This assumes that all columns have been mapped
/// already via the caller, and will throw errors if there are columns that cannot be found.
fn map_last_cache_definition_column_ids(
def: &LastCacheDefinition,
id_map: &CatalogIdMap,
) -> Result<LastCacheDefinition> {
let table_id =
id_map.tables.get(&def.table_id).copied().ok_or_else(|| {
Error::Other(anyhow!("last cache definition contained invalid table id"))
})?;
let key_columns = def
.key_columns
.iter()
.map(|id| {
id_map.map_column_id(id).ok_or_else(|| {
Error::Other(anyhow!(
"last cache definition contained invalid key column id"
))
})
})
.collect::<Result<Vec<ColumnId>>>()?;
let value_columns = match def.value_columns {
LastCacheValueColumnsDef::Explicit { ref columns } => {
let columns = columns
.iter()
.map(|id| {
id_map.map_column_id(id).ok_or_else(|| {
Error::Other(anyhow!(
"last cache definition contained invalid value column id"
))
})
})
.collect::<Result<Vec<ColumnId>>>()?;
LastCacheValueColumnsDef::Explicit { columns }
}
LastCacheValueColumnsDef::AllNonKeyColumns => LastCacheValueColumnsDef::AllNonKeyColumns,
};
Ok(LastCacheDefinition {
table_id,
table: Arc::clone(&def.table),
name: Arc::clone(&def.name),
key_columns,
value_columns,
count: def.count,
ttl: def.ttl,
})
}

impl ColumnDefinition {
/// Check for compatibility between two column definitions and produce a [`CatalogIdMap`] that
/// contains a mapping of the [`ColumnId`] from the `other` catalog to this one.
Expand Down Expand Up @@ -624,56 +583,38 @@ impl CatalogIdMap {
)?,
})
}

CatalogOp::CreateLastCache(def) => CatalogOp::CreateLastCache(LastCacheDefinition {
table_id: self.map_table_or_new(
target_catalog,
database_id,
&def.table,
def.table_id,
),
table: def.table,
name: def.name,
key_columns: def
.key_columns
.into_iter()
.map(|id| {
self.columns
.get(&id)
.copied()
.expect("create last cache operation contained invalid key column id")
})
.collect(),
value_columns: match def.value_columns {
LastCacheValueColumnsDef::Explicit { columns } => {
LastCacheValueColumnsDef::Explicit {
columns: columns
.into_iter()
.map(|id| {
self.columns.get(&id).copied().expect(
"create last cache operation contained invalid value column id",
)
})
.collect(),
}
}
LastCacheValueColumnsDef::AllNonKeyColumns => {
LastCacheValueColumnsDef::AllNonKeyColumns
// The following last cache ops will throw an error if they are for a table that does
// not exist. If such an op is encountered, that would indicate that the WAL is
// corrupted on the other host, since there should always be a CreateTable op preceding
// one of these last cache ops.
CatalogOp::CreateLastCache(def) => {
let mapped_def = self.map_last_cache_definition_column_ids(&def)?;
let tbl_def = target_catalog
.db_schema_by_id(database_id)
.and_then(|db| db.table_definition_by_id(mapped_def.table_id))
// this unwrap is okay as the call to map the last cache definition would
// catch a missing table:
.unwrap();
if let Some(local_def) = tbl_def.last_caches.get(&def.name) {
if local_def != &mapped_def {
return Err(Error::Other(
anyhow!("WAL contained a CreateLastCache operation with a last cache \
name that already exists in the local catalog, but is not compatible. \
This means that the catalogs for these two hosts have diverged and the \
last cache named '{name}' needs to be removed on one of the hosts.",
name = def.name
)));
}
},
count: def.count,
ttl: def.ttl,
}),
// TODO: if the table doesn't exist locally, do we need to bother with
// deleting it?
}
CatalogOp::CreateLastCache(self.map_last_cache_definition_column_ids(&def)?)
}
CatalogOp::DeleteLastCache(def) => CatalogOp::DeleteLastCache(LastCacheDelete {
table_name: Arc::clone(&def.table_name),
table_id: self.map_table_or_new(
target_catalog,
database_id,
&def.table_name,
def.table_id,
),
table_id: self.map_table_id(&def.table_id).ok_or_else(|| {
Error::Other(anyhow!(
"attempted to delete a last cache for a table that does not exist locally"
))
})?,
name: def.name,
}),
};
Expand Down Expand Up @@ -707,15 +648,67 @@ impl CatalogIdMap {
})
.collect()
}

/// Map all of the [`ColumnId`]s within a [`LastCacheDefinition`] from another host's catalog to
/// their respective IDs on the local catalog. This assumes that all columns have been mapped
/// already via the caller, and will throw errors if there are columns that cannot be found.
fn map_last_cache_definition_column_ids(
&self,
def: &LastCacheDefinition,
) -> Result<LastCacheDefinition> {
let table_id = self.tables.get(&def.table_id).copied().ok_or_else(|| {
Error::Other(anyhow!("last cache definition contained invalid table id"))
})?;
let key_columns = def
.key_columns
.iter()
.map(|id| {
self.map_column_id(id).ok_or_else(|| {
Error::Other(anyhow!(
"last cache definition contained invalid key column id"
))
})
})
.collect::<Result<Vec<ColumnId>>>()?;
let value_columns = match def.value_columns {
LastCacheValueColumnsDef::Explicit { ref columns } => {
let columns = columns
.iter()
.map(|id| {
self.map_column_id(id).ok_or_else(|| {
Error::Other(anyhow!(
"last cache definition contained invalid value column id"
))
})
})
.collect::<Result<Vec<ColumnId>>>()?;
LastCacheValueColumnsDef::Explicit { columns }
}
LastCacheValueColumnsDef::AllNonKeyColumns => {
LastCacheValueColumnsDef::AllNonKeyColumns
}
};
Ok(LastCacheDefinition {
table_id,
table: Arc::clone(&def.table),
name: Arc::clone(&def.name),
key_columns,
value_columns,
count: def.count,
ttl: def.ttl,
})
}
}

#[cfg(test)]
mod tests {
use std::{ops::Deref, sync::Arc};

use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_wal::{LastCacheDefinition, LastCacheSize, LastCacheValueColumnsDef};
use pretty_assertions::assert_eq;
use schema::{InfluxColumnType, InfluxFieldType};
use test_helpers::assert_contains;

use crate::catalog::{Catalog, DatabaseSchema, Error, TableDefinition};

Expand Down Expand Up @@ -1054,4 +1047,51 @@ mod tests {
insta::assert_json_snapshot!(db);
})
}

#[test]
fn merge_with_incompatible_last_cache() {
let a = create_catalog("a");
let b = create_catalog("b");
let cache_name = "test_cache";
// add a last cache to 'a':
{
let mut db = a.db_schema("foo").unwrap().deref().clone();
let mut tbl = db.table_definition("bar").unwrap().deref().clone();
tbl.add_last_cache(LastCacheDefinition {
table_id: tbl.table_id,
table: Arc::clone(&tbl.table_name),
name: cache_name.into(),
key_columns: vec![tbl.column_name_to_id_unchecked("t1".into())],
value_columns: LastCacheValueColumnsDef::AllNonKeyColumns,
count: LastCacheSize::new(1).unwrap(),
ttl: 3600,
});
db.insert_table(tbl.table_id, Arc::new(tbl));
a.insert_database(db);
}
// add a last cache to 'b' but with a different configuration:
{
let mut db = a.db_schema("foo").unwrap().deref().clone();
let mut tbl = db.table_definition("bar").unwrap().deref().clone();
tbl.add_last_cache(LastCacheDefinition {
table_id: tbl.table_id,
table: Arc::clone(&tbl.table_name),
name: cache_name.into(),
// this def has no key columns:
key_columns: vec![],
value_columns: LastCacheValueColumnsDef::AllNonKeyColumns,
count: LastCacheSize::new(1).unwrap(),
ttl: 3600,
});
db.insert_table(tbl.table_id, Arc::new(tbl));
b.insert_database(db);
}
let err = a
.merge(b)
.expect_err("merging incompatible last caches should fail");
assert_contains!(
err.to_string(),
"the last cache definition from the other host does not match the local one."
);
}
}
Loading

0 comments on commit c4dc302

Please sign in to comment.