Skip to content

Commit

Permalink
refactor: Move Catalog into influxdb3_catalog crate (#25210)
Browse files Browse the repository at this point in the history
* refactor: Move Catalog into influxdb3_catalog crate

This moves the catalog and its serialization logic into its own crate. This is a precursor to recording more catalog modifications into the WAL.

Fixes #25204

* fix: cargo update

* fix: add version = 2 to deny.toml

* fix: update deny.toml

* fix: add CCO to deny.toml
  • Loading branch information
pauldix authored Aug 2, 2024
1 parent 3265960 commit 2b8fc7b
Show file tree
Hide file tree
Showing 22 changed files with 291 additions and 205 deletions.
213 changes: 132 additions & 81 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# In alphabetical order
members = [
"influxdb3",
"influxdb3_catalog",
"influxdb3_client",
"influxdb3_load_generator",
"influxdb3_process",
Expand Down
18 changes: 11 additions & 7 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@
#  https://embarkstudios.github.io/cargo-deny/index.html

[advisories]
vulnerability = "deny"
yanked = "deny"
unmaintained = "warn"
notice = "warn"
ignore = [
]
git-fetch-with-cli = true

[licenses]
allow-osi-fsf-free = "either"
copyleft = "deny"
unlicensed = "deny"
default = "deny"
allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"BSD-4-Clause",
"CC0-1.0",
"ISC",
"MIT",
]

exceptions = [
# We should probably NOT bundle CA certs but use the OS ones.
{ name = "webpki-roots", allow = ["MPL-2.0"] },
{ allow = ["Unicode-DFS-2016"], crate = "unicode-ident" },
{ allow = ["OpenSSL"], crate = "ring" },
]

[[licenses.clarify]]
Expand Down
31 changes: 31 additions & 0 deletions influxdb3_catalog/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "influxdb3_catalog"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true

[dependencies]
# Core Crates
influxdb-line-protocol.workspace = true
observability_deps.workspace = true
schema.workspace = true

# Local deps
influxdb3_wal = { path = "../influxdb3_wal" }

# crates.io dependencies
arrow.workspace = true
parking_lot.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
thiserror.workspace = true

[dev-dependencies]
insta.workspace = true
pretty_assertions.workspace = true
test_helpers.workspace = true

[lints]
workspace = true
92 changes: 28 additions & 64 deletions influxdb3_write/src/catalog.rs → influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Implementation of the Catalog that sits entirely in memory.
use crate::last_cache::LastCache;
use influxdb3_wal::{CatalogBatch, CatalogOp};
use influxdb_line_protocol::FieldValue;
use observability_deps::tracing::info;
Expand All @@ -11,9 +10,7 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use thiserror::Error;

mod serialize;

#[derive(Debug, Error)]
#[derive(Debug, Error, Clone, Copy)]
pub enum Error {
#[error("catalog updated elsewhere")]
CatalogUpdatedElsewhere,
Expand Down Expand Up @@ -110,7 +107,7 @@ impl Catalog {
self.inner.write().apply_catalog_batch(catalog_batch)
}

pub(crate) fn db_or_create(&self, db_name: &str) -> Result<Arc<DatabaseSchema>> {
pub fn db_or_create(&self, db_name: &str) -> Result<Arc<DatabaseSchema>> {
let db = self.inner.read().databases.get(db_name).cloned();

let db = match db {
Expand All @@ -124,7 +121,9 @@ impl Catalog {

info!("return new db {}", db_name);
let db = Arc::new(DatabaseSchema::new(db_name.into()));
inner.databases.insert(db.name.clone(), Arc::clone(&db));
inner
.databases
.insert(Arc::clone(&db.name), Arc::clone(&db));
inner.sequence = inner.sequence.next();
inner.updated = true;
db
Expand Down Expand Up @@ -190,12 +189,11 @@ impl Catalog {
self.inner.read().db_exists(db_name)
}

#[cfg(test)]
pub(crate) fn insert_database(&mut self, db: DatabaseSchema) {
self.inner
.write()
.databases
.insert(Arc::clone(&db.name), Arc::new(db));
pub fn insert_database(&mut self, db: DatabaseSchema) {
let mut inner = self.inner.write();
inner.databases.insert(Arc::clone(&db.name), Arc::new(db));
inner.sequence = inner.sequence.next();
inner.updated = true;
}
}

Expand Down Expand Up @@ -266,12 +264,10 @@ impl InnerCatalog {
Ok(())
}

#[cfg(test)]
pub(crate) fn databases(&self) -> impl Iterator<Item = &Arc<DatabaseSchema>> {
pub fn databases(&self) -> impl Iterator<Item = &Arc<DatabaseSchema>> {
self.databases.values()
}

#[cfg(test)]
pub fn db_exists(&self, db_name: &str) -> bool {
self.databases.contains_key(db_name)
}
Expand All @@ -283,7 +279,7 @@ pub struct DatabaseSchema {
pub name: Arc<str>,
/// The database is a map of tables
#[serde_as(as = "serde_with::MapPreventDuplicates<_, _>")]
pub(crate) tables: BTreeMap<Arc<str>, TableDefinition>,
pub tables: BTreeMap<Arc<str>, TableDefinition>,
}

impl DatabaseSchema {
Expand Down Expand Up @@ -334,7 +330,7 @@ impl DatabaseSchema {
columns,
<Option<Vec<String>>>::None,
)?;
tables.insert(field_additions.table_name.clone(), table);
tables.insert(Arc::clone(&field_additions.table_name), table);
}
}
}
Expand Down Expand Up @@ -366,8 +362,7 @@ impl DatabaseSchema {
self.tables.contains_key(table_name)
}

#[cfg(test)]
pub(crate) fn tables(&self) -> impl Iterator<Item = &TableDefinition> {
pub fn tables(&self) -> impl Iterator<Item = &TableDefinition> {
self.tables.values()
}
}
Expand All @@ -383,7 +378,7 @@ impl TableDefinition {
/// Create a new [`TableDefinition`]
///
/// Ensures the provided columns will be ordered before constructing the schema.
pub(crate) fn new<CN: AsRef<str>>(
pub fn new<CN: AsRef<str>>(
name: Arc<str>,
columns: impl AsRef<[(CN, InfluxColumnType)]>,
series_key: Option<impl IntoIterator<Item: AsRef<str>>>,
Expand Down Expand Up @@ -416,7 +411,7 @@ impl TableDefinition {
}

/// Create a new table definition from a catalog op
pub(crate) fn new_from_op(table_definition: &influxdb3_wal::TableDefinition) -> Self {
pub fn new_from_op(table_definition: &influxdb3_wal::TableDefinition) -> Self {
let mut columns = Vec::new();
for field_def in &table_definition.field_definitions {
columns.push((field_def.name.as_ref(), field_def.data_type.into()));
Expand All @@ -430,14 +425,14 @@ impl TableDefinition {
}

/// Check if the column exists in the [`TableDefinition`]s schema
pub(crate) fn column_exists(&self, column: &str) -> bool {
pub fn column_exists(&self, column: &str) -> bool {
self.schema.find_index_of(column).is_some()
}

/// Add the columns to this [`TableDefinition`]
///
/// This ensures that the resulting schema has its columns ordered
pub(crate) fn add_columns(&mut self, columns: Vec<(String, InfluxColumnType)>) -> Result<()> {
pub fn add_columns(&mut self, columns: Vec<(String, InfluxColumnType)>) -> Result<()> {
// Use BTree to insert existing and new columns, and use that to generate the
// resulting schema, to ensure column order is consistent:
let mut cols = BTreeMap::new();
Expand Down Expand Up @@ -466,7 +461,7 @@ impl TableDefinition {
Ok(())
}

pub(crate) fn index_columns(&self) -> Vec<&str> {
pub fn index_columns(&self) -> Vec<&str> {
self.schema
.iter()
.filter_map(|(col_type, field)| match col_type {
Expand All @@ -476,36 +471,34 @@ impl TableDefinition {
.collect()
}

pub(crate) fn schema(&self) -> &Schema {
pub fn schema(&self) -> &Schema {
&self.schema
}

#[cfg(test)]
pub(crate) fn num_columns(&self) -> usize {
pub fn num_columns(&self) -> usize {
self.schema.len()
}

pub(crate) fn field_type_by_name(&self, name: &str) -> Option<InfluxColumnType> {
pub fn field_type_by_name(&self, name: &str) -> Option<InfluxColumnType> {
self.schema.field_type_by_name(name)
}

pub(crate) fn is_v3(&self) -> bool {
pub fn is_v3(&self) -> bool {
self.schema.series_key().is_some()
}

/// Add a new last cache to this table definition
pub(crate) fn add_last_cache(&mut self, last_cache: LastCacheDefinition) {
pub fn add_last_cache(&mut self, last_cache: LastCacheDefinition) {
self.last_caches
.insert(last_cache.name.to_string(), last_cache);
}

/// Remove a last cache from the table definition
pub(crate) fn remove_last_cache(&mut self, name: &str) {
pub fn remove_last_cache(&mut self, name: &str) {
self.last_caches.remove(name);
}

#[cfg(test)]
pub(crate) fn last_caches(&self) -> impl Iterator<Item = (&String, &LastCacheDefinition)> {
pub fn last_caches(&self) -> impl Iterator<Item = (&String, &LastCacheDefinition)> {
self.last_caches.iter()
}
}
Expand All @@ -529,8 +522,7 @@ pub struct LastCacheDefinition {

impl LastCacheDefinition {
/// Create a new [`LastCacheDefinition`] with explicit value columns
#[cfg(test)]
pub(crate) fn new_with_explicit_value_columns(
pub fn new_with_explicit_value_columns(
table: impl Into<String>,
name: impl Into<String>,
key_columns: impl IntoIterator<Item: Into<String>>,
Expand All @@ -551,8 +543,7 @@ impl LastCacheDefinition {
}

/// Create a new [`LastCacheDefinition`] with explicit value columns
#[cfg(test)]
pub(crate) fn new_all_non_key_value_columns(
pub fn new_all_non_key_value_columns(
table: impl Into<String>,
name: impl Into<String>,
key_columns: impl IntoIterator<Item: Into<String>>,
Expand All @@ -568,33 +559,6 @@ impl LastCacheDefinition {
ttl,
})
}

pub(crate) fn from_cache(
table: impl Into<String>,
name: impl Into<String>,
cache: &LastCache,
) -> Self {
Self {
table: table.into(),
name: name.into(),
key_columns: cache.key_columns.iter().cloned().collect(),
value_columns: if cache.accept_new_fields {
LastCacheValueColumnsDef::AllNonKeyColumns
} else {
LastCacheValueColumnsDef::Explicit {
columns: cache
.schema
.fields()
.iter()
.filter(|f| !cache.key_columns.contains(f.name()))
.map(|f| f.name().to_owned())
.collect(),
}
},
count: cache.count,
ttl: cache.ttl.as_secs(),
}
}
}

/// A last cache will either store values for an explicit set of columns, or will accept all
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod catalog;
pub(crate) mod serialize;
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use schema::{InfluxColumnType, SchemaBuilder};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

use super::{LastCacheDefinition, LastCacheValueColumnsDef, TableDefinition};
use crate::catalog::{LastCacheDefinition, LastCacheValueColumnsDef, TableDefinition};

impl Serialize for TableDefinition {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -172,34 +172,34 @@ impl<'a> From<&'a ArrowDataType> for DataType<'a> {
ArrowDataType::Float32 => Self::F32,
ArrowDataType::Float64 => Self::F64,
ArrowDataType::Timestamp(unit, tz) => Self::Time((*unit).into(), tz.as_deref()),
ArrowDataType::Date32 => todo!(),
ArrowDataType::Date64 => todo!(),
ArrowDataType::Time32(_) => todo!(),
ArrowDataType::Time64(_) => todo!(),
ArrowDataType::Duration(_) => todo!(),
ArrowDataType::Interval(_) => todo!(),
ArrowDataType::Date32 => unimplemented!(),
ArrowDataType::Date64 => unimplemented!(),
ArrowDataType::Time32(_) => unimplemented!(),
ArrowDataType::Time64(_) => unimplemented!(),
ArrowDataType::Duration(_) => unimplemented!(),
ArrowDataType::Interval(_) => unimplemented!(),
ArrowDataType::Binary => Self::Bin,
ArrowDataType::FixedSizeBinary(_) => todo!(),
ArrowDataType::FixedSizeBinary(_) => unimplemented!(),
ArrowDataType::LargeBinary => Self::BigBin,
ArrowDataType::BinaryView => Self::BinView,
ArrowDataType::Utf8 => Self::Str,
ArrowDataType::LargeUtf8 => Self::BigStr,
ArrowDataType::Utf8View => Self::StrView,
ArrowDataType::List(_) => todo!(),
ArrowDataType::ListView(_) => todo!(),
ArrowDataType::FixedSizeList(_, _) => todo!(),
ArrowDataType::LargeList(_) => todo!(),
ArrowDataType::LargeListView(_) => todo!(),
ArrowDataType::Struct(_) => todo!(),
ArrowDataType::Union(_, _) => todo!(),
ArrowDataType::List(_) => unimplemented!(),
ArrowDataType::ListView(_) => unimplemented!(),
ArrowDataType::FixedSizeList(_, _) => unimplemented!(),
ArrowDataType::LargeList(_) => unimplemented!(),
ArrowDataType::LargeListView(_) => unimplemented!(),
ArrowDataType::Struct(_) => unimplemented!(),
ArrowDataType::Union(_, _) => unimplemented!(),
ArrowDataType::Dictionary(key_type, val_type) => Self::Dict(
Box::new(key_type.as_ref().into()),
Box::new(val_type.as_ref().into()),
),
ArrowDataType::Decimal128(_, _) => todo!(),
ArrowDataType::Decimal256(_, _) => todo!(),
ArrowDataType::Map(_, _) => todo!(),
ArrowDataType::RunEndEncoded(_, _) => todo!(),
ArrowDataType::Decimal128(_, _) => unimplemented!(),
ArrowDataType::Decimal256(_, _) => unimplemented!(),
ArrowDataType::Map(_, _) => unimplemented!(),
ArrowDataType::RunEndEncoded(_, _) => unimplemented!(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
source: influxdb3_write/src/catalog.rs
source: influxdb3_catalog/src/catalog.rs
assertion_line: 710
expression: catalog
---
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
source: influxdb3_write/src/catalog.rs
source: influxdb3_catalog/src/catalog.rs
assertion_line: 903
expression: catalog
---
{
Expand Down
Loading

0 comments on commit 2b8fc7b

Please sign in to comment.