Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(buffers): Batch delete #7615

Merged
merged 5 commits into from
May 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/vector-core/buffers/src/disk/leveldb_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,14 @@ where
blocked_write_tasks,
read_offset: head,
compacted_offset: 0,
acked: 0,
delete_offset: head,
current_size,
ack_counter,
max_uncompacted_size,
uncompacted_size: 0,
unacked_sizes: VecDeque::new(),
buffer: Vec::new(),
buffer: VecDeque::new(),
last_compaction: Instant::now(),
phantom: PhantomData,
};
Expand Down
59 changes: 37 additions & 22 deletions lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const MIN_TIME_UNCOMPACTED: Duration = Duration::from_secs(60);
///
/// So the disk buffer (indices/keys) is separated into following regions.
/// |--Compacted--|--Deleted--|--Read--|--Unread
/// ^ ^ ^ ^
/// | | | |
/// ^ ^ ^ ^ ^
/// | | |-acked-| |
/// 0 `compacted_offset` | |
/// `delete_offset` |
/// `read_offset`
Expand All @@ -48,6 +48,9 @@ where
pub(crate) compacted_offset: usize,
/// First not deleted key
pub(crate) delete_offset: usize,
/// Number of acked events that haven't been deleted from
/// database. Used for batching deletes.
pub(crate) acked: usize,
/// Reader is notified by Writers through this Waker.
/// Shared with Writers.
pub(crate) write_notifier: Arc<AtomicWaker>,
Expand All @@ -65,7 +68,7 @@ where
/// Sizes in bytes of read, not acked/deleted, events.
pub(crate) unacked_sizes: VecDeque<usize>,
/// Buffer for internal use.
pub(crate) buffer: Vec<Vec<u8>>,
pub(crate) buffer: VecDeque<Vec<u8>>,
/// Limit on uncompacted_size after which we trigger compaction.
pub(crate) max_uncompacted_size: usize,
/// Last time that compaction was triggered.
Expand Down Expand Up @@ -96,18 +99,19 @@ where
// This will usually complete instantly, but in the case of a large
// queue (or a fresh launch of the app), this will have to go to
// disk.
let new_data = tokio::task::block_in_place(|| {
self.db
.value_iter(ReadOptions::new())
.from(&Key(self.read_offset))
.to(&Key(self.read_offset + 100))
.collect()
let mut buffer = std::mem::replace(&mut self.buffer, VecDeque::default());
tokio::task::block_in_place(|| {
buffer.extend(
self.db
.value_iter(ReadOptions::new())
.from(&Key(self.read_offset))
.to(&Key(self.read_offset + 100)),
)
Copy link
Contributor Author

@ktff ktff May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't fully related too batching but it is a performance win which is kinda related.

});
self.buffer = new_data;
self.buffer.reverse(); // so we can pop
self.buffer = buffer;
}

if let Some(value) = self.buffer.pop() {
if let Some(value) = self.buffer.pop_front() {
self.unacked_sizes.push_back(value.len());
self.read_offset += 1;

Expand Down Expand Up @@ -135,6 +139,7 @@ where
{
fn drop(&mut self) {
self.delete_acked();
self.flush(self.current_size.load(Ordering::Acquire));
blt marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -146,7 +151,25 @@ where
let num_to_delete = self.ack_counter.swap(0, Ordering::Relaxed);

if num_to_delete > 0 {
let new_offset = self.delete_offset + num_to_delete;
let size_deleted = self.unacked_sizes.drain(..num_to_delete).sum();
let unread_size = self.current_size.fetch_sub(size_deleted, Ordering::Release);

self.uncompacted_size += size_deleted;
self.acked += num_to_delete;

if self.acked >= 100 {
self.flush(unread_size);
}
}

for task in self.blocked_write_tasks.lock().unwrap().drain(..) {
task.wake();
}
}

fn flush(&mut self, unread_size: usize) {
if self.acked > 0 {
let new_offset = self.delete_offset + self.acked;
assert!(
new_offset <= self.read_offset,
"Tried to ack beyond read offset"
Expand All @@ -161,11 +184,7 @@ where
self.db.write(WriteOptions::new(), &delete_batch).unwrap();

self.delete_offset = new_offset;

let size_deleted = self.unacked_sizes.drain(..num_to_delete).sum();
let unread_size = self.current_size.fetch_sub(size_deleted, Ordering::Release);

self.uncompacted_size += size_deleted;
self.acked = 0;

// Compaction can be triggered in two ways:
// 1. When size of uncompacted is a percentage of total allowed size.
Expand All @@ -186,10 +205,6 @@ where
self.compact();
}
}

for task in self.blocked_write_tasks.lock().unwrap().drain(..) {
task.wake();
}
}

pub(crate) fn compact(&mut self) {
Expand Down