Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp: support forwarding packets between workers #13086

Merged
merged 45 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7fe3b1d
udp: support forwarding packets between workers
ggreenway Sep 14, 2020
f7f0899
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 14, 2020
da80953
clang-tidy
ggreenway Sep 14, 2020
4c85ce4
rename
ggreenway Sep 14, 2020
e4a1d6b
Re-use the file event on the socket instead of creating a new event on
ggreenway Sep 14, 2020
decfdd0
add comment about performance
ggreenway Sep 14, 2020
5ef79ae
opt ref
ggreenway Sep 15, 2020
3c1485a
non-release assert
ggreenway Sep 15, 2020
b1f030e
fix build
ggreenway Sep 15, 2020
b3588c3
spelling
ggreenway Sep 15, 2020
3641fd2
opt-ref all the things
ggreenway Sep 15, 2020
8ad53b6
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 15, 2020
21a86f0
id -> index
ggreenway Sep 15, 2020
d863caf
only schedule if there is a buffered chlo
ggreenway Sep 15, 2020
2380204
*ForTests
ggreenway Sep 15, 2020
e410196
_for_test_
ggreenway Sep 16, 2020
04e72b3
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 16, 2020
1b7aa1d
simpler enabled() checking
ggreenway Sep 16, 2020
1ef9c69
clarify warning
ggreenway Sep 16, 2020
659897e
revert unneeded #include
ggreenway Sep 16, 2020
d812e3c
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 16, 2020
9c3f054
fix build
ggreenway Sep 16, 2020
61ff426
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 17, 2020
970dd33
move listen_socket_ to base
ggreenway Sep 17, 2020
dfb32b4
rework interface so no locking when packet stays on same worker
ggreenway Sep 17, 2020
a0ec03e
make destination return non-optional
ggreenway Sep 17, 2020
f83f8d5
rename register/unregister
ggreenway Sep 17, 2020
f62d7b8
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 17, 2020
419db25
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 21, 2020
cc1630a
update comment
ggreenway Sep 21, 2020
8037892
clarify comment
ggreenway Sep 21, 2020
f22abc2
revert erroneous newline
ggreenway Sep 21, 2020
adffbb1
improve coverage in DisabledAndEnabled test
ggreenway Sep 21, 2020
e5d6ce0
add TODO for stat
ggreenway Sep 21, 2020
c6d2d99
clang-tidy
ggreenway Sep 21, 2020
e7e1f14
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 21, 2020
c012882
rename to clarify this is only a compile-time check
ggreenway Sep 22, 2020
1d57d02
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 22, 2020
e2437b4
/s/worker_id/worker_index/ everywhere
ggreenway Sep 22, 2020
0faba2e
doc comment on function
ggreenway Sep 22, 2020
f8d27af
remove no-op static_cast
ggreenway Sep 22, 2020
efc73e0
promote an interface so that the factory can return it and avoid
ggreenway Sep 22, 2020
155459d
clang_tidy
ggreenway Sep 22, 2020
fc4ba9d
Merge remote-tracking branch 'upstream/master' into udp-worker-routing
ggreenway Sep 22, 2020
5bf2394
lower coverage limit
ggreenway Sep 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class Dispatcher {
* @param cb supplies the udp listener callbacks to invoke for listener events.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr&& socket,
virtual Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr socket,
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
Network::UdpListenerCallbacks& cb) PURE;
/**
* Allocates a timer. @see Timer for docs on how to use the timer.
Expand Down
13 changes: 10 additions & 3 deletions include/envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class ConnectionHandler {
*/
virtual void removeListeners(uint64_t listener_tag) PURE;

/**
* Get the ``UdpListenerCallbacks`` associated with ``listener_tag``. This will be
* absl::nullopt for non-UDP listeners and for ``listener_tag`` values that have already been
* removed.
*/
virtual UdpListenerCallbacksOptRef getUdpListenerCallbacks(uint64_t listener_tag) PURE;

/**
* Remove the filter chains and the connections in the listener. All connections owned
* by the filter chains will be closed. Once all the connections are destroyed(connections
Expand Down Expand Up @@ -147,8 +154,8 @@ class ActiveUdpListenerFactory {
* @return the ActiveUdpListener created.
*/
virtual ConnectionHandler::ActiveListenerPtr
createActiveUdpListener(ConnectionHandler& parent, Event::Dispatcher& disptacher,
Network::ListenerConfig& config) PURE;
createActiveUdpListener(uint32_t worker_id, ConnectionHandler& parent,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: worker_index, here and anywhere else we still have id. Also needs doc comment update.

Event::Dispatcher& dispatcher, Network::ListenerConfig& config) PURE;

/**
* @return true if the UDP passing through listener doesn't form stateful connections.
Expand All @@ -159,4 +166,4 @@ class ActiveUdpListenerFactory {
using ActiveUdpListenerFactoryPtr = std::unique_ptr<ActiveUdpListenerFactory>;

} // namespace Network
} // namespace Envoy
} // namespace Envoy
69 changes: 66 additions & 3 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ namespace Envoy {
namespace Network {

class ActiveUdpListenerFactory;
class UdpListenerWorkerRouter;

using UdpListenerWorkerRouterOptRef =
absl::optional<std::reference_wrapper<UdpListenerWorkerRouter>>;

/**
* ListenSocketFactory is a member of ListenConfig to provide listen socket.
Expand Down Expand Up @@ -137,11 +141,17 @@ class ListenerConfig {
virtual ActiveUdpListenerFactory* udpListenerFactory() PURE;

/**
* @return factory pointer if writing on UDP socket, otherwise return
* nullptr.
* @return factory if writing on UDP socket, otherwise return
* nullopt.
*/
virtual UdpPacketWriterFactoryOptRef udpPacketWriterFactory() PURE;

/**
* @return the ``UdpListenerWorkerRouter`` for this listener. This will
* be non-empty iff this is a UDP listener.
*/
virtual UdpListenerWorkerRouterOptRef udpListenerWorkerRouter() PURE;

/**
* @return traffic direction of the listener.
*/
Expand Down Expand Up @@ -246,7 +256,7 @@ class UdpListenerCallbacks {
*
* @param data UdpRecvData from the underlying socket.
*/
virtual void onData(UdpRecvData& data) PURE;
virtual void onData(UdpRecvData&& data) PURE;

/**
* Called when the underlying socket is ready for read, before onData() is
Expand Down Expand Up @@ -278,8 +288,26 @@ class UdpListenerCallbacks {
* UdpListenerCallback
*/
virtual UdpPacketWriter& udpPacketWriter() PURE;

/**
* Returns the index of this worker, in the range of [0, concurrency).
*/
virtual uint32_t workerIndex() const PURE;

/**
* Called whenever data is received on the underlying udp socket, on
* the destination worker for the datagram according to ``destination()``.
*/
virtual void onDataWorker(Network::UdpRecvData&& data) PURE;

/**
* Posts ``data`` to be delivered on this worker.
*/
virtual void post(Network::UdpRecvData&& data) PURE;
};

using UdpListenerCallbacksOptRef = absl::optional<std::reference_wrapper<UdpListenerCallbacks>>;

/**
* An abstract socket listener. Free the listener to stop listening on the socket.
*/
Expand Down Expand Up @@ -337,9 +365,44 @@ class UdpListener : public virtual Listener {
* @return the error code of the underlying flush api.
*/
virtual Api::IoCallUint64Result flush() PURE;

/**
* Make this listener readable at the beginning of the next event loop.
*
* @note: it may become readable during the current loop if feature
* ``envoy.reloadable_features.activate_fds_next_event_loop`` is disabled.
*/
virtual void activateRead() PURE;
};

using UdpListenerPtr = std::unique_ptr<UdpListener>;

/**
* Handles delivering datagrams to the correct worker.
*/
class UdpListenerWorkerRouter {
public:
virtual ~UdpListenerWorkerRouter() = default;

/**
* Registers a worker's callbacks for this listener. This worker must accept
* packets until it calls ``unregisterWorker``.
*/
virtual void registerWorkerForListener(UdpListenerCallbacks& listener) PURE;

/**
* Unregisters a worker's callbacks for this listener.
*/
virtual void unregisterWorkerForListener(UdpListenerCallbacks& listener) PURE;

/**
* Deliver ``data`` to the correct worker by calling ``onDataWorker()``
* or ``post()`` on one of the registered workers.
*/
virtual void deliver(uint32_t dest_worker_index, UdpRecvData&& data) PURE;
};

using UdpListenerWorkerRouterPtr = std::unique_ptr<UdpListenerWorkerRouter>;

} // namespace Network
} // namespace Envoy
3 changes: 2 additions & 1 deletion include/envoy/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ class WorkerFactory {
virtual ~WorkerFactory() = default;

/**
* @param index supplies the index of the worker, in the range of [0, concurrency).
* @param overload_manager supplies the server's overload manager.
* @param worker_name supplies the name of the worker, used for per-worker stats.
* @return WorkerPtr a new worker.
*/
virtual WorkerPtr createWorker(OverloadManager& overload_manager,
virtual WorkerPtr createWorker(uint32_t index, OverloadManager& overload_manager,
const std::string& worker_name) PURE;
};

Expand Down
2 changes: 1 addition & 1 deletion source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Network::ListenerPtr DispatcherImpl::createListener(Network::SocketSharedPtr&& s
backlog_size);
}

Network::UdpListenerPtr DispatcherImpl::createUdpListener(Network::SocketSharedPtr&& socket,
Network::UdpListenerPtr DispatcherImpl::createUdpListener(Network::SocketSharedPtr socket,
Network::UdpListenerCallbacks& cb) {
ASSERT(isThreadSafe());
return std::make_unique<Network::UdpListenerImpl>(*this, std::move(socket), cb, timeSource());
Expand Down
2 changes: 1 addition & 1 deletion source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
Network::ListenerPtr createListener(Network::SocketSharedPtr&& socket,
Network::TcpListenerCallbacks& cb, bool bind_to_port,
uint32_t backlog_size) override;
Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr&& socket,
Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr socket,
Network::UdpListenerCallbacks& cb) override;
TimerPtr createTimer(TimerCb cb) override;
Event::SchedulableCallbackPtr createSchedulableCallback(std::function<void()> cb) override;
Expand Down
36 changes: 35 additions & 1 deletion source/common/network/udp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void UdpListenerImpl::processPacket(Address::InstanceConstSharedPtr local_addres
ASSERT(local_address != nullptr);
UdpRecvData recvData{
{std::move(local_address), std::move(peer_address)}, std::move(buffer), receive_time};
cb_.onData(recvData);
cb_.onData(std::move(recvData));
}

void UdpListenerImpl::handleWriteCallback() {
Expand Down Expand Up @@ -125,5 +125,39 @@ Api::IoCallUint64Result UdpListenerImpl::flush() {
return cb_.udpPacketWriter().flush();
}

void UdpListenerImpl::activateRead() { file_event_->activate(Event::FileReadyType::Read); }

UdpListenerWorkerRouterImpl::UdpListenerWorkerRouterImpl(uint32_t concurrency)
: workers_(concurrency) {}

void UdpListenerWorkerRouterImpl::registerWorkerForListener(UdpListenerCallbacks& listener) {
absl::WriterMutexLock lock(&mutex_);

ASSERT(listener.workerIndex() < workers_.size());
ASSERT(workers_.at(listener.workerIndex()) == nullptr);
workers_.at(listener.workerIndex()) = &listener;
}

void UdpListenerWorkerRouterImpl::unregisterWorkerForListener(UdpListenerCallbacks& listener) {
absl::WriterMutexLock lock(&mutex_);

ASSERT(workers_.at(listener.workerIndex()) == &listener);
workers_.at(listener.workerIndex()) = nullptr;
}

void UdpListenerWorkerRouterImpl::deliver(uint32_t dest_worker_index, UdpRecvData&& data) {
absl::ReaderMutexLock lock(&mutex_);

ggreenway marked this conversation as resolved.
Show resolved Hide resolved
ASSERT(dest_worker_index < workers_.size(),
"UdpListenerCallbacks::destination returned out-of-range value");
auto* worker = workers_[dest_worker_index];

// When a listener is being removed, packets could be processed on some workers after the
// listener is removed from other workers, which could result in a nullptr for that worker.
if (worker != nullptr) {
worker->post(std::move(data));
}
}

} // namespace Network
} // namespace Envoy
14 changes: 14 additions & 0 deletions source/common/network/udp_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class UdpListenerImpl : public BaseListenerImpl,
const Address::InstanceConstSharedPtr& localAddress() const override;
Api::IoCallUint64Result send(const UdpSendData& data) override;
Api::IoCallUint64Result flush() override;
void activateRead() override;

void processPacket(Address::InstanceConstSharedPtr local_address,
Address::InstanceConstSharedPtr peer_address, Buffer::InstancePtr buffer,
Expand All @@ -61,5 +62,18 @@ class UdpListenerImpl : public BaseListenerImpl,
Event::FileEventPtr file_event_;
};

class UdpListenerWorkerRouterImpl : public UdpListenerWorkerRouter {
public:
UdpListenerWorkerRouterImpl(uint32_t concurrency);

void registerWorkerForListener(UdpListenerCallbacks& listener) override;
void unregisterWorkerForListener(UdpListenerCallbacks& listener) override;
void deliver(uint32_t dest_worker_id, UdpRecvData&& data) override;

private:
absl::Mutex mutex_;
std::vector<UdpListenerCallbacks*> workers_ ABSL_GUARDED_BY(mutex_);
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace Network
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ constexpr const char* runtime_features[] = {
"envoy.reloadable_features.http2_skip_encoding_empty_trailers",
"envoy.reloadable_features.listener_in_place_filterchain_update",
"envoy.reloadable_features.overload_manager_disable_keepalive_drain_http2",
"envoy.reloadable_features.prefer_quic_kernel_bpf_packet_routing",
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
"envoy.reloadable_features.preserve_query_string_in_path_redirects",
"envoy.reloadable_features.preserve_upstream_date",
"envoy.reloadable_features.stop_faking_paths",
Expand Down
5 changes: 1 addition & 4 deletions source/extensions/quic_listeners/quiche/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,7 @@ envoy_cc_library(
"//bazel:linux": ["udp_gso_batch_writer.cc"],
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
"//conditions:default": [],
}),
hdrs = select({
"//bazel:linux": ["udp_gso_batch_writer.h"],
"//conditions:default": [],
}),
hdrs = ["udp_gso_batch_writer.h"],
external_deps = ["quiche_quic_platform"],
tags = ["nofips"],
visibility = [
Expand Down
Loading