Skip to content

Commit

Permalink
[typed store] don't report metrics on deprecated tables
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Jul 2, 2024
1 parent 9a3ccbb commit 7882f33
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 44 deletions.
1 change: 1 addition & 0 deletions crates/sui-core/src/authority/authority_store_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ mod tests {
// open the db to bypass default db options which ignores range tombstones
// so we can read the accurate number of retained versions
&ReadWriteOptions::default(),
false,
)?;
let iter = objects.unbounded_iter();
for (k, _v) in iter {
Expand Down
5 changes: 3 additions & 2 deletions crates/typed-store-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,13 @@ pub fn derive_dbmap_utils_general(input: TokenStream) -> TokenStream {
};
db.map(|d| (d, rwopt_cfs))
}.expect(&format!("Cannot open DB at {:?}", path));
let deprecated_tables = vec![#(stringify!(#deprecated_cfs),)*];
let (
#(
#field_names
),*
) = (#(
DBMap::#inner_types::reopen(&db, Some(stringify!(#cf_names)), rwopt_cfs.get(stringify!(#cf_names)).unwrap_or(&typed_store::rocks::ReadWriteOptions::default())).expect(&format!("Cannot open {} CF.", stringify!(#cf_names))[..])
DBMap::#inner_types::reopen(&db, Some(stringify!(#cf_names)), rwopt_cfs.get(stringify!(#cf_names)).unwrap_or(&typed_store::rocks::ReadWriteOptions::default()), remove_deprecated_tables && deprecated_tables.contains(&stringify!(#cf_names))).expect(&format!("Cannot open {} CF.", stringify!(#cf_names))[..])
),*);

if as_secondary_with_path.is_none() && remove_deprecated_tables {
Expand Down Expand Up @@ -887,7 +888,7 @@ pub fn derive_sallydb_general(input: TokenStream) -> TokenStream {
#field_names
),*
) = (#(
SallyColumn::RocksDB((DBMap::#inner_types::reopen(&db, Some(stringify!(#field_names)), rwopt_cfs.get(stringify!(#field_names)).unwrap_or(&typed_store::rocks::ReadWriteOptions::default())).expect(&format!("Cannot open {} CF.", stringify!(#field_names))[..]), typed_store::sally::SallyConfig::default()))
SallyColumn::RocksDB((DBMap::#inner_types::reopen(&db, Some(stringify!(#field_names)), rwopt_cfs.get(stringify!(#field_names)).unwrap_or(&typed_store::rocks::ReadWriteOptions::default()), false).expect(&format!("Cannot open {} CF.", stringify!(#field_names))[..]), typed_store::sally::SallyConfig::default()))
),*);

Self {
Expand Down
58 changes: 33 additions & 25 deletions crates/typed-store/src/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ macro_rules! reopen {
( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
(
$(
DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default()).expect(&format!("Cannot open {} CF.", $cf)[..])
DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
),*
)
};
Expand Down Expand Up @@ -727,32 +727,39 @@ pub struct DBMap<K, V> {
unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}

impl<K, V> DBMap<K, V> {
pub(crate) fn new(db: Arc<RocksDB>, opts: &ReadWriteOptions, opt_cf: &str) -> Self {
pub(crate) fn new(
db: Arc<RocksDB>,
opts: &ReadWriteOptions,
opt_cf: &str,
is_deprecated: bool,
) -> Self {
let db_cloned = db.clone();
let db_metrics = DBMetrics::get();
let db_metrics_cloned = db_metrics.clone();
let cf = opt_cf.to_string();
let (sender, mut recv) = tokio::sync::oneshot::channel();
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_millis(CF_METRICS_REPORT_PERIOD_MILLIS));
loop {
tokio::select! {
_ = interval.tick() => {
let db = db_cloned.clone();
let cf = cf.clone();
let db_metrics = db_metrics.clone();
if let Err(e) = tokio::task::spawn_blocking(move || {
Self::report_metrics(&db, &cf, &db_metrics);
}).await {
error!("Failed to log metrics with error: {}", e);
if !is_deprecated {
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_millis(CF_METRICS_REPORT_PERIOD_MILLIS));
loop {
tokio::select! {
_ = interval.tick() => {
let db = db_cloned.clone();
let cf = cf.clone();
let db_metrics = db_metrics.clone();
if let Err(e) = tokio::task::spawn_blocking(move || {
Self::report_metrics(&db, &cf, &db_metrics);
}).await {
error!("Failed to log metrics with error: {}", e);
}
}
_ = &mut recv => break,
}
_ = &mut recv => break,
}
}
debug!("Returning the cf metric logging task for DBMap: {}", &cf);
});
debug!("Returning the cf metric logging task for DBMap: {}", &cf);
});
}
DBMap {
rocksdb: db.clone(),
opts: opts.clone(),
Expand Down Expand Up @@ -782,7 +789,7 @@ impl<K, V> DBMap<K, V> {
let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
let cfs = vec![cf_key];
let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
Ok(DBMap::new(rocksdb, rw_options, cf_key))
Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
}

/// Reopens an open database as a typed map operating under a specific column family.
Expand All @@ -800,8 +807,8 @@ impl<K, V> DBMap<K, V> {
/// /// Open the DB with all needed column families first.
/// let rocks = open_cf(tempdir().unwrap(), None, MetricConf::default(), &["First_CF", "Second_CF"]).unwrap();
/// /// Attach the column families to specific maps.
/// let db_cf_1 = DBMap::<u32,u32>::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default()).expect("Failed to open storage");
/// let db_cf_2 = DBMap::<u32,u32>::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default()).expect("Failed to open storage");
/// let db_cf_1 = DBMap::<u32,u32>::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false).expect("Failed to open storage");
/// let db_cf_2 = DBMap::<u32,u32>::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false).expect("Failed to open storage");
/// Ok(())
/// }
/// ```
Expand All @@ -810,6 +817,7 @@ impl<K, V> DBMap<K, V> {
db: &Arc<RocksDB>,
opt_cf: Option<&str>,
rw_options: &ReadWriteOptions,
is_deprecated: bool,
) -> Result<Self, TypedStoreError> {
let cf_key = opt_cf
.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
Expand All @@ -818,7 +826,7 @@ impl<K, V> DBMap<K, V> {
db.cf_handle(&cf_key)
.ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;

Ok(DBMap::new(db.clone(), rw_options, &cf_key))
Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
}

pub fn batch(&self) -> DBBatch {
Expand Down Expand Up @@ -1278,11 +1286,11 @@ impl<K, V> DBMap<K, V> {
/// async fn main() -> Result<(), Error> {
/// let rocks = open_cf(tempfile::tempdir().unwrap(), None, MetricConf::default(), &["First_CF", "Second_CF"]).unwrap();
///
/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default())
/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
/// .expect("Failed to open storage");
/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
///
/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default())
/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
/// .expect("Failed to open storage");
/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
///
Expand Down
53 changes: 36 additions & 17 deletions crates/typed-store/src/rocks/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn test_reopen(#[values(true, false)] is_transactional: bool) {
.expect("Failed to insert");
db
};
let db = DBMap::<u32, String>::reopen(&arc.rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<u32, String>::reopen(&arc.rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");
assert!(db
.contains_key(&123456789)
Expand Down Expand Up @@ -174,7 +174,7 @@ async fn test_reopen_macro() {
#[tokio::test]
async fn test_wrong_reopen(#[values(true, false)] is_transactional: bool) {
let rocks = open_rocksdb(temp_dir(), &["foo", "bar", "baz"], is_transactional);
let db = DBMap::<u8, u8>::reopen(&rocks, Some("quux"), &ReadWriteOptions::default());
let db = DBMap::<u8, u8>::reopen(&rocks, Some("quux"), &ReadWriteOptions::default(), false);
assert!(db.is_err());
}

Expand Down Expand Up @@ -567,12 +567,22 @@ async fn test_insert_batch(#[values(true, false)] is_transactional: bool) {
async fn test_insert_batch_across_cf(#[values(true, false)] is_transactional: bool) {
let rocks = open_rocksdb(temp_dir(), &["First_CF", "Second_CF"], is_transactional);

let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default())
.expect("Failed to open storage");
let db_cf_1 = DBMap::reopen(
&rocks,
Some("First_CF"),
&ReadWriteOptions::default(),
false,
)
.expect("Failed to open storage");
let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));

let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default())
.expect("Failed to open storage");
let db_cf_2 = DBMap::reopen(
&rocks,
Some("Second_CF"),
&ReadWriteOptions::default(),
false,
)
.expect("Failed to open storage");
let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));

let mut batch = db_cf_1.batch();
Expand Down Expand Up @@ -600,14 +610,22 @@ async fn test_insert_batch_across_different_db(#[values(true, false)] is_transac
let rocks = open_rocksdb(temp_dir(), &["First_CF", "Second_CF"], is_transactional);
let rocks2 = open_rocksdb(temp_dir(), &["First_CF", "Second_CF"], is_transactional);

let db_cf_1: DBMap<i32, String> =
DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default())
.expect("Failed to open storage");
let db_cf_1: DBMap<i32, String> = DBMap::reopen(
&rocks,
Some("First_CF"),
&ReadWriteOptions::default(),
false,
)
.expect("Failed to open storage");
let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));

let db_cf_2: DBMap<i32, String> =
DBMap::reopen(&rocks2, Some("Second_CF"), &ReadWriteOptions::default())
.expect("Failed to open storage");
let db_cf_2: DBMap<i32, String> = DBMap::reopen(
&rocks2,
Some("Second_CF"),
&ReadWriteOptions::default(),
false,
)
.expect("Failed to open storage");
let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));

assert!(db_cf_1
Expand Down Expand Up @@ -1089,7 +1107,7 @@ async fn test_transactional() {
let opt = rocksdb::Options::default();
let rocksdb =
open_cf_opts_transactional(path, None, MetricConf::default(), &[("cf", opt)]).unwrap();
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");

// transaction is used instead
Expand All @@ -1113,7 +1131,7 @@ async fn test_transaction_snapshot() {
let opt = rocksdb::Options::default();
let rocksdb =
open_cf_opts_transactional(path, None, MetricConf::default(), &[("cf", opt)]).unwrap();
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");

// transaction without set_snapshot succeeds when extraneous write occurs before transaction
Expand Down Expand Up @@ -1200,7 +1218,7 @@ async fn test_retry_transaction() {
let opt = rocksdb::Options::default();
let rocksdb =
open_cf_opts_transactional(path, None, MetricConf::default(), &[("cf", opt)]).unwrap();
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");

let mut conflicts = 0;
Expand Down Expand Up @@ -1260,7 +1278,7 @@ async fn test_transaction_read_your_write() {
let opt = rocksdb::Options::default();
let rocksdb =
open_cf_opts_transactional(path, None, MetricConf::default(), &[("cf", opt)]).unwrap();
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");
db.insert(&key1.to_string(), &"1".to_string()).unwrap();
let mut tx = db.transaction().expect("failed to initiate transaction");
Expand Down Expand Up @@ -1331,6 +1349,7 @@ async fn open_as_secondary_test() {
&secondary_store,
Some("table"),
&ReadWriteOptions::default(),
false,
)
.unwrap();

Expand Down Expand Up @@ -1482,7 +1501,7 @@ fn open_map<P: AsRef<Path>, K, V>(
MetricConf::default(),
&[(cf, default_db_options().options)],
)
.map(|db| DBMap::new(db, &ReadWriteOptions::default(), cf))
.map(|db| DBMap::new(db, &ReadWriteOptions::default(), cf, false))
.expect("failed to open rocksdb")
} else {
DBMap::<K, V>::open(
Expand Down

0 comments on commit 7882f33

Please sign in to comment.