Skip to content

Commit

Permalink
udp: support forwarding packets between workers (#13086)
Browse files Browse the repository at this point in the history
Also, a few fixes in active_quic_listener that were discovered while fixing
tests to work with this change.

Signed-off-by: Greg Greenway <[email protected]>
  • Loading branch information
ggreenway authored Sep 23, 2020
1 parent 2059228 commit f8c3acc
Show file tree
Hide file tree
Showing 42 changed files with 699 additions and 296 deletions.
2 changes: 1 addition & 1 deletion include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class Dispatcher {
* @param cb supplies the udp listener callbacks to invoke for listener events.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr&& socket,
virtual Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr socket,
Network::UdpListenerCallbacks& cb) PURE;
/**
* Allocates a timer. @see Timer for docs on how to use the timer.
Expand Down
32 changes: 28 additions & 4 deletions include/envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class ConnectionHandler {
*/
virtual void removeListeners(uint64_t listener_tag) PURE;

/**
* Get the ``UdpListenerCallbacks`` associated with ``listener_tag``. This will be
* absl::nullopt for non-UDP listeners and for ``listener_tag`` values that have already been
* removed.
*/
virtual UdpListenerCallbacksOptRef getUdpListenerCallbacks(uint64_t listener_tag) PURE;

/**
* Remove the filter chains and the connections in the listener. All connections owned
* by the filter chains will be closed. Once all the connections are destroyed(connections
Expand Down Expand Up @@ -126,6 +133,22 @@ class ConnectionHandler {
};

using ActiveListenerPtr = std::unique_ptr<ActiveListener>;

/**
* Used by ConnectionHandler to manage UDP listeners.
*/
class ActiveUdpListener : public virtual ActiveListener, public Network::UdpListenerCallbacks {
public:
~ActiveUdpListener() override = default;

/**
* Returns the worker index that ``data`` should be delivered to. The return value must be in
* the range [0, concurrency).
*/
virtual uint32_t destination(const Network::UdpRecvData& data) const PURE;
};

using ActiveUdpListenerPtr = std::unique_ptr<ActiveUdpListener>;
};

using ConnectionHandlerPtr = std::unique_ptr<ConnectionHandler>;
Expand All @@ -140,15 +163,16 @@ class ActiveUdpListenerFactory {
/**
* Creates an ActiveUdpListener object and a corresponding UdpListener
* according to given config.
* @param worker_index The index of the worker this listener is being created on.
* @param parent is the owner of the created ActiveListener objects.
* @param dispatcher is used to create actual UDP listener.
* @param config provides information needed to create ActiveUdpListener and
* UdpListener objects.
* @return the ActiveUdpListener created.
*/
virtual ConnectionHandler::ActiveListenerPtr
createActiveUdpListener(ConnectionHandler& parent, Event::Dispatcher& disptacher,
Network::ListenerConfig& config) PURE;
virtual ConnectionHandler::ActiveUdpListenerPtr
createActiveUdpListener(uint32_t worker_index, ConnectionHandler& parent,
Event::Dispatcher& dispatcher, Network::ListenerConfig& config) PURE;

/**
* @return true if the UDP passing through listener doesn't form stateful connections.
Expand All @@ -159,4 +183,4 @@ class ActiveUdpListenerFactory {
using ActiveUdpListenerFactoryPtr = std::unique_ptr<ActiveUdpListenerFactory>;

} // namespace Network
} // namespace Envoy
} // namespace Envoy
69 changes: 66 additions & 3 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ namespace Envoy {
namespace Network {

class ActiveUdpListenerFactory;
class UdpListenerWorkerRouter;

using UdpListenerWorkerRouterOptRef =
absl::optional<std::reference_wrapper<UdpListenerWorkerRouter>>;

/**
* ListenSocketFactory is a member of ListenConfig to provide listen socket.
Expand Down Expand Up @@ -137,11 +141,17 @@ class ListenerConfig {
virtual ActiveUdpListenerFactory* udpListenerFactory() PURE;

/**
* @return factory pointer if writing on UDP socket, otherwise return
* nullptr.
* @return factory if writing on UDP socket, otherwise return
* nullopt.
*/
virtual UdpPacketWriterFactoryOptRef udpPacketWriterFactory() PURE;

/**
* @return the ``UdpListenerWorkerRouter`` for this listener. This will
* be non-empty iff this is a UDP listener.
*/
virtual UdpListenerWorkerRouterOptRef udpListenerWorkerRouter() PURE;

/**
* @return traffic direction of the listener.
*/
Expand Down Expand Up @@ -246,7 +256,7 @@ class UdpListenerCallbacks {
*
* @param data UdpRecvData from the underlying socket.
*/
virtual void onData(UdpRecvData& data) PURE;
virtual void onData(UdpRecvData&& data) PURE;

/**
* Called when the underlying socket is ready for read, before onData() is
Expand Down Expand Up @@ -278,8 +288,26 @@ class UdpListenerCallbacks {
* UdpListenerCallback
*/
virtual UdpPacketWriter& udpPacketWriter() PURE;

/**
* Returns the index of this worker, in the range of [0, concurrency).
*/
virtual uint32_t workerIndex() const PURE;

/**
* Called whenever data is received on the underlying udp socket, on
* the destination worker for the datagram according to ``destination()``.
*/
virtual void onDataWorker(Network::UdpRecvData&& data) PURE;

/**
* Posts ``data`` to be delivered on this worker.
*/
virtual void post(Network::UdpRecvData&& data) PURE;
};

using UdpListenerCallbacksOptRef = absl::optional<std::reference_wrapper<UdpListenerCallbacks>>;

/**
* An abstract socket listener. Free the listener to stop listening on the socket.
*/
Expand Down Expand Up @@ -337,9 +365,44 @@ class UdpListener : public virtual Listener {
* @return the error code of the underlying flush api.
*/
virtual Api::IoCallUint64Result flush() PURE;

/**
* Make this listener readable at the beginning of the next event loop.
*
* @note: it may become readable during the current loop if feature
* ``envoy.reloadable_features.activate_fds_next_event_loop`` is disabled.
*/
virtual void activateRead() PURE;
};

using UdpListenerPtr = std::unique_ptr<UdpListener>;

/**
* Handles delivering datagrams to the correct worker.
*/
class UdpListenerWorkerRouter {
public:
virtual ~UdpListenerWorkerRouter() = default;

/**
* Registers a worker's callbacks for this listener. This worker must accept
* packets until it calls ``unregisterWorker``.
*/
virtual void registerWorkerForListener(UdpListenerCallbacks& listener) PURE;

/**
* Unregisters a worker's callbacks for this listener.
*/
virtual void unregisterWorkerForListener(UdpListenerCallbacks& listener) PURE;

/**
* Deliver ``data`` to the correct worker by calling ``onDataWorker()``
* or ``post()`` on one of the registered workers.
*/
virtual void deliver(uint32_t dest_worker_index, UdpRecvData&& data) PURE;
};

using UdpListenerWorkerRouterPtr = std::unique_ptr<UdpListenerWorkerRouter>;

} // namespace Network
} // namespace Envoy
3 changes: 2 additions & 1 deletion include/envoy/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ class WorkerFactory {
virtual ~WorkerFactory() = default;

/**
* @param index supplies the index of the worker, in the range of [0, concurrency).
* @param overload_manager supplies the server's overload manager.
* @param worker_name supplies the name of the worker, used for per-worker stats.
* @return WorkerPtr a new worker.
*/
virtual WorkerPtr createWorker(OverloadManager& overload_manager,
virtual WorkerPtr createWorker(uint32_t index, OverloadManager& overload_manager,
const std::string& worker_name) PURE;
};

Expand Down
2 changes: 1 addition & 1 deletion source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Network::ListenerPtr DispatcherImpl::createListener(Network::SocketSharedPtr&& s
backlog_size);
}

Network::UdpListenerPtr DispatcherImpl::createUdpListener(Network::SocketSharedPtr&& socket,
Network::UdpListenerPtr DispatcherImpl::createUdpListener(Network::SocketSharedPtr socket,
Network::UdpListenerCallbacks& cb) {
ASSERT(isThreadSafe());
return std::make_unique<Network::UdpListenerImpl>(*this, std::move(socket), cb, timeSource());
Expand Down
2 changes: 1 addition & 1 deletion source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
Network::ListenerPtr createListener(Network::SocketSharedPtr&& socket,
Network::TcpListenerCallbacks& cb, bool bind_to_port,
uint32_t backlog_size) override;
Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr&& socket,
Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr socket,
Network::UdpListenerCallbacks& cb) override;
TimerPtr createTimer(TimerCb cb) override;
Event::SchedulableCallbackPtr createSchedulableCallback(std::function<void()> cb) override;
Expand Down
36 changes: 35 additions & 1 deletion source/common/network/udp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void UdpListenerImpl::processPacket(Address::InstanceConstSharedPtr local_addres
ASSERT(local_address != nullptr);
UdpRecvData recvData{
{std::move(local_address), std::move(peer_address)}, std::move(buffer), receive_time};
cb_.onData(recvData);
cb_.onData(std::move(recvData));
}
void UdpListenerImpl::handleWriteCallback() {
Expand Down Expand Up @@ -125,5 +125,39 @@ Api::IoCallUint64Result UdpListenerImpl::flush() {
return cb_.udpPacketWriter().flush();
}
void UdpListenerImpl::activateRead() { file_event_->activate(Event::FileReadyType::Read); }
UdpListenerWorkerRouterImpl::UdpListenerWorkerRouterImpl(uint32_t concurrency)
: workers_(concurrency) {}
void UdpListenerWorkerRouterImpl::registerWorkerForListener(UdpListenerCallbacks& listener) {
absl::WriterMutexLock lock(&mutex_);
ASSERT(listener.workerIndex() < workers_.size());
ASSERT(workers_.at(listener.workerIndex()) == nullptr);
workers_.at(listener.workerIndex()) = &listener;
}
void UdpListenerWorkerRouterImpl::unregisterWorkerForListener(UdpListenerCallbacks& listener) {
absl::WriterMutexLock lock(&mutex_);
ASSERT(workers_.at(listener.workerIndex()) == &listener);
workers_.at(listener.workerIndex()) = nullptr;
}
void UdpListenerWorkerRouterImpl::deliver(uint32_t dest_worker_index, UdpRecvData&& data) {
absl::ReaderMutexLock lock(&mutex_);
ASSERT(dest_worker_index < workers_.size(),
"UdpListenerCallbacks::destination returned out-of-range value");
auto* worker = workers_[dest_worker_index];
// When a listener is being removed, packets could be processed on some workers after the
// listener is removed from other workers, which could result in a nullptr for that worker.
if (worker != nullptr) {
worker->post(std::move(data));
}
}
} // namespace Network
} // namespace Envoy
15 changes: 15 additions & 0 deletions source/common/network/udp_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class UdpListenerImpl : public BaseListenerImpl,
const Address::InstanceConstSharedPtr& localAddress() const override;
Api::IoCallUint64Result send(const UdpSendData& data) override;
Api::IoCallUint64Result flush() override;
void activateRead() override;

void processPacket(Address::InstanceConstSharedPtr local_address,
Address::InstanceConstSharedPtr peer_address, Buffer::InstancePtr buffer,
Expand All @@ -61,5 +62,19 @@ class UdpListenerImpl : public BaseListenerImpl,
Event::FileEventPtr file_event_;
};

class UdpListenerWorkerRouterImpl : public UdpListenerWorkerRouter {
public:
UdpListenerWorkerRouterImpl(uint32_t concurrency);

// UdpListenerWorkerRouter
void registerWorkerForListener(UdpListenerCallbacks& listener) override;
void unregisterWorkerForListener(UdpListenerCallbacks& listener) override;
void deliver(uint32_t dest_worker_index, UdpRecvData&& data) override;

private:
absl::Mutex mutex_;
std::vector<UdpListenerCallbacks*> workers_ ABSL_GUARDED_BY(mutex_);
};

} // namespace Network
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ constexpr const char* runtime_features[] = {
"envoy.reloadable_features.http2_skip_encoding_empty_trailers",
"envoy.reloadable_features.listener_in_place_filterchain_update",
"envoy.reloadable_features.overload_manager_disable_keepalive_drain_http2",
"envoy.reloadable_features.prefer_quic_kernel_bpf_packet_routing",
"envoy.reloadable_features.preserve_query_string_in_path_redirects",
"envoy.reloadable_features.preserve_upstream_date",
"envoy.reloadable_features.stop_faking_paths",
Expand Down
5 changes: 1 addition & 4 deletions source/extensions/quic_listeners/quiche/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,7 @@ envoy_cc_library(
"//bazel:linux": ["udp_gso_batch_writer.cc"],
"//conditions:default": [],
}),
hdrs = select({
"//bazel:linux": ["udp_gso_batch_writer.h"],
"//conditions:default": [],
}),
hdrs = ["udp_gso_batch_writer.h"],
external_deps = ["quiche_quic_platform"],
tags = ["nofips"],
visibility = [
Expand Down
Loading

0 comments on commit f8c3acc

Please sign in to comment.