Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api listener: add shutdown method and call during server termination #9959

Merged
merged 8 commits into from
Feb 8, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/envoy/server/api_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ class ApiListener {
*/
virtual absl::string_view name() const PURE;

/**
* Shutdown the ApiListener. This is an interrupt, not a drain. In other words, calling this
* function results in termination of all active streams vs. draining were no new streams are
junr03 marked this conversation as resolved.
Show resolved Hide resolved
* allowed, but already existing streams are allowed to finish.
*/
virtual void shutdown() PURE;

/**
* @return the Type of the ApiListener.
*/
Expand Down
14 changes: 14 additions & 0 deletions source/server/api_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ ApiListenerImplBase::ApiListenerImplBase(const envoy::config::listener::v3::List
factory_context_(parent_.server_, config_, *this, *global_scope_, *listener_scope_),
read_callbacks_(SyntheticReadCallbacks(*this)) {}

void ApiListenerImplBase::SyntheticReadCallbacks::SyntheticConnection::raiseConnectionEvent(
Network::ConnectionEvent event) {
for (Network::ConnectionCallbacks* callback : callbacks_) {
callback->onEvent(event);
}
}

HttpApiListener::HttpApiListener(const envoy::config::listener::v3::Listener& config,
ListenerManagerImpl& parent, const std::string& name)
: ApiListenerImplBase(config, parent, name) {
Expand All @@ -44,5 +51,12 @@ Http::ApiListenerOptRef HttpApiListener::http() {
return Http::ApiListenerOptRef(std::ref(*http_connection_manager_));
}

void HttpApiListener::shutdown() {
// The Http::ConnectionManagerImpl is a callback target for the read_callback_.connection_. By
// raising connection closure, Http::ConnectionManagerImpl::onEvent is fired. In that case the
// Http::ConnectionManagerImpl will reset any ActiveStreams it has.
read_callbacks_.connection_.raiseConnectionEvent(Network::ConnectionEvent::RemoteClose);
}

} // namespace Server
} // namespace Envoy
10 changes: 9 additions & 1 deletion source/server/api_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class ApiListenerImplBase : public ApiListener,
: parent_(parent), stream_info_(parent_.parent_.factory_context_.timeSource()),
options_(std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>()) {}

void raiseConnectionEvent(Network::ConnectionEvent event);

// Network::FilterManager
void addWriteFilter(Network::WriteFilterSharedPtr) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
Expand All @@ -84,7 +86,9 @@ class ApiListenerImplBase : public ApiListener,
bool initializeReadFilters() override { return true; }

// Network::Connection
void addConnectionCallbacks(Network::ConnectionCallbacks&) override {}
void addConnectionCallbacks(Network::ConnectionCallbacks& cb) override {
callbacks_.push_back(&cb);
}
void addBytesSentCallback(Network::Connection::BytesSentCb) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
Expand Down Expand Up @@ -129,6 +133,7 @@ class ApiListenerImplBase : public ApiListener,
SyntheticReadCallbacks& parent_;
StreamInfo::StreamInfoImpl stream_info_;
Network::ConnectionSocket::OptionsSharedPtr options_;
std::list<Network::ConnectionCallbacks*> callbacks_;
};

ApiListenerImplBase& parent_;
Expand Down Expand Up @@ -157,6 +162,9 @@ class HttpApiListener : public ApiListenerImplBase {
// ApiListener
ApiListener::Type type() const override { return ApiListener::Type::HttpApiListener; }
Http::ApiListenerOptRef http() override;
void shutdown() override;

Network::ReadFilterCallbacks& readCallbacksForTest() { return read_callbacks_; }

private:
// Need to store the factory due to the shared_ptrs that need to be kept alive: date provider,
Expand Down
7 changes: 7 additions & 0 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,13 @@ void InstanceImpl::terminate() {

// Shutdown all the workers now that the main dispatch loop is done.
if (listener_manager_ != nullptr) {
// Also shutdown the listener manager's ApiListener, if there is one, which runs on the main
// thread. This needs to happen ahead of calling thread_local_.shutdown() below to prevent any
// objects in the ApiListener destructor to reference any objects in thread local storage.
if (listener_manager_->apiListener().has_value()) {
listener_manager_->apiListener()->get().shutdown();
}

listener_manager_->stopWorkers();
}

Expand Down
32 changes: 30 additions & 2 deletions test/integration/api_listener_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class ApiListenerIntegrationTest : public BaseIntegrationTest,
bootstrap.mutable_static_resources()->add_listeners()->MergeFrom(
Server::parseListenerFromV2Yaml(api_listener_config()));
});
BaseIntegrationTest::initialize();
}

void TearDown() override {
Expand Down Expand Up @@ -84,6 +83,7 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, ApiListenerIntegrationTest,
TestUtility::ipTestParamsToString);

TEST_P(ApiListenerIntegrationTest, Basic) {
BaseIntegrationTest::initialize();
absl::Notification done;
test_server_->server().dispatcher().post([this, &done]() -> void {
ASSERT_TRUE(test_server_->server().listenerManager().apiListener().has_value());
Expand Down Expand Up @@ -111,5 +111,33 @@ TEST_P(ApiListenerIntegrationTest, Basic) {
ASSERT_TRUE(done.WaitForNotificationWithTimeout(absl::Seconds(1)));
}

TEST_P(ApiListenerIntegrationTest, DestroyWithActiveStreams) {
autonomous_allow_incomplete_streams_ = true;
BaseIntegrationTest::initialize();
absl::Notification done;

test_server_->server().dispatcher().post([this, &done]() -> void {
ASSERT_TRUE(test_server_->server().listenerManager().apiListener().has_value());
ASSERT_EQ("api_listener", test_server_->server().listenerManager().apiListener()->get().name());
ASSERT_TRUE(test_server_->server().listenerManager().apiListener()->get().http().has_value());
auto& http_api_listener =
test_server_->server().listenerManager().apiListener()->get().http()->get();

ON_CALL(stream_encoder_, getStream()).WillByDefault(ReturnRef(stream_encoder_.stream_));
auto& stream_decoder = http_api_listener.newStream(stream_encoder_);

// Send a headers-only request
stream_decoder.decodeHeaders(
Http::HeaderMapPtr(new Http::TestHeaderMapImpl{
{":method", "GET"}, {":path", "/api"}, {":scheme", "http"}, {":authority", "host"}}),
false);

done.Notify();
});
ASSERT_TRUE(done.WaitForNotificationWithTimeout(absl::Seconds(1)));
// The server should shutdown the ApiListener at the right time during server termination such
// that no crashes occur if termination happens when the ApiListener still has ongoing streams.
}

} // namespace
} // namespace Envoy
} // namespace Envoy
1 change: 1 addition & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ envoy_cc_test(
":utility_lib",
"//source/server:api_listener_lib",
"//source/server:listener_lib",
"//test/mocks/network:network_mocks",
"//test/mocks/server:server_mocks",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/listener/v3:pkg_cc_proto",
Expand Down
46 changes: 45 additions & 1 deletion test/server/api_listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "server/api_listener_impl.h"
#include "server/listener_manager_impl.h"

#include "test/mocks/network/mocks.h"
#include "test/mocks/server/mocks.h"
#include "test/server/utility.h"
#include "test/test_common/utility.h"
Expand Down Expand Up @@ -87,5 +88,48 @@ name: test_api_listener
"eds_cluster_config {\n eds_config {\n path: \"eds path\"\n }\n }\n}\n");
}

TEST_F(ApiListenerTest, HttpApiListenerShutdown) {
const std::string yaml = R"EOF(
name: test_api_listener
address:
socket_address:
address: 127.0.0.1
port_value: 1234
api_listener:
api_listener:
"@type": type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager
stat_prefix: hcm
route_config:
name: api_router
virtual_hosts:
- name: api
domains:
- "*"
routes:
- match:
prefix: "/"
route:
cluster: dynamic_forward_proxy_cluster
)EOF";

const envoy::config::listener::v3::Listener config = parseListenerFromV2Yaml(yaml);

auto http_api_listener = HttpApiListener(config, *listener_manager_, config.name());

ASSERT_EQ("test_api_listener", http_api_listener.name());
ASSERT_EQ(ApiListener::Type::HttpApiListener, http_api_listener.type());
ASSERT_TRUE(http_api_listener.http().has_value());

Network::MockConnectionCallbacks network_connection_callbacks;
// TODO(junr03): potentially figure out a way of unit testing this behavior without exposing a
// ForTest function.
http_api_listener.readCallbacksForTest().connection().addConnectionCallbacks(
network_connection_callbacks);

EXPECT_CALL(network_connection_callbacks, onEvent(Network::ConnectionEvent::RemoteClose));
// Shutting down the ApiListener should raise an event on all connection callback targets.
http_api_listener.shutdown();
}

} // namespace Server
} // namespace Envoy
} // namespace Envoy