Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suissa-Peleg <[email protected]>
  • Loading branch information
adisuissa committed May 28, 2024
1 parent e7aa3bc commit 5c1508e
Showing 1 changed file with 57 additions and 46 deletions.
103 changes: 57 additions & 46 deletions source/extensions/config_subscription/grpc/grpc_mux_failover.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,42 +89,30 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {
// First check if Envoy ever connected to the primary/failover, and if so
// persist attempts to that source.
if (ever_connected_to_ == ConnectedState::Primary) {
ASSERT(!connecting_to_failover_);
ENVOY_LOG_MISC(trace, "Attempting to reconnect to the primary gRPC source, as a connection "
"to it was previously established.");
if (connected_to_ != ConnectedState::Primary) {
ASSERT(connected_to_ == ConnectedState::None);
connecting_to_primary_ = true;
primary_grpc_stream_->establishNewStream();
}
establishStreamToPrimaryIfNotConnected();
return;
} else if (ever_connected_to_ == ConnectedState::Failover) {
ASSERT(!connecting_to_primary_);
ENVOY_LOG_MISC(trace, "Attempting to reconnect to the failover gRPC source, as a connection "
"to it was previously established.");
if (connected_to_ != ConnectedState::Failover) {
ASSERT(connected_to_ == ConnectedState::None);
failover_grpc_stream_->establishNewStream();
}
establishStreamToFailoverIfNotConnected();
return;
}
// No prior connection was established, prefer the primary over the failover.
if (connecting_to_failover_ || (connected_to_ == ConnectedState::Failover)) {
if (connected_to_ != ConnectedState::Failover) {
ASSERT(connected_to_ == ConnectedState::None);
failover_grpc_stream_->establishNewStream();
}
if (connecting_to_failover_) {
establishStreamToFailoverIfNotConnected();
} else {
// Either connecting to primary or connected to it, or neither.
if (connected_to_ != ConnectedState::Primary) {
ASSERT(connected_to_ == ConnectedState::None);
connecting_to_primary_ = true;
primary_grpc_stream_->establishNewStream();
}
establishStreamToPrimaryIfNotConnected();
}
}

// Returns the availability of the underlying stream.
bool grpcStreamAvailable() const {
if (connecting_to_failover_ || (connected_to_ == ConnectedState::Failover)) {
if (connectingToOrConnectedToFailover()) {
return failover_grpc_stream_->grpcStreamAvailable();
}
// Either connecting/connected to the primary, or no connection was attempted.
Expand All @@ -133,7 +121,7 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {

// Sends a message using the underlying stream.
void sendMessage(const RequestType& request) {
if (connecting_to_failover_ || (connected_to_ == ConnectedState::Failover)) {
if (connectingToOrConnectedToFailover()) {
failover_grpc_stream_->sendMessage(request);
return;
}
Expand All @@ -143,7 +131,7 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {

// Updates the queue size of the underlying stream.
void maybeUpdateQueueSizeStat(uint64_t size) {
if (connecting_to_failover_ || (connected_to_ == ConnectedState::Failover)) {
if (connectingToOrConnectedToFailover()) {
failover_grpc_stream_->maybeUpdateQueueSizeStat(size);
return;
}
Expand All @@ -153,7 +141,7 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {

// Returns true if the rate-limit allows draining.
bool checkRateLimitAllowsDrain() {
if (connecting_to_failover_ || (connected_to_ == ConnectedState::Failover)) {
if (connectingToOrConnectedToFailover()) {
return failover_grpc_stream_->checkRateLimitAllowsDrain();
}
// Either connecting/connected to the primary, or no connection was attempted.
Expand All @@ -162,19 +150,19 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {

// Returns the close status for testing purposes only.
absl::optional<Grpc::Status::GrpcStatus> getCloseStatusForTest() {
if (connecting_to_failover_ || (connected_to_ == ConnectedState::Failover)) {
if (connectingToOrConnectedToFailover()) {
return failover_grpc_stream_->getCloseStatusForTest();
}
ASSERT(connecting_to_primary_ || (connected_to_ == ConnectedState::Primary));
ASSERT(connectingToOrConnectedToPrimary());
return primary_grpc_stream_->getCloseStatusForTest();
}

// Returns the current stream for testing purposes only.
GrpcStreamInterface<RequestType, ResponseType>& currentStreamForTest() {
if (connecting_to_failover_ || (connected_to_ == ConnectedState::Failover)) {
if (connectingToOrConnectedToFailover()) {
return *failover_grpc_stream_;
}
ASSERT(connecting_to_primary_ || (connected_to_ == ConnectedState::Primary));
ASSERT(connectingToOrConnectedToPrimary());
return *primary_grpc_stream_;
};

Expand Down Expand Up @@ -206,12 +194,10 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {
}

void onEstablishmentFailure() override {
ASSERT(parent_.connected_to_ != ConnectedState::Failover);
// This will be called when the primary stream fails to establish a connection, or after the
// connection was closed.
ASSERT((parent_.connecting_to_primary_ || parent_.connected_to_ == ConnectedState::Primary) &&
!parent_.connecting_to_failover_ &&
(parent_.connected_to_ != ConnectedState::Failover));
ASSERT(parent_.connectingToOrConnectedToPrimary() &&
!parent_.connectingToOrConnectedToFailover());
// If there's no failover supported, this will just be a pass-through
// callback.
if (parent_.failover_grpc_stream_ != nullptr) {
Expand Down Expand Up @@ -247,9 +233,8 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {

void onDiscoveryResponse(ResponseProtoPtr<ResponseType>&& message,
ControlPlaneStats& control_plane_stats) override {
ASSERT(
(parent_.connecting_to_primary_ || (parent_.connected_to_ == ConnectedState::Primary)) &&
!parent_.connecting_to_failover_ && (parent_.connected_to_ != ConnectedState::Failover));
ASSERT((parent_.connectingToOrConnectedToPrimary()) &&
!parent_.connectingToOrConnectedToFailover());
// Received a response from the primary. The primary is now considered available (no failover
// will be attempted).
parent_.ever_connected_to_ = ConnectedState::Primary;
Expand All @@ -260,7 +245,7 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {
}

void onWriteable() override {
if (parent_.connecting_to_primary_ || (parent_.connected_to_ == ConnectedState::Primary)) {
if (parent_.connectingToOrConnectedToPrimary()) {
parent_.grpc_mux_callbacks_.onWriteable();
}
}
Expand All @@ -286,12 +271,10 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {
}

void onEstablishmentFailure() override {
ASSERT(parent_.connected_to_ != ConnectedState::Primary);
// This will be called when the failover stream fails to establish a connection, or after the
// connection was closed.
ASSERT(
(parent_.connecting_to_failover_ || parent_.connected_to_ == ConnectedState::Failover) &&
!parent_.connecting_to_primary_ && (parent_.connected_to_ != ConnectedState::Primary));
ASSERT(parent_.connectingToOrConnectedToFailover() &&
!parent_.connectingToOrConnectedToPrimary());
if (parent_.ever_connected_to_ != ConnectedState::Failover) {
ASSERT(parent_.connecting_to_failover_);
// If Envoy never established a connecting the failover, it will try to connect to the
Expand Down Expand Up @@ -322,9 +305,8 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {

void onDiscoveryResponse(ResponseProtoPtr<ResponseType>&& message,
ControlPlaneStats& control_plane_stats) override {
ASSERT((parent_.connecting_to_failover_ ||
(parent_.connected_to_ == ConnectedState::Failover)) &&
!parent_.connecting_to_primary_ && (parent_.connected_to_ != ConnectedState::Primary));
ASSERT(parent_.connectingToOrConnectedToFailover() &&
!parent_.connectingToOrConnectedToPrimary());
// Received a response from the failover. The failover is now considered available (no going
// back to the primary will be attempted).
// TODO(adisuissa): This will be modified in the future, when allowing the primary to always
Expand All @@ -336,7 +318,7 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {
}

void onWriteable() override {
if (parent_.connecting_to_failover_ || (parent_.connected_to_ == ConnectedState::Failover)) {
if (parent_.connectingToOrConnectedToFailover()) {
parent_.grpc_mux_callbacks_.onWriteable();
}
}
Expand All @@ -345,6 +327,34 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {
GrpcMuxFailover& parent_;
};

// Returns true iff the state is connecting to primary or connected to it.
bool connectingToOrConnectedToPrimary() const {
return connecting_to_primary_ || (connected_to_ == ConnectedState::Primary);
}

// Returns true iff the state is connecting to failover or connected to it.
bool connectingToOrConnectedToFailover() const {
return connecting_to_failover_ || (connected_to_ == ConnectedState::Failover);
}

// Establishes a new stream to the primary source if not connected to it.
void establishStreamToPrimaryIfNotConnected() {
if (connected_to_ != ConnectedState::Primary) {
ASSERT(connected_to_ == ConnectedState::None);
connecting_to_primary_ = true;
primary_grpc_stream_->establishNewStream();
}
}

// Establishes a new stream to the failover source if not connected to it.
void establishStreamToFailoverIfNotConnected() {
if (connected_to_ != ConnectedState::Failover) {
ASSERT(connected_to_ == ConnectedState::None);
connecting_to_failover_ = true;
failover_grpc_stream_->establishNewStream();
}
}

// The stream callbacks that will be invoked on the GrpcMux object, to notify
// about the state of the underlying primary/failover stream.
GrpcStreamCallbacks<ResponseType>& grpc_mux_callbacks_;
Expand Down Expand Up @@ -379,12 +389,13 @@ class GrpcMuxFailover : public Logger::Loggable<Logger::Id::config> {
// determined similar to the primary variants.
// Note that while Envoy can only be connected to a single source (mutually
// exclusive), it can attempt connecting to more than one source at a time.
bool connecting_to_primary_ : 1;
bool connecting_to_failover_ : 1;
bool connecting_to_primary_{false};
bool connecting_to_failover_{false};
ConnectedState connected_to_;

// A flag that keeps track of whether Envoy successfully connected to either the
// primary or failover source.
// primary or failover source. Envoy successfully connected to a source once
// it receives a response from it.
ConnectedState ever_connected_to_;
};

Expand Down

0 comments on commit 5c1508e

Please sign in to comment.