Skip to content

Commit

Permalink
use get batch to impl get range
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelLeeHZ committed Jun 6, 2023
1 parent 175038e commit 55127ae
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 9 deletions.
21 changes: 12 additions & 9 deletions components/object_store/src/obkv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ use upstream::{
};
use uuid::Uuid;

use uuid::Uuid;

use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
obkv::meta::{MetaManager, ObkvObjectMeta, OBJECT_STORE_META},
Expand Down Expand Up @@ -277,11 +275,11 @@ impl<T: TableKv> ObkvObjectStore<T> {
let table_name = self.pick_shard_table(location);
// TODO: Let table_kv provide a api `get_batch` to avoid extra IO operations.
let mut futures = FuturesOrdered::new();
for path in meta.parts {
for part_key in meta.parts {
let client = self.client.clone();
let table_name = table_name.to_string();
let future = async move {
match client.get(&table_name, path.as_bytes()) {
match client.get(&table_name, part_key.as_bytes()) {
Ok(res) => Ok(Bytes::from(res.unwrap())),
Err(err) => Err(StoreError::Generic {
store: OBKV,
Expand Down Expand Up @@ -461,16 +459,21 @@ impl<T: TableKv> ObjectStore for ObkvObjectStore<T> {
source,
})?;

for (index, key) in covered_parts.part_keys.iter().enumerate() {
let part_bytes = self
.client
.get(table_name, key.as_bytes())
let keys: Vec<&[u8]> = covered_parts
.part_keys
.iter()
.map(|key| key.as_bytes())
.collect();
let values =
self.client
.get_batch(table_name, keys)
.map_err(|source| StoreError::NotFound {
path: location.to_string(),
source: Box::new(source),
})?;

if let Some(bytes) = part_bytes {
for (index, key) in covered_parts.part_keys.iter().enumerate() {
if let Some(bytes) = &values[index] {
let mut begin = 0;
let mut end = bytes.len();
if index == 0 {
Expand Down
7 changes: 7 additions & 0 deletions components/table_kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ pub trait TableKv: Clone + Send + Sync + fmt::Debug + 'static {
/// Get value by key from table with `table_name`.
fn get(&self, table_name: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;

/// Get a batch of value by keys from table with `table_name`
fn get_batch(
&self,
table_name: &str,
keys: Vec<&[u8]>,
) -> Result<Vec<Option<Vec<u8>>>, Self::Error>;

/// Delete data by key from table with `table_name`.
fn delete(&self, table_name: &str, key: &[u8]) -> Result<(), Self::Error>;

Expand Down
17 changes: 17 additions & 0 deletions components/table_kv/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,23 @@ impl TableKv for MemoryImpl {
Ok(table.get(key))
}

fn get_batch(
&self,
table_name: &str,
keys: Vec<&[u8]>,
) -> std::result::Result<Vec<Option<Vec<u8>>>, Self::Error> {
let table = self
.find_table(table_name)
.context(TableNotFound { table_name })?;

let mut result = Vec::with_capacity(keys.len());
for key in keys {
result.push(table.get(key));
}

Ok(result)
}

fn delete(&self, table_name: &str, key: &[u8]) -> std::result::Result<(), Self::Error> {
let table = self
.find_table(table_name)
Expand Down
33 changes: 33 additions & 0 deletions components/table_kv/src/obkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Failed to get batch value from table, table:{table_name}, err:{source}.\nBacktrace:\n{backtrace}"
))]
GetBatchValue {
table_name: String,
source: obkv::error::Error,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to delete data from table, table:{}, err:{}.\nBacktrace:\n{}",
table_name,
Expand Down Expand Up @@ -501,6 +510,30 @@ impl TableKv for ObkvImpl {
Ok(values.remove(VALUE_COLUMN_NAME).map(Value::as_bytes))
}

fn get_batch(&self, table_name: &str, keys: Vec<&[u8]>) -> Result<Vec<Option<Vec<u8>>>> {
let mut batch_ops = ObTableBatchOperation::with_ops_num_raw(keys.len());
let mut batch_res = Vec::with_capacity(keys.len());

for key in keys {
batch_ops.get(bytes_to_values(key), vec![VALUE_COLUMN_NAME.to_string()]);
}

let result = self
.client
.execute_batch(table_name, batch_ops)
.context(GetBatchValue { table_name })?;

for table_ops_result in result {
match table_ops_result {
TableOpResult::RetrieveRows(mut values) => {
batch_res.push(values.remove(VALUE_COLUMN_NAME).map(Value::as_bytes))
}
TableOpResult::AffectedRows(_) => {}
}
}
Ok(batch_res)
}

fn delete(&self, table_name: &str, key: &[u8]) -> std::result::Result<(), Self::Error> {
self.client
.delete(table_name, bytes_to_values(key))
Expand Down

0 comments on commit 55127ae

Please sign in to comment.