From 8a029a51591441b25214a95cdab08feb09c19f22 Mon Sep 17 00:00:00 2001 From: Primiano Tucci Date: Tue, 22 Aug 2023 16:58:36 +0200 Subject: [PATCH 1/2] service: Add buffer.transfer_on_clone Add a per-buffer flag which causes the buffer to be moved into the cloned trace (Rather than copied) and cleared in the main tracing session. This is to deal with use-cases when a data source writes hundreds of MB of data, which is needed only in the cloned buffer (for bugreport) and that we don't want to stick around to minimize memory impact of long traces. See go/clone-for-winscope for more. Bug: 293429094 Test: perfetto_unittests --gtest_filter=TracingServiceImplTest.TransferOnClone Change-Id: I61db4fc49281c0656defadf18878653c81acac54 --- protos/perfetto/config/perfetto_config.proto | 8 ++ protos/perfetto/config/trace_config.proto | 8 ++ protos/perfetto/trace/perfetto_trace.proto | 8 ++ src/tracing/core/trace_buffer.h | 2 + src/tracing/core/tracing_service_impl.cc | 31 ++++- .../core/tracing_service_impl_unittest.cc | 107 ++++++++++++++++++ 6 files changed, 160 insertions(+), 4 deletions(-) diff --git a/protos/perfetto/config/perfetto_config.proto b/protos/perfetto/config/perfetto_config.proto index f457976684..0b51d30ce4 100644 --- a/protos/perfetto/config/perfetto_config.proto +++ b/protos/perfetto/config/perfetto_config.proto @@ -2873,6 +2873,14 @@ message TraceConfig { DISCARD = 2; } optional FillPolicy fill_policy = 4; + + // When true the buffer is moved (rather than copied) onto the cloned + // session, and an empty buffer of the same size is allocated in the source + // tracing session. This feature will likely get deprecated in the future. + // It been introduced mainly to support the surfaceflinger snapshot dump + // for bugreports, where SF can dumps O(400MB) into the bugreport trace. In + // that case we don't want to retain another in-memory copy of the buffer. + optional bool transfer_on_clone = 5; } repeated BufferConfig buffers = 1; diff --git a/protos/perfetto/config/trace_config.proto b/protos/perfetto/config/trace_config.proto index d62b0c794b..b17f8141dc 100644 --- a/protos/perfetto/config/trace_config.proto +++ b/protos/perfetto/config/trace_config.proto @@ -52,6 +52,14 @@ message TraceConfig { DISCARD = 2; } optional FillPolicy fill_policy = 4; + + // When true the buffer is moved (rather than copied) onto the cloned + // session, and an empty buffer of the same size is allocated in the source + // tracing session. This feature will likely get deprecated in the future. + // It been introduced mainly to support the surfaceflinger snapshot dump + // for bugreports, where SF can dumps O(400MB) into the bugreport trace. In + // that case we don't want to retain another in-memory copy of the buffer. + optional bool transfer_on_clone = 5; } repeated BufferConfig buffers = 1; diff --git a/protos/perfetto/trace/perfetto_trace.proto b/protos/perfetto/trace/perfetto_trace.proto index 554413fabd..dc887ed26c 100644 --- a/protos/perfetto/trace/perfetto_trace.proto +++ b/protos/perfetto/trace/perfetto_trace.proto @@ -2873,6 +2873,14 @@ message TraceConfig { DISCARD = 2; } optional FillPolicy fill_policy = 4; + + // When true the buffer is moved (rather than copied) onto the cloned + // session, and an empty buffer of the same size is allocated in the source + // tracing session. This feature will likely get deprecated in the future. + // It been introduced mainly to support the surfaceflinger snapshot dump + // for bugreports, where SF can dumps O(400MB) into the bugreport trace. In + // that case we don't want to retain another in-memory copy of the buffer. + optional bool transfer_on_clone = 5; } repeated BufferConfig buffers = 1; diff --git a/src/tracing/core/trace_buffer.h b/src/tracing/core/trace_buffer.h index f6994bab81..c1873134e0 100644 --- a/src/tracing/core/trace_buffer.h +++ b/src/tracing/core/trace_buffer.h @@ -281,9 +281,11 @@ class TraceBuffer { // TraceBuffer will CHECK(). std::unique_ptr CloneReadOnly() const; + void set_read_only() { read_only_ = true; } const WriterStatsMap& writer_stats() const { return writer_stats_; } const TraceStats::BufferStats& stats() const { return stats_; } size_t size() const { return size_; } + OverwritePolicy overwrite_policy() const { return overwrite_policy_; } private: friend class TraceBufferTest; diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc index 7b421c32a7..cc0430634b 100644 --- a/src/tracing/core/tracing_service_impl.cc +++ b/src/tracing/core/tracing_service_impl.cc @@ -3663,13 +3663,31 @@ base::Status TracingServiceImpl::DoCloneSession(ConsumerEndpointImpl* consumer, // happens bail out early before creating any session. std::vector>> buf_snaps; buf_snaps.reserve(src->num_buffers()); + PERFETTO_DCHECK(src->num_buffers() == src->config.buffers().size()); bool buf_clone_failed = false; + size_t buf_idx = 0; for (BufferID src_buf_id : src->buffers_index) { - TraceBuffer* src_buf = GetBufferByID(src_buf_id); - std::unique_ptr buf_snap = src_buf->CloneReadOnly(); + auto buf_iter = buffers_.find(src_buf_id); + PERFETTO_CHECK(buf_iter != buffers_.end()); + std::unique_ptr& src_buf = buf_iter->second; + std::unique_ptr new_buf; + if (src->config.buffers()[buf_idx].transfer_on_clone()) { + const auto buf_policy = src_buf->overwrite_policy(); + const auto buf_size = src_buf->size(); + new_buf = std::move(src_buf); + src_buf = TraceBuffer::Create(buf_size, buf_policy); + if (!src_buf) { + // If the allocation fails put the buffer back and let the code below + // handle the failure gracefully. + src_buf = std::move(new_buf); + } + } else { + new_buf = src_buf->CloneReadOnly(); + } BufferID buf_global_id = buffer_ids_.Allocate(); - buf_clone_failed |= !buf_snap.get() || !buf_global_id; - buf_snaps.emplace_back(buf_global_id, std::move(buf_snap)); + buf_clone_failed |= !new_buf.get() || !buf_global_id; + buf_snaps.emplace_back(buf_global_id, std::move(new_buf)); + ++buf_idx; } // Free up allocated IDs in case of failure. No need to free the TraceBuffers, @@ -3703,6 +3721,11 @@ base::Status TracingServiceImpl::DoCloneSession(ConsumerEndpointImpl* consumer, for (auto& kv : buf_snaps) { BufferID buf_global_id = kv.first; std::unique_ptr& buf = kv.second; + // This is only needed for transfer_on_clone. Other buffers are already + // marked as read-only by CloneReadOnly(). We cannot do this early because + // in case of an allocation failure we will put std::move() the original + // buffer back in its place and in that case should not be made read-only. + buf->set_read_only(); buffers_.emplace(buf_global_id, std::move(buf)); cloned_session->buffers_index.emplace_back(buf_global_id); } diff --git a/src/tracing/core/tracing_service_impl_unittest.cc b/src/tracing/core/tracing_service_impl_unittest.cc index 3aa050c6e0..04f71c3703 100644 --- a/src/tracing/core/tracing_service_impl_unittest.cc +++ b/src/tracing/core/tracing_service_impl_unittest.cc @@ -4762,6 +4762,113 @@ TEST_F(TracingServiceImplTest, CloneSessionAcrossUidForBugreport) { task_runner.RunUntilCheckpoint("clone_done"); } +TEST_F(TracingServiceImplTest, TransferOnClone) { + // The consumer the creates the initial tracing session. + std::unique_ptr consumer = CreateMockConsumer(); + consumer->Connect(svc.get()); + + std::unique_ptr producer = CreateMockProducer(); + producer->Connect(svc.get(), "mock_producer"); + + // Create two data sources, as we'll write on two distinct buffers. + producer->RegisterDataSource("ds_1"); + producer->RegisterDataSource("ds_2"); + + TraceConfig trace_config; + trace_config.add_buffers()->set_size_kb(1024); // Buf 0. + auto* buf1_cfg = trace_config.add_buffers(); // Buf 1 (transfer_on_clone). + buf1_cfg->set_size_kb(1024); + buf1_cfg->set_transfer_on_clone(true); + auto* ds_cfg = trace_config.add_data_sources()->mutable_config(); + ds_cfg->set_name("ds_1"); + ds_cfg->set_target_buffer(0); + ds_cfg = trace_config.add_data_sources()->mutable_config(); + ds_cfg->set_name("ds_2"); + ds_cfg->set_target_buffer(1); + + consumer->EnableTracing(trace_config); + producer->WaitForTracingSetup(); + + producer->WaitForDataSourceSetup("ds_1"); + producer->WaitForDataSourceSetup("ds_2"); + + producer->WaitForDataSourceStart("ds_1"); + producer->WaitForDataSourceStart("ds_2"); + + std::unique_ptr writers[] = { + producer->CreateTraceWriter("ds_1"), + producer->CreateTraceWriter("ds_2"), + }; + + // Write once in the first buffer. This is expected persist across clones. + static constexpr int kNumTestPackets = 10; + for (int n = 0; n < kNumTestPackets; n++) { + auto tp = writers[0]->NewTracePacket(); + base::StackString<64> payload("persistent_%d", n); + tp->set_for_testing()->set_str(payload.c_str(), payload.len()); + } + + const int kLastIteration = 3; + for (int iteration = 1; iteration <= kLastIteration; iteration++) { + // The consumer the creates the initial tracing session. + std::unique_ptr clone_consumer = CreateMockConsumer(); + clone_consumer->Connect(svc.get()); + + // Add some new data to the 2nd buffer, which is transferred. + // Omit the writing the last iteration to test we get an empty buffer. + for (int n = 0; n < kNumTestPackets && iteration != kLastIteration; n++) { + auto tp = writers[1]->NewTracePacket(); + base::StackString<64> payload("transferred_%d_%d", iteration, n); + tp->set_for_testing()->set_str(payload.c_str(), payload.len()); + } + + std::string clone_checkpoint_name = "clone_" + std::to_string(iteration); + auto clone_done = task_runner.CreateCheckpoint(clone_checkpoint_name); + base::Uuid clone_uuid; + EXPECT_CALL(*clone_consumer, OnSessionCloned(_)) + .WillOnce(InvokeWithoutArgs(clone_done)); + clone_consumer->CloneSession(1); + + // CloneSession() will implicitly issue a flush. Linearize with that. + producer->ExpectFlush({writers[0].get(), writers[1].get()}); + task_runner.RunUntilCheckpoint(clone_checkpoint_name); + + auto packets = clone_consumer->ReadBuffers(); + std::vector actual_payloads; + for (const auto& packet : packets) { + if (packet.has_for_testing()) + actual_payloads.emplace_back(packet.for_testing().str()); + } + std::vector expected_payloads; + for (int n = 0; n < kNumTestPackets; n++) { + base::StackString<64> expected_payload("persistent_%d", n); + expected_payloads.emplace_back(expected_payload.ToStdString()); + } + for (int n = 0; n < kNumTestPackets && iteration != kLastIteration; n++) { + base::StackString<64> expected_payload("transferred_%d_%d", iteration, n); + expected_payloads.emplace_back(expected_payload.ToStdString()); + } + ASSERT_THAT(actual_payloads, ElementsAreArray(expected_payloads)); + } // for (iteration) + + consumer->DisableTracing(); + producer->WaitForDataSourceStop("ds_1"); + producer->WaitForDataSourceStop("ds_2"); + consumer->WaitForTracingDisabled(); + + // Read the data from the primary (non-cloned) tracing session. Check that + // it doesn't have any "transferred_xxx" payload but only the "persistent_xxx" + // coming from the standard non-transferred buffer. + auto packets = consumer->ReadBuffers(); + EXPECT_THAT(packets, + Not(Contains(Property(&protos::gen::TracePacket::for_testing, + Property(&protos::gen::TestEvent::str, + HasSubstr("transferred_")))))); + EXPECT_THAT(packets, Contains(Property(&protos::gen::TracePacket::for_testing, + Property(&protos::gen::TestEvent::str, + HasSubstr("persistent_"))))); +} + TEST_F(TracingServiceImplTest, InvalidBufferSizes) { std::unique_ptr consumer = CreateMockConsumer(); consumer->Connect(svc.get()); From f342257e38b318456d75aeec6f063ed61a5ff415 Mon Sep 17 00:00:00 2001 From: Primiano Tucci Date: Tue, 22 Aug 2023 17:40:13 +0200 Subject: [PATCH 2/2] service: Add buffer.clear_before_clone Add a flag that causes the buffer to be cleared prior to the Flush() that precedes a clone. This is intended to be used together with transfer_on_clone, so that if a data source takes too long to write data and goes beyond the flush timeout, we don't mix old data and new data on the next clone. See go/clone-for-winscope Test: perfetto_unittests --gtest_filter=TracingServiceImplTest.ClearBeforeClone Bug: 293429094 Change-Id: I38fb847436c9ee3fbd6e7e6e0ee3ef00ba7de821 --- protos/perfetto/config/perfetto_config.proto | 7 ++ protos/perfetto/config/trace_config.proto | 7 ++ protos/perfetto/trace/perfetto_trace.proto | 7 ++ src/tracing/core/trace_buffer.cc | 1 + src/tracing/core/trace_buffer.h | 7 ++ src/tracing/core/tracing_service_impl.cc | 42 +++++++++++ .../core/tracing_service_impl_unittest.cc | 71 +++++++++++++++++++ 7 files changed, 142 insertions(+) diff --git a/protos/perfetto/config/perfetto_config.proto b/protos/perfetto/config/perfetto_config.proto index 0b51d30ce4..a259573189 100644 --- a/protos/perfetto/config/perfetto_config.proto +++ b/protos/perfetto/config/perfetto_config.proto @@ -2881,6 +2881,13 @@ message TraceConfig { // for bugreports, where SF can dumps O(400MB) into the bugreport trace. In // that case we don't want to retain another in-memory copy of the buffer. optional bool transfer_on_clone = 5; + + // Used in conjuction with transfer_on_clone. When true the buffer is + // cleared before issuing the Flush(reason=kTraceClone). This is to ensure + // that if the data source took too long to write the data in a previous + // clone-related flush, we don't end up with a mixture of leftovers from + // the previous write and new data. + optional bool clear_before_clone = 6; } repeated BufferConfig buffers = 1; diff --git a/protos/perfetto/config/trace_config.proto b/protos/perfetto/config/trace_config.proto index b17f8141dc..d19e73a1d3 100644 --- a/protos/perfetto/config/trace_config.proto +++ b/protos/perfetto/config/trace_config.proto @@ -60,6 +60,13 @@ message TraceConfig { // for bugreports, where SF can dumps O(400MB) into the bugreport trace. In // that case we don't want to retain another in-memory copy of the buffer. optional bool transfer_on_clone = 5; + + // Used in conjuction with transfer_on_clone. When true the buffer is + // cleared before issuing the Flush(reason=kTraceClone). This is to ensure + // that if the data source took too long to write the data in a previous + // clone-related flush, we don't end up with a mixture of leftovers from + // the previous write and new data. + optional bool clear_before_clone = 6; } repeated BufferConfig buffers = 1; diff --git a/protos/perfetto/trace/perfetto_trace.proto b/protos/perfetto/trace/perfetto_trace.proto index dc887ed26c..9e88d34476 100644 --- a/protos/perfetto/trace/perfetto_trace.proto +++ b/protos/perfetto/trace/perfetto_trace.proto @@ -2881,6 +2881,13 @@ message TraceConfig { // for bugreports, where SF can dumps O(400MB) into the bugreport trace. In // that case we don't want to retain another in-memory copy of the buffer. optional bool transfer_on_clone = 5; + + // Used in conjuction with transfer_on_clone. When true the buffer is + // cleared before issuing the Flush(reason=kTraceClone). This is to ensure + // that if the data source took too long to write the data in a previous + // clone-related flush, we don't end up with a mixture of leftovers from + // the previous write and new data. + optional bool clear_before_clone = 6; } repeated BufferConfig buffers = 1; diff --git a/src/tracing/core/trace_buffer.cc b/src/tracing/core/trace_buffer.cc index baa6a3486d..f0e9d31757 100644 --- a/src/tracing/core/trace_buffer.cc +++ b/src/tracing/core/trace_buffer.cc @@ -110,6 +110,7 @@ void TraceBuffer::CopyChunkUntrusted(ProducerID producer_id_trusted, return; } + has_data_ = true; #if PERFETTO_DCHECK_IS_ON() changed_since_last_read_ = true; #endif diff --git a/src/tracing/core/trace_buffer.h b/src/tracing/core/trace_buffer.h index c1873134e0..9dea557dc3 100644 --- a/src/tracing/core/trace_buffer.h +++ b/src/tracing/core/trace_buffer.h @@ -213,6 +213,7 @@ class TraceBuffer { bool chunk_complete, const uint8_t* src, size_t size); + // Applies a batch of |patches| to the given chunk, if the given chunk is // still in the buffer. Does nothing if the given ChunkID is gone. // Returns true if the chunk has been found and patched, false otherwise. @@ -286,6 +287,7 @@ class TraceBuffer { const TraceStats::BufferStats& stats() const { return stats_; } size_t size() const { return size_; } OverwritePolicy overwrite_policy() const { return overwrite_policy_; } + bool has_data() const { return has_data_; } private: friend class TraceBufferTest; @@ -711,6 +713,11 @@ class TraceBuffer { // Per-{Producer, Writer} statistics. WriterStatsMap writer_stats_; + // Set to true upon the very first call to CopyChunkUntrusted() and never + // cleared. This is used to tell if the buffer has never been used since its + // creation (which in turn is used to optimize `clear_before_clone`). + bool has_data_ = false; + #if PERFETTO_DCHECK_IS_ON() bool changed_since_last_read_ = false; #endif diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc index cc0430634b..9040ded67d 100644 --- a/src/tracing/core/tracing_service_impl.cc +++ b/src/tracing/core/tracing_service_impl.cc @@ -3614,6 +3614,48 @@ void TracingServiceImpl::FlushAndCloneSession(ConsumerEndpointImpl* consumer, clone_target = FlushFlags::CloneTarget::kBugreport; } + TracingSession* session = GetTracingSession(tsid); + if (!session) { + consumer->consumer_->OnSessionCloned( + {false, "Tracing session not found", {}}); + return; + } + + // If any of the buffers are marked as clear_before_clone, reset them before + // issuing the Flush(kCloneReason). + size_t buf_idx = 0; + for (BufferID src_buf_id : session->buffers_index) { + if (!session->config.buffers()[buf_idx++].clear_before_clone()) + continue; + auto buf_iter = buffers_.find(src_buf_id); + PERFETTO_CHECK(buf_iter != buffers_.end()); + std::unique_ptr& buf = buf_iter->second; + + // No need to reset the buffer if nothing has been written into it yet. + // This is the canonical case if producers behive nicely and don't timeout + // the handling of writes during the flush. + // This check avoids a useless re-mmap upon every Clone() if the buffer is + // already empty (when used in combination with `transfer_on_clone`). + if (!buf->has_data()) + continue; + + // Some leftover data was left in the buffer. Recreate it to empty it. + const auto buf_policy = buf->overwrite_policy(); + const auto buf_size = buf->size(); + std::unique_ptr old_buf = std::move(buf); + buf = TraceBuffer::Create(buf_size, buf_policy); + if (!buf) { + // This is extremely rare but could happen on 32-bit. If the new buffer + // allocation failed, put back the buffer where it was and fail the clone. + // We cannot leave the original tracing session buffer-less as it would + // cause crashes when data sources commit new data. + buf = std::move(old_buf); + consumer->consumer_->OnSessionCloned( + {false, "Buffer allocation failed while attempting to clone", {}}); + return; + } + } + auto weak_this = weak_ptr_factory_.GetWeakPtr(); auto weak_consumer = consumer->GetWeakPtr(); Flush( diff --git a/src/tracing/core/tracing_service_impl_unittest.cc b/src/tracing/core/tracing_service_impl_unittest.cc index 04f71c3703..f5a8591d71 100644 --- a/src/tracing/core/tracing_service_impl_unittest.cc +++ b/src/tracing/core/tracing_service_impl_unittest.cc @@ -4869,6 +4869,77 @@ TEST_F(TracingServiceImplTest, TransferOnClone) { HasSubstr("persistent_"))))); } +TEST_F(TracingServiceImplTest, ClearBeforeClone) { + // The consumer the creates the initial tracing session. + std::unique_ptr consumer = CreateMockConsumer(); + consumer->Connect(svc.get()); + + std::unique_ptr producer = CreateMockProducer(); + producer->Connect(svc.get(), "mock_producer"); + + producer->RegisterDataSource("ds_1"); + + TraceConfig trace_config; + // Unused. This buffer is created only to make the test less trivial and cover + // the case of the clear-bufferd to be the beyond the 0th entry. + trace_config.add_buffers()->set_size_kb(32); + + auto* buf_cfg = trace_config.add_buffers(); + buf_cfg->set_size_kb(1024); + buf_cfg->set_clear_before_clone(true); + auto* ds_cfg = trace_config.add_data_sources()->mutable_config(); + ds_cfg->set_name("ds_1"); + ds_cfg->set_target_buffer(1); + + consumer->EnableTracing(trace_config); + producer->WaitForTracingSetup(); + producer->WaitForDataSourceSetup("ds_1"); + producer->WaitForDataSourceStart("ds_1"); + + std::unique_ptr writer = producer->CreateTraceWriter("ds_1"); + + // These packets, emitted before the clone, should be dropped. + for (int i = 0; i < 3; i++) { + writer->NewTracePacket()->set_for_testing()->set_str("before_clone"); + } + auto flush_request = consumer->Flush(); + producer->ExpectFlush(writer.get()); + ASSERT_TRUE(flush_request.WaitForReply()); + + // The consumer the creates the initial tracing session. + std::unique_ptr clone_consumer = CreateMockConsumer(); + clone_consumer->Connect(svc.get()); + + auto clone_done = task_runner.CreateCheckpoint("clone_done"); + EXPECT_CALL(*clone_consumer, OnSessionCloned(_)) + .WillOnce(InvokeWithoutArgs(clone_done)); + clone_consumer->CloneSession(1); + + // CloneSession() will implicitly issue a flush. Write some other packets + // in that callback. Those are the only ones that should survive in the cloned + // session. + FlushFlags flush_flags(FlushFlags::Initiator::kTraced, + FlushFlags::Reason::kTraceClone); + EXPECT_CALL(*producer, Flush(_, _, _, flush_flags)) + .WillOnce(Invoke([&](FlushRequestID flush_req_id, + const DataSourceInstanceID*, size_t, FlushFlags) { + writer->NewTracePacket()->set_for_testing()->set_str("after_clone"); + writer->Flush( + [&] { producer->endpoint()->NotifyFlushComplete(flush_req_id); }); + })); + + task_runner.RunUntilCheckpoint("clone_done"); + + auto packets = clone_consumer->ReadBuffers(); + EXPECT_THAT(packets, + Not(Contains(Property(&protos::gen::TracePacket::for_testing, + Property(&protos::gen::TestEvent::str, + HasSubstr("before_clone")))))); + EXPECT_THAT(packets, Contains(Property(&protos::gen::TracePacket::for_testing, + Property(&protos::gen::TestEvent::str, + HasSubstr("after_clone"))))); +} + TEST_F(TracingServiceImplTest, InvalidBufferSizes) { std::unique_ptr consumer = CreateMockConsumer(); consumer->Connect(svc.get());