diff --git a/Cargo.lock b/Cargo.lock index 26f5b0a6b3..689d925b84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -413,6 +413,7 @@ dependencies = [ "tentacle-multiaddr", "tentacle-secio", "toml", + "ubyte", "url", ] @@ -1213,13 +1214,13 @@ version = "0.104.0-pre" dependencies = [ "bitflags", "ckb-app-config", + "ckb-async-runtime", "ckb-chain", "ckb-chain-spec", "ckb-channel", "ckb-constant", "ckb-dao", "ckb-dao-utils", - "ckb-db", "ckb-error", "ckb-launcher", "ckb-logger", @@ -1227,6 +1228,7 @@ dependencies = [ "ckb-network", "ckb-reward-calculator", "ckb-shared", + "ckb-stop-handler", "ckb-store", "ckb-test-chain-utils", "ckb-traits", @@ -1245,6 +1247,7 @@ dependencies = [ "once_cell", "rand 0.7.3", "sentry", + "sled", "tempfile", "tokio", ] @@ -2091,6 +2094,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.12.4" @@ -4021,6 +4033,22 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" +[[package]] +name = "sled" +version = "0.34.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" +dependencies = [ + "crc32fast", + "crossbeam-epoch", + "crossbeam-utils", + "fs2", + "fxhash", + "libc", + "log", + "parking_lot 0.11.2", +] + [[package]] name = "smallvec" version = "1.8.0" @@ -4516,6 +4544,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +[[package]] +name = "ubyte" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42756bb9e708855de2f8a98195643dff31a97f0485d90d8467b39dc24be9e8fe" +dependencies = [ + "serde", +] + [[package]] name = "uname" version = "0.1.1" diff --git a/resource/ckb.toml b/resource/ckb.toml index e4c347e988..4f267901de 100644 --- a/resource/ckb.toml +++ b/resource/ckb.toml @@ -97,6 +97,9 @@ discovery_local_address = false # {{ # Ensure that itself can continue to serve as a bootnode node bootnode_mode = false +# [network.sync.header_map] +# memory_limit = "600MB" + [rpc] # By default RPC only binds to localhost, thus it only allows accessing from the same machine. # diff --git a/sync/Cargo.toml b/sync/Cargo.toml index fb219df1ca..4504632066 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -12,7 +12,6 @@ repository = "https://github.com/nervosnetwork/ckb" ckb-chain = { path = "../chain", version = "= 0.104.0-pre" } ckb-shared = { path = "../shared", version = "= 0.104.0-pre" } ckb-store = { path = "../store", version = "= 0.104.0-pre" } -ckb-db = { path = "../db", version = "= 0.104.0-pre" } ckb-app-config = {path = "../util/app-config", version = "= 0.104.0-pre"} ckb-types = {path = "../util/types", version = "= 0.104.0-pre"} ckb-network = { path = "../network", version = "= 0.104.0-pre" } @@ -28,6 +27,9 @@ ckb-error = {path = "../error", version = "= 0.104.0-pre"} ckb-tx-pool = { path = "../tx-pool", version = "= 0.104.0-pre" } sentry = { version = "0.23.0", optional = true } ckb-constant = { path = "../util/constant", version = "= 0.104.0-pre" } +ckb-async-runtime = { path = "../util/runtime", version = "= 0.104.0-pre" } +ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.104.0-pre" } +tokio = { version = "1", features = ["sync"] } lru = "0.7.1" futures = "0.3" governor = "0.3.1" @@ -36,6 +38,7 @@ faketime = "0.2.0" bitflags = "1.0" dashmap = "4.0" keyed_priority_queue = "0.3" +sled = "0.34.7" [dev-dependencies] ckb-test-chain-utils = { path = "../util/test-chain-utils", version = "= 0.104.0-pre" } @@ -46,8 +49,6 @@ ckb-reward-calculator = { path = "../util/reward-calculator", version = "= 0.104 ckb-chain = { path = "../chain", version = "= 0.104.0-pre", features = ["mock"] } ckb-launcher = { path = "../util/launcher", version = "= 0.104.0-pre" } faux = "^0.1" -tokio = "1" -tempfile = "3.0" once_cell = "1.8.0" [features] diff --git a/sync/src/types/header_map/backend.rs b/sync/src/types/header_map/backend.rs index b0cc27e53e..befc198682 100644 --- a/sync/src/types/header_map/backend.rs +++ b/sync/src/types/header_map/backend.rs @@ -14,12 +14,10 @@ pub(crate) trait KeyValueBackend { self.len() == 0 } - fn is_opened(&self) -> bool; - fn open(&mut self); - fn try_close(&mut self) -> bool; - fn contains_key(&self, key: &Byte32) -> bool; fn get(&self, key: &Byte32) -> Option; - fn insert(&mut self, value: &HeaderView) -> Option; - fn remove(&mut self, key: &Byte32) -> Option; + fn insert(&self, value: &HeaderView) -> Option<()>; + fn insert_batch(&self, values: &[HeaderView]); + fn remove(&self, key: &Byte32) -> Option; + fn remove_no_return(&self, key: &Byte32); } diff --git a/sync/src/types/header_map/backend_rocksdb.rs b/sync/src/types/header_map/backend_rocksdb.rs deleted file mode 100644 index be2ce3191f..0000000000 --- a/sync/src/types/header_map/backend_rocksdb.rs +++ /dev/null @@ -1,168 +0,0 @@ -use std::path; - -use ckb_db::internal::{ - ops::{Delete as _, GetPinned as _, Open as _, Put as _}, - BlockBasedOptions, Options, DB, -}; -use ckb_logger::{debug, warn}; -use ckb_types::{packed::Byte32, prelude::*}; -use tempfile::TempDir; - -use super::KeyValueBackend; -use crate::types::HeaderView; - -pub(crate) struct RocksDBBackend { - tmpdir: Option, - resource: Option<(TempDir, DB)>, - count: usize, -} - -impl KeyValueBackend for RocksDBBackend { - fn new

(tmpdir: Option

) -> Self - where - P: AsRef, - { - Self { - tmpdir: tmpdir.map(|p| p.as_ref().to_path_buf()), - resource: None, - count: 0, - } - } - - fn len(&self) -> usize { - self.count - } - - fn is_opened(&self) -> bool { - self.resource.is_some() - } - - fn open(&mut self) { - if !self.is_opened() { - let mut builder = tempfile::Builder::new(); - builder.prefix("ckb-tmp-"); - let cache_dir_res = if let Some(ref tmpdir) = self.tmpdir { - builder.tempdir_in(tmpdir) - } else { - builder.tempdir() - }; - match cache_dir_res { - Ok(cache_dir) => { - // We minimize memory usage at all costs here. - // If we want to use more memory, we should increase the limit of KeyValueMemory. - let opts = { - let mut block_opts = BlockBasedOptions::default(); - block_opts.disable_cache(); - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.set_block_based_table_factory(&block_opts); - opts.set_write_buffer_size(4 * 1024 * 1024); - opts.set_max_write_buffer_number(2); - opts.set_min_write_buffer_number_to_merge(1); - opts - }; - match DB::open(&opts, cache_dir.path()) { - Ok(db) => { - debug!( - "open a key-value database({}) to save header map into disk", - cache_dir.path().to_str().unwrap_or("") - ); - self.resource.replace((cache_dir, db)); - } - Err(e) => panic!( - "failed to open a key-value database to save header map into disk: {}", - e - ), - } - } - Err(e) => panic!( - "failed to create a tempdir to save header map into disk: {}", - e - ), - } - } - } - - fn try_close(&mut self) -> bool { - if self.is_opened() { - if self.is_empty() { - if let Some((cache_dir, db)) = self.resource.take() { - drop(db); - let _ignore = cache_dir.close(); - } - true - } else { - false - } - } else { - true - } - } - - fn contains_key(&self, key: &Byte32) -> bool { - if let Some((_, ref db)) = self.resource { - db.get_pinned(key.as_slice()) - .unwrap_or_else(|err| panic!("read header map from disk should be ok, but {}", err)) - .is_some() - } else { - false - } - } - - fn get(&self, key: &Byte32) -> Option { - if let Some((_, ref db)) = self.resource { - db.get_pinned(key.as_slice()) - .unwrap_or_else(|err| panic!("read header map from disk should be ok, but {}", err)) - .map(|slice| HeaderView::from_slice_should_be_ok(&slice)) - } else { - None - } - } - - fn insert(&mut self, value: &HeaderView) -> Option { - if let Some((_, ref db)) = self.resource { - let key = value.hash(); - let old_value_opt = db - .get_pinned(key.as_slice()) - .unwrap_or_else(|err| panic!("read header map from disk should be ok, but {}", err)) - .map(|slice| HeaderView::from_slice_should_be_ok(&slice)); - if db.put(key.as_slice(), &value.to_vec()).is_err() { - panic!("failed to insert item into header map"); - } - if old_value_opt.is_none() { - self.count += 1; - } - old_value_opt - } else { - None - } - } - - fn remove(&mut self, key: &Byte32) -> Option { - let mut do_count = false; - let value_opt = if let Some((_, ref db)) = self.resource { - let value_opt = db - .get_pinned(key.as_slice()) - .unwrap_or_else(|err| panic!("read header map from disk should be ok, but {}", err)) - .map(|slice| HeaderView::from_slice_should_be_ok(&slice)); - if value_opt.is_some() { - if db.delete(key.as_slice()).is_ok() { - do_count = true; - value_opt - } else { - warn!("failed to delete a value from database"); - None - } - } else { - None - } - } else { - None - }; - if do_count { - self.count -= 1; - self.try_close(); - } - value_opt - } -} diff --git a/sync/src/types/header_map/backend_sled.rs b/sync/src/types/header_map/backend_sled.rs new file mode 100644 index 0000000000..2e74cdbf61 --- /dev/null +++ b/sync/src/types/header_map/backend_sled.rs @@ -0,0 +1,104 @@ +use super::KeyValueBackend; +use crate::types::HeaderView; +use ckb_types::{packed::Byte32, prelude::*}; +use sled::Db; +use std::path; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use tempfile::TempDir; + +pub(crate) struct SledBackend { + count: AtomicUsize, + db: Db, + _tmpdir: TempDir, +} + +impl KeyValueBackend for SledBackend { + fn new

(tmp_path: Option

) -> Self + where + P: AsRef, + { + let mut builder = tempfile::Builder::new(); + builder.prefix("ckb-tmp-"); + let tmpdir = if let Some(ref path) = tmp_path { + builder.tempdir_in(path) + } else { + builder.tempdir() + } + .expect("failed to create a tempdir to save header map into disk"); + + let db: Db = sled::open(tmpdir.path()) + .expect("failed to open a key-value database to save header map into disk"); + Self { + db, + _tmpdir: tmpdir, + count: AtomicUsize::new(0), + } + } + + fn len(&self) -> usize { + self.count.load(Ordering::SeqCst) + } + + fn contains_key(&self, key: &Byte32) -> bool { + self.db + .contains_key(key.as_slice()) + .expect("sled contains_key") + } + + fn get(&self, key: &Byte32) -> Option { + self.db + .get(key.as_slice()) + .unwrap_or_else(|err| panic!("read header map from disk should be ok, but {}", err)) + .map(|slice| HeaderView::from_slice_should_be_ok(slice.as_ref())) + } + + fn insert(&self, value: &HeaderView) -> Option<()> { + let key = value.hash(); + let last_value = self + .db + .insert(key.as_slice(), value.to_vec()) + .expect("failed to insert item to sled"); + if last_value.is_none() { + self.count.fetch_add(1, Ordering::SeqCst); + } + last_value.map(|_| ()) + } + + fn insert_batch(&self, values: &[HeaderView]) { + let mut count = 0; + for value in values { + let key = value.hash(); + let last_value = self + .db + .insert(key.as_slice(), value.to_vec()) + .expect("failed to insert item to sled"); + if last_value.is_none() { + count += 1; + } + } + self.count.fetch_add(count, Ordering::SeqCst); + } + + fn remove(&self, key: &Byte32) -> Option { + let old_value = self + .db + .remove(key.as_slice()) + .expect("failed to remove item from sled"); + + old_value.map(|slice| { + self.count.fetch_sub(1, Ordering::SeqCst); + HeaderView::from_slice_should_be_ok(&slice) + }) + } + + fn remove_no_return(&self, key: &Byte32) { + let old_value = self + .db + .remove(key.as_slice()) + .expect("failed to remove item from sled"); + if old_value.is_some() { + self.count.fetch_sub(1, Ordering::SeqCst); + } + } +} diff --git a/sync/src/types/header_map/kernel_lru.rs b/sync/src/types/header_map/kernel_lru.rs index a2c0b5ff1c..69eb1e8f8c 100644 --- a/sync/src/types/header_map/kernel_lru.rs +++ b/sync/src/types/header_map/kernel_lru.rs @@ -2,28 +2,30 @@ use std::path; #[cfg(feature = "stats")] use ckb_logger::trace; +#[cfg(feature = "stats")] +use ckb_util::{Mutex, MutexGuard}; + use ckb_types::packed::Byte32; -use super::{KeyValueBackend, KeyValueMemory}; +use super::{KeyValueBackend, MemoryMap}; use crate::types::HeaderView; -pub(crate) struct HeaderMapLruKernel +pub(crate) struct HeaderMapKernel where Backend: KeyValueBackend, { - primary: KeyValueMemory, - backend: Backend, + pub(crate) memory: MemoryMap, + pub(crate) backend: Backend, // Configuration - primary_limit: usize, - backend_close_threshold: usize, + memory_limit: usize, // Statistics #[cfg(feature = "stats")] - stats: HeaderMapLruKernelStats, + stats: Mutex, } #[cfg(feature = "stats")] #[derive(Default)] -struct HeaderMapLruKernelStats { +struct HeaderMapKernelStats { frequency: usize, trace_progress: usize, @@ -34,195 +36,157 @@ struct HeaderMapLruKernelStats { primary_delete: usize, backend_contain: usize, - backend_insert: usize, backend_delete: usize, } -impl HeaderMapLruKernel +impl HeaderMapKernel where Backend: KeyValueBackend, { - pub(crate) fn new

( - tmpdir: Option

, - primary_limit: usize, - backend_close_threshold: usize, - ) -> Self + pub(crate) fn new

(tmpdir: Option

, memory_limit: usize) -> Self where P: AsRef, { - let primary = Default::default(); + let memory = Default::default(); let backend = Backend::new(tmpdir); #[cfg(not(feature = "stats"))] { Self { - primary, + memory, backend, - primary_limit, - backend_close_threshold, + memory_limit, } } #[cfg(feature = "stats")] { Self { - primary, + memory, backend, - primary_limit, - backend_close_threshold, - stats: HeaderMapLruKernelStats::new(50_000), + memory_limit, + stats: Mutex::new(HeaderMapKernelStats::new(50_000)), } } } - pub(crate) fn contains_key(&mut self, hash: &Byte32) -> bool { + pub(crate) fn contains_key(&self, hash: &Byte32) -> bool { #[cfg(feature = "stats")] { - self.mut_stats().tick_primary_contain(); + self.stats().tick_primary_contain(); } - if self.primary.contains_key(hash) { + if self.memory.contains_key(hash) { return true; } - if !self.backend.is_opened() { + if self.backend.is_empty() { return false; } #[cfg(feature = "stats")] { - self.mut_stats().tick_backend_contain(); + self.stats().tick_backend_contain(); } self.backend.contains_key(hash) } - pub(crate) fn get(&mut self, hash: &Byte32) -> Option { + pub(crate) fn get(&self, hash: &Byte32) -> Option { #[cfg(feature = "stats")] { - self.mut_stats().tick_primary_select(); + self.stats().tick_primary_select(); } - if let Some(view) = self.primary.get_refresh(hash) { + if let Some(view) = self.memory.get_refresh(hash) { return Some(view); } - if !self.backend.is_opened() { + if self.backend.is_empty() { return None; } #[cfg(feature = "stats")] { - self.mut_stats().tick_backend_delete(); + self.stats().tick_backend_delete(); } if let Some(view) = self.backend.remove(hash) { - if self.primary.len() >= self.primary_limit { - #[cfg(feature = "stats")] - { - self.mut_stats().tick_primary_delete(); - self.mut_stats().tick_backend_insert(); - } - if let Some((_, view_old)) = self.primary.pop_front() { - self.backend.insert(&view_old); - } - } else if self.primary.len() < self.backend_close_threshold { - self.backend.try_close(); - } #[cfg(feature = "stats")] { - self.mut_stats().tick_primary_insert(); + self.stats().tick_primary_insert(); } - self.primary.insert(view.hash(), view.clone()); + self.memory.insert(view.hash(), view.clone()); Some(view) } else { None } } - pub(crate) fn insert(&mut self, view: HeaderView) -> Option { + pub(crate) fn insert(&self, view: HeaderView) -> Option<()> { #[cfg(feature = "stats")] { self.trace(); - self.mut_stats().tick_primary_insert(); + self.stats().tick_primary_insert(); } - if let Some(view) = self.primary.insert(view.hash(), view.clone()) { - return Some(view); - } - let view_opt = self.backend.remove(&view.hash()); - if self.primary.len() > self.primary_limit { - self.backend.open(); - #[cfg(feature = "stats")] - { - self.mut_stats().tick_primary_delete(); - self.mut_stats().tick_backend_insert(); - } - if let Some((_, view_old)) = self.primary.pop_front() { - self.backend.insert(&view_old); - } - } - view_opt + self.memory.insert(view.hash(), view) } - pub(crate) fn remove(&mut self, hash: &Byte32) -> Option { + pub(crate) fn remove(&self, hash: &Byte32) { #[cfg(feature = "stats")] { self.trace(); - self.mut_stats().tick_primary_delete(); + self.stats().tick_primary_delete(); } - if let Some(view) = self.primary.remove(hash) { - return Some(view); - } - if !self.backend.is_opened() { - return None; + self.memory.remove(hash); + if self.backend.is_empty() { + return; } - #[cfg(feature = "stats")] - { - self.mut_stats().tick_backend_delete(); - } - let view_opt = self.backend.remove(hash); - if self.primary.len() < self.backend_close_threshold { - self.backend.try_close(); + self.backend.remove_no_return(hash); + } + + pub(crate) fn limit_memory(&self) { + if let Some(values) = self.memory.front_n(self.memory_limit) { + tokio::task::block_in_place(|| { + self.backend.insert_batch(&values); + }); + self.memory + .remove_batch(values.iter().map(|value| value.hash())); } - view_opt } #[cfg(feature = "stats")] - fn trace(&mut self) { - let progress = self.stats().trace_progress(); - let frequency = self.stats().frequency(); + fn trace(&self) { + let mut stats = self.stats(); + let progress = stats.trace_progress(); + let frequency = stats.frequency(); if progress % frequency == 0 { trace!( "Header Map Statistics\ \n>\t| storage | length | limit | contain | select | insert | delete |\ \n>\t|---------+---------+---------+---------+------------+---------+---------|\ - \n>\t| primary |{:>9}|{:>9}|{:>9}|{:>12}|{:>9}|{:>9}|\ + \n>\t| memory |{:>9}|{:>9}|{:>9}|{:>12}|{:>9}|{:>9}|\ \n>\t| backend |{:>9}|{:>9}|{:>9}|{:>12}|{:>9}|{:>9}|\ ", - self.primary.len(), - self.primary_limit, - self.stats().primary_contain, - self.stats().primary_select, - self.stats().primary_insert, - self.stats().primary_delete, + self.memory.len(), + self.memory_limit, + stats.primary_contain, + stats.primary_select, + stats.primary_insert, + stats.primary_delete, self.backend.len(), - self.backend.is_opened(), - self.stats().backend_contain, '-', - self.stats().backend_insert, - self.stats().backend_delete, + stats.backend_contain, + '-', + '-', + stats.backend_delete, ); - self.mut_stats().trace_progress_reset(); + stats.trace_progress_reset(); } else { - self.mut_stats().trace_progress_tick(); + stats.trace_progress_tick(); } } #[cfg(feature = "stats")] - fn stats(&self) -> &HeaderMapLruKernelStats { - &self.stats - } - - #[cfg(feature = "stats")] - fn mut_stats(&mut self) -> &mut HeaderMapLruKernelStats { - &mut self.stats + fn stats(&self) -> MutexGuard { + self.stats.lock() } } #[cfg(feature = "stats")] -impl HeaderMapLruKernelStats { +impl HeaderMapKernelStats { fn new(frequency: usize) -> Self { Self { frequency, @@ -262,10 +226,6 @@ impl HeaderMapLruKernelStats { self.primary_insert += 1; } - fn tick_backend_insert(&mut self) { - self.backend_insert += 1; - } - fn tick_primary_delete(&mut self) { self.primary_delete += 1; } diff --git a/sync/src/types/header_map/memory.rs b/sync/src/types/header_map/memory.rs index 63ff5299f7..39d85506c7 100644 --- a/sync/src/types/header_map/memory.rs +++ b/sync/src/types/header_map/memory.rs @@ -1,53 +1,62 @@ -use std::{clone, cmp, default, hash}; - +use crate::types::HeaderView; +use crate::types::SHRINK_THRESHOLD; +use ckb_types::packed::Byte32; use ckb_util::shrink_to_fit; use ckb_util::LinkedHashMap; +use ckb_util::RwLock; +use std::default; -use crate::types::SHRINK_THRESHOLD; - -pub(crate) struct KeyValueMemory(LinkedHashMap) -where - K: cmp::Eq + hash::Hash; +pub(crate) struct MemoryMap(RwLock>); -impl default::Default for KeyValueMemory -where - K: cmp::Eq + hash::Hash, -{ +impl default::Default for MemoryMap { fn default() -> Self { - Self(default::Default::default()) + Self(RwLock::new(default::Default::default())) } } -impl KeyValueMemory -where - K: cmp::Eq + hash::Hash, - V: clone::Clone, -{ +impl MemoryMap { + #[cfg(feature = "stats")] pub(crate) fn len(&self) -> usize { - self.0.len() + self.0.read().len() } - pub(crate) fn contains_key(&self, key: &K) -> bool { - self.0.contains_key(key) + pub(crate) fn contains_key(&self, key: &Byte32) -> bool { + self.0.read().contains_key(key) } - pub(crate) fn get_refresh(&mut self, key: &K) -> Option { - self.0.get_refresh(key).cloned() + pub(crate) fn get_refresh(&self, key: &Byte32) -> Option { + let mut guard = self.0.write(); + guard.get_refresh(key).cloned() } - pub(crate) fn insert(&mut self, key: K, value: V) -> Option { - self.0.insert(key, value) + pub(crate) fn insert(&self, key: Byte32, value: HeaderView) -> Option<()> { + let mut guard = self.0.write(); + guard.insert(key, value).map(|_| ()) } - pub(crate) fn remove(&mut self, key: &K) -> Option { - let ret = self.0.remove(key); - shrink_to_fit!(self.0, SHRINK_THRESHOLD); + pub(crate) fn remove(&self, key: &Byte32) -> Option { + let mut guard = self.0.write(); + let ret = guard.remove(key); + shrink_to_fit!(guard, SHRINK_THRESHOLD); ret } - pub(crate) fn pop_front(&mut self) -> Option<(K, V)> { - let ret = self.0.pop_front(); - shrink_to_fit!(self.0, SHRINK_THRESHOLD); - ret + pub(crate) fn front_n(&self, size_limit: usize) -> Option> { + let guard = self.0.read(); + let size = guard.len(); + if size > size_limit { + let num = size - size_limit; + Some(guard.values().take(num).cloned().collect()) + } else { + None + } + } + + pub(crate) fn remove_batch(&self, keys: impl Iterator) { + let mut guard = self.0.write(); + for key in keys { + guard.remove(&key); + } + shrink_to_fit!(guard, SHRINK_THRESHOLD); } } diff --git a/sync/src/types/header_map/mod.rs b/sync/src/types/header_map/mod.rs index 64fe471e00..d6dde661f4 100644 --- a/sync/src/types/header_map/mod.rs +++ b/sync/src/types/header_map/mod.rs @@ -1,48 +1,90 @@ -use std::path; - -use ckb_types::packed::Byte32; -use ckb_util::Mutex; - use crate::types::HeaderView; +use ckb_async_runtime::Handle; +use ckb_stop_handler::{SignalSender, StopHandler}; +use ckb_types::packed::{self, Byte32}; +use std::path; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::oneshot; +use tokio::time::MissedTickBehavior; mod backend; -mod backend_rocksdb; +mod backend_sled; mod kernel_lru; mod memory; pub(crate) use self::{ - backend::KeyValueBackend, backend_rocksdb::RocksDBBackend, kernel_lru::HeaderMapLruKernel, - memory::KeyValueMemory, + backend::KeyValueBackend, backend_sled::SledBackend, kernel_lru::HeaderMapKernel, + memory::MemoryMap, }; -pub struct HeaderMapLru(Mutex>); +pub struct HeaderMap { + inner: Arc>, + stop: StopHandler<()>, +} -impl HeaderMapLru { - pub(crate) fn new

( - tmpdir: Option

, - primary_limit: usize, - backend_close_threshold: usize, - ) -> Self +impl Drop for HeaderMap { + fn drop(&mut self) { + self.stop.try_send(()); + } +} + +const INTERVAL: Duration = Duration::from_millis(500); +// key + total_difficulty + skip_hash +const ITEM_BYTES_SIZE: usize = packed::HeaderView::TOTAL_SIZE + 32 * 3; +const WARN_THRESHOLD: usize = ITEM_BYTES_SIZE * 100_000; + +impl HeaderMap { + pub(crate) fn new

(tmpdir: Option

, memory_limit: usize, async_handle: &Handle) -> Self where P: AsRef, { - let inner = HeaderMapLruKernel::new(tmpdir, primary_limit, backend_close_threshold); - Self(Mutex::new(inner)) + if memory_limit < ITEM_BYTES_SIZE { + panic!("The limit setting is too low"); + } + if memory_limit < WARN_THRESHOLD { + ckb_logger::warn!( + "The low memory limit setting {} will result in inefficient synchronization", + memory_limit + ); + } + let size_limit = memory_limit / ITEM_BYTES_SIZE; + let inner = Arc::new(HeaderMapKernel::new(tmpdir, size_limit)); + let map = Arc::clone(&inner); + let (stop, mut stop_rx) = oneshot::channel::<()>(); + + async_handle.spawn(async move { + let mut interval = tokio::time::interval(INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = interval.tick() => { + map.limit_memory(); + } + _ = &mut stop_rx => break, + } + } + }); + + Self { + inner, + stop: StopHandler::new(SignalSender::Tokio(stop), None, "HeaderMap".to_string()), + } } pub(crate) fn contains_key(&self, hash: &Byte32) -> bool { - self.0.lock().contains_key(hash) + self.inner.contains_key(hash) } pub(crate) fn get(&self, hash: &Byte32) -> Option { - self.0.lock().get(hash) + self.inner.get(hash) } - pub(crate) fn insert(&self, view: HeaderView) -> Option { - self.0.lock().insert(view) + pub(crate) fn insert(&self, view: HeaderView) -> Option<()> { + self.inner.insert(view) } - pub(crate) fn remove(&self, hash: &Byte32) -> Option { - self.0.lock().remove(hash) + pub(crate) fn remove(&self, hash: &Byte32) { + self.inner.remove(hash) } } diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 15fa9b4a60..0f68dc7963 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -45,7 +45,7 @@ mod header_map; use crate::utils::send_message; use ckb_types::core::EpochNumber; -pub use header_map::HeaderMapLru as HeaderMap; +pub use header_map::HeaderMap; const FILTER_SIZE: usize = 20000; const GET_HEADERS_CACHE_SIZE: usize = 10000; @@ -958,25 +958,19 @@ impl HeaderView { self.inner } - pub fn build_skip( - &mut self, - tip_number: BlockNumber, - mut get_header_view: F, - fast_scanner: G, - ) where + pub fn build_skip(&mut self, tip_number: BlockNumber, get_header_view: F, fast_scanner: G) + where F: FnMut(&Byte32, Option) -> Option, G: Fn(BlockNumber, &HeaderView) -> Option, { - let store_first = self.number() <= tip_number; - self.skip_hash = get_header_view(&self.parent_hash(), Some(store_first)) - .and_then(|parent| { - parent.get_ancestor( - tip_number, - get_skip_height(self.number()), - get_header_view, - fast_scanner, - ) - }) + self.skip_hash = self + .clone() + .get_ancestor( + tip_number, + get_skip_height(self.number()), + get_header_view, + fast_scanner, + ) .map(|header| header.hash()); } @@ -1150,10 +1144,14 @@ impl SyncShared { ) }; let shared_best_header = RwLock::new(HeaderView::new(header, total_difficulty)); + ckb_logger::info!( + "header_map.memory_limit {}", + sync_config.header_map.memory_limit + ); let header_map = HeaderMap::new( tmpdir, - sync_config.header_map.primary_limit, - sync_config.header_map.backend_close_threshold, + sync_config.header_map.memory_limit.as_u64() as usize, + shared.async_handle(), ); let state = SyncState { @@ -1586,10 +1584,6 @@ impl SyncState { return; } - assert!( - self.header_map.contains_key(&header.hash()), - "HeaderView must exists in header_map before set best header" - ); metrics!(gauge, "ckb.shared_best_number", header.number() as i64); *self.shared_best_header.write() = header; } diff --git a/util/app-config/Cargo.toml b/util/app-config/Cargo.toml index 0d96a677c6..036836053b 100644 --- a/util/app-config/Cargo.toml +++ b/util/app-config/Cargo.toml @@ -30,6 +30,7 @@ rand = "0.7" sentry = { version = "0.23.0", optional = true } faketime = "0.2.0" url = { version = "2.2.2", features = ["serde"] } +ubyte = { version = "0.10", features = ["serde"] } [features] with_sentry = ["sentry"] diff --git a/util/app-config/src/configs/network.rs b/util/app-config/src/configs/network.rs index 10324246f6..9da6cf4a3b 100644 --- a/util/app-config/src/configs/network.rs +++ b/util/app-config/src/configs/network.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use std::fs; use std::io::{Error, ErrorKind, Read, Write}; use std::path::PathBuf; +use ubyte::ByteUnit; // Max data size in send buffer: 24MB (a little larger than max frame length) const DEFAULT_SEND_BUFFER: usize = 24 * 1024 * 1024; @@ -111,20 +112,28 @@ pub struct SyncConfig { #[serde(deny_unknown_fields)] pub struct HeaderMapConfig { /// The maximum size of data in memory - pub primary_limit: usize, + pub primary_limit: Option, /// Disable cache if the size of data in memory less than this threshold - pub backend_close_threshold: usize, + pub backend_close_threshold: Option, + /// The maximum amount memory limit + #[serde(default = "default_memory_limit")] + pub memory_limit: ByteUnit, } impl Default for HeaderMapConfig { fn default() -> Self { Self { - primary_limit: 300_000, - backend_close_threshold: 20_000, + primary_limit: None, + backend_close_threshold: None, + memory_limit: ByteUnit::Megabyte(600), } } } +const fn default_memory_limit() -> ByteUnit { + ByteUnit::Megabyte(600) +} + #[derive(Clone, Debug, Copy, Eq, PartialEq, Serialize, Deserialize, Hash)] #[allow(missing_docs)] pub enum SupportProtocol {