Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use cache kv manager for SchemaMetadataManager #5053

Merged
merged 10 commits into from
Nov 26, 2024
2 changes: 1 addition & 1 deletion src/catalog/src/kvbackend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};

mod client;
mod manager;
Expand Down
44 changes: 26 additions & 18 deletions src/catalog/src/kvbackend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
Expand All @@ -42,20 +43,20 @@ const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);

pub struct CachedMetaKvBackendBuilder {
pub struct CachedKvBackendBuilder {
cache_max_capacity: Option<u64>,
cache_ttl: Option<Duration>,
cache_tti: Option<Duration>,
meta_client: Arc<MetaClient>,
inner: KvBackendRef,
}

impl CachedMetaKvBackendBuilder {
pub fn new(meta_client: Arc<MetaClient>) -> Self {
impl CachedKvBackendBuilder {
pub fn new(inner: KvBackendRef) -> Self {
Self {
cache_max_capacity: None,
cache_ttl: None,
cache_tti: None,
meta_client,
inner,
}
}

Expand All @@ -74,7 +75,7 @@ impl CachedMetaKvBackendBuilder {
self
}

pub fn build(self) -> CachedMetaKvBackend {
pub fn build(self) -> CachedKvBackend {
let cache_max_capacity = self
.cache_max_capacity
.unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
Expand All @@ -85,14 +86,11 @@ impl CachedMetaKvBackendBuilder {
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build();

let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
});
let kv_backend = self.inner;
let name = format!("CachedKvBackend({})", kv_backend.name());
let version = AtomicUsize::new(0);

CachedMetaKvBackend {
CachedKvBackend {
kv_backend,
cache,
name,
Expand All @@ -112,19 +110,29 @@ pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related
/// information. Note: If you read other information, you may read expired data, which depends on
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
pub struct CachedKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackend,
name: String,
version: AtomicUsize,
}

impl TxnService for CachedMetaKvBackend {
#[async_trait::async_trait]
impl TxnService for CachedKvBackend {
type Error = Error;

async fn txn(&self, txn: Txn) -> std::result::Result<TxnResponse, Self::Error> {
// TODO(hl): txn of CachedKvBackend simply pass through to inner backend without invalidating caches.
self.kv_backend.txn(txn).await
}

fn max_txn_ops(&self) -> usize {
self.kv_backend.max_txn_ops()
}
}

#[async_trait::async_trait]
impl KvBackend for CachedMetaKvBackend {
impl KvBackend for CachedKvBackend {
fn name(&self) -> &str {
&self.name
}
Expand Down Expand Up @@ -305,15 +313,15 @@ impl KvBackend for CachedMetaKvBackend {
}

#[async_trait::async_trait]
impl KvCacheInvalidator for CachedMetaKvBackend {
impl KvCacheInvalidator for CachedKvBackend {
async fn invalidate_key(&self, key: &[u8]) {
self.create_new_version();
self.cache.invalidate(key).await;
debug!("invalidated cache key: {}", String::from_utf8_lossy(key));
}
}

impl CachedMetaKvBackend {
impl CachedKvBackend {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
Expand Down Expand Up @@ -466,7 +474,7 @@ mod tests {
use common_meta::rpc::KeyValue;
use dashmap::DashMap;

use super::CachedMetaKvBackend;
use super::CachedKvBackend;

#[derive(Default)]
pub struct SimpleKvBackend {
Expand Down Expand Up @@ -540,7 +548,7 @@ mod tests {
async fn test_cached_kv_backend() {
let simple_kv = Arc::new(SimpleKvBackend::default());
let get_execute_times = simple_kv.get_execute_times.clone();
let cached_kv = CachedMetaKvBackend::wrap(simple_kv);
let cached_kv = CachedKvBackend::wrap(simple_kv);

add_some_vals(&cached_kv).await;

Expand Down
8 changes: 5 additions & 3 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use cache::{
TABLE_ROUTE_CACHE_NAME,
};
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_config::Mode;
use common_error::ext::ErrorExt;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::kv_backend::KvBackendRef;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
Expand Down Expand Up @@ -258,8 +259,9 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
.context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);

let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let cached_meta_backend = Arc::new(
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
);
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())
Expand Down
13 changes: 7 additions & 6 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
Expand Down Expand Up @@ -246,11 +246,12 @@ impl StartCommand {
let cache_tti = meta_config.metadata_cache_tti;

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

// Builds cache registry
Expand Down
13 changes: 7 additions & 6 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Duration;

use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
Expand Down Expand Up @@ -293,11 +293,12 @@ impl StartCommand {
.context(MetaClientInitSnafu)?;

// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

// Builds cache registry
Expand Down
16 changes: 14 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use catalog::kvbackend::CachedKvBackendBuilder;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
Expand Down Expand Up @@ -208,7 +209,10 @@ impl DatanodeBuilder {
(Box::new(NoopRegionServerEventListener) as _, None)
};

let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(kv_backend.clone()));
let cached_kv_backend = Arc::new(CachedKvBackendBuilder::new(kv_backend.clone()).build());

let schema_metadata_manager =
Arc::new(SchemaMetadataManager::new(cached_kv_backend.clone()));
let region_server = self
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
Expand Down Expand Up @@ -239,7 +243,15 @@ impl DatanodeBuilder {
}

let heartbeat_task = if let Some(meta_client) = meta_client {
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
Some(
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cached_kv_backend,
)
.await?,
)
} else {
None
};
Expand Down
4 changes: 4 additions & 0 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use catalog::kvbackend::CachedKvBackend;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
Expand All @@ -39,6 +40,7 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;

Expand Down Expand Up @@ -70,6 +72,7 @@ impl HeartbeatTask {
opts: &DatanodeOptions,
region_server: RegionServer,
meta_client: MetaClientRef,
cache_kv_backend: Arc<CachedKvBackend>,
) -> Result<Self> {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
Expand All @@ -79,6 +82,7 @@ impl HeartbeatTask {
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
Arc::new(InvalidateSchemaCacheHandler::new(cache_kv_backend)),
]));

Ok(Self {
Expand Down
13 changes: 7 additions & 6 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use futures::future::BoxFuture;
use snafu::OptionExt;
use store_api::storage::RegionId;

pub(crate) mod cache_invalidator;
mod close_region;
mod downgrade_region;
mod open_region;
Expand Down Expand Up @@ -134,7 +135,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
}
});

Ok(HandleControl::Done)
Ok(HandleControl::Continue)
}
}

Expand Down Expand Up @@ -285,7 +286,7 @@ mod tests {

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand Down Expand Up @@ -340,7 +341,7 @@ mod tests {

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand Down Expand Up @@ -373,7 +374,7 @@ mod tests {

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand Down Expand Up @@ -420,7 +421,7 @@ mod tests {

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand All @@ -442,7 +443,7 @@ mod tests {
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);

let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

Expand Down
Loading
Loading