Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection: propagate I/O errors to network filters. #13941

Closed
2 changes: 2 additions & 0 deletions include/envoy/api/io_error.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class IoError {
Interrupt,
// Requested a nonexistent interface or a non-local source address.
AddressNotAvailable,
// Connection reset by peer.
ConnectionResetByPeer,
// Bad file descriptor.
BadFd,
// Other error codes cannot be mapped to any one above in getErrorCode().
Expand Down
2 changes: 2 additions & 0 deletions include/envoy/common/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ struct msghdr {
#define SOCKET_ERROR_ADDR_NOT_AVAIL WSAEADDRNOTAVAIL
#define SOCKET_ERROR_INVAL WSAEINVAL
#define SOCKET_ERROR_ADDR_IN_USE WSAEADDRINUSE
#define SOCKET_ERROR_ECONNRESET WSAECONNRESET

#define HANDLE_ERROR_PERM ERROR_ACCESS_DENIED
#define HANDLE_ERROR_INVALID ERROR_INVALID_HANDLE
Expand Down Expand Up @@ -240,6 +241,7 @@ typedef int filesystem_os_id_t; // NOLINT(modernize-use-using)
#define SOCKET_ERROR_ADDR_NOT_AVAIL EADDRNOTAVAIL
#define SOCKET_ERROR_INVAL EINVAL
#define SOCKET_ERROR_ADDR_IN_USE EADDRINUSE
#define SOCKET_ERROR_ECONNRESET ECONNRESET

// Mapping POSIX file errors to common error names
#define HANDLE_ERROR_PERM EACCES
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/network/transport_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ struct IoResult {
* can only be true for read operations.
*/
bool end_stream_read_;

/**
* I/O error.
*/
absl::optional<Api::IoError::IoErrorCode> io_error_;
};

/**
Expand Down
30 changes: 30 additions & 0 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ namespace {

constexpr absl::string_view kTransportSocketConnectTimeoutTerminationDetails =
"transport socket timeout was reached";
constexpr absl::string_view kDownstreamConnectionTerminationDetails =
"downstream connection was terminated";
constexpr absl::string_view kUpstreamConnectionTerminationDetails =
"upstream connection was terminated";

}

Expand Down Expand Up @@ -588,6 +592,20 @@ void ConnectionImpl::onReadReady() {
result.action_ = PostIoAction::Close;
}

if (result.io_error_.has_value()) {
ASSERT(result.action_ == PostIoAction::Close);
if (dynamic_cast<ServerConnectionImpl*>(this)) {
stream_info_.setConnectionTerminationDetails(kDownstreamConnectionTerminationDetails);
stream_info_.setResponseFlag(StreamInfo::ResponseFlag::DownstreamConnectionTermination);
} else {
stream_info_.setUpstreamTransportFailureReason(kUpstreamConnectionTerminationDetails);
stream_info_.setConnectionTerminationDetails(kUpstreamConnectionTerminationDetails);
stream_info_.setResponseFlag(StreamInfo::ResponseFlag::UpstreamConnectionTermination);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delegate stats update to virtual method so you can avoid the dynamic cast.

}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those values are set on the upstream connection's stream info, and they are never propagated to downstream connection's stream info or access logs. I'm not sure what's the best way to solve that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the high level request object query the connection as part of the termination process to determine if termination was graceful or abrupt?

// Force "end_stream" so that filters can process this error.
result.end_stream_read_ = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually associate end_stream_read_ with graceful termination. Is it appropriate to set this? Do we need alternate signaling to notify filters about abrupt terminations via either additional arguments to push pipeline or a new virtual method which is called on abrupt termination.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; I think of end_stream as always a graceful operation. I think a different signal for error is more appropriate.

Copy link
Contributor Author

@PiotrSikora PiotrSikora Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that there is no different signal right now, and if we don't set this, then network filters won't be executed at all.

Note that this PR is meant to be a temporary fix until we have a proper solution for #13940, but that's going to require a lot more changes and time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a massive hack. How much effort would it be to implement the real fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That depends on what do you consider a real fix. I'm not too familiar with that part of the codebase, but if we want to propagate upstream connection events to network filters, then I imagine it's quite a lot of changes, so I think it would be better if one of maintainers could work on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ggreenway

I don't know what to suggest. It would be good to figure out how to do a catch-all cleanup of WASM resources on abnormal connection termination. Could WASM detect connection termination based on an L4 filter that does the relevant signaling on destruction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, we have a workaround in #13836 that works for Wasm.

}

read_end_stream_ |= result.end_stream_read_;
if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
(latched_dispatch_buffered_data && read_buffer_.length() > 0)) {
Expand Down Expand Up @@ -651,6 +669,18 @@ void ConnectionImpl::onWriteReady() {
uint64_t new_buffer_size = write_buffer_->length();
updateWriteBufferStats(result.bytes_processed_, new_buffer_size);

if (result.io_error_.has_value()) {
ASSERT(result.action_ == PostIoAction::Close);
if (dynamic_cast<ServerConnectionImpl*>(this)) {
stream_info_.setConnectionTerminationDetails(kDownstreamConnectionTerminationDetails);
stream_info_.setResponseFlag(StreamInfo::ResponseFlag::DownstreamConnectionTermination);
} else {
stream_info_.setUpstreamTransportFailureReason(kUpstreamConnectionTerminationDetails);
stream_info_.setConnectionTerminationDetails(kUpstreamConnectionTerminationDetails);
stream_info_.setResponseFlag(StreamInfo::ResponseFlag::UpstreamConnectionTermination);
}
}

// NOTE: If the delayed_close_timer_ is set, it must only trigger after a delayed_close_timeout_
// period of inactivity from the last write event. Therefore, the timer must be reset to its
// original timeout value unless the socket is going to be closed as a result of the doWrite().
Expand Down
2 changes: 2 additions & 0 deletions source/common/network/io_socket_error_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Api::IoError::IoErrorCode IoSocketError::getErrorCode() const {
return IoErrorCode::Interrupt;
case SOCKET_ERROR_ADDR_NOT_AVAIL:
return IoErrorCode::AddressNotAvailable;
case SOCKET_ERROR_ECONNRESET:
return IoErrorCode::ConnectionResetByPeer;
default:
ENVOY_LOG_MISC(debug, "Unknown error code {} details {}", errno_, getErrorDetails());
return IoErrorCode::UnknownError;
Expand Down
8 changes: 6 additions & 2 deletions source/common/network/raw_buffer_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ void RawBufferSocket::setTransportSocketCallbacks(TransportSocketCallbacks& call

IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
PostIoAction action = PostIoAction::KeepOpen;
absl::optional<Api::IoError::IoErrorCode> error;
uint64_t bytes_read = 0;
bool end_stream = false;
do {
Expand All @@ -38,17 +39,19 @@ IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
ENVOY_CONN_LOG(trace, "read error: {}", callbacks_->connection(),
result.err_->getErrorDetails());
if (result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) {
error = result.err_->getErrorCode();
action = PostIoAction::Close;
}
break;
}
} while (true);

return {action, bytes_read, end_stream};
return {action, bytes_read, end_stream, error};
}

IoResult RawBufferSocket::doWrite(Buffer::Instance& buffer, bool end_stream) {
PostIoAction action;
absl::optional<Api::IoError::IoErrorCode> error;
uint64_t bytes_written = 0;
ASSERT(!shutdown_ || buffer.length() == 0);
do {
Expand All @@ -73,13 +76,14 @@ IoResult RawBufferSocket::doWrite(Buffer::Instance& buffer, bool end_stream) {
if (result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again) {
action = PostIoAction::KeepOpen;
} else {
error = result.err_->getErrorCode();
action = PostIoAction::Close;
}
break;
}
} while (true);

return {action, bytes_written, false};
return {action, bytes_written, false, error};
}

std::string RawBufferSocket::protocol() const { return EMPTY_STRING; }
Expand Down
9 changes: 5 additions & 4 deletions source/extensions/transport_sockets/alts/tsi_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Network::PostIoAction TsiSocket::doHandshakeNextDone(NextResultPtr&& next_result
}

Network::IoResult TsiSocket::doRead(Buffer::Instance& buffer) {
Network::IoResult result = {Network::PostIoAction::KeepOpen, 0, false};
Network::IoResult result = {Network::PostIoAction::KeepOpen, 0, false, absl::nullopt};
if (!end_stream_read_ && !read_error_) {
result = raw_buffer_socket_->doRead(raw_read_buffer_);
ENVOY_CONN_LOG(debug, "TSI: raw read result action {} bytes {} end_stream {}",
Expand All @@ -172,7 +172,8 @@ Network::IoResult TsiSocket::doRead(Buffer::Instance& buffer) {
}

if (!handshake_complete_ && result.end_stream_read_ && result.bytes_processed_ == 0) {
return {Network::PostIoAction::Close, result.bytes_processed_, result.end_stream_read_};
return {Network::PostIoAction::Close, result.bytes_processed_, result.end_stream_read_,
absl::nullopt};
}

end_stream_read_ = result.end_stream_read_;
Expand All @@ -182,7 +183,7 @@ Network::IoResult TsiSocket::doRead(Buffer::Instance& buffer) {
if (!handshake_complete_) {
Network::PostIoAction action = doHandshake();
if (action == Network::PostIoAction::Close || !handshake_complete_) {
return {action, 0, false};
return {action, 0, false, absl::nullopt};
}
}

Expand Down Expand Up @@ -225,7 +226,7 @@ Network::IoResult TsiSocket::doWrite(Buffer::Instance& buffer, bool end_stream)
raw_write_buffer_.length(), end_stream);
return raw_buffer_socket_->doWrite(raw_write_buffer_, end_stream && (buffer.length() == 0));
}
return {Network::PostIoAction::KeepOpen, 0, false};
return {Network::PostIoAction::KeepOpen, 0, false, absl::nullopt};
}

void TsiSocket::closeSocket(Network::ConnectionEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ Network::IoResult UpstreamProxyProtocolSocket::doWrite(Buffer::Instance& buffer,
auto header_res = writeHeader();
if (header_buffer_.length() == 0 && header_res.action_ == Network::PostIoAction::KeepOpen) {
auto inner_res = transport_socket_->doWrite(buffer, end_stream);
return {inner_res.action_, header_res.bytes_processed_ + inner_res.bytes_processed_, false};
return {inner_res.action_, header_res.bytes_processed_ + inner_res.bytes_processed_, false,
inner_res.io_error_};
}
return header_res;
} else {
Expand Down Expand Up @@ -97,7 +98,7 @@ Network::IoResult UpstreamProxyProtocolSocket::writeHeader() {
}
} while (true);

return {action, bytes_written, false};
return {action, bytes_written, false, absl::nullopt};
}

void UpstreamProxyProtocolSocket::onConnected() {
Expand Down
16 changes: 9 additions & 7 deletions source/extensions/transport_sockets/tls/ssl_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class NotReadySslSocket : public Network::TransportSocket {
absl::string_view failureReason() const override { return NotReadyReason; }
bool canFlushClose() override { return true; }
void closeSocket(Network::ConnectionEvent) override {}
Network::IoResult doRead(Buffer::Instance&) override { return {PostIoAction::Close, 0, false}; }
Network::IoResult doRead(Buffer::Instance&) override {
return {PostIoAction::Close, 0, false, absl::nullopt};
}
Network::IoResult doWrite(Buffer::Instance&, bool) override {
return {PostIoAction::Close, 0, false};
return {PostIoAction::Close, 0, false, absl::nullopt};
}
void onConnected() override {}
Ssl::ConnectionInfoConstSharedPtr ssl() const override { return nullptr; }
Expand Down Expand Up @@ -113,7 +115,7 @@ Network::IoResult SslSocket::doRead(Buffer::Instance& read_buffer) {
if (action == PostIoAction::Close || info_->state() != Ssl::SocketState::HandshakeComplete) {
// end_stream is false because either a hard error occurred (action == Close) or
// the handshake isn't complete, so a half-close cannot occur yet.
return {action, 0, false};
return {action, 0, false, absl::nullopt};
}
}

Expand Down Expand Up @@ -173,7 +175,7 @@ Network::IoResult SslSocket::doRead(Buffer::Instance& read_buffer) {

ENVOY_CONN_LOG(trace, "ssl read {} bytes", callbacks_->connection(), bytes_read);

return {action, bytes_read, end_stream};
return {action, bytes_read, end_stream, absl::nullopt};
}

void SslSocket::onPrivateKeyMethodComplete() {
Expand Down Expand Up @@ -232,7 +234,7 @@ Network::IoResult SslSocket::doWrite(Buffer::Instance& write_buffer, bool end_st
info_->state() != Ssl::SocketState::ShutdownSent) {
PostIoAction action = doHandshake();
if (action == PostIoAction::Close || info_->state() != Ssl::SocketState::HandshakeComplete) {
return {action, 0, false};
return {action, 0, false, absl::nullopt};
}
}

Expand Down Expand Up @@ -270,7 +272,7 @@ Network::IoResult SslSocket::doWrite(Buffer::Instance& write_buffer, bool end_st
// Renegotiation has started. We don't handle renegotiation so just fall through.
default:
drainErrorQueue();
return {PostIoAction::Close, total_bytes_written, false};
return {PostIoAction::Close, total_bytes_written, false, absl::nullopt};
}

break;
Expand All @@ -281,7 +283,7 @@ Network::IoResult SslSocket::doWrite(Buffer::Instance& write_buffer, bool end_st
shutdownSsl();
}

return {PostIoAction::KeepOpen, total_bytes_written, false};
return {PostIoAction::KeepOpen, total_bytes_written, false, absl::nullopt};
}

void SslSocket::onConnected() { ASSERT(info_->state() == Ssl::SocketState::PreHandshake); }
Expand Down
Loading