Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use DeltaReader directly to implement Dictionnary::term_ord #1925

Merged
merged 2 commits into from
Mar 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions sstable/src/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::Ordering;
use std::io;
use std::marker::PhantomData;
use std::ops::{Bound, RangeBounds};
Expand Down Expand Up @@ -96,6 +97,14 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
Ok(TSSTable::delta_reader(data))
}

pub(crate) fn sstable_delta_reader_block(
&self,
block_addr: BlockAddr,
) -> io::Result<DeltaReader<'static, TSSTable::ValueReader>> {
let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?;
Ok(TSSTable::delta_reader(data))
}

/// This function returns a file slice covering a set of sstable blocks
/// that include the key range passed in arguments. Optionally returns
/// only block for up to `limit` matching terms.
Expand Down Expand Up @@ -215,13 +224,43 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
};

let mut term_ord = block_addr.first_ordinal;
let mut sstable_reader = self.sstable_reader_block(block_addr)?;
while sstable_reader.advance()? {
if sstable_reader.key() == key_bytes {
return Ok(Some(term_ord));
let mut ok_bytes = 0;
let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?;
while sstable_delta_reader.advance()? {
let prefix_len = sstable_delta_reader.common_prefix_len();
let suffix = sstable_delta_reader.suffix();

match prefix_len.cmp(&ok_bytes) {
Ordering::Less => return Ok(None), // poped bytes already matched => too far
Ordering::Equal => (),
Ordering::Greater => {
// the ok prefix is less than current entry prefix => continue to next elem
term_ord += 1;
continue;
}
}

// we have ok_bytes byte of common prefix, check if this key adds more
for (key_byte, suffix_byte) in key_bytes[ok_bytes..].iter().zip(suffix) {
match suffix_byte.cmp(key_byte) {
Ordering::Less => break, // byte too small
Ordering::Equal => ok_bytes += 1, // new matching byte
Ordering::Greater => return Ok(None), // too far
}
}

if ok_bytes == key_bytes.len() {
if prefix_len + suffix.len() == ok_bytes {
return Ok(Some(term_ord));
} else {
// current key is a prefix of current element, not a match
return Ok(None);
}
}

term_ord += 1;
}

Ok(None)
}

Expand Down Expand Up @@ -456,6 +495,12 @@ mod tests {
slice.restrict(0..0);
assert!(dic.get(b"~~~").unwrap().is_none());
assert!(dic.term_ord(b"~~~").unwrap().is_none());

slice.restrict(0..slice.bytes.len());
// between 1000F and 10010, test case where matched prefix > prefix kept
assert!(dic.term_ord(b"1000G").unwrap().is_none());
// shorter than 10000, tests prefix case
assert!(dic.term_ord(b"1000").unwrap().is_none());
}

#[test]
Expand Down