Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc: Switch to use address of stream as the key #34959

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,7 @@ Filter::StreamOpenState Filter::openStream() {
}
stats_.streams_started_.inc();

// TODO(tyxia) Switch to address of stream
stream_ = config_->threadLocalStreamManager().store(decoder_callbacks_->streamId(),
std::move(stream_object), config_->stats(),
stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(),
config_->deferredCloseTimeout());
// For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not
// have a proper implementation of streamInfo.
Expand All @@ -377,17 +375,16 @@ void Filter::closeStream() {
if (stream_->close()) {
stats_.streams_closed_.inc();
}
config_->threadLocalStreamManager().erase(stream_);
stream_ = nullptr;
config_->threadLocalStreamManager().erase(decoder_callbacks_->streamId());
} else {
ENVOY_LOG(debug, "Stream already closed");
}
}

void Filter::deferredCloseStream() {
ENVOY_LOG(debug, "Calling deferred close on stream");
config_->threadLocalStreamManager().deferredErase(decoder_callbacks_->streamId(),
filter_callbacks_->dispatcher());
config_->threadLocalStreamManager().deferredErase(stream_, filter_callbacks_->dispatcher());
}

void Filter::onDestroy() {
Expand Down Expand Up @@ -1360,28 +1357,24 @@ void Filter::mergePerRouteConfig() {
}
}

void DeferredDeletableStream::closeStreamOnTimer(uint64_t stream_id) {
void DeferredDeletableStream::closeStreamOnTimer() {
// Close the stream.
if (stream_) {
ENVOY_LOG(debug, "Closing the stream");
if (stream_->close()) {
stats.streams_closed_.inc();
}
stream_.reset();
// Erase this entry from the map; this will also reset the stream_ pointer.
parent.erase(stream_.get());
} else {
ENVOY_LOG(debug, "Stream already closed");
}

// Erase this entry from the map.
parent.erase(stream_id);
}

// In the deferred closure mode, stream closure is deferred upon filter destruction, with a timer
// to prevent unbounded resource usage growth.
void DeferredDeletableStream::deferredClose(Envoy::Event::Dispatcher& dispatcher,
uint64_t stream_id) {
derferred_close_timer =
dispatcher.createTimer([this, stream_id] { closeStreamOnTimer(stream_id); });
void DeferredDeletableStream::deferredClose(Envoy::Event::Dispatcher& dispatcher) {
derferred_close_timer = dispatcher.createTimer([this] { closeStreamOnTimer(); });
derferred_close_timer->enableTimer(std::chrono::milliseconds(deferred_close_timeout));
}

Expand Down
26 changes: 13 additions & 13 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ struct DeferredDeletableStream : public Logger::Loggable<Logger::Id::ext_proc> {
: stream_(std::move(stream)), parent(stream_manager), stats(stat),
deferred_close_timeout(timeout) {}

void deferredClose(Envoy::Event::Dispatcher& dispatcher, uint64_t stream_id);
void deferredClose(Envoy::Event::Dispatcher& dispatcher);
void closeStreamOnTimer();

void closeStreamOnTimer(uint64_t stream_id);
ExternalProcessorStreamPtr stream_;
ThreadLocalStreamManager& parent;
ExtProcFilterStats stats;
Expand All @@ -174,28 +174,28 @@ class ThreadLocalStreamManager : public Envoy::ThreadLocal::ThreadLocalObject {
public:
// Store the ExternalProcessorStreamPtr (as a wrapper object) in the map and return the raw
// pointer of ExternalProcessorStream.
ExternalProcessorStream* store(uint64_t stream_id, ExternalProcessorStreamPtr stream,
const ExtProcFilterStats& stat,
ExternalProcessorStream* store(ExternalProcessorStreamPtr stream, const ExtProcFilterStats& stat,
const std::chrono::milliseconds& timeout) {
stream_manager_[stream_id] =
auto deferred_stream =
std::make_unique<DeferredDeletableStream>(std::move(stream), *this, stat, timeout);
return stream_manager_[stream_id]->stream_.get();
ExternalProcessorStream* raw_stream = deferred_stream->stream_.get();
stream_manager_[raw_stream] = std::move(deferred_stream);
return stream_manager_[raw_stream]->stream_.get();
}

void erase(uint64_t stream_id) { stream_manager_.erase(stream_id); }

void deferredErase(uint64_t stream_id, Envoy::Event::Dispatcher& dispatcher) {
auto it = stream_manager_.find(stream_id);
void erase(ExternalProcessorStream* stream) { stream_manager_.erase(stream); }
void deferredErase(ExternalProcessorStream* stream, Envoy::Event::Dispatcher& dispatcher) {
auto it = stream_manager_.find(stream);
if (it == stream_manager_.end()) {
return;
}

it->second->deferredClose(dispatcher, stream_id);
it->second->deferredClose(dispatcher);
}

private:
// Map of DeferredDeletableStreamPtrs with stream id as key.
absl::flat_hash_map<uint64_t, DeferredDeletableStreamPtr> stream_manager_;
// Map of DeferredDeletableStreamPtrs with ExternalProcessorStream pointer as key.
absl::flat_hash_map<ExternalProcessorStream*, DeferredDeletableStreamPtr> stream_manager_;
};

class FilterConfig {
Expand Down