diff --git a/Cargo.lock b/Cargo.lock index dd22b07f4d..6c76eaf94e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -492,6 +492,35 @@ dependencies = [ "xz2", ] +[[package]] +name = "async-io" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c374dda1ed3e7d8f0d9ba58715f924862c63eae6849c92d3a18e7fbde9e2794" +dependencies = [ + "async-lock", + "autocfg 1.1.0", + "concurrent-queue", + "futures-lite", + "libc", + "log", + "parking", + "polling", + "slab", + "socket2", + "waker-fn", + "windows-sys 0.42.0", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -956,6 +985,37 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "camino" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c530edf18f37068ac2d977409ed5cd50d53d73bc653c7647b48eb78976ac9ae2" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver 1.0.13", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -1041,6 +1101,7 @@ dependencies = [ "log", "logger", "meta_client", + "moka", "query_engine", "router", "serde", @@ -1347,6 +1408,15 @@ dependencies = [ "toml 0.7.3", ] +[[package]] +name = "concurrent-queue" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" +dependencies = [ + "crossbeam-utils 0.8.11", +] + [[package]] name = "const-random" version = "0.1.13" @@ -2089,6 +2159,21 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fail" version = "0.4.0" @@ -2344,6 +2429,21 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.25" @@ -3269,6 +3369,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "matchers" version = "0.0.1" @@ -3419,6 +3528,32 @@ dependencies = [ "windows-sys 0.36.1", ] +[[package]] +name = "moka" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b6446f16d504e3d575df79cabb11bfbe9f24b17e9562d964a815db7b28ae3ec" +dependencies = [ + "async-io", + "async-lock", + "crossbeam-channel 0.5.6", + "crossbeam-epoch 0.9.10", + "crossbeam-utils 0.8.11", + "futures-util", + "num_cpus", + "once_cell", + "parking_lot 0.12.1", + "quanta", + "rustc_version 0.4.0", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid 1.3.0", +] + [[package]] name = "multimap" version = "0.8.3" @@ -3951,6 +4086,12 @@ dependencies = [ "sha1 0.10.4", ] +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.11.2" @@ -4264,6 +4405,20 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polling" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22122d5ec4f9fe1b3916419b76be1e80bcb93f618d071d2edf841b137b2a2bd6" +dependencies = [ + "autocfg 1.1.0", + "cfg-if 1.0.0", + "libc", + "log", + "wepoll-ffi", + "windows-sys 0.42.0", +] + [[package]] name = "pprof" version = "0.10.1" @@ -4583,6 +4738,33 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9653c3ed92974e34c5a6e0a510864dab979760481714c172e0a34e437cb98804" +[[package]] +name = "pulldown-cmark" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + +[[package]] +name = "quanta" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" +dependencies = [ + "crossbeam-utils 0.8.11", + "libc", + "mach", + "once_cell", + "raw-cpuid", + "wasi 0.10.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "query_engine" version = "1.0.0" @@ -4855,6 +5037,15 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags", +] + [[package]] name = "rayon" version = "1.5.3" @@ -5047,8 +5238,10 @@ dependencies = [ "common_util", "log", "meta_client", + "moka", "serde", "snafu 0.6.10", + "tokio", "twox-hash", ] @@ -5281,6 +5474,9 @@ name = "semver" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93f6841e709003d68bb2deee8c343572bf446003ec20a583e76f7b15cebf3711" +dependencies = [ + "serde", +] [[package]] name = "semver-parser" @@ -5528,6 +5724,21 @@ dependencies = [ "serde", ] +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "skiplist" version = "1.0.0" @@ -5975,6 +6186,12 @@ dependencies = [ "snafu 0.6.10", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "take_mut" version = "0.2.2" @@ -6574,6 +6791,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "triomphe" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" + [[package]] name = "try-lock" version = "0.2.3" @@ -6766,6 +6989,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "wal" version = "1.0.0" @@ -6957,6 +7186,15 @@ dependencies = [ "webpki", ] +[[package]] +name = "wepoll-ffi" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" +dependencies = [ + "cc", +] + [[package]] name = "which" version = "4.2.5" diff --git a/Cargo.toml b/Cargo.toml index c273f762d6..5fa9e05a3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -137,6 +137,7 @@ interpreters = { workspace = true } log = { workspace = true } logger = { workspace = true } meta_client = { workspace = true } +moka = { version = "0.10", features = ["future"] } query_engine = { workspace = true } router = { workspace = true } serde = { workspace = true } diff --git a/router/Cargo.toml b/router/Cargo.toml index 535e6bd57a..4df5a99ca7 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -18,6 +18,8 @@ common_types = { workspace = true } common_util = { workspace = true } log = { workspace = true } meta_client = { workspace = true } +moka = { version = "0.10", features = ["future"] } serde = { workspace = true } snafu = { workspace = true } +tokio = { workspace = true } twox-hash = "1.6" diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index 09ad461a76..3541ce8de7 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -7,17 +7,52 @@ use ceresdbproto::storage::{Route, RouteRequest}; use cluster::ClusterRef; use common_util::error::BoxError; use meta_client::types::RouteTablesRequest; +use moka::future::Cache; use snafu::ResultExt; -use crate::{endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, Router}; +use crate::{endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, RouteCacheConfig, Router}; pub struct ClusterBasedRouter { cluster: ClusterRef, + cache: Option>, } impl ClusterBasedRouter { - pub fn new(cluster: ClusterRef) -> Self { - Self { cluster } + pub fn new(cluster: ClusterRef, cache_config: RouteCacheConfig) -> Self { + let cache = if cache_config.enable { + Some( + Cache::builder() + .time_to_live(cache_config.ttl.0) + .time_to_idle(cache_config.tti.0) + .max_capacity(cache_config.capacity) + .build(), + ) + } else { + None + }; + + Self { cluster, cache } + } + + /// route table from local cache, return cache routes and tables which are + /// not in cache + fn route_from_cache(&self, tables: Vec) -> (Vec, Vec) { + let mut routes = vec![]; + let mut miss = vec![]; + + if let Some(cache) = &self.cache { + for table in tables { + if let Some(route) = cache.get(&table) { + routes.push(route.clone()); + } else { + miss.push(table.clone()); + } + } + } else { + miss = tables; + } + + (routes, miss) } } @@ -35,10 +70,19 @@ fn make_route(table_name: &str, endpoint: &str) -> Result { impl Router for ClusterBasedRouter { async fn route(&self, req: RouteRequest) -> Result> { let req_ctx = req.context.unwrap(); + + // Firstly route table from local cache. + let (mut routes, miss) = self.route_from_cache(req.tables); + + if miss.is_empty() { + return Ok(routes); + } + let route_tables_req = RouteTablesRequest { schema_name: req_ctx.database, - table_names: req.tables, + table_names: miss, }; + let route_resp = self .cluster .route_tables(&route_tables_req) @@ -48,18 +92,170 @@ impl Router for ClusterBasedRouter { msg: format!("Failed to route tables by cluster, req:{route_tables_req:?}"), })?; - let mut routes = Vec::with_capacity(route_resp.entries.len()); - // Now we pick up the nodes who own the leader shard for the route response. for (table_name, route_entry) in route_resp.entries { for node_shard in route_entry.node_shards { if node_shard.shard_info.is_leader() { let route = make_route(&table_name, &node_shard.endpoint)?; + if let Some(cache) = &self.cache { + // There may be data race here, and it is acceptable currently. + cache.insert(table_name.clone(), route.clone()).await; + } routes.push(route); } } } + return Ok(routes); + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc, thread::sleep}; + use std::time::Duration; + + use ceresdbproto::{ + meta_event::{ + CloseShardRequest, CloseTableOnShardRequest, CreateTableOnShardRequest, + DropTableOnShardRequest, OpenShardRequest, OpenTableOnShardRequest, + }, + storage::RequestContext, + }; + use cluster::{Cluster, ClusterNodesResp}; + use common_util::config::ReadableDuration; + use meta_client::types::{ + NodeShard, RouteEntry, RouteTablesResponse, ShardInfo, ShardRole::Leader, TableInfo, + TablesOfShard, + }; + + use super::*; + + struct MockClusterImpl {} + + #[async_trait] + impl Cluster for MockClusterImpl { + async fn start(&self) -> cluster::Result<()> { + todo!() + } + + async fn stop(&self) -> cluster::Result<()> { + todo!() + } + + async fn open_shard(&self, _req: &OpenShardRequest) -> cluster::Result { + todo!() + } + + async fn close_shard(&self, _req: &CloseShardRequest) -> cluster::Result { + todo!() + } + + async fn create_table_on_shard( + &self, + _req: &CreateTableOnShardRequest, + ) -> cluster::Result<()> { + todo!() + } + + async fn drop_table_on_shard(&self, _req: &DropTableOnShardRequest) -> cluster::Result<()> { + todo!() + } + + async fn open_table_on_shard(&self, _req: &OpenTableOnShardRequest) -> cluster::Result<()> { + todo!() + } + + async fn close_table_on_shard( + &self, + _req: &CloseTableOnShardRequest, + ) -> cluster::Result<()> { + todo!() + } + + async fn route_tables( + &self, + req: &RouteTablesRequest, + ) -> cluster::Result { + let mut entries = HashMap::new(); + for table in &req.table_names { + entries.insert( + table.clone(), + RouteEntry { + table: TableInfo { + id: 0, + name: table.clone(), + schema_name: String::from("public"), + schema_id: 0, + }, + node_shards: vec![NodeShard { + endpoint: String::from("127.0.0.1:8831"), + shard_info: ShardInfo { + id: 0, + role: Leader, + version: 100, + }, + }], + }, + ); + } + + Ok(RouteTablesResponse { + cluster_topology_version: 0, + entries, + }) + } + + async fn fetch_nodes(&self) -> cluster::Result { + todo!() + } + } + + #[tokio::test] + async fn test_route_cache() { + let mock_cluster = MockClusterImpl {}; + + let config = RouteCacheConfig { + enable: true, + ttl: ReadableDuration::from(Duration::from_secs(4)), + tti: ReadableDuration::from(Duration::from_secs(2)), + capacity: 2, + }; + let router = ClusterBasedRouter::new(Arc::new(mock_cluster), config); + + let table1 = "table1"; + let table2 = "table2"; + + // first case get two tables, no one miss + let tables = vec![table1.to_string(), table2.to_string()]; + let result = router + .route(RouteRequest { + context: Some(RequestContext { + database: String::from("public"), + }), + tables: tables.clone(), + }) + .await; + assert_eq!(result.unwrap().len(), 2); + + let (routes, miss) = router.route_from_cache(tables); + assert_eq!(routes.len(), 2); + assert_eq!(miss.len(), 0); + sleep(Duration::from_secs(1)); + + // try to get table1 + let tables = vec![table1.to_string()]; + let (routes, miss) = router.route_from_cache(tables); + assert_eq!(routes.len(), 1); + assert_eq!(routes[0].table, table1.to_string()); + assert_eq!(miss.len(), 0); - Ok(routes) + // sleep 1.5s, table2 will be evicted, and table1 in cache + sleep(Duration::from_millis(1500)); + let tables = vec![table1.to_string(), table2.to_string()]; + let (routes, miss) = router.route_from_cache(tables); + assert_eq!(routes.len(), 1); + assert_eq!(routes[0].table, table1.to_string()); + assert_eq!(miss.len(), 1); + assert_eq!(miss[0], table2.to_string()); } } diff --git a/router/src/lib.rs b/router/src/lib.rs index 8153dbf1df..f546fa9c1a 100644 --- a/router/src/lib.rs +++ b/router/src/lib.rs @@ -4,14 +4,14 @@ pub mod cluster_based; pub mod endpoint; pub(crate) mod hash; pub mod rule_based; - use std::sync::Arc; use async_trait::async_trait; use ceresdbproto::storage::{Route, RouteRequest}; pub use cluster_based::ClusterBasedRouter; -use common_util::define_result; +use common_util::{config::ReadableDuration, define_result}; pub use rule_based::{RuleBasedRouter, RuleList}; +use serde::{Deserialize, Serialize}; use snafu::{Backtrace, Snafu}; #[derive(Snafu, Debug)] @@ -63,3 +63,15 @@ pub type RouterRef = Arc; pub trait Router { async fn route(&self, req: RouteRequest) -> Result>; } + +#[derive(Clone, Debug, Deserialize, Serialize, Default)] +pub struct RouteCacheConfig { + // enable route cache, default false + enable: bool, + /// Time to live (TTL) in second. + ttl: ReadableDuration, + /// Time to idle (TTI) in second. + tti: ReadableDuration, + /// how many route records can store in cache. + capacity: u64, +} diff --git a/server/src/config.rs b/server/src/config.rs index a89a8cab99..ea614e2506 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -110,6 +110,9 @@ pub struct ServerConfig { /// Whether to create table automatically when data is first written, only /// used in gRPC pub auto_create_table: bool, + + // Config of route + pub route_cache: router::RouteCacheConfig, } impl Default for ServerConfig { @@ -125,6 +128,7 @@ impl Default for ServerConfig { resp_compress_min_length: ReadableSize::mb(4), forward: forward::Config::default(), auto_create_table: true, + route_cache: router::RouteCacheConfig::default(), } } } diff --git a/src/setup.rs b/src/setup.rs index 682bc16a42..fef55a494e 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -215,7 +215,10 @@ async fn build_with_meta( .unwrap(); Arc::new(cluster_impl) }; - let router = Arc::new(ClusterBasedRouter::new(cluster.clone())); + let router = Arc::new(ClusterBasedRouter::new( + cluster.clone(), + config.server.route_cache.clone(), + )); let opened_wals = wal_opener .open_wals(&config.analytic.wal, runtimes.clone())