Skip to content

Commit

Permalink
pw_multisink: Interface to read entries count
Browse files Browse the repository at this point in the history
Bug: 355104976

There is use case of pw_multisink that drain the sink in a "batched"
pattern, where it is drained only when close to full. The draining
thread is put to sleep until a close-to-full signal.
In such workload, the draining thread needs to know how much entries it
needs to pop out in each wakeup. Otherwise, if 1) entry pop speed and
push speed is close, and 2) remaining entries count in pw_multisink is
low, the draining thread will temporaly "stream". "stream" does not
fully utilize resources for batched pattern, e.g. allocated buffers.

Test: Added unit tests
Change-Id: I440eeb90af51afb04ee9bbbc5cb3b1cd90607241
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/226351
Lint: Lint 🤖 <[email protected]>
Reviewed-by: Taylor Cramer <[email protected]>
Commit-Queue: Taylor Cramer <[email protected]>
  • Loading branch information
Jason0214 authored and CQ Bot Account committed Jul 29, 2024
1 parent eb94603 commit 6b0942c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pw_multisink/multisink_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,18 +539,23 @@ TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
const uint32_t ingress_drops = 10;
multisink_.HandleDropped(ingress_drops);
multisink_.HandleEntry(kMessage);
EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);

// Detaching and attaching drain should report the same drops.
multisink_.DetachDrain(drains_[0]);
multisink_.AttachDrain(drains_[0]);
EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 1u);
VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
}

TEST_F(MultiSinkTest, DrainUnreadEntriesSize) {
multisink_.AttachDrain(drains_[0]);

EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
multisink_.HandleEntry(kMessage);
multisink_.HandleEntry(kMessage);
const size_t unread_entries_size = drains_[0].GetUnreadEntriesSize();
Expand All @@ -572,6 +577,8 @@ TEST_F(MultiSinkTest, DrainUnreadEntriesSize) {
/*expected_ingress_drop_count=*/0);
EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
EXPECT_EQ(drains_[0].GetUnreadEntriesCount(), 0u);
EXPECT_EQ(drains_[1].GetUnreadEntriesCount(), 2u);
}

TEST(UnsafeGetUnreadEntriesSize, ReadFromListener) {
Expand Down
8 changes: 8 additions & 0 deletions pw_multisink/public/pw_multisink/multisink.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ class MultiSink {
return reader_.EntriesSize();
}

// Return number of unread entries in the sink for this drain. This is a
// thread-safe version and must not be used inside a
// Listener::OnNewEntryAvailable() to avoid deadlocks.
size_t GetUnreadEntriesCount() const PW_LOCKS_EXCLUDED(multisink_->lock_) {
std::lock_guard lock(multisink_->lock_);
return reader_.EntryCount();
}

protected:
friend MultiSink;

Expand Down

0 comments on commit 6b0942c

Please sign in to comment.