Skip to content

Commit

Permalink
pw_bluetooth_sapphire: Queue ISO frames in stream as needed
Browse files Browse the repository at this point in the history
Establish a robust path for sending frames from an IsoStream to
its client.

The IsoStream will buffer frames and provide them upon request
(calls to ReadNextQueuedIncomingPacket()). If at any time a call
is made and no frames are available, nullptr is returned and
a notification will be sent to the client (via the
IncomingDataHandler) when the next frame is received. This
interface maps easily to a 'hanging get' pattern where an
unsatisfiable read will be followed by a notification when data is
ready.

Bug: http://b/issues/311639690
Test: pw presubmit --step gn_chre_googletest_nanopb_sapphire_build
Change-Id: Iad999735cf15f70cf3b6fa079745c2d25784dddb
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/251332
Lint: Lint 🤖 <[email protected]>
Presubmit-Verified: CQ Bot Account <[email protected]>
Commit-Queue: Josh Conner <[email protected]>
Docs-Not-Needed: Josh Conner <[email protected]>
Reviewed-by: Jason Graffius <[email protected]>
  • Loading branch information
josh-conner authored and CQ Bot Account committed Nov 27, 2024
1 parent 683ebcf commit e431fab
Showing 5 changed files with 165 additions and 6 deletions.
38 changes: 34 additions & 4 deletions pw_bluetooth_sapphire/host/iso/iso_stream.cc
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ class IsoStreamImpl final : public IsoStream {
return cis_hci_handle_;
}
void Close() override;
std::unique_ptr<IsoDataPacket> ReadNextQueuedIncomingPacket() override;
IsoStream::WeakPtr GetWeakPtr() override { return weak_self_.GetWeakPtr(); }

// IsoDataChannel::ConnectionInterface override
@@ -71,6 +72,12 @@ class IsoStreamImpl final : public IsoStream {

IncomingDataHandler on_incoming_data_available_cb_;

// When true, we will send a notification to the client when the next packet
// arrives. Otherwise, we will just queue it up.
bool inbound_client_is_waiting_ = false;

std::queue<std::unique_ptr<std::vector<std::byte>>> incoming_data_queue_;

// Called when stream is closed
pw::Callback<void()> on_closed_cb_;

@@ -339,12 +346,35 @@ void IsoStreamImpl::HandleCompletePacket(
return;
}

if (on_incoming_data_available_cb_(packet)) {
// Packet was processed successfully - we're done here
return;
if (inbound_client_is_waiting_) {
inbound_client_is_waiting_ = false;
if (on_incoming_data_available_cb_(packet)) {
// Packet was processed successfully - we're done here
return;
}
// This is not a hard error, but it is a bit unusual and probably worth
// noting.
bt_log(INFO,
"iso",
"ISO incoming packet client previously requested packets, now not "
"accepting new ones");
}

// Client not ready to handle packet, queue it up until they ask for it
incoming_data_queue_.push(
std::make_unique<IsoDataPacket>(packet.begin(), packet.end()));
}

std::unique_ptr<IsoDataPacket> IsoStreamImpl::ReadNextQueuedIncomingPacket() {
if (incoming_data_queue_.empty()) {
inbound_client_is_waiting_ = true;
return nullptr;
}

// TODO(fxbug.dev/311639690): queue the packet
std::unique_ptr<IsoDataPacket> packet =
std::move(incoming_data_queue_.front());
incoming_data_queue_.pop();
return packet;
}

void IsoStreamImpl::Close() { on_closed_cb_(); }
114 changes: 112 additions & 2 deletions pw_bluetooth_sapphire/host/iso/iso_stream_test.cc
Original file line number Diff line number Diff line change
@@ -24,6 +24,10 @@ constexpr hci_spec::CigIdentifier kCigId = 0x22;
constexpr hci_spec::CisIdentifier kCisId = 0x42;

constexpr hci_spec::ConnectionHandle kCisHandleId = 0x59e;

constexpr size_t kMaxControllerPacketSize = 100;
constexpr size_t kMaxControllerPacketCount = 5;

using MockControllerTestBase =
bt::testing::FakeDispatcherControllerTest<bt::testing::MockController>;

@@ -35,8 +39,8 @@ class IsoStreamTest : public MockControllerTestBase {
void SetUp() override {
MockControllerTestBase::SetUp(
pw::bluetooth::Controller::FeaturesBits::kHciIso);
hci::DataBufferInfo iso_buffer_info(/*max_data_length=*/100,
/*max_num_packets=*/5);
hci::DataBufferInfo iso_buffer_info(kMaxControllerPacketSize,
kMaxControllerPacketCount);
transport()->InitializeIsoDataChannel(iso_buffer_info);
iso_stream_ = IsoStream::Create(
kCigId,
@@ -79,6 +83,10 @@ class IsoStreamTest : public MockControllerTestBase {

IsoStream* iso_stream() { return iso_stream_.get(); }

std::queue<std::vector<std::byte>>* complete_incoming_sdus() {
return &complete_incoming_sdus_;
}

std::optional<pw::bluetooth::emboss::StatusCode> establishment_status() {
return establishment_status_;
}
@@ -89,6 +97,9 @@ class IsoStreamTest : public MockControllerTestBase {

bool closed() { return closed_; }

protected:
bool accept_incoming_sdus_ = true;

private:
std::unique_ptr<IsoStream> iso_stream_;
std::optional<pw::bluetooth::emboss::StatusCode> establishment_status_;
@@ -186,6 +197,9 @@ void IsoStreamTest::SetupDataPath(

bool IsoStreamTest::HandleCompleteIncomingSDU(
const pw::span<const std::byte>& sdu) {
if (!accept_incoming_sdus_) {
return false;
}
std::vector<std::byte> new_sdu(sdu.size());
std::copy(sdu.begin(), sdu.end(), new_sdu.begin());
complete_incoming_sdus_.emplace(std::move(new_sdu));
@@ -300,4 +314,100 @@ TEST_F(IsoStreamTest, SetupDataPathControllerError) {
iso::IsoStream::SetupDataPathError::kStreamRejectedByController);
}

// If the client asks for frames before any are ready it will receive
// a notification when the next packet arrives.
TEST_F(IsoStreamTest, PendingRead) {
EstablishCis(pw::bluetooth::emboss::StatusCode::SUCCESS);
SetupDataPath(
pw::bluetooth::emboss::DataPathDirection::OUTPUT,
/*codec_configuration=*/std::nullopt,
/*cmd_complete_status=*/pw::bluetooth::emboss::StatusCode::SUCCESS,
iso::IsoStream::SetupDataPathError::kSuccess);
DynamicByteBuffer packet0 =
testing::IsoDataPacket(kMaxControllerPacketSize,
iso_stream()->cis_handle(),
/*packet_sequence_number=*/0);
pw::span<const std::byte> packet0_as_span = packet0.subspan();
ASSERT_EQ(iso_stream()->ReadNextQueuedIncomingPacket(), nullptr);
iso_stream()->ReceiveInboundPacket(packet0_as_span);
ASSERT_EQ(complete_incoming_sdus()->size(), 1u);
std::vector<std::byte>& received_frame = complete_incoming_sdus()->front();
ASSERT_EQ(packet0_as_span.size(), received_frame.size());
EXPECT_TRUE(std::equal(
packet0_as_span.begin(), packet0_as_span.end(), received_frame.begin()));
}

// If the client does not ask for frames it will not receive any notifications
// and the IsoStream will just queue them up.
TEST_F(IsoStreamTest, UnreadData) {
EstablishCis(pw::bluetooth::emboss::StatusCode::SUCCESS);
SetupDataPath(
pw::bluetooth::emboss::DataPathDirection::OUTPUT,
/*codec_configuration=*/std::nullopt,
/*cmd_complete_status=*/pw::bluetooth::emboss::StatusCode::SUCCESS,
iso::IsoStream::SetupDataPathError::kSuccess);
const size_t kTotalFrameCount = 5;
DynamicByteBuffer packets[kTotalFrameCount];
pw::span<const std::byte> packets_as_span[kTotalFrameCount];
for (size_t i = 0; i < kTotalFrameCount; i++) {
packets[i] = testing::IsoDataPacket(kMaxControllerPacketSize - i,
iso_stream()->cis_handle(),
/*packet_sequence_number=*/i);
packets_as_span[i] = packets[i].subspan();
iso_stream()->ReceiveInboundPacket(packets_as_span[i]);
}
EXPECT_EQ(complete_incoming_sdus()->size(), 0u);
}

// This is the (somewhat unusual) case where the client asks for a frame but
// then rejects it when the frame is ready. The frame should stay in the queue
// and future frames should not receive notification, either.
TEST_F(IsoStreamTest, ReadRequestedAndThenRejected) {
EstablishCis(pw::bluetooth::emboss::StatusCode::SUCCESS);
SetupDataPath(
pw::bluetooth::emboss::DataPathDirection::OUTPUT,
/*codec_configuration=*/std::nullopt,
/*cmd_complete_status=*/pw::bluetooth::emboss::StatusCode::SUCCESS,
iso::IsoStream::SetupDataPathError::kSuccess);
DynamicByteBuffer packet0 =
testing::IsoDataPacket(kMaxControllerPacketSize,
iso_stream()->cis_handle(),
/*packet_sequence_number=*/0);
pw::span<const std::byte> packet0_as_span = packet0.subspan();
DynamicByteBuffer packet1 =
testing::IsoDataPacket(kMaxControllerPacketSize - 1,
iso_stream()->cis_handle(),
/*packet_sequence_number=*/1);
pw::span<const std::byte> packet1_as_span = packet1.subspan();

// Request a frame but then reject it when proffered by the stream
ASSERT_EQ(iso_stream()->ReadNextQueuedIncomingPacket(), nullptr);
accept_incoming_sdus_ = false;
iso_stream()->ReceiveInboundPacket(packet0_as_span);
EXPECT_EQ(complete_incoming_sdus()->size(), 0u);

// Accept future frames, but because no read request has been made that we
// couldn't fulfill, the stream should just queue them up.
accept_incoming_sdus_ = true;
iso_stream()->ReceiveInboundPacket(packet1_as_span);
EXPECT_EQ(complete_incoming_sdus()->size(), 0u);

// And finally, we should be able to read out the packets in the right order
std::unique_ptr<IsoDataPacket> rx_packet_0 =
iso_stream()->ReadNextQueuedIncomingPacket();
ASSERT_NE(rx_packet_0, nullptr);
ASSERT_EQ(rx_packet_0->size(), packet0_as_span.size());
ASSERT_TRUE(std::equal(
rx_packet_0->begin(), rx_packet_0->end(), packet0_as_span.begin()));
std::unique_ptr<IsoDataPacket> rx_packet_1 =
iso_stream()->ReadNextQueuedIncomingPacket();
ASSERT_NE(rx_packet_1, nullptr);
ASSERT_EQ(rx_packet_1->size(), packet1_as_span.size());
ASSERT_TRUE(std::equal(
rx_packet_1->begin(), rx_packet_1->end(), packet1_as_span.begin()));

// Stream's packet queue should be empty now
ASSERT_EQ(iso_stream()->ReadNextQueuedIncomingPacket(), nullptr);
}

} // namespace bt::iso
Original file line number Diff line number Diff line change
@@ -43,6 +43,10 @@ class FakeIsoStream : public IsoStream {

void Close() override {}

std::unique_ptr<IsoDataPacket> ReadNextQueuedIncomingPacket() override {
return nullptr;
}

IsoStream::WeakPtr GetWeakPtr() override { return weak_self_.GetWeakPtr(); }

void SetSetupDataPathReturnStatus(IsoStream::SetupDataPathError status) {
Original file line number Diff line number Diff line change
@@ -30,6 +30,8 @@ constexpr size_t kMaxIsochronousDataPacketSize =
pw::bluetooth::emboss::IsoDataFrameHeader::MaxSizeInBytes() +
hci_spec::kMaxIsochronousDataPacketPayloadSize;

using IsoDataPacket = std::vector<std::byte>;

// Possible outcomes from an AcceptCis call
enum class AcceptCisStatus {
// We're now waiting for an incoming CIS request with the specified attributes
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ class IsoStream : public hci::IsoDataChannel::ConnectionInterface {
using SetupDataPathCallback = pw::Callback<void(SetupDataPathError)>;
using IncomingDataHandler =
pw::Function<bool(const pw::span<const std::byte>&)>;

virtual void SetupDataPath(
pw::bluetooth::emboss::DataPathDirection direction,
const bt::StaticPacket<pw::bluetooth::emboss::CodecIdWriter>& codec_id,
@@ -65,6 +66,18 @@ class IsoStream : public hci::IsoDataChannel::ConnectionInterface {
hci::CommandChannel::WeakPtr cmd,
pw::Callback<void()> on_closed_cb);

// Used by the client to check for queued frames. If none are present the
// incoming data available callback will be called the next time a frame is
// available. This allows for a 'hanging get' style interface (request a frame
// whenever the client is ready to process one and then wait for a
// notification) or a client-buffered interface (every time the client wants
// more frames request them until it receives a nullptr, and then wait for a
// callback to indicate that the next frame(s) are available). It is important
// to note that the client cannot simply rely on notifications: until a read
// attempt is unfulfilled the stream will buffer frames waiting for a read
// from the client.
virtual std::unique_ptr<IsoDataPacket> ReadNextQueuedIncomingPacket() = 0;

using WeakPtr = WeakSelf<IsoStream>::WeakPtr;
virtual WeakPtr GetWeakPtr() = 0;
};

0 comments on commit e431fab

Please sign in to comment.