Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(buffers): Batch delete #7615

Merged
merged 5 commits into from
May 30, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/vector-core/buffers/src/disk/leveldb_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,14 @@ where
blocked_write_tasks,
read_offset: head,
compacted_offset: 0,
acked: 0,
delete_offset: head,
current_size,
ack_counter,
max_uncompacted_size,
uncompacted_size: 0,
unacked_sizes: VecDeque::new(),
buffer: Vec::new(),
buffer: VecDeque::new(),
last_compaction: Instant::now(),
phantom: PhantomData,
};
Expand Down
75 changes: 49 additions & 26 deletions lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const MIN_TIME_UNCOMPACTED: Duration = Duration::from_secs(60);
///
/// So the disk buffer (indices/keys) is separated into following regions.
/// |--Compacted--|--Deleted--|--Read--|--Unread
/// ^ ^ ^ ^
/// | | | |
/// ^ ^ ^ ^ ^
/// | | |-acked-| |
/// 0 `compacted_offset` | |
/// `delete_offset` |
/// `read_offset`
Expand All @@ -48,6 +48,9 @@ where
pub(crate) compacted_offset: usize,
/// First not deleted key
pub(crate) delete_offset: usize,
/// Number of acked events that haven't been deleted from
/// database. Used for batching deletes.
pub(crate) acked: usize,
/// Reader is notified by Writers through this Waker.
/// Shared with Writers.
pub(crate) write_notifier: Arc<AtomicWaker>,
Expand All @@ -65,7 +68,7 @@ where
/// Sizes in bytes of read, not acked/deleted, events.
pub(crate) unacked_sizes: VecDeque<usize>,
/// Buffer for internal use.
pub(crate) buffer: Vec<Vec<u8>>,
pub(crate) buffer: VecDeque<Vec<u8>>,
/// Limit on uncompacted_size after which we trigger compaction.
pub(crate) max_uncompacted_size: usize,
/// Last time that compaction was triggered.
Expand All @@ -90,24 +93,29 @@ where
// write.
self.write_notifier.register(cx.waker());

self.delete_acked();
let unread_size = self.delete_acked();

if self.acked >= 100 {
self.flush(unread_size);
}

if self.buffer.is_empty() {
// This will usually complete instantly, but in the case of a large
// queue (or a fresh launch of the app), this will have to go to
// disk.
let new_data = tokio::task::block_in_place(|| {
self.db
.value_iter(ReadOptions::new())
.from(&Key(self.read_offset))
.to(&Key(self.read_offset + 100))
.collect()
let mut buffer = std::mem::take(&mut self.buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about why we're doing mem::take here. What's going on is we're allocating a fresh, zero-length VecDeque, extending the taken buffer by the contents of the value_iter and then placing the taken buffer back into self.buffer, trashing the freshly allocated VecDeque. Is this to work around a lifetime issue with sending the buffer down into block_in_place?

Could we not move the call to extend to outside the block_in_place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to work around a lifetime issue with sending the buffer down into block_in_place?

Exactly. The compiler is having issue with mutable and immutable borrow at the same time from self in that function, while creating fresh VecDeque is just copying bytes around, there is even a possibility that the compiler is optimizing it away.

Could we not move the call to extend to outside the block_in_place?

By having extend in block_in_place we also cover the possibility that the iterator is lazy and actually accessing disk while iterating. I haven't checked for this behavior but either way it seems as a good insurance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. At this point it seems like there are a couple of suppositions I'd like to get empirical data for:

  • the compiler is able to optimized the temporary VecDeque (I find this believable)
  • the iterator is lazy and doing extend internal to block_in_place is a performance boon

This function is going to be called enough that I'd like to be sure. We have benchmarks in place for disk buffering specifically, so measuring and approach with and without the temporary would be sufficient to my mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A, it seams VecDeque::new is allocating... yea I'll see what can be changed. At worst I'll extract this change to a separate PR for benchmarking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, here's benchmarks for the disk buffer on my system. I had to rebase with master but otherwise the code here is unchanged.

Screenshot_2021-05-28 buffer-disk Summary - Criterion rs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the benchmark with the following diff applied:

diff --git a/lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs b/lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs
index 840cef35c..0f24033e8 100644
--- a/lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs
+++ b/lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs
@@ -106,16 +106,14 @@ where
             // This will usually complete instantly, but in the case of a large
             // queue (or a fresh launch of the app), this will have to go to
             // disk.
-            let mut buffer = std::mem::take(&mut self.buffer);
-            tokio::task::block_in_place(|| {
-                buffer.extend(
-                    self.db
-                        .value_iter(ReadOptions::new())
-                        .from(&Key(self.read_offset))
-                        .to(&Key(self.read_offset + 100)),
-                );
+            let vals: VecDeque<Vec<u8>> = tokio::task::block_in_place(|| {
+                self.db
+                    .value_iter(ReadOptions::new())
+                    .from(&Key(self.read_offset))
+                    .to(&Key(self.read_offset + 100))
+                    .collect()
             });
-            self.buffer = buffer;
+            self.buffer.extend(vals);
         }

         if let Some(value) = self.buffer.pop_front() {

Screenshot_2021-05-28 buffer-disk Summary - Criterion rs(1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From these measurements it looks like the median runtimes don't shift all that much for both write-then-read and write-and-read but the distribution is more favorable to the diff version I posted, especially in write-and-read where we'd be hitting the empty condition frequently. You can even see that the modified version I made did fewer overall iterations per benchmark because the results were more stable, which tracks. Removing an allocation from a hot path will give that path more consistent runtime, generally speaking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compiler is having issue with mutable and immutable borrow at the same time from self

I've digged for a bit and found it's caused by Pin. That's fixed now so this works as expected, without intermediates.

tokio::task::block_in_place(|| {
buffer.extend(
self.db
.value_iter(ReadOptions::new())
.from(&Key(self.read_offset))
.to(&Key(self.read_offset + 100)),
);
});
self.buffer = new_data;
self.buffer.reverse(); // so we can pop
self.buffer = buffer;
}

if let Some(value) = self.buffer.pop() {
if let Some(value) = self.buffer.pop_front() {
self.unacked_sizes.push_back(value.len());
self.read_offset += 1;

Expand All @@ -134,19 +142,42 @@ where
T: Send + Sync + Unpin,
{
fn drop(&mut self) {
self.delete_acked();
let unread_size = self.delete_acked();
self.flush(unread_size);
ktff marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl<T> Reader<T>
where
T: Send + Sync + Unpin,
{
fn delete_acked(&mut self) {
/// Returns number of bytes to be read.
fn delete_acked(&mut self) -> usize {
let num_to_delete = self.ack_counter.swap(0, Ordering::Relaxed);

if num_to_delete > 0 {
let new_offset = self.delete_offset + num_to_delete;
let unread_size = if num_to_delete > 0 {
let size_deleted = self.unacked_sizes.drain(..num_to_delete).sum();
let unread_size =
self.current_size.fetch_sub(size_deleted, Ordering::Release) - size_deleted;

self.uncompacted_size += size_deleted;
self.acked += num_to_delete;

unread_size
} else {
self.current_size.load(Ordering::Acquire)
};

for task in self.blocked_write_tasks.lock().unwrap().drain(..) {
task.wake();
}

unread_size
}

fn flush(&mut self, unread_size: usize) {
if self.acked > 0 {
let new_offset = self.delete_offset + self.acked;
assert!(
new_offset <= self.read_offset,
"Tried to ack beyond read offset"
Expand All @@ -161,11 +192,7 @@ where
self.db.write(WriteOptions::new(), &delete_batch).unwrap();

self.delete_offset = new_offset;

let size_deleted = self.unacked_sizes.drain(..num_to_delete).sum();
let unread_size = self.current_size.fetch_sub(size_deleted, Ordering::Release);

self.uncompacted_size += size_deleted;
self.acked = 0;

// Compaction can be triggered in two ways:
// 1. When size of uncompacted is a percentage of total allowed size.
Expand All @@ -186,10 +213,6 @@ where
self.compact();
}
}

for task in self.blocked_write_tasks.lock().unwrap().drain(..) {
task.wake();
}
}

pub(crate) fn compact(&mut self) {
Expand Down