Skip to content

Commit

Permalink
listen_socket: Support multiple socket options.
Browse files Browse the repository at this point in the history
Keep a list of options so that multiple options can be set on the same
sockets, maybe by different listener filters.

TODO: Test with multiple options & verify hashKey() is properly handled as well.

Signed-off-by: Jarno Rajahalme <[email protected]>
  • Loading branch information
jrajahalme committed Mar 14, 2018
1 parent 3109a2c commit 3495654
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 90 deletions.
23 changes: 13 additions & 10 deletions include/envoy/network/listen_socket.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <list>
#include <memory>
#include <vector>

#include "envoy/common/pure.h"
#include "envoy/network/address.h"
Expand Down Expand Up @@ -33,9 +35,9 @@ class Socket {
/**
* Visitor class for setting socket options.
*/
class Options {
class Option {
public:
virtual ~Options() {}
virtual ~Option() {}

/**
* @param socket the socket on which to apply options.
Expand All @@ -44,22 +46,23 @@ class Socket {
* that only make sense for bound sockets if set before the bind() call.
* @return true if succeeded, false otherwise.
*/
virtual bool setOptions(Socket& socket, bool pre_bind) const PURE;
virtual bool setOption(Socket& socket, bool pre_bind) const PURE;

/**
* @return bits that can be used to separate connections based on the options. Should return
* zero if connections with different options can be pooled together. This is limited
* to 32 bits to allow these bits to be efficiently combined into a larger hash key
* used in connection pool lookups.
* @param vector of bits that can be used to separate connections based on the options. Should
* return zero if connections with different options can be pooled together. This is
* limited to 32 bits to allow these bits to be efficiently combined into a larger hash
* key used in connection pool lookups.
*/
virtual uint32_t hashKey() const PURE;
virtual void hashKey(std::vector<uint8_t>& key) const PURE;
};
typedef std::shared_ptr<Options> OptionsSharedPtr;
typedef std::shared_ptr<Option> OptionSharedPtr;
typedef std::shared_ptr<std::list<OptionSharedPtr>> OptionsSharedPtr;

/**
* Set the socket options for later retrieval with options().
*/
virtual void setOptions(const OptionsSharedPtr&) PURE;
virtual void setOption(const OptionSharedPtr&) PURE;

/**
* @return the socket options stored earlier with setOptions(), if any.
Expand Down
2 changes: 1 addition & 1 deletion include/envoy/server/filter_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class ListenerFactoryContext : public FactoryContext {
/**
* Store socket options to be set on the listen socket before listening.
*/
virtual void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options) PURE;
virtual void setListenSocketOption(const Network::Socket::OptionSharedPtr& options) PURE;
};

/**
Expand Down
16 changes: 9 additions & 7 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,13 +540,15 @@ ClientConnectionImpl::ClientConnectionImpl(
std::move(transport_socket), false) {
bool will_bind = (source_address != nullptr);
if (options) {
if (!options->setOptions(*socket_, will_bind)) {
// Set a special error state to ensure asynchronous close to give the owner of the
// ConnectionImpl a chance to add callbacks and detect the "disconnect".
immediate_error_event_ = ConnectionEvent::LocalClose;
// Trigger a write event to close this connection out-of-band.
file_event_->activate(Event::FileReadyType::Write);
return;
for (const auto& option : *options) {
if (!option->setOption(*socket_, will_bind)) {
// Set a special error state to ensure asynchronous close to give the owner of the
// ConnectionImpl a chance to add callbacks and detect the "disconnect".
immediate_error_event_ = ConnectionEvent::LocalClose;
// Trigger a write event to close this connection out-of-band.
file_event_->activate(Event::FileReadyType::Write);
return;
}
}
}
if (will_bind) {
Expand Down
19 changes: 13 additions & 6 deletions source/common/network/listen_socket_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ void ListenSocketImpl::doBind() {
}
}

void ListenSocketImpl::setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options,
bool pre_bind) {
if (options) {
for (const auto& option : *options) {
if (!option->setOption(*this, pre_bind)) {
throw EnvoyException("ListenSocket: Setting socket options failed");
}
}
}
}

TcpListenSocket::TcpListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options,
bool bind_to_port)
Expand All @@ -39,9 +50,7 @@ TcpListenSocket::TcpListenSocket(const Address::InstanceConstSharedPtr& address,
int rc = setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
RELEASE_ASSERT(rc != -1);

if (options && !options->setOptions(*this, bind_to_port)) {
throw EnvoyException("TcpListenSocket: Setting socket options failed");
}
setListenSocketOptions(options, bind_to_port);

if (bind_to_port) {
doBind();
Expand All @@ -51,9 +60,7 @@ TcpListenSocket::TcpListenSocket(const Address::InstanceConstSharedPtr& address,
TcpListenSocket::TcpListenSocket(int fd, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options)
: ListenSocketImpl(fd, address) {
if (options && !options->setOptions(*this, false)) {
throw EnvoyException("TcpListenSocket: Setting socket options failed");
}
setListenSocketOptions(options, false);
}

UdsListenSocket::UdsListenSocket(const Address::InstanceConstSharedPtr& address)
Expand Down
8 changes: 7 additions & 1 deletion source/common/network/listen_socket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ class SocketImpl : public virtual Socket {
fd_ = -1;
}
}
void setOptions(const OptionsSharedPtr& options) override { options_ = options; }
void setOption(const OptionSharedPtr& option) override {
if (!options_) {
options_ = std::make_shared<std::list<OptionSharedPtr>>();
}
options_->emplace_back(option);
}
const OptionsSharedPtr& options() const override { return options_; }

protected:
Expand All @@ -44,6 +49,7 @@ class ListenSocketImpl : public SocketImpl {
: SocketImpl(fd, local_address) {}

void doBind();
void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options, bool pre_bind);
};

/**
Expand Down
17 changes: 9 additions & 8 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
}

// Inherit socket options from downstream connection, if set.
absl::optional<uint32_t> hash_key;
std::vector<uint8_t> hash_key = {uint8_t(protocol), uint8_t(priority)};

// Use downstream connection socket options for computing connection pool hash key, if any.
// This allows socket options to control connection pooling so that connections with
Expand All @@ -767,19 +767,20 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
const Network::ConnectionSocket::OptionsSharedPtr& options =
context->downstreamConnection()->socketOptions();
if (options) {
hash_key = options->hashKey();
for (const auto& option : *options) {
option->hashKey(hash_key);
}
}
}

ConnPoolsContainer& container = parent_.host_http_conn_pool_map_[host];
const auto key = container.key(priority, protocol, hash_key ? hash_key.value() : 0);
if (!container.pools_[key]) {
container.pools_[key] = parent_.parent_.factory_.allocateConnPool(
if (!container.pools_[hash_key]) {
container.pools_[hash_key] = parent_.parent_.factory_.allocateConnPool(
parent_.thread_local_dispatcher_, host, priority, protocol,
hash_key ? context->downstreamConnection()->socketOptions() : nullptr);
hash_key.size() > 2 ? context->downstreamConnection()->socketOptions() : nullptr);
}

return container.pools_[key].get();
return container.pools_[hash_key].get();
}

ClusterManagerPtr ProdClusterManagerFactory::clusterManagerFromProto(
Expand Down Expand Up @@ -819,4 +820,4 @@ CdsApiPtr ProdClusterManagerFactory::createCds(
}

} // namespace Upstream
} // namespace Envoy
} // namespace Envoy
14 changes: 2 additions & 12 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
#include <cstdint>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "envoy/config/bootstrap/v2/bootstrap.pb.h"
Expand Down Expand Up @@ -197,17 +197,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
*/
struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject {
struct ConnPoolsContainer {
typedef std::unordered_map<uint64_t, Http::ConnectionPool::InstancePtr> ConnPools;

uint64_t key(ResourcePriority priority, Http::Protocol protocol, uint32_t hash_key) {
// One bit needed for priority
static_assert(NumResourcePriorities == 2,
"Fix shifts below to match number of bits needed for 'priority'");
// Two bits needed for protocol
static_assert(Http::NumProtocols <= 4,
"Fix shifts below to match number of bits needed for 'protocol'");
return uint64_t(hash_key) << 3 | uint64_t(protocol) << 1 | uint64_t(priority);
}
typedef std::map<std::vector<uint8_t>, Http::ConnectionPool::InstancePtr> ConnPools;

ConnPools pools_;
uint64_t drains_remaining_{};
Expand Down
18 changes: 10 additions & 8 deletions source/server/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,16 @@ void ListenerImpl::setSocketAndOptions(const Network::SocketSharedPtr& socket) {
// Server config validation sets nullptr sockets.
if (socket_ && listen_socket_options_) {
// 'pre_bind = false' as bind() is never done after this.
bool ok = listen_socket_options_->setOptions(*socket_, false);
const std::string message =
fmt::format("{}: Setting socket options {}", name_, ok ? "succeeded" : "failed");
if (!ok) {
ENVOY_LOG(warn, "{}", message);
throw EnvoyException(message);
} else {
ENVOY_LOG(debug, "{}", message);
for (const auto& option : *listen_socket_options_) {
bool ok = option->setOption(*socket_, false);
const std::string message =
fmt::format("{}: Setting socket options {}", name_, ok ? "succeeded" : "failed");
if (!ok) {
ENVOY_LOG(warn, "{}", message);
throw EnvoyException(message);
} else {
ENVOY_LOG(debug, "{}", message);
}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions source/server/listener_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,11 @@ class ListenerImpl : public Network::ListenerConfig,
ThreadLocal::Instance& threadLocal() override { return parent_.server_.threadLocal(); }
Admin& admin() override { return parent_.server_.admin(); }
const envoy::api::v2::core::Metadata& listenerMetadata() const override { return metadata_; };
void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options) override {
listen_socket_options_ = options;
void setListenSocketOption(const Network::Socket::OptionSharedPtr& option) override {
if (!listen_socket_options_) {
listen_socket_options_ = std::make_shared<std::list<Network::Socket::OptionSharedPtr>>();
}
listen_socket_options_->emplace_back(option);
}

// Network::DrainDecision
Expand Down
15 changes: 7 additions & 8 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> {
protected:
Event::DispatcherPtr dispatcher_;
Stats::IsolatedStoreImpl stats_store_;
std::shared_ptr<MockSocketOptions> options_{std::make_shared<NiceMock<MockSocketOptions>>()};
Network::TcpListenSocket socket_{Network::Test::getAnyAddress(GetParam()), options_, true};
Network::TcpListenSocket socket_{Network::Test::getAnyAddress(GetParam()), nullptr, true};
Network::MockListenerCallbacks listener_callbacks_;
Network::MockConnectionHandler connection_handler_;
Network::ListenerPtr listener_;
Expand Down Expand Up @@ -253,12 +252,12 @@ TEST_P(ConnectionImplTest, SocketOptions) {

read_filter_.reset(new NiceMock<MockReadFilter>());

auto options = std::make_shared<MockSocketOptions>();
auto option = std::make_shared<MockSocketOption>();

EXPECT_CALL(*options, setOptions(_, false)).WillOnce(Return(true));
EXPECT_CALL(*option, setOption(_, false)).WillOnce(Return(true));
EXPECT_CALL(listener_callbacks_, onAccept_(_, _))
.WillOnce(Invoke([&](Network::ConnectionSocketPtr& socket, bool) -> void {
socket->setOptions(options);
socket->setOption(option);
Network::ConnectionPtr new_connection = dispatcher_->createServerConnection(
std::move(socket), Network::Test::createRawBufferSocket());
listener_callbacks_.onNewConnection(std::move(new_connection));
Expand Down Expand Up @@ -301,12 +300,12 @@ TEST_P(ConnectionImplTest, SocketOptionsFailureTest) {

read_filter_.reset(new NiceMock<MockReadFilter>());

auto options = std::make_shared<MockSocketOptions>();
auto option = std::make_shared<MockSocketOption>();

EXPECT_CALL(*options, setOptions(_, false)).WillOnce(Return(false));
EXPECT_CALL(*option, setOption(_, false)).WillOnce(Return(false));
EXPECT_CALL(listener_callbacks_, onAccept_(_, _))
.WillOnce(Invoke([&](Network::ConnectionSocketPtr& socket, bool) -> void {
socket->setOptions(options);
socket->setOption(option);
Network::ConnectionPtr new_connection = dispatcher_->createServerConnection(
std::move(socket), Network::Test::createRawBufferSocket());
listener_callbacks_.onNewConnection(std::move(new_connection));
Expand Down
12 changes: 8 additions & 4 deletions test/common/network/listen_socket_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@ TEST_P(ListenSocketImplTest, BindSpecificPort) {
// bind failure (in the TcpListenSocket ctor) once isn't considered an error.
EXPECT_EQ(0, close(addr_fd.second));

auto options = std::make_shared<MockSocketOptions>();
EXPECT_CALL(*options, setOptions(_, true)).WillOnce(Return(true));
auto option = std::make_shared<MockSocketOption>();
auto options = std::make_shared<std::list<Network::Socket::OptionSharedPtr>>();
options->emplace_back(option);
EXPECT_CALL(*option, setOption(_, true)).WillOnce(Return(true));
TcpListenSocket socket1(addr, options, true);
EXPECT_EQ(0, listen(socket1.fd(), 0));
EXPECT_EQ(addr->ip()->port(), socket1.localAddress()->ip()->port());
EXPECT_EQ(addr->ip()->addressAsString(), socket1.localAddress()->ip()->addressAsString());

auto options2 = std::make_shared<MockSocketOptions>();
EXPECT_CALL(*options2, setOptions(_, true)).WillOnce(Return(true));
auto option2 = std::make_shared<MockSocketOption>();
auto options2 = std::make_shared<std::list<Network::Socket::OptionSharedPtr>>();
options2->emplace_back(option2);
EXPECT_CALL(*option2, setOption(_, true)).WillOnce(Return(true));
// The address and port are bound already, should throw exception.
EXPECT_THROW(Network::TcpListenSocket socket2(addr, options2, true), EnvoyException);

Expand Down
6 changes: 4 additions & 2 deletions test/common/network/listener_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ TEST_P(ListenerImplTest, WildcardListenerUseActualDst) {
TEST_P(ListenerImplTest, WildcardListenerIpv4Compat) {
Stats::IsolatedStoreImpl stats_store;
Event::DispatcherImpl dispatcher;
auto options = std::make_shared<MockSocketOptions>();
auto option = std::make_shared<MockSocketOption>();
auto options = std::make_shared<std::list<Network::Socket::OptionSharedPtr>>();
options->emplace_back(option);

EXPECT_CALL(*options, setOptions(_, true)).WillOnce(Return(true));
EXPECT_CALL(*option, setOption(_, true)).WillOnce(Return(true));
Network::TcpListenSocket socket(Network::Test::getAnyAddress(version_, true), options, true);
Network::MockListenerCallbacks listener_callbacks;
Network::MockConnectionHandler connection_handler;
Expand Down
6 changes: 3 additions & 3 deletions test/mocks/network/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ MockListenSocket::MockListenSocket() : local_address_(new Address::Ipv4Instance(

MockListenSocket::~MockListenSocket() {}

MockSocketOptions::MockSocketOptions() {
ON_CALL(*this, setOptions(_, _)).WillByDefault(Return(true));
MockSocketOption::MockSocketOption() {
ON_CALL(*this, setOption(_, _)).WillByDefault(Return(true));
}

MockSocketOptions::~MockSocketOptions() {}
MockSocketOption::~MockSocketOption() {}

MockConnectionSocket::MockConnectionSocket() : local_address_(new Address::Ipv4Instance(80)) {
ON_CALL(*this, localAddress()).WillByDefault(ReturnRef(local_address_));
Expand Down
14 changes: 7 additions & 7 deletions test/mocks/network/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,20 +261,20 @@ class MockListenSocket : public Socket {
MOCK_CONST_METHOD0(localAddress, const Address::InstanceConstSharedPtr&());
MOCK_CONST_METHOD0(fd, int());
MOCK_METHOD0(close, void());
MOCK_METHOD1(setOptions, void(const Socket::OptionsSharedPtr& options));
MOCK_METHOD1(setOption, void(const Socket::OptionSharedPtr& options));
MOCK_CONST_METHOD0(options, const OptionsSharedPtr&());

Address::InstanceConstSharedPtr local_address_;
OptionsSharedPtr options_;
};

class MockSocketOptions : public Socket::Options {
class MockSocketOption : public Socket::Option {
public:
MockSocketOptions();
~MockSocketOptions();
MockSocketOption();
~MockSocketOption();

MOCK_CONST_METHOD2(setOptions, bool(Socket&, bool pre_bind));
MOCK_CONST_METHOD0(hashKey, uint32_t());
MOCK_CONST_METHOD2(setOption, bool(Socket&, bool pre_bind));
MOCK_CONST_METHOD1(hashKey, void(std::vector<uint8_t>&));
};

class MockConnectionSocket : public ConnectionSocket {
Expand All @@ -287,7 +287,7 @@ class MockConnectionSocket : public ConnectionSocket {
MOCK_CONST_METHOD0(localAddressRestored, bool());
MOCK_CONST_METHOD0(remoteAddress, const Address::InstanceConstSharedPtr&());
MOCK_METHOD1(setRemoteAddress, void(const Address::InstanceConstSharedPtr&));
MOCK_METHOD1(setOptions, void(const Network::ConnectionSocket::OptionsSharedPtr&));
MOCK_METHOD1(setOption, void(const Network::ConnectionSocket::OptionSharedPtr&));
MOCK_CONST_METHOD0(options, const Network::ConnectionSocket::OptionsSharedPtr&());
MOCK_CONST_METHOD0(fd, int());
MOCK_METHOD0(close, void());
Expand Down
Loading

0 comments on commit 3495654

Please sign in to comment.