Skip to content

Commit

Permalink
ext_proc: Switch to use address of stream as the key (envoyproxy#34959)
Browse files Browse the repository at this point in the history
Switch to use address of stream as the key to avoid any potential key duplication

---------

Signed-off-by: tyxia <[email protected]>
Signed-off-by: Martin Duke <[email protected]>
  • Loading branch information
tyxia authored and martinduke committed Aug 8, 2024
1 parent c869ff0 commit 416ef21
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 28 deletions.
23 changes: 8 additions & 15 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,7 @@ Filter::StreamOpenState Filter::openStream() {
}
stats_.streams_started_.inc();

// TODO(tyxia) Switch to address of stream
stream_ = config_->threadLocalStreamManager().store(decoder_callbacks_->streamId(),
std::move(stream_object), config_->stats(),
stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(),
config_->deferredCloseTimeout());
// For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not
// have a proper implementation of streamInfo.
Expand All @@ -377,17 +375,16 @@ void Filter::closeStream() {
if (stream_->close()) {
stats_.streams_closed_.inc();
}
config_->threadLocalStreamManager().erase(stream_);
stream_ = nullptr;
config_->threadLocalStreamManager().erase(decoder_callbacks_->streamId());
} else {
ENVOY_LOG(debug, "Stream already closed");
}
}

void Filter::deferredCloseStream() {
ENVOY_LOG(debug, "Calling deferred close on stream");
config_->threadLocalStreamManager().deferredErase(decoder_callbacks_->streamId(),
filter_callbacks_->dispatcher());
config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher());
}

void Filter::onDestroy() {
Expand Down Expand Up @@ -1360,28 +1357,24 @@ void Filter::mergePerRouteConfig() {
}
}

void DeferredDeletableStream::closeStreamOnTimer(uint64_t stream_id) {
void DeferredDeletableStream::closeStreamOnTimer() {
// Close the stream.
if (stream_) {
ENVOY_LOG(debug, "Closing the stream");
if (stream_->close()) {
stats.streams_closed_.inc();
}
stream_.reset();
// Erase this entry from the map; this will also reset the stream_ pointer.
parent.erase(stream_.get());
} else {
ENVOY_LOG(debug, "Stream already closed");
}

// Erase this entry from the map.
parent.erase(stream_id);
}

// In the deferred closure mode, stream closure is deferred upon filter destruction, with a timer
// to prevent unbounded resource usage growth.
void DeferredDeletableStream::deferredClose(Envoy::Event::Dispatcher& dispatcher,
uint64_t stream_id) {
derferred_close_timer =
dispatcher.createTimer([this, stream_id] { closeStreamOnTimer(stream_id); });
void DeferredDeletableStream::deferredClose(Envoy::Event::Dispatcher& dispatcher) {
derferred_close_timer = dispatcher.createTimer([this] { closeStreamOnTimer(); });
derferred_close_timer->enableTimer(std::chrono::milliseconds(deferred_close_timeout));
}

Expand Down
26 changes: 13 additions & 13 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ struct DeferredDeletableStream : public Logger::Loggable<Logger::Id::ext_proc> {
: stream_(std::move(stream)), parent(stream_manager), stats(stat),
deferred_close_timeout(timeout) {}

void deferredClose(Envoy::Event::Dispatcher& dispatcher, uint64_t stream_id);
void deferredClose(Envoy::Event::Dispatcher& dispatcher);
void closeStreamOnTimer();

void closeStreamOnTimer(uint64_t stream_id);
ExternalProcessorStreamPtr stream_;
ThreadLocalStreamManager& parent;
ExtProcFilterStats stats;
Expand All @@ -174,28 +174,28 @@ class ThreadLocalStreamManager : public Envoy::ThreadLocal::ThreadLocalObject {
public:
// Store the ExternalProcessorStreamPtr (as a wrapper object) in the map and return the raw
// pointer of ExternalProcessorStream.
ExternalProcessorStream* store(uint64_t stream_id, ExternalProcessorStreamPtr stream,
const ExtProcFilterStats& stat,
ExternalProcessorStream* store(ExternalProcessorStreamPtr stream, const ExtProcFilterStats& stat,
const std::chrono::milliseconds& timeout) {
stream_manager_[stream_id] =
auto deferred_stream =
std::make_unique<DeferredDeletableStream>(std::move(stream), *this, stat, timeout);
return stream_manager_[stream_id]->stream_.get();
ExternalProcessorStream* raw_stream = deferred_stream->stream_.get();
stream_manager_[raw_stream] = std::move(deferred_stream);
return stream_manager_[raw_stream]->stream_.get();
}

void erase(uint64_t stream_id) { stream_manager_.erase(stream_id); }

void deferredErase(uint64_t stream_id, Envoy::Event::Dispatcher& dispatcher) {
auto it = stream_manager_.find(stream_id);
void erase(ExternalProcessorStream* stream) { stream_manager_.erase(stream); }
void deferredErase(ExternalProcessorStream* stream, Envoy::Event::Dispatcher& dispatcher) {
auto it = stream_manager_.find(stream);
if (it == stream_manager_.end()) {
return;
}

it->second->deferredClose(dispatcher, stream_id);
it->second->deferredClose(dispatcher);
}

private:
// Map of DeferredDeletableStreamPtrs with stream id as key.
absl::flat_hash_map<uint64_t, DeferredDeletableStreamPtr> stream_manager_;
// Map of DeferredDeletableStreamPtrs with ExternalProcessorStream pointer as key.
absl::flat_hash_map<ExternalProcessorStream*, DeferredDeletableStreamPtr> stream_manager_;
};

class FilterConfig {
Expand Down

0 comments on commit 416ef21

Please sign in to comment.