Skip to content

Commit

Permalink
feat: use cache kv manager for SchemaMetadataManager (#5053)
Browse files Browse the repository at this point in the history
* feat: add cache for schema options

* fix/use-cache-kv-manager: Add cache invalidation handling to Datanode's heartbeat task

 • Implement InvalidateSchemaCacheHandler in heartbeat.rs to handle cache invalidation instructions.
 • Update HeartbeatTask constructor to accept cached_kv_backend and pass it to InvalidateSchemaCacheHandler.
 • Modify DatanodeBuilder to clone cached_kv_backend when creating schema_metadata_manager.
 • Refactor MetasrvCacheInvalidator in cache_invalidator.rs to reuse MailboxMessage for broadcasting to different channels.

* fix: only remove schema related cache entries

* chore: add more tests

* fix/use-cache-kv-manager: Moved InvalidateSchemaCacheHandler to a separate module

 • Extracted InvalidateSchemaCacheHandler and associated tests into a new file cache_invalidator.rs
 • Removed async_trait and CacheInvalidator related code from heartbeat.rs
 • Added cache_invalidator module declaration in handler.rs

* fix: unit tests

* fix/use-cache-kv-manager:
 Standardize TODO comment format in CachedKvBackend txn method

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

---------

Co-authored-by: jeremyhi <[email protected]>
  • Loading branch information
v0y4g3r and fengjiachun authored Nov 26, 2024
1 parent 6130c70 commit a617e0d
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};

mod client;
mod manager;
Expand Down
44 changes: 26 additions & 18 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
Expand All @@ -42,20 +43,20 @@ const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);

pub struct CachedMetaKvBackendBuilder {
pub struct CachedKvBackendBuilder {
cache_max_capacity: Option<u64>,
cache_ttl: Option<Duration>,
cache_tti: Option<Duration>,
meta_client: Arc<MetaClient>,
inner: KvBackendRef,
}

impl CachedMetaKvBackendBuilder {
pub fn new(meta_client: Arc<MetaClient>) -> Self {
impl CachedKvBackendBuilder {
pub fn new(inner: KvBackendRef) -> Self {
Self {
cache_max_capacity: None,
cache_ttl: None,
cache_tti: None,
meta_client,
inner,
}
}

Expand All @@ -74,7 +75,7 @@ impl CachedMetaKvBackendBuilder {
self
}

pub fn build(self) -> CachedMetaKvBackend {
pub fn build(self) -> CachedKvBackend {
let cache_max_capacity = self
.cache_max_capacity
.unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
Expand All @@ -85,14 +86,11 @@ impl CachedMetaKvBackendBuilder {
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build();

let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
});
let kv_backend = self.inner;
let name = format!("CachedKvBackend({})", kv_backend.name());
let version = AtomicUsize::new(0);

CachedMetaKvBackend {
CachedKvBackend {
kv_backend,
cache,
name,
Expand All @@ -112,19 +110,29 @@ pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related
/// information. Note: If you read other information, you may read expired data, which depends on
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
pub struct CachedKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackend,
name: String,
version: AtomicUsize,
}

impl TxnService for CachedMetaKvBackend {
#[async_trait::async_trait]
impl TxnService for CachedKvBackend {
type Error = Error;

async fn txn(&self, txn: Txn) -> std::result::Result<TxnResponse, Self::Error> {
// TODO(hl): txn of CachedKvBackend simply pass through to inner backend without invalidating caches.
self.kv_backend.txn(txn).await
}

fn max_txn_ops(&self) -> usize {
self.kv_backend.max_txn_ops()
}
}

#[async_trait::async_trait]
impl KvBackend for CachedMetaKvBackend {
impl KvBackend for CachedKvBackend {
fn name(&self) -> &str {
&self.name
}
Expand Down Expand Up @@ -305,15 +313,15 @@ impl KvBackend for CachedMetaKvBackend {
}

#[async_trait::async_trait]
impl KvCacheInvalidator for CachedMetaKvBackend {
impl KvCacheInvalidator for CachedKvBackend {
async fn invalidate_key(&self, key: &[u8]) {
self.create_new_version();
self.cache.invalidate(key).await;
debug!("invalidated cache key: {}", String::from_utf8_lossy(key));
}
}

impl CachedMetaKvBackend {
impl CachedKvBackend {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
Expand Down Expand Up @@ -466,7 +474,7 @@ mod tests {
use common_meta::rpc::KeyValue;
use dashmap::DashMap;

use super::CachedMetaKvBackend;
use super::CachedKvBackend;

#[derive(Default)]
pub struct SimpleKvBackend {
Expand Down Expand Up @@ -540,7 +548,7 @@ mod tests {
async fn test_cached_kv_backend() {
let simple_kv = Arc::new(SimpleKvBackend::default());
let get_execute_times = simple_kv.get_execute_times.clone();
let cached_kv = CachedMetaKvBackend::wrap(simple_kv);
let cached_kv = CachedKvBackend::wrap(simple_kv);

add_some_vals(&cached_kv).await;

Expand Down
8 changes: 5 additions & 3 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use cache::{
TABLE_ROUTE_CACHE_NAME,
};
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_config::Mode;
use common_error::ext::ErrorExt;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::kv_backend::KvBackendRef;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
Expand Down Expand Up @@ -258,8 +259,9 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
.context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);

let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let cached_meta_backend = Arc::new(
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
);
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())
Expand Down
13 changes: 7 additions & 6 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
Expand Down Expand Up @@ -246,11 +246,12 @@ impl StartCommand {
let cache_tti = meta_config.metadata_cache_tti;

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

// Builds cache registry
Expand Down
13 changes: 7 additions & 6 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Duration;

use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
Expand Down Expand Up @@ -293,11 +293,12 @@ impl StartCommand {
.context(MetaClientInitSnafu)?;

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

// Builds cache registry
Expand Down
16 changes: 14 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use catalog::kvbackend::CachedKvBackendBuilder;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
Expand Down Expand Up @@ -208,7 +209,10 @@ impl DatanodeBuilder {
(Box::new(NoopRegionServerEventListener) as _, None)
};

let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(kv_backend.clone()));
let cached_kv_backend = Arc::new(CachedKvBackendBuilder::new(kv_backend.clone()).build());

let schema_metadata_manager =
Arc::new(SchemaMetadataManager::new(cached_kv_backend.clone()));
let region_server = self
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
Expand Down Expand Up @@ -239,7 +243,15 @@ impl DatanodeBuilder {
}

let heartbeat_task = if let Some(meta_client) = meta_client {
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
Some(
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cached_kv_backend,
)
.await?,
)
} else {
None
};
Expand Down
4 changes: 4 additions & 0 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use catalog::kvbackend::CachedKvBackend;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
Expand All @@ -39,6 +40,7 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;

Expand Down Expand Up @@ -70,6 +72,7 @@ impl HeartbeatTask {
opts: &DatanodeOptions,
region_server: RegionServer,
meta_client: MetaClientRef,
cache_kv_backend: Arc<CachedKvBackend>,
) -> Result<Self> {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
Expand All @@ -79,6 +82,7 @@ impl HeartbeatTask {
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
Arc::new(InvalidateSchemaCacheHandler::new(cache_kv_backend)),
]));

Ok(Self {
Expand Down
13 changes: 7 additions & 6 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use futures::future::BoxFuture;
use snafu::OptionExt;
use store_api::storage::RegionId;

pub(crate) mod cache_invalidator;
mod close_region;
mod downgrade_region;
mod open_region;
Expand Down Expand Up @@ -134,7 +135,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
}
});

Ok(HandleControl::Done)
Ok(HandleControl::Continue)
}
}

Expand Down Expand Up @@ -285,7 +286,7 @@ mod tests {

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand Down Expand Up @@ -340,7 +341,7 @@ mod tests {

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand Down Expand Up @@ -373,7 +374,7 @@ mod tests {

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand Down Expand Up @@ -420,7 +421,7 @@ mod tests {

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand All @@ -442,7 +443,7 @@ mod tests {
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand Down
Loading

0 comments on commit a617e0d

Please sign in to comment.