Skip to content

Commit

Permalink
service: Add buffer.clear_before_clone
Browse files Browse the repository at this point in the history
Add a flag that causes the buffer to be cleared
prior to the Flush() that precedes a clone.
This is intended to be used together with transfer_on_clone,
so that if a data source takes too long to write data and
goes beyond the flush timeout, we don't mix old data and
new data on the next clone.
See go/clone-for-winscope

Test: perfetto_unittests --gtest_filter=TracingServiceImplTest.ClearBeforeClone
Bug: 293429094

Change-Id: I38fb847436c9ee3fbd6e7e6e0ee3ef00ba7de821
  • Loading branch information
primiano committed Aug 22, 2023
1 parent 8a029a5 commit f342257
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 0 deletions.
7 changes: 7 additions & 0 deletions protos/perfetto/config/perfetto_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2881,6 +2881,13 @@ message TraceConfig {
// for bugreports, where SF can dumps O(400MB) into the bugreport trace. In
// that case we don't want to retain another in-memory copy of the buffer.
optional bool transfer_on_clone = 5;

// Used in conjuction with transfer_on_clone. When true the buffer is
// cleared before issuing the Flush(reason=kTraceClone). This is to ensure
// that if the data source took too long to write the data in a previous
// clone-related flush, we don't end up with a mixture of leftovers from
// the previous write and new data.
optional bool clear_before_clone = 6;
}
repeated BufferConfig buffers = 1;

Expand Down
7 changes: 7 additions & 0 deletions protos/perfetto/config/trace_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ message TraceConfig {
// for bugreports, where SF can dumps O(400MB) into the bugreport trace. In
// that case we don't want to retain another in-memory copy of the buffer.
optional bool transfer_on_clone = 5;

// Used in conjuction with transfer_on_clone. When true the buffer is
// cleared before issuing the Flush(reason=kTraceClone). This is to ensure
// that if the data source took too long to write the data in a previous
// clone-related flush, we don't end up with a mixture of leftovers from
// the previous write and new data.
optional bool clear_before_clone = 6;
}
repeated BufferConfig buffers = 1;

Expand Down
7 changes: 7 additions & 0 deletions protos/perfetto/trace/perfetto_trace.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2881,6 +2881,13 @@ message TraceConfig {
// for bugreports, where SF can dumps O(400MB) into the bugreport trace. In
// that case we don't want to retain another in-memory copy of the buffer.
optional bool transfer_on_clone = 5;

// Used in conjuction with transfer_on_clone. When true the buffer is
// cleared before issuing the Flush(reason=kTraceClone). This is to ensure
// that if the data source took too long to write the data in a previous
// clone-related flush, we don't end up with a mixture of leftovers from
// the previous write and new data.
optional bool clear_before_clone = 6;
}
repeated BufferConfig buffers = 1;

Expand Down
1 change: 1 addition & 0 deletions src/tracing/core/trace_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ void TraceBuffer::CopyChunkUntrusted(ProducerID producer_id_trusted,
return;
}

has_data_ = true;
#if PERFETTO_DCHECK_IS_ON()
changed_since_last_read_ = true;
#endif
Expand Down
7 changes: 7 additions & 0 deletions src/tracing/core/trace_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class TraceBuffer {
bool chunk_complete,
const uint8_t* src,
size_t size);

// Applies a batch of |patches| to the given chunk, if the given chunk is
// still in the buffer. Does nothing if the given ChunkID is gone.
// Returns true if the chunk has been found and patched, false otherwise.
Expand Down Expand Up @@ -286,6 +287,7 @@ class TraceBuffer {
const TraceStats::BufferStats& stats() const { return stats_; }
size_t size() const { return size_; }
OverwritePolicy overwrite_policy() const { return overwrite_policy_; }
bool has_data() const { return has_data_; }

private:
friend class TraceBufferTest;
Expand Down Expand Up @@ -711,6 +713,11 @@ class TraceBuffer {
// Per-{Producer, Writer} statistics.
WriterStatsMap writer_stats_;

// Set to true upon the very first call to CopyChunkUntrusted() and never
// cleared. This is used to tell if the buffer has never been used since its
// creation (which in turn is used to optimize `clear_before_clone`).
bool has_data_ = false;

#if PERFETTO_DCHECK_IS_ON()
bool changed_since_last_read_ = false;
#endif
Expand Down
42 changes: 42 additions & 0 deletions src/tracing/core/tracing_service_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3614,6 +3614,48 @@ void TracingServiceImpl::FlushAndCloneSession(ConsumerEndpointImpl* consumer,
clone_target = FlushFlags::CloneTarget::kBugreport;
}

TracingSession* session = GetTracingSession(tsid);
if (!session) {
consumer->consumer_->OnSessionCloned(
{false, "Tracing session not found", {}});
return;
}

// If any of the buffers are marked as clear_before_clone, reset them before
// issuing the Flush(kCloneReason).
size_t buf_idx = 0;
for (BufferID src_buf_id : session->buffers_index) {
if (!session->config.buffers()[buf_idx++].clear_before_clone())
continue;
auto buf_iter = buffers_.find(src_buf_id);
PERFETTO_CHECK(buf_iter != buffers_.end());
std::unique_ptr<TraceBuffer>& buf = buf_iter->second;

// No need to reset the buffer if nothing has been written into it yet.
// This is the canonical case if producers behive nicely and don't timeout
// the handling of writes during the flush.
// This check avoids a useless re-mmap upon every Clone() if the buffer is
// already empty (when used in combination with `transfer_on_clone`).
if (!buf->has_data())
continue;

// Some leftover data was left in the buffer. Recreate it to empty it.
const auto buf_policy = buf->overwrite_policy();
const auto buf_size = buf->size();
std::unique_ptr<TraceBuffer> old_buf = std::move(buf);
buf = TraceBuffer::Create(buf_size, buf_policy);
if (!buf) {
// This is extremely rare but could happen on 32-bit. If the new buffer
// allocation failed, put back the buffer where it was and fail the clone.
// We cannot leave the original tracing session buffer-less as it would
// cause crashes when data sources commit new data.
buf = std::move(old_buf);
consumer->consumer_->OnSessionCloned(
{false, "Buffer allocation failed while attempting to clone", {}});
return;
}
}

auto weak_this = weak_ptr_factory_.GetWeakPtr();
auto weak_consumer = consumer->GetWeakPtr();
Flush(
Expand Down
71 changes: 71 additions & 0 deletions src/tracing/core/tracing_service_impl_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4869,6 +4869,77 @@ TEST_F(TracingServiceImplTest, TransferOnClone) {
HasSubstr("persistent_")))));
}

TEST_F(TracingServiceImplTest, ClearBeforeClone) {
// The consumer the creates the initial tracing session.
std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
consumer->Connect(svc.get());

std::unique_ptr<MockProducer> producer = CreateMockProducer();
producer->Connect(svc.get(), "mock_producer");

producer->RegisterDataSource("ds_1");

TraceConfig trace_config;
// Unused. This buffer is created only to make the test less trivial and cover
// the case of the clear-bufferd to be the beyond the 0th entry.
trace_config.add_buffers()->set_size_kb(32);

auto* buf_cfg = trace_config.add_buffers();
buf_cfg->set_size_kb(1024);
buf_cfg->set_clear_before_clone(true);
auto* ds_cfg = trace_config.add_data_sources()->mutable_config();
ds_cfg->set_name("ds_1");
ds_cfg->set_target_buffer(1);

consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
producer->WaitForDataSourceSetup("ds_1");
producer->WaitForDataSourceStart("ds_1");

std::unique_ptr<TraceWriter> writer = producer->CreateTraceWriter("ds_1");

// These packets, emitted before the clone, should be dropped.
for (int i = 0; i < 3; i++) {
writer->NewTracePacket()->set_for_testing()->set_str("before_clone");
}
auto flush_request = consumer->Flush();
producer->ExpectFlush(writer.get());
ASSERT_TRUE(flush_request.WaitForReply());

// The consumer the creates the initial tracing session.
std::unique_ptr<MockConsumer> clone_consumer = CreateMockConsumer();
clone_consumer->Connect(svc.get());

auto clone_done = task_runner.CreateCheckpoint("clone_done");
EXPECT_CALL(*clone_consumer, OnSessionCloned(_))
.WillOnce(InvokeWithoutArgs(clone_done));
clone_consumer->CloneSession(1);

// CloneSession() will implicitly issue a flush. Write some other packets
// in that callback. Those are the only ones that should survive in the cloned
// session.
FlushFlags flush_flags(FlushFlags::Initiator::kTraced,
FlushFlags::Reason::kTraceClone);
EXPECT_CALL(*producer, Flush(_, _, _, flush_flags))
.WillOnce(Invoke([&](FlushRequestID flush_req_id,
const DataSourceInstanceID*, size_t, FlushFlags) {
writer->NewTracePacket()->set_for_testing()->set_str("after_clone");
writer->Flush(
[&] { producer->endpoint()->NotifyFlushComplete(flush_req_id); });
}));

task_runner.RunUntilCheckpoint("clone_done");

auto packets = clone_consumer->ReadBuffers();
EXPECT_THAT(packets,
Not(Contains(Property(&protos::gen::TracePacket::for_testing,
Property(&protos::gen::TestEvent::str,
HasSubstr("before_clone"))))));
EXPECT_THAT(packets, Contains(Property(&protos::gen::TracePacket::for_testing,
Property(&protos::gen::TestEvent::str,
HasSubstr("after_clone")))));
}

TEST_F(TracingServiceImplTest, InvalidBufferSizes) {
std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
consumer->Connect(svc.get());
Expand Down

0 comments on commit f342257

Please sign in to comment.