Skip to content

Commit

Permalink
Refactor cache_filter to expect caches to post cb (envoyproxy#36184)
Browse files Browse the repository at this point in the history
Refactor cache_filter to expect caches to post cb
Additional Description: This is a bit of an unintuitive change in that
it moves some work from the common class to the plugin, meaning that
work will be duplicated. However, there's a good reason for this - if
the cache class needs to use a dispatcher to get back onto its own
thread, having cache_filter also post means that actions end up being
queued in the dispatcher twice.

A different possible solution would be to have the cache_filter
callbacks only post if the callback comes in on the wrong thread, but
there's a wrinkle in that model too - if the callback executes
immediately, on the same thread, as was the case with the
simple_http_cache, it executes too soon, trying to resume a connection
that hasn't yet stopped, which is an error. That, too, could be covered
with *another* workaround, either intercepting when that happens and
posting the resume, or intercepting when that happens and replacing the
resume with returning `Continue` instead, but both of those options make
the cache filter itself more complicated (and therefore error prone).

Having just one consistent path, where the cache implementation always
posts the callback (and never calls it if cancelled), and the cache
always performs the callback outside of the initial call's context and
on its own thread, is the least complexity, and avoids the performance
impact of posting twice, at a cost of a bit more verbosity in the simple
cache implementation.

This PR also wraps the UpdateHeadersCallback into a declared type, and
makes it an `AnyInvocable` instead of a `std::function`, which enforces
that callbacks are called only once and that they're moved not copied,
avoiding accidental performance drains.

Risk Level: Low; WIP filter, existing tests still pass.
Testing: Existing tests should be covering all cases. Added tests to
enforce that all cache implementations' `LookupContext` correctly posts
callback actions, and correctly cancels calling the callback if the
context is deleted before the post resolves.
Docs Changes: Code-comments only.
Release Notes: Maybe?
Platform Specific Features: n/a

---------

Signed-off-by: Raven Black <[email protected]>
  • Loading branch information
ravenblackx authored Sep 20, 2024
1 parent f575d26 commit decbb66
Show file tree
Hide file tree
Showing 16 changed files with 382 additions and 232 deletions.
69 changes: 18 additions & 51 deletions source/extensions/filters/http/cache/cache_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,38 +303,19 @@ CacheFilter::resolveLookupStatus(absl::optional<CacheEntryStatus> cache_entry_st

void CacheFilter::getHeaders(Http::RequestHeaderMap& request_headers) {
ASSERT(lookup_, "CacheFilter is trying to call getHeaders with no LookupContext");

// If the cache posts a callback to the dispatcher then the CacheFilter is destroyed for any
// reason (e.g client disconnected and HTTP stream terminated), then there is no guarantee that
// the posted callback will run before the filter is deleted. Hence, a weak_ptr to the CacheFilter
// is captured and used to make sure the CacheFilter is still alive before accessing it in the
// posted callback.
// TODO(yosrym93): Look into other options for handling this (also in getBody and getTrailers) as
// they arise, e.g. cancellable posts, guaranteed ordering of posted callbacks and deletions, etc.
CacheFilterWeakPtr self = weak_from_this();

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getHeaders([self, &request_headers, &dispatcher = decoder_callbacks_->dispatcher()](
callback_called_directly_ = true;
lookup_->getHeaders([this, &request_headers, &dispatcher = decoder_callbacks_->dispatcher()](
LookupResult&& result, bool end_stream) {
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
dispatcher.post([self, &request_headers, result = std::move(result), end_stream]() mutable {
if (CacheFilterSharedPtr cache_filter = self.lock()) {
cache_filter->onHeaders(std::move(result), request_headers, end_stream);
}
});
ASSERT(!callback_called_directly_ && dispatcher.isThreadSafe(),
"caches must post the callback to the filter's dispatcher");
onHeaders(std::move(result), request_headers, end_stream);
});
callback_called_directly_ = false;
}

void CacheFilter::getBody() {
ASSERT(lookup_, "CacheFilter is trying to call getBody with no LookupContext");
ASSERT(!remaining_ranges_.empty(), "No reason to call getBody when there's no body to get.");
// If the cache posts a callback to the dispatcher then the CacheFilter is destroyed for any
// reason (e.g client disconnected and HTTP stream terminated), then there is no guarantee that
// the posted callback will run before the filter is deleted. Hence, a weak_ptr to the CacheFilter
// is captured and used to make sure the CacheFilter is still alive before accessing it in the
// posted callback.
CacheFilterWeakPtr self = weak_from_this();

// We don't want to request more than a buffer-size at a time from the cache.
uint64_t fetch_size_limit = encoder_callbacks_->encoderBufferLimit();
Expand All @@ -347,41 +328,27 @@ void CacheFilter::getBody() {
? (remaining_ranges_[0].begin() + fetch_size_limit)
: remaining_ranges_[0].end()};

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getBody(fetch_range, [self, &dispatcher = decoder_callbacks_->dispatcher()](
callback_called_directly_ = true;
lookup_->getBody(fetch_range, [this, &dispatcher = decoder_callbacks_->dispatcher()](
Buffer::InstancePtr&& body, bool end_stream) {
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
dispatcher.post([self, body = std::move(body), end_stream]() mutable {
if (CacheFilterSharedPtr cache_filter = self.lock()) {
cache_filter->onBody(std::move(body), end_stream);
}
});
ASSERT(!callback_called_directly_ && dispatcher.isThreadSafe(),
"caches must post the callback to the filter's dispatcher");
onBody(std::move(body), end_stream);
});
callback_called_directly_ = false;
}

void CacheFilter::getTrailers() {
ASSERT(lookup_, "CacheFilter is trying to call getTrailers with no LookupContext");

// If the cache posts a callback to the dispatcher then the CacheFilter is destroyed for any
// reason (e.g client disconnected and HTTP stream terminated), then there is no guarantee that
// the posted callback will run before the filter is deleted. Hence, a weak_ptr to the CacheFilter
// is captured and used to make sure the CacheFilter is still alive before accessing it in the
// posted callback.
CacheFilterWeakPtr self = weak_from_this();

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getTrailers([self, &dispatcher = decoder_callbacks_->dispatcher()](
callback_called_directly_ = true;
lookup_->getTrailers([this, &dispatcher = decoder_callbacks_->dispatcher()](
Http::ResponseTrailerMapPtr&& trailers) {
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
// The lambda must be mutable as it captures trailers as a unique_ptr.
dispatcher.post([self, trailers = std::move(trailers)]() mutable {
if (CacheFilterSharedPtr cache_filter = self.lock()) {
cache_filter->onTrailers(std::move(trailers));
}
});
ASSERT(!callback_called_directly_ && dispatcher.isThreadSafe(),
"caches must post the callback to the filter's dispatcher");
onTrailers(std::move(trailers));
});
callback_called_directly_ = false;
}

void CacheFilter::onHeaders(LookupResult&& result, Http::RequestHeaderMap& request_headers,
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/cache/cache_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class CacheFilter : public Http::PassThroughFilter,
FilterState filter_state_ = FilterState::Initial;

bool is_head_request_ = false;
// This toggle is used to detect callbacks being called directly and not posted.
bool callback_called_directly_ = false;
// The status of the insert operation or header update, or decision not to insert or update.
// If it's too early to determine the final status, this is empty.
absl::optional<InsertStatus> insert_status_;
Expand Down
114 changes: 55 additions & 59 deletions source/extensions/filters/http/cache/cache_insert_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class CacheInsertFragment {
// on_complete is called when the cache completes the operation.
virtual void
send(InsertContext& context,
std::function<void(bool cache_success, bool end_stream, size_t sz)> on_complete) PURE;
absl::AnyInvocable<void(bool cache_success, bool end_stream, size_t sz)> on_complete) PURE;

virtual ~CacheInsertFragment() = default;
};
Expand All @@ -27,14 +27,14 @@ class CacheInsertFragmentBody : public CacheInsertFragment {
CacheInsertFragmentBody(const Buffer::Instance& buffer, bool end_stream)
: buffer_(buffer), end_stream_(end_stream) {}

void
send(InsertContext& context,
std::function<void(bool cache_success, bool end_stream, size_t sz)> on_complete) override {
void send(InsertContext& context,
absl::AnyInvocable<void(bool cache_success, bool end_stream, size_t sz)> on_complete)
override {
size_t sz = buffer_.length();
context.insertBody(
std::move(buffer_),
[on_complete, end_stream = end_stream_, sz](bool cache_success) {
on_complete(cache_success, end_stream, sz);
[cb = std::move(on_complete), end_stream = end_stream_, sz](bool cache_success) mutable {
std::move(cb)(cache_success, end_stream, sz);
},
end_stream_);
}
Expand All @@ -52,14 +52,15 @@ class CacheInsertFragmentTrailers : public CacheInsertFragment {
Http::ResponseTrailerMapImpl::copyFrom(*trailers_, trailers);
}

void
send(InsertContext& context,
std::function<void(bool cache_success, bool end_stream, size_t sz)> on_complete) override {
void send(InsertContext& context,
absl::AnyInvocable<void(bool cache_success, bool end_stream, size_t sz)> on_complete)
override {
// While zero isn't technically true for the size of trailers, it doesn't
// matter at this point because watermarks after the stream is complete
// aren't useful.
context.insertTrailers(
*trailers_, [on_complete](bool cache_success) { on_complete(cache_success, true, 0); });
context.insertTrailers(*trailers_, [cb = std::move(on_complete)](bool cache_success) mutable {
std::move(cb)(cache_success, true, 0);
});
}

private:
Expand All @@ -72,7 +73,7 @@ CacheInsertQueue::CacheInsertQueue(std::shared_ptr<HttpCache> cache,
: dispatcher_(encoder_callbacks.dispatcher()), insert_context_(std::move(insert_context)),
low_watermark_bytes_(encoder_callbacks.encoderBufferLimit() / 2),
high_watermark_bytes_(encoder_callbacks.encoderBufferLimit()),
encoder_callbacks_(encoder_callbacks), abort_callback_(abort), cache_(cache) {}
encoder_callbacks_(encoder_callbacks), abort_callback_(std::move(abort)), cache_(cache) {}

void CacheInsertQueue::insertHeaders(const Http::ResponseHeaderMap& response_headers,
const ResponseMetadata& metadata, bool end_stream) {
Expand Down Expand Up @@ -123,59 +124,54 @@ void CacheInsertQueue::insertTrailers(const Http::ResponseTrailerMap& trailers)
}

void CacheInsertQueue::onFragmentComplete(bool cache_success, bool end_stream, size_t sz) {
// If the cache implementation is asynchronous, this may be called from whatever
// thread that cache implementation runs on. Therefore, we post it to the
// dispatcher to be certain any callbacks and updates are called on the filter's
// thread (and therefore we don't have to mutex-guard anything).
dispatcher_.post([this, cache_success, end_stream, sz]() {
fragment_in_flight_ = false;
if (aborting_) {
// Parent filter was destroyed, so we can quit this operation.
fragments_.clear();
self_ownership_.reset();
return;
ASSERT(dispatcher_.isThreadSafe());
fragment_in_flight_ = false;
if (aborting_) {
// Parent filter was destroyed, so we can quit this operation.
fragments_.clear();
self_ownership_.reset();
return;
}
ASSERT(queue_size_bytes_ >= sz, "queue can't be emptied by more than its size");
queue_size_bytes_ -= sz;
if (watermarked_ && queue_size_bytes_ <= low_watermark_bytes_) {
if (encoder_callbacks_.has_value()) {
encoder_callbacks_.value().get().onEncoderFilterBelowWriteBufferLowWatermark();
}
ASSERT(queue_size_bytes_ >= sz, "queue can't be emptied by more than its size");
queue_size_bytes_ -= sz;
if (watermarked_ && queue_size_bytes_ <= low_watermark_bytes_) {
watermarked_ = false;
}
if (!cache_success) {
// canceled by cache; unwatermark if necessary, inform the filter if
// it's still around, and delete the queue.
if (watermarked_) {
if (encoder_callbacks_.has_value()) {
encoder_callbacks_.value().get().onEncoderFilterBelowWriteBufferLowWatermark();
}
watermarked_ = false;
}
if (!cache_success) {
// canceled by cache; unwatermark if necessary, inform the filter if
// it's still around, and delete the queue.
if (watermarked_) {
if (encoder_callbacks_.has_value()) {
encoder_callbacks_.value().get().onEncoderFilterBelowWriteBufferLowWatermark();
}
watermarked_ = false;
}
fragments_.clear();
// Clearing self-ownership might provoke the destructor, so take a copy of the
// abort callback to avoid reading from 'this' after it may be deleted.
auto abort_callback = abort_callback_;
self_ownership_.reset();
abort_callback();
return;
}
if (end_stream) {
ASSERT(fragments_.empty(), "ending a stream with the queue not empty is a bug");
ASSERT(!watermarked_, "being over the high watermark when the queue is empty makes no sense");
self_ownership_.reset();
return;
}
if (!fragments_.empty()) {
// If there's more in the queue, push the next fragment to the cache.
auto fragment = std::move(fragments_.front());
fragments_.pop_front();
fragment_in_flight_ = true;
fragment->send(*insert_context_, [this](bool cache_success, bool end_stream, size_t sz) {
onFragmentComplete(cache_success, end_stream, sz);
});
}
});
fragments_.clear();
// Clearing self-ownership might provoke the destructor, so take a copy of the
// abort callback to avoid reading from 'this' after it may be deleted.
auto abort_callback = std::move(abort_callback_);
self_ownership_.reset();
std::move(abort_callback)();
return;
}
if (end_stream) {
ASSERT(fragments_.empty(), "ending a stream with the queue not empty is a bug");
ASSERT(!watermarked_, "being over the high watermark when the queue is empty makes no sense");
self_ownership_.reset();
return;
}
if (!fragments_.empty()) {
// If there's more in the queue, push the next fragment to the cache.
auto fragment = std::move(fragments_.front());
fragments_.pop_front();
fragment_in_flight_ = true;
fragment->send(*insert_context_, [this](bool cache_success, bool end_stream, size_t sz) {
onFragmentComplete(cache_success, end_stream, sz);
});
}
}

void CacheInsertQueue::setSelfOwned(std::unique_ptr<CacheInsertQueue> self) {
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/cache/cache_insert_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Cache {

using OverHighWatermarkCallback = std::function<void()>;
using UnderLowWatermarkCallback = std::function<void()>;
using AbortInsertCallback = std::function<void()>;
using AbortInsertCallback = absl::AnyInvocable<void()>;
class CacheInsertFragment;

// This queue acts as an intermediary between CacheFilter and the cache
Expand Down
40 changes: 25 additions & 15 deletions source/extensions/filters/http/cache/http_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,20 @@ struct CacheInfo {
bool supports_range_requests_ = false;
};

using LookupBodyCallback = std::function<void(Buffer::InstancePtr&&, bool end_stream)>;
using LookupHeadersCallback = std::function<void(LookupResult&&, bool end_stream)>;
using LookupTrailersCallback = std::function<void(Http::ResponseTrailerMapPtr&&)>;
using InsertCallback = std::function<void(bool success_ready_for_more)>;
using LookupBodyCallback = absl::AnyInvocable<void(Buffer::InstancePtr&&, bool end_stream)>;
using LookupHeadersCallback = absl::AnyInvocable<void(LookupResult&&, bool end_stream)>;
using LookupTrailersCallback = absl::AnyInvocable<void(Http::ResponseTrailerMapPtr&&)>;
using InsertCallback = absl::AnyInvocable<void(bool success_ready_for_more)>;
using UpdateHeadersCallback = absl::AnyInvocable<void(bool)>;

// Manages the lifetime of an insertion.
class InsertContext {
public:
// Accepts response_headers for caching. Only called once.
//
// Implementations MUST call insert_complete(true) on success, or
// insert_complete(false) to attempt to abort the insertion. This
// call may be made asynchronously, but any async operation that can
// Implementations MUST post to the filter's dispatcher insert_complete(true)
// on success, or insert_complete(false) to attempt to abort the insertion.
// This call may be made asynchronously, but any async operation that can
// potentially silently fail must include a timeout, to avoid memory leaks.
virtual void insertHeaders(const Http::ResponseHeaderMap& response_headers,
const ResponseMetadata& metadata, InsertCallback insert_complete,
Expand All @@ -149,17 +150,17 @@ class InsertContext {
// InsertContextPtr. A cache can abort the insertion by passing 'false' into
// ready_for_next_fragment.
//
// The cache implementation MUST call ready_for_next_fragment. This call may be
// made asynchronously, but any async operation that can potentially silently
// fail must include a timeout, to avoid memory leaks.
// The cache implementation MUST post ready_for_next_fragment to the filter's
// dispatcher. This post may be made asynchronously, but any async operation
// that can potentially silently fail must include a timeout, to avoid memory leaks.
virtual void insertBody(const Buffer::Instance& fragment, InsertCallback ready_for_next_fragment,
bool end_stream) PURE;

// Inserts trailers into the cache.
//
// The cache implementation MUST call insert_complete. This call may be
// made asynchronously, but any async operation that can potentially silently
// fail must include a timeout, to avoid memory leaks.
// The cache implementation MUST post insert_complete to the filter's dispatcher.
// This call may be made asynchronously, but any async operation that can
// potentially silently fail must include a timeout, to avoid memory leaks.
virtual void insertTrailers(const Http::ResponseTrailerMap& trailers,
InsertCallback insert_complete) PURE;

Expand Down Expand Up @@ -199,6 +200,9 @@ class LookupContext {
// implementation should wait until that is known before calling the callback,
// and must pass a LookupResult with range_details_->satisfiable_ = false
// if the request is invalid.
//
// A cache that posts the callback must wrap it such that if the LookupContext is
// destroyed before the callback is executed, the callback is not executed.
virtual void getHeaders(LookupHeadersCallback&& cb) PURE;

// Reads the next fragment from the cache, calling cb when the fragment is ready.
Expand Down Expand Up @@ -228,11 +232,17 @@ class LookupContext {
// getBody requests bytes 0-23 .......... callback with bytes 0-9
// getBody requests bytes 10-23 .......... callback with bytes 10-19
// getBody requests bytes 20-23 .......... callback with bytes 20-23
//
// A cache that posts the callback must wrap it such that if the LookupContext is
// destroyed before the callback is executed, the callback is not executed.
virtual void getBody(const AdjustedByteRange& range, LookupBodyCallback&& cb) PURE;

// Get the trailers from the cache. Only called if the request reached the end of
// the body and LookupBodyCallback did not pass true for end_stream. The
// Http::ResponseTrailerMapPtr passed to cb must not be null.
//
// A cache that posts the callback must wrap it such that if the LookupContext is
// destroyed before the callback is executed, the callback is not executed.
virtual void getTrailers(LookupTrailersCallback&& cb) PURE;

// This routine is called prior to a LookupContext being destroyed. LookupContext is responsible
Expand All @@ -248,7 +258,7 @@ class LookupContext {
// 5. [Other thread] RPC completes and calls RPCLookupContext::onRPCDone.
// --> RPCLookupContext's destructor and onRpcDone cause a data race in RPCLookupContext.
// onDestroy() should cancel any outstanding async operations and, if necessary,
// it should block on that cancellation to avoid data races. InsertContext must not invoke any
// it should block on that cancellation to avoid data races. LookupContext must not invoke any
// callbacks to the CacheFilter after having onDestroy() invoked.
virtual void onDestroy() PURE;

Expand Down Expand Up @@ -289,7 +299,7 @@ class HttpCache {
virtual void updateHeaders(const LookupContext& lookup_context,
const Http::ResponseHeaderMap& response_headers,
const ResponseMetadata& metadata,
std::function<void(bool)> on_complete) PURE;
UpdateHeadersCallback on_complete) PURE;

// Returns statically known information about a cache.
virtual CacheInfo cacheInfo() const PURE;
Expand Down
Loading

0 comments on commit decbb66

Please sign in to comment.