Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(buffers): Batch delete #7615

Merged
merged 5 commits into from
May 30, 2021
Merged

fix(buffers): Batch delete #7615

merged 5 commits into from
May 30, 2021

Conversation

ktff
Copy link
Contributor

@ktff ktff commented May 27, 2021

Ref. #7425

Reduces performance impact of deletes. In worst case scenario when sink manages to stay up to date, every event would result in one delete operation which negatively impacted write/read performance. This PR batches deletes to reduce that.

ktff added 2 commits May 27, 2021 16:35
Signed-off-by: ktf <[email protected]>
Signed-off-by: ktf <[email protected]>
@ktff ktff added the domain: buffers Anything related to Vector's memory/disk buffers label May 27, 2021
@ktff ktff self-assigned this May 27, 2021
@ktff ktff requested a review from blt May 27, 2021 14:40
Comment on lines 102 to 109
let mut buffer = std::mem::replace(&mut self.buffer, VecDeque::default());
tokio::task::block_in_place(|| {
buffer.extend(
self.db
.value_iter(ReadOptions::new())
.from(&Key(self.read_offset))
.to(&Key(self.read_offset + 100)),
)
Copy link
Contributor Author

@ktff ktff May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't fully related too batching but it is a performance win which is kinda related.

@ktff ktff mentioned this pull request May 27, 2021
7 tasks
Signed-off-by: ktf <[email protected]>
.from(&Key(self.read_offset))
.to(&Key(self.read_offset + 100))
.collect()
let mut buffer = std::mem::take(&mut self.buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about why we're doing mem::take here. What's going on is we're allocating a fresh, zero-length VecDeque, extending the taken buffer by the contents of the value_iter and then placing the taken buffer back into self.buffer, trashing the freshly allocated VecDeque. Is this to work around a lifetime issue with sending the buffer down into block_in_place?

Could we not move the call to extend to outside the block_in_place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to work around a lifetime issue with sending the buffer down into block_in_place?

Exactly. The compiler is having issue with mutable and immutable borrow at the same time from self in that function, while creating fresh VecDeque is just copying bytes around, there is even a possibility that the compiler is optimizing it away.

Could we not move the call to extend to outside the block_in_place?

By having extend in block_in_place we also cover the possibility that the iterator is lazy and actually accessing disk while iterating. I haven't checked for this behavior but either way it seems as a good insurance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. At this point it seems like there are a couple of suppositions I'd like to get empirical data for:

  • the compiler is able to optimized the temporary VecDeque (I find this believable)
  • the iterator is lazy and doing extend internal to block_in_place is a performance boon

This function is going to be called enough that I'd like to be sure. We have benchmarks in place for disk buffering specifically, so measuring and approach with and without the temporary would be sufficient to my mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A, it seams VecDeque::new is allocating... yea I'll see what can be changed. At worst I'll extract this change to a separate PR for benchmarking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, here's benchmarks for the disk buffer on my system. I had to rebase with master but otherwise the code here is unchanged.

Screenshot_2021-05-28 buffer-disk Summary - Criterion rs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the benchmark with the following diff applied:

diff --git a/lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs b/lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs
index 840cef35c..0f24033e8 100644
--- a/lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs
+++ b/lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs
@@ -106,16 +106,14 @@ where
             // This will usually complete instantly, but in the case of a large
             // queue (or a fresh launch of the app), this will have to go to
             // disk.
-            let mut buffer = std::mem::take(&mut self.buffer);
-            tokio::task::block_in_place(|| {
-                buffer.extend(
-                    self.db
-                        .value_iter(ReadOptions::new())
-                        .from(&Key(self.read_offset))
-                        .to(&Key(self.read_offset + 100)),
-                );
+            let vals: VecDeque<Vec<u8>> = tokio::task::block_in_place(|| {
+                self.db
+                    .value_iter(ReadOptions::new())
+                    .from(&Key(self.read_offset))
+                    .to(&Key(self.read_offset + 100))
+                    .collect()
             });
-            self.buffer = buffer;
+            self.buffer.extend(vals);
         }

         if let Some(value) = self.buffer.pop_front() {

Screenshot_2021-05-28 buffer-disk Summary - Criterion rs(1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From these measurements it looks like the median runtimes don't shift all that much for both write-then-read and write-and-read but the distribution is more favorable to the diff version I posted, especially in write-and-read where we'd be hitting the empty condition frequently. You can even see that the modified version I made did fewer overall iterations per benchmark because the results were more stable, which tracks. Removing an allocation from a hot path will give that path more consistent runtime, generally speaking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compiler is having issue with mutable and immutable borrow at the same time from self

I've digged for a bit and found it's caused by Pin. That's fixed now so this works as expected, without intermediates.

lib/vector-core/buffers/src/disk/leveldb_buffer/reader.rs Outdated Show resolved Hide resolved
@ktff ktff requested a review from blt May 28, 2021 16:16
Signed-off-by: ktf <[email protected]>
@ktff ktff requested a review from blt May 29, 2021 09:21
Copy link
Contributor

@blt blt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work here.

@ktff ktff merged commit 9b4b77c into master May 30, 2021
@ktff ktff deleted the ktff/disk_batch_delete branch May 30, 2021 08:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: buffers Anything related to Vector's memory/disk buffers
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants