Skip to content

Commit

Permalink
[typed store] iterator cleanup: remove values and few other deprecate…
Browse files Browse the repository at this point in the history
…d methods (#21349)

## Description 

removes values iterator and few other deprecated methods

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
phoenix-o authored Feb 25, 2025
1 parent 0269b42 commit 8d3abb7
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 326 deletions.
19 changes: 8 additions & 11 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ use typed_store::Map;
use typed_store::{
rocks::{default_db_options, DBBatch, DBMap, DBOptions, MetricConf},
traits::{TableSummary, TypedStoreDebug},
TypedStoreError,
};

use super::authority_store_tables::ENV_VAR_LOCKS_BLOCK_CACHE_SIZE;
Expand Down Expand Up @@ -2421,24 +2420,22 @@ impl AuthorityPerEpochStore {

pub fn get_capabilities_v1(&self) -> SuiResult<Vec<AuthorityCapabilitiesV1>> {
assert!(!self.protocol_config.authority_capabilities_v2());
let result: Result<Vec<AuthorityCapabilitiesV1>, TypedStoreError> = self
Ok(self
.tables()?
.authority_capabilities
.values()
.map_into()
.collect();
Ok(result?)
.safe_iter()
.map(|item| item.map(|(_, v)| v))
.collect::<Result<Vec<_>, _>>()?)
}

pub fn get_capabilities_v2(&self) -> SuiResult<Vec<AuthorityCapabilitiesV2>> {
assert!(self.protocol_config.authority_capabilities_v2());
let result: Result<Vec<AuthorityCapabilitiesV2>, TypedStoreError> = self
Ok(self
.tables()?
.authority_capabilities_v2
.values()
.map_into()
.collect();
Ok(result?)
.safe_iter()
.map(|item| item.map(|(_, v)| v))
.collect::<Result<Vec<_>, _>>()?)
}

fn record_jwk_vote(
Expand Down
140 changes: 1 addition & 139 deletions crates/typed-store/src/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
pub mod errors;
pub(crate) mod iter;
pub(crate) mod safe_iter;
pub(crate) mod values;

use self::{iter::Iter, values::Values};
use self::iter::Iter;
use crate::rocks::errors::typed_store_err_from_bcs_err;
use crate::rocks::errors::typed_store_err_from_bincode_err;
use crate::rocks::errors::typed_store_err_from_rocks_err;
Expand All @@ -17,7 +16,6 @@ use crate::{
};
use bincode::Options;
use collectable::TryExtend;
use itertools::Itertools;
use prometheus::{Histogram, HistogramTimer};
use rocksdb::properties::num_files_at_level;
use rocksdb::{
Expand Down Expand Up @@ -337,15 +335,6 @@ impl RocksDB {
delegate_call!(self.drop_cf(name))
}

pub fn delete_file_in_range<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
from: K,
to: K,
) -> Result<(), rocksdb::Error> {
delegate_call!(self.delete_file_in_range_cf(cf, from, to))
}

pub fn delete_cf<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
Expand Down Expand Up @@ -1729,19 +1718,6 @@ impl<'a> DBTransaction<'a> {
)
}

pub fn values<K: DeserializeOwned, V: DeserializeOwned>(
&'a self,
db: &DBMap<K, V>,
) -> Values<'a, V> {
let mut db_iter = RocksDBRawIter::OptimisticTransaction(
self.transaction
.raw_iterator_cf_opt(&db.cf(), db.opts.readopts()),
);
db_iter.seek_to_first();

Values::new(db_iter)
}

pub fn commit(self) -> Result<(), TypedStoreError> {
fail_point!("transaction-commit");
self.transaction.commit().map_err(|e| match e.kind() {
Expand Down Expand Up @@ -1835,7 +1811,6 @@ where
type Error = TypedStoreError;
type Iterator = Iter<'a, K, V>;
type SafeIterator = SafeIter<'a, K, V>;
type Values = Values<'a, V>;

#[instrument(level = "trace", skip_all, err)]
fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
Expand Down Expand Up @@ -1901,40 +1876,6 @@ where
}
}

#[instrument(level = "trace", skip_all, err)]
fn get_raw_bytes(&self, key: &K) -> Result<Option<Vec<u8>>, TypedStoreError> {
let _timer = self
.db_metrics
.op_metrics
.rocksdb_get_latency_seconds
.with_label_values(&[&self.cf])
.start_timer();
let perf_ctx = if self.get_sample_interval.sample() {
Some(RocksDBPerfContext)
} else {
None
};
let key_buf = be_fix_int_ser(key)?;
let res = self
.rocksdb
.get_pinned_cf_opt(&self.cf(), &key_buf, &self.opts.readopts())
.map_err(typed_store_err_from_rocks_err)?;
self.db_metrics
.op_metrics
.rocksdb_get_bytes
.with_label_values(&[&self.cf])
.observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
if perf_ctx.is_some() {
self.db_metrics
.read_perf_ctx_metrics
.report_metrics(&self.cf);
}
match res {
Some(data) => Ok(Some(data.to_vec())),
None => Ok(None),
}
}

#[instrument(level = "trace", skip_all, err)]
fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
let timer = self
Expand Down Expand Up @@ -2012,23 +1953,6 @@ where
Ok(())
}

/// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
/// by immediately deleting any sst files whose key range overlaps with the range.
/// Files whose range only partially overlaps with the range are not deleted.
/// This can be useful for quickly removing a large amount of data without having
/// to delete individual keys. Only files at level 1 or higher are considered (
/// Level 0 files are skipped). It doesn't guarantee that all keys in the range are
/// deleted, as there might be keys in files that weren't entirely within the range.
#[instrument(level = "trace", skip_all, err)]
fn delete_file_in_range(&self, from: &K, to: &K) -> Result<(), TypedStoreError> {
let from_buf = be_fix_int_ser(from.borrow())?;
let to_buf = be_fix_int_ser(to.borrow())?;
self.rocksdb
.delete_file_in_range(&self.cf(), from_buf, to_buf)
.map_err(typed_store_err_from_rocks_err)?;
Ok(())
}

/// This method first drops the existing column family and then creates a new one
/// with the same name. The two operations are not atomic and hence it is possible
/// to get into a race condition where the column family has been dropped but new
Expand Down Expand Up @@ -2177,32 +2101,6 @@ where
)
}

fn values(&'a self) -> Self::Values {
let mut db_iter = self
.rocksdb
.raw_iterator_cf(&self.cf(), self.opts.readopts());
db_iter.seek_to_first();

Values::new(db_iter)
}

/// Returns a vector of raw values corresponding to the keys provided.
#[instrument(level = "trace", skip_all, err)]
fn multi_get_raw_bytes<J>(
&self,
keys: impl IntoIterator<Item = J>,
) -> Result<Vec<Option<Vec<u8>>>, TypedStoreError>
where
J: Borrow<K>,
{
let results = self
.multi_get_pinned(keys)?
.into_iter()
.map(|val| val.map(|v| v.to_vec()))
.collect();
Ok(results)
}

/// Returns a vector of values corresponding to the keys provided.
#[instrument(level = "trace", skip_all, err)]
fn multi_get<J>(
Expand All @@ -2226,42 +2124,6 @@ where
values_parsed
}

/// Returns a vector of values corresponding to the keys provided.
#[instrument(level = "trace", skip_all, err)]
fn chunked_multi_get<J>(
&self,
keys: impl IntoIterator<Item = J>,
chunk_size: usize,
) -> Result<Vec<Option<V>>, TypedStoreError>
where
J: Borrow<K>,
{
let cf = self.cf();
let keys_bytes = keys
.into_iter()
.map(|k| (&cf, be_fix_int_ser(k.borrow()).unwrap()));
let chunked_keys = keys_bytes.into_iter().chunks(chunk_size);
let snapshot = self.snapshot()?;
let mut results = vec![];
for chunk in chunked_keys.into_iter() {
let chunk_result = snapshot.multi_get_cf(chunk);
let values_parsed: Result<Vec<_>, TypedStoreError> = chunk_result
.into_iter()
.map(|value_byte| {
let value_byte = value_byte.map_err(typed_store_err_from_rocks_err)?;
match value_byte {
Some(data) => Ok(Some(
bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
)),
None => Ok(None),
}
})
.collect();
results.extend(values_parsed?);
}
Ok(results)
}

/// Convenience method for batch insertion
#[instrument(level = "trace", skip_all, err)]
fn multi_insert<J, U>(
Expand Down
56 changes: 0 additions & 56 deletions crates/typed-store/src/rocks/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,27 +205,6 @@ async fn test_get(#[values(true, false)] is_transactional: bool) {
assert_eq!(None, db.get(&000000000).expect("Failed to get"));
}

#[rstest]
#[tokio::test]
async fn test_get_raw(#[values(true, false)] is_transactional: bool) {
let db = open_map(temp_dir(), None, is_transactional);

db.insert(&123456789, &"123456789".to_string())
.expect("Failed to insert");

let val_bytes = db
.get_raw_bytes(&123456789)
.expect("Failed to get_raw_bytes")
.unwrap();

assert_eq!(bcs::to_bytes(&"123456789".to_string()).unwrap(), val_bytes);
assert_eq!(
None,
db.get_raw_bytes(&000000000)
.expect("Failed to get_raw_bytes")
);
}

#[rstest]
#[tokio::test]
async fn test_multi_get(#[values(true, false)] is_transactional: bool) {
Expand All @@ -244,26 +223,6 @@ async fn test_multi_get(#[values(true, false)] is_transactional: bool) {
assert_eq!(result[2], None);
}

#[rstest]
#[tokio::test]
async fn test_chunked_multi_get(#[values(true, false)] is_transactional: bool) {
let db = open_map(temp_dir(), None, is_transactional);

db.insert(&123, &"123".to_string())
.expect("Failed to insert");
db.insert(&456, &"456".to_string())
.expect("Failed to insert");

let result = db
.chunked_multi_get([123, 456, 789], 1)
.expect("Failed to chunk multi get");

assert_eq!(result.len(), 3);
assert_eq!(result[0], Some("123".to_string()));
assert_eq!(result[1], Some("456".to_string()));
assert_eq!(result[2], None);
}

#[rstest]
#[tokio::test]
async fn test_skip(
Expand Down Expand Up @@ -385,19 +344,6 @@ async fn test_iter_reverse(
assert_eq!(Some((2, "2".to_string())), iter.next());
}

#[rstest]
#[tokio::test]
async fn test_values(#[values(true, false)] is_transactional: bool) {
let db = open_map(temp_dir(), None, is_transactional);

db.insert(&123456789, &"123456789".to_string())
.expect("Failed to insert");

let mut values = db.values();
assert_eq!(Some(Ok("123456789".to_string())), values.next());
assert_eq!(None, values.next());
}

#[rstest]
#[tokio::test]
async fn test_try_extend(#[values(true, false)] is_transactional: bool) {
Expand Down Expand Up @@ -1035,8 +981,6 @@ async fn test_transaction_read_your_write() {
.unwrap(),
vec![Some("11".to_string()), None]
);
let values: Vec<_> = tx.values(&db).collect();
assert_eq!(values, vec![Ok("11".to_string())]);
assert!(tx.commit().is_ok());
}

Expand Down
44 changes: 0 additions & 44 deletions crates/typed-store/src/rocks/values.rs

This file was deleted.

Loading

0 comments on commit 8d3abb7

Please sign in to comment.