Skip to content

Commit

Permalink
refactor: change table info cache to table schema cache
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Dec 3, 2024
1 parent 4f2f2f4 commit 5b08d74
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 41 deletions.
26 changes: 15 additions & 11 deletions src/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use std::time::Duration;
use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
new_schema_cache, new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_view_info_cache, CacheRegistry, CacheRegistryBuilder,
LayeredCacheRegistryBuilder,
new_table_route_cache, new_table_schema_cache, new_view_info_cache, CacheRegistry,
CacheRegistryBuilder, LayeredCacheRegistryBuilder,
};
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;
Expand All @@ -38,20 +38,18 @@ pub const VIEW_INFO_CACHE_NAME: &str = "view_info_cache";
pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
pub const TABLE_CACHE_NAME: &str = "table_cache";
pub const SCHEMA_CACHE_NAME: &str = "schema_cache";
pub const TABLE_SCHEMA_NAME_CACHE_NAME: &str = "table_schema_name_cache";
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache";

/// Builds cache registry for datanode, including:
/// - Table info cache
/// - Schema cache.
/// - Table id to schema name cache.
pub fn build_datanode_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
// Builds table info cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_info_cache = Arc::new(new_table_info_cache(
TABLE_INFO_CACHE_NAME.to_string(),
// Builds table id schema name cache that never expires.
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build();
let table_id_schema_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
Expand All @@ -68,7 +66,7 @@ pub fn build_datanode_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry
));

CacheRegistryBuilder::default()
.add_cache(table_info_cache)
.add_cache(table_id_schema_cache)
.add_cache(schema_cache)
.build()
}
Expand Down Expand Up @@ -146,13 +144,19 @@ pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegist
kv_backend.clone(),
));

let table_schema_name_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build(),
kv_backend,
));
CacheRegistryBuilder::default()
.add_cache(table_info_cache)
.add_cache(table_name_cache)
.add_cache(table_route_cache)
.add_cache(view_info_cache)
.add_cache(table_flownode_set_cache)
.add_cache(schema_cache)
.add_cache(table_schema_name_cache)
.build()
}

Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use registry::{
};
pub use table::{
new_schema_cache, new_table_info_cache, new_table_name_cache, new_table_route_cache,
new_view_info_cache, SchemaCache, SchemaCacheRef, TableInfoCache, TableInfoCacheRef,
TableNameCache, TableNameCacheRef, TableRoute, TableRouteCache, TableRouteCacheRef,
ViewInfoCache, ViewInfoCacheRef,
new_table_schema_cache, new_view_info_cache, SchemaCache, SchemaCacheRef, TableInfoCache,
TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute, TableRouteCache,
TableRouteCacheRef, TableSchemaCache, TableSchemaCacheRef, ViewInfoCache, ViewInfoCacheRef,
};
2 changes: 2 additions & 0 deletions src/common/meta/src/cache/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ mod schema;
mod table_info;
mod table_name;
mod table_route;
mod table_schema;
mod view_info;

pub use schema::{new_schema_cache, SchemaCache, SchemaCacheRef};
pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef};
pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef};
pub use table_route::{new_table_route_cache, TableRoute, TableRouteCache, TableRouteCacheRef};
pub use table_schema::{new_table_schema_cache, TableSchemaCache, TableSchemaCacheRef};
pub use view_info::{new_view_info_cache, ViewInfoCache, ViewInfoCacheRef};
76 changes: 76 additions & 0 deletions src/common/meta/src/cache/table/table_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Cache for table id to schema name mapping.
use std::sync::Arc;

use futures_util::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use store_api::storage::TableId;

use crate::cache::{CacheContainer, Initializer};
use crate::error;
use crate::instruction::CacheIdent;
use crate::key::schema_name::SchemaName;
use crate::key::table_info::TableInfoManager;
use crate::kv_backend::KvBackendRef;

pub type TableSchemaCache = CacheContainer<TableId, Arc<SchemaName>, CacheIdent>;
pub type TableSchemaCacheRef = Arc<TableSchemaCache>;

/// Constructs a [TableSchemaCache].
pub fn new_table_schema_cache(
name: String,
cache: Cache<TableId, Arc<SchemaName>>,
kv_backend: KvBackendRef,
) -> TableSchemaCache {
let table_info_manager = TableInfoManager::new(kv_backend);
let init = init_factory(table_info_manager);

CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}

fn init_factory(table_info_manager: TableInfoManager) -> Initializer<TableId, Arc<SchemaName>> {
Arc::new(move |table_id| {
let table_info_manager = table_info_manager.clone();
Box::pin(async move {
let raw_table_info = table_info_manager
.get(*table_id)
.await?
.context(error::ValueNotExistSnafu)?
.into_inner()
.table_info;

Ok(Some(Arc::new(SchemaName {
catalog_name: raw_table_info.catalog_name,
schema_name: raw_table_info.schema_name,
})))
})
})
}

/// Never invalidates table id schema cache.
fn invalidator<'a>(
_cache: &'a Cache<TableId, Arc<SchemaName>>,
_ident: &'a CacheIdent,
) -> BoxFuture<'a, error::Result<()>> {
Box::pin(std::future::ready(Ok(())))
}

/// Never invalidates table id schema cache.
fn filter(_ident: &CacheIdent) -> bool {
false
}
7 changes: 7 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to connect to Postgres"))]
FetchTableSchemaCache {
#[snafu(source)]
error: Arc<Error>,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -782,6 +788,7 @@ impl ErrorExt for Error {
#[cfg(feature = "pg_kvbackend")]
ConnectPostgres { .. } => StatusCode::Internal,
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
FetchTableSchemaCache { .. } => StatusCode::Internal,
}
}

Expand Down
34 changes: 14 additions & 20 deletions src/common/meta/src/key/schema_metadata_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ use std::sync::Arc;
use snafu::OptionExt;
use store_api::storage::TableId;

use crate::cache::{SchemaCacheRef, TableInfoCacheRef};
use crate::cache::{SchemaCacheRef, TableSchemaCacheRef};
use crate::error::TableInfoNotFoundSnafu;
use crate::key::schema_name::SchemaName;
use crate::{error, SchemaOptions};

pub type SchemaMetadataManagerRef = Arc<SchemaMetadataManager>;

pub struct SchemaMetadataManager {
table_cache: TableInfoCacheRef,
table_id_schema_cache: TableSchemaCacheRef,
schema_cache: SchemaCacheRef,
#[cfg(any(test, feature = "testing"))]
kv_backend: crate::kv_backend::KvBackendRef,
Expand All @@ -36,9 +35,9 @@ pub struct SchemaMetadataManager {
impl SchemaMetadataManager {
/// Creates a new database meta
#[cfg(not(any(test, feature = "testing")))]
pub fn new(table_cache: TableInfoCacheRef, schema_cache: SchemaCacheRef) -> Self {
pub fn new(table_schema_cache: TableSchemaCacheRef, schema_cache: SchemaCacheRef) -> Self {
Self {
table_cache,
table_id_schema_cache,
schema_cache,
}
}
Expand All @@ -47,11 +46,11 @@ impl SchemaMetadataManager {
#[cfg(any(test, feature = "testing"))]
pub fn new(
kv_backend: crate::kv_backend::KvBackendRef,
table_cache: TableInfoCacheRef,
table_id_schema_cache: TableSchemaCacheRef,
schema_cache: SchemaCacheRef,
) -> Self {
Self {
table_cache,
table_id_schema_cache,
schema_cache,
kv_backend,
}
Expand All @@ -62,20 +61,15 @@ impl SchemaMetadataManager {
&self,
table_id: TableId,
) -> error::Result<Option<Arc<SchemaOptions>>> {
let table_info =
self.table_cache
.get(table_id)
.await?
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id: {}", table_id),
})?;
let schema_name = self
.table_id_schema_cache
.get(table_id)
.await?
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id: {}", table_id),
})?;

self.schema_cache
.get_by_ref(&SchemaName {
catalog_name: table_info.catalog_name.clone(),
schema_name: table_info.schema_name.clone(),
})
.await
self.schema_cache.get_by_ref(&schema_name).await
}

#[cfg(any(test, feature = "testing"))]
Expand Down
7 changes: 4 additions & 3 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableInfoCacheRef};
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
Expand Down Expand Up @@ -219,12 +219,13 @@ impl DatanodeBuilder {
};

let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
let table_cache: TableInfoCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
let table_id_schema_cache: TableSchemaCacheRef =
cache_registry.get().context(MissingCacheSnafu)?;

let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
kv_backend.clone(),
table_cache,
table_id_schema_cache,
schema_cache,
));
let region_server = self
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_datasource::compression::CompressionType;
use common_meta::cache::{new_schema_cache, new_table_info_cache};
use common_meta::cache::{new_schema_cache, new_table_info_cache, new_table_schema_cache};
use common_meta::key::schema_name::{SchemaName, SchemaNameValue};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
Expand Down Expand Up @@ -1154,8 +1154,8 @@ pub async fn reopen_region(

pub(crate) fn mock_schema_metadata_manager() -> Arc<SchemaMetadataManager> {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_cache = Arc::new(new_table_info_cache(
"table_info_cache".to_string(),
let table_schema_cache = Arc::new(new_table_schema_cache(
"table_schema_name_cache".to_string(),
CacheBuilder::default().build(),
kv_backend.clone(),
));
Expand All @@ -1166,7 +1166,7 @@ pub(crate) fn mock_schema_metadata_manager() -> Arc<SchemaMetadataManager> {
));
Arc::new(SchemaMetadataManager::new(
kv_backend as KvBackendRef,
table_cache,
table_schema_cache,
schema_cache,
))
}

0 comments on commit 5b08d74

Please sign in to comment.