From 416ef2110b72205def450976a9c6c6dae3d4791f Mon Sep 17 00:00:00 2001 From: Tianyu <72890320+tyxia@users.noreply.github.com> Date: Wed, 7 Aug 2024 20:41:00 -0400 Subject: [PATCH] ext_proc: Switch to use address of stream as the key (#34959) Switch to use address of stream as the key to avoid any potential key duplication --------- Signed-off-by: tyxia Signed-off-by: Martin Duke --- .../filters/http/ext_proc/ext_proc.cc | 23 ++++++---------- .../filters/http/ext_proc/ext_proc.h | 26 +++++++++---------- 2 files changed, 21 insertions(+), 28 deletions(-) diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index a039235197544..5130208e1b5d3 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -358,9 +358,7 @@ Filter::StreamOpenState Filter::openStream() { } stats_.streams_started_.inc(); - // TODO(tyxia) Switch to address of stream - stream_ = config_->threadLocalStreamManager().store(decoder_callbacks_->streamId(), - std::move(stream_object), config_->stats(), + stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(), config_->deferredCloseTimeout()); // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not // have a proper implementation of streamInfo. @@ -377,8 +375,8 @@ void Filter::closeStream() { if (stream_->close()) { stats_.streams_closed_.inc(); } + config_->threadLocalStreamManager().erase(stream_); stream_ = nullptr; - config_->threadLocalStreamManager().erase(decoder_callbacks_->streamId()); } else { ENVOY_LOG(debug, "Stream already closed"); } @@ -386,8 +384,7 @@ void Filter::closeStream() { void Filter::deferredCloseStream() { ENVOY_LOG(debug, "Calling deferred close on stream"); - config_->threadLocalStreamManager().deferredErase(decoder_callbacks_->streamId(), - filter_callbacks_->dispatcher()); + config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher()); } void Filter::onDestroy() { @@ -1360,28 +1357,24 @@ void Filter::mergePerRouteConfig() { } } -void DeferredDeletableStream::closeStreamOnTimer(uint64_t stream_id) { +void DeferredDeletableStream::closeStreamOnTimer() { // Close the stream. if (stream_) { ENVOY_LOG(debug, "Closing the stream"); if (stream_->close()) { stats.streams_closed_.inc(); } - stream_.reset(); + // Erase this entry from the map; this will also reset the stream_ pointer. + parent.erase(stream_.get()); } else { ENVOY_LOG(debug, "Stream already closed"); } - - // Erase this entry from the map. - parent.erase(stream_id); } // In the deferred closure mode, stream closure is deferred upon filter destruction, with a timer // to prevent unbounded resource usage growth. -void DeferredDeletableStream::deferredClose(Envoy::Event::Dispatcher& dispatcher, - uint64_t stream_id) { - derferred_close_timer = - dispatcher.createTimer([this, stream_id] { closeStreamOnTimer(stream_id); }); +void DeferredDeletableStream::deferredClose(Envoy::Event::Dispatcher& dispatcher) { + derferred_close_timer = dispatcher.createTimer([this] { closeStreamOnTimer(); }); derferred_close_timer->enableTimer(std::chrono::milliseconds(deferred_close_timeout)); } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index b798a29b5f821..070b60b58d8e1 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -158,9 +158,9 @@ struct DeferredDeletableStream : public Logger::Loggable { : stream_(std::move(stream)), parent(stream_manager), stats(stat), deferred_close_timeout(timeout) {} - void deferredClose(Envoy::Event::Dispatcher& dispatcher, uint64_t stream_id); + void deferredClose(Envoy::Event::Dispatcher& dispatcher); + void closeStreamOnTimer(); - void closeStreamOnTimer(uint64_t stream_id); ExternalProcessorStreamPtr stream_; ThreadLocalStreamManager& parent; ExtProcFilterStats stats; @@ -174,28 +174,28 @@ class ThreadLocalStreamManager : public Envoy::ThreadLocal::ThreadLocalObject { public: // Store the ExternalProcessorStreamPtr (as a wrapper object) in the map and return the raw // pointer of ExternalProcessorStream. - ExternalProcessorStream* store(uint64_t stream_id, ExternalProcessorStreamPtr stream, - const ExtProcFilterStats& stat, + ExternalProcessorStream* store(ExternalProcessorStreamPtr stream, const ExtProcFilterStats& stat, const std::chrono::milliseconds& timeout) { - stream_manager_[stream_id] = + auto deferred_stream = std::make_unique(std::move(stream), *this, stat, timeout); - return stream_manager_[stream_id]->stream_.get(); + ExternalProcessorStream* raw_stream = deferred_stream->stream_.get(); + stream_manager_[raw_stream] = std::move(deferred_stream); + return stream_manager_[raw_stream]->stream_.get(); } - void erase(uint64_t stream_id) { stream_manager_.erase(stream_id); } - - void deferredErase(uint64_t stream_id, Envoy::Event::Dispatcher& dispatcher) { - auto it = stream_manager_.find(stream_id); + void erase(ExternalProcessorStream* stream) { stream_manager_.erase(stream); } + void deferredErase(ExternalProcessorStream* stream, Envoy::Event::Dispatcher& dispatcher) { + auto it = stream_manager_.find(stream); if (it == stream_manager_.end()) { return; } - it->second->deferredClose(dispatcher, stream_id); + it->second->deferredClose(dispatcher); } private: - // Map of DeferredDeletableStreamPtrs with stream id as key. - absl::flat_hash_map stream_manager_; + // Map of DeferredDeletableStreamPtrs with ExternalProcessorStream pointer as key. + absl::flat_hash_map stream_manager_; }; class FilterConfig {