Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[typed store] don't report metrics on deprecated tables #18492

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/sui-core/src/authority/authority_store_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ mod tests {
// open the db to bypass default db options which ignores range tombstones
// so we can read the accurate number of retained versions
&ReadWriteOptions::default(),
false,
)?;
let iter = objects.unbounded_iter();
for (k, _v) in iter {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-open-rpc/spec/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"name": "Apache-2.0",
"url": "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE"
},
"version": "1.28.0"
"version": "1.28.2"
},
"methods": [
{
Expand Down
5 changes: 3 additions & 2 deletions crates/typed-store-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,13 @@ pub fn derive_dbmap_utils_general(input: TokenStream) -> TokenStream {
};
db.map(|d| (d, rwopt_cfs))
}.expect(&format!("Cannot open DB at {:?}", path));
let deprecated_tables = vec![#(stringify!(#deprecated_cfs),)*];
let (
#(
#field_names
),*
) = (#(
DBMap::#inner_types::reopen(&db, Some(stringify!(#cf_names)), rwopt_cfs.get(stringify!(#cf_names)).unwrap_or(&typed_store::rocks::ReadWriteOptions::default())).expect(&format!("Cannot open {} CF.", stringify!(#cf_names))[..])
DBMap::#inner_types::reopen(&db, Some(stringify!(#cf_names)), rwopt_cfs.get(stringify!(#cf_names)).unwrap_or(&typed_store::rocks::ReadWriteOptions::default()), remove_deprecated_tables && deprecated_tables.contains(&stringify!(#cf_names))).expect(&format!("Cannot open {} CF.", stringify!(#cf_names))[..])
),*);

if as_secondary_with_path.is_none() && remove_deprecated_tables {
Expand Down Expand Up @@ -887,7 +888,7 @@ pub fn derive_sallydb_general(input: TokenStream) -> TokenStream {
#field_names
),*
) = (#(
SallyColumn::RocksDB((DBMap::#inner_types::reopen(&db, Some(stringify!(#field_names)), rwopt_cfs.get(stringify!(#field_names)).unwrap_or(&typed_store::rocks::ReadWriteOptions::default())).expect(&format!("Cannot open {} CF.", stringify!(#field_names))[..]), typed_store::sally::SallyConfig::default()))
SallyColumn::RocksDB((DBMap::#inner_types::reopen(&db, Some(stringify!(#field_names)), rwopt_cfs.get(stringify!(#field_names)).unwrap_or(&typed_store::rocks::ReadWriteOptions::default()), false).expect(&format!("Cannot open {} CF.", stringify!(#field_names))[..]), typed_store::sally::SallyConfig::default()))
),*);

Self {
Expand Down
58 changes: 33 additions & 25 deletions crates/typed-store/src/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ macro_rules! reopen {
( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
(
$(
DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default()).expect(&format!("Cannot open {} CF.", $cf)[..])
DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
),*
)
};
Expand Down Expand Up @@ -727,32 +727,39 @@ pub struct DBMap<K, V> {
unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}

impl<K, V> DBMap<K, V> {
pub(crate) fn new(db: Arc<RocksDB>, opts: &ReadWriteOptions, opt_cf: &str) -> Self {
pub(crate) fn new(
db: Arc<RocksDB>,
opts: &ReadWriteOptions,
opt_cf: &str,
is_deprecated: bool,
) -> Self {
let db_cloned = db.clone();
let db_metrics = DBMetrics::get();
let db_metrics_cloned = db_metrics.clone();
let cf = opt_cf.to_string();
let (sender, mut recv) = tokio::sync::oneshot::channel();
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_millis(CF_METRICS_REPORT_PERIOD_MILLIS));
loop {
tokio::select! {
_ = interval.tick() => {
let db = db_cloned.clone();
let cf = cf.clone();
let db_metrics = db_metrics.clone();
if let Err(e) = tokio::task::spawn_blocking(move || {
Self::report_metrics(&db, &cf, &db_metrics);
}).await {
error!("Failed to log metrics with error: {}", e);
if !is_deprecated {
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_millis(CF_METRICS_REPORT_PERIOD_MILLIS));
loop {
tokio::select! {
_ = interval.tick() => {
let db = db_cloned.clone();
let cf = cf.clone();
let db_metrics = db_metrics.clone();
if let Err(e) = tokio::task::spawn_blocking(move || {
Self::report_metrics(&db, &cf, &db_metrics);
}).await {
error!("Failed to log metrics with error: {}", e);
}
}
_ = &mut recv => break,
}
_ = &mut recv => break,
}
}
debug!("Returning the cf metric logging task for DBMap: {}", &cf);
});
debug!("Returning the cf metric logging task for DBMap: {}", &cf);
});
}
DBMap {
rocksdb: db.clone(),
opts: opts.clone(),
Expand Down Expand Up @@ -782,7 +789,7 @@ impl<K, V> DBMap<K, V> {
let cf_key = opt_cf.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME);
let cfs = vec![cf_key];
let rocksdb = open_cf(path, db_options, metric_conf, &cfs)?;
Ok(DBMap::new(rocksdb, rw_options, cf_key))
Ok(DBMap::new(rocksdb, rw_options, cf_key, false))
}

/// Reopens an open database as a typed map operating under a specific column family.
Expand All @@ -800,8 +807,8 @@ impl<K, V> DBMap<K, V> {
/// /// Open the DB with all needed column families first.
/// let rocks = open_cf(tempdir().unwrap(), None, MetricConf::default(), &["First_CF", "Second_CF"]).unwrap();
/// /// Attach the column families to specific maps.
/// let db_cf_1 = DBMap::<u32,u32>::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default()).expect("Failed to open storage");
/// let db_cf_2 = DBMap::<u32,u32>::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default()).expect("Failed to open storage");
/// let db_cf_1 = DBMap::<u32,u32>::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false).expect("Failed to open storage");
/// let db_cf_2 = DBMap::<u32,u32>::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false).expect("Failed to open storage");
/// Ok(())
/// }
/// ```
Expand All @@ -810,6 +817,7 @@ impl<K, V> DBMap<K, V> {
db: &Arc<RocksDB>,
opt_cf: Option<&str>,
rw_options: &ReadWriteOptions,
is_deprecated: bool,
) -> Result<Self, TypedStoreError> {
let cf_key = opt_cf
.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
Expand All @@ -818,7 +826,7 @@ impl<K, V> DBMap<K, V> {
db.cf_handle(&cf_key)
.ok_or_else(|| TypedStoreError::UnregisteredColumn(cf_key.clone()))?;

Ok(DBMap::new(db.clone(), rw_options, &cf_key))
Ok(DBMap::new(db.clone(), rw_options, &cf_key, is_deprecated))
}

pub fn batch(&self) -> DBBatch {
Expand Down Expand Up @@ -1278,11 +1286,11 @@ impl<K, V> DBMap<K, V> {
/// async fn main() -> Result<(), Error> {
/// let rocks = open_cf(tempfile::tempdir().unwrap(), None, MetricConf::default(), &["First_CF", "Second_CF"]).unwrap();
///
/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default())
/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
/// .expect("Failed to open storage");
/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
///
/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default())
/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
/// .expect("Failed to open storage");
/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
///
Expand Down
53 changes: 36 additions & 17 deletions crates/typed-store/src/rocks/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn test_reopen(#[values(true, false)] is_transactional: bool) {
.expect("Failed to insert");
db
};
let db = DBMap::<u32, String>::reopen(&arc.rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<u32, String>::reopen(&arc.rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");
assert!(db
.contains_key(&123456789)
Expand Down Expand Up @@ -174,7 +174,7 @@ async fn test_reopen_macro() {
#[tokio::test]
async fn test_wrong_reopen(#[values(true, false)] is_transactional: bool) {
let rocks = open_rocksdb(temp_dir(), &["foo", "bar", "baz"], is_transactional);
let db = DBMap::<u8, u8>::reopen(&rocks, Some("quux"), &ReadWriteOptions::default());
let db = DBMap::<u8, u8>::reopen(&rocks, Some("quux"), &ReadWriteOptions::default(), false);
assert!(db.is_err());
}

Expand Down Expand Up @@ -567,12 +567,22 @@ async fn test_insert_batch(#[values(true, false)] is_transactional: bool) {
async fn test_insert_batch_across_cf(#[values(true, false)] is_transactional: bool) {
let rocks = open_rocksdb(temp_dir(), &["First_CF", "Second_CF"], is_transactional);

let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default())
.expect("Failed to open storage");
let db_cf_1 = DBMap::reopen(
&rocks,
Some("First_CF"),
&ReadWriteOptions::default(),
false,
)
.expect("Failed to open storage");
let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));

let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default())
.expect("Failed to open storage");
let db_cf_2 = DBMap::reopen(
&rocks,
Some("Second_CF"),
&ReadWriteOptions::default(),
false,
)
.expect("Failed to open storage");
let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));

let mut batch = db_cf_1.batch();
Expand Down Expand Up @@ -600,14 +610,22 @@ async fn test_insert_batch_across_different_db(#[values(true, false)] is_transac
let rocks = open_rocksdb(temp_dir(), &["First_CF", "Second_CF"], is_transactional);
let rocks2 = open_rocksdb(temp_dir(), &["First_CF", "Second_CF"], is_transactional);

let db_cf_1: DBMap<i32, String> =
DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default())
.expect("Failed to open storage");
let db_cf_1: DBMap<i32, String> = DBMap::reopen(
&rocks,
Some("First_CF"),
&ReadWriteOptions::default(),
false,
)
.expect("Failed to open storage");
let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));

let db_cf_2: DBMap<i32, String> =
DBMap::reopen(&rocks2, Some("Second_CF"), &ReadWriteOptions::default())
.expect("Failed to open storage");
let db_cf_2: DBMap<i32, String> = DBMap::reopen(
&rocks2,
Some("Second_CF"),
&ReadWriteOptions::default(),
false,
)
.expect("Failed to open storage");
let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));

assert!(db_cf_1
Expand Down Expand Up @@ -1089,7 +1107,7 @@ async fn test_transactional() {
let opt = rocksdb::Options::default();
let rocksdb =
open_cf_opts_transactional(path, None, MetricConf::default(), &[("cf", opt)]).unwrap();
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");

// transaction is used instead
Expand All @@ -1113,7 +1131,7 @@ async fn test_transaction_snapshot() {
let opt = rocksdb::Options::default();
let rocksdb =
open_cf_opts_transactional(path, None, MetricConf::default(), &[("cf", opt)]).unwrap();
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");

// transaction without set_snapshot succeeds when extraneous write occurs before transaction
Expand Down Expand Up @@ -1200,7 +1218,7 @@ async fn test_retry_transaction() {
let opt = rocksdb::Options::default();
let rocksdb =
open_cf_opts_transactional(path, None, MetricConf::default(), &[("cf", opt)]).unwrap();
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");

let mut conflicts = 0;
Expand Down Expand Up @@ -1260,7 +1278,7 @@ async fn test_transaction_read_your_write() {
let opt = rocksdb::Options::default();
let rocksdb =
open_cf_opts_transactional(path, None, MetricConf::default(), &[("cf", opt)]).unwrap();
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default())
let db = DBMap::<String, String>::reopen(&rocksdb, None, &ReadWriteOptions::default(), false)
.expect("Failed to re-open storage");
db.insert(&key1.to_string(), &"1".to_string()).unwrap();
let mut tx = db.transaction().expect("failed to initiate transaction");
Expand Down Expand Up @@ -1331,6 +1349,7 @@ async fn open_as_secondary_test() {
&secondary_store,
Some("table"),
&ReadWriteOptions::default(),
false,
)
.unwrap();

Expand Down Expand Up @@ -1482,7 +1501,7 @@ fn open_map<P: AsRef<Path>, K, V>(
MetricConf::default(),
&[(cf, default_db_options().options)],
)
.map(|db| DBMap::new(db, &ReadWriteOptions::default(), cf))
.map(|db| DBMap::new(db, &ReadWriteOptions::default(), cf, false))
.expect("failed to open rocksdb")
} else {
DBMap::<K, V>::open(
Expand Down
Loading