Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres: validate message syntax before parsing #16575

Merged
merged 15 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 183 additions & 104 deletions source/extensions/filters/network/postgres_proxy/postgres_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ namespace PostgresProxy {
[]() -> std::unique_ptr<Message> { return createMsgBodyReader<__VA_ARGS__>(); }
#define NO_BODY BODY_FORMAT()

constexpr absl::string_view FRONTEND = "Frontend";
constexpr absl::string_view BACKEND = "Backend";

void DecoderImpl::initialize() {
// Special handler for first message of the transaction.
first_ =
MessageProcessor{"Startup", BODY_FORMAT(Int32, Repeated<String>), {&DecoderImpl::onStartup}};

// Frontend messages.
FE_messages_.direction_ = "Frontend";
FE_messages_.direction_ = FRONTEND;

// Setup handlers for known messages.
absl::flat_hash_map<char, MessageProcessor>& FE_known_msgs = FE_messages_.messages_;
Expand Down Expand Up @@ -52,7 +55,7 @@ void DecoderImpl::initialize() {
MessageProcessor{"Other", BODY_FORMAT(ByteN), {&DecoderImpl::incMessagesUnknown}};

// Backend messages.
BE_messages_.direction_ = "Backend";
BE_messages_.direction_ = BACKEND;

// Setup handlers for known messages.
absl::flat_hash_map<char, MessageProcessor>& BE_known_msgs = BE_messages_.messages_;
Expand Down Expand Up @@ -176,134 +179,216 @@ void DecoderImpl::initialize() {
};
}

Decoder::Result DecoderImpl::parseHeader(Buffer::Instance& data) {
ENVOY_LOG(trace, "postgres_proxy: parsing message, len {}", data.length());
/* Main handler for incoming messages. Messages are dispatched based on the
current decoder's state.
*/
Decoder::Result DecoderImpl::onData(Buffer::Instance& data, bool frontend) {
switch (state_) {
case State::InitState:
return onDataInit(data, frontend);
case State::OutOfSyncState:
case State::EncryptedState:
return onDataIgnore(data, frontend);
case State::InSyncState:
return onDataInSync(data, frontend);
default:
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
}

/* Handler for messages when decoder is in Init State. There are very few message types which
are allowed in this state.
If the initial message has the correct syntax and indicates that session should be in
clear-text, the decoder will move to InSyncState. If the initial message has the correct syntax
and indicates that session should be encrypted, the decoder stays in InitState, because the
initial message will be received again after transport socket negotiates SSL. If the message
syntax is incorrect, the decoder will move to OutOfSyncState, in which messages are not parsed.
*/
Decoder::Result DecoderImpl::onDataInit(Buffer::Instance& data, bool) {
ASSERT(state_ == State::InitState);

// The minimum size of the message sufficient for parsing is 5 bytes.
if (data.length() < 5) {
if (data.length() < 4) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about create something like #define MIN_MESSAGE_LENGTH 4 ? And seems the comment above is not in sync what code does...

// not enough data in the buffer.
return Decoder::NeedMoreData;
return Decoder::Result::NeedMoreData;
}

// Validate the message before processing.
const MsgBodyReader& f = std::get<1>(first_);
const auto msgParser = f();
// Run the validation.
message_len_ = data.peekBEInt<uint32_t>(0);
// MAX_STARTUP_PACKET_LENGTH is defined in Postgres source code
// as maximum size of initial packet.
#define MAX_STARTUP_PACKET_LENGTH 10000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps move it to postgres_decoder.h and add the following link to comments: https://github.com/postgres/postgres/search?q=MAX_STARTUP_PACKET_LENGTH&type=code

if (message_len_ > MAX_STARTUP_PACKET_LENGTH) {
// Message does not conform to the expected format. Move to out-of-sync state.
data.drain(data.length());
state_ = State::OutOfSyncState;
return Decoder::Result::ReadyForNext;
}

if (!startup_) {
data.copyOut(0, 1, &command_);
ENVOY_LOG(trace, "postgres_proxy: command is {}", command_);
Message::ValidationResult validationResult = msgParser->validate(data, 4, message_len_ - 4);

if (validationResult == Message::ValidationNeedMoreData) {
return Decoder::Result::NeedMoreData;
}

// The 1 byte message type and message length should be in the buffer
// Check if the entire message has been read.
std::string message;
message_len_ = data.peekBEInt<uint32_t>(startup_ ? 0 : 1);
if (data.length() < (message_len_ + (startup_ ? 0 : 1))) {
ENVOY_LOG(trace, "postgres_proxy: cannot parse message. Need {} bytes in buffer",
message_len_ + (startup_ ? 0 : 1));
// Not enough data in the buffer.
return Decoder::NeedMoreData;
if (validationResult == Message::ValidationFailed) {
// Message does not conform to the expected format. Move to out-of-sync state.
data.drain(data.length());
state_ = State::OutOfSyncState;
return Decoder::Result::ReadyForNext;
}

if (startup_) {
uint32_t code = data.peekBEInt<uint32_t>(4);
// Startup message with 1234 in the most significant 16 bits
// indicate request to encrypt.
if (code >= 0x04d20000) {
encrypted_ = true;
// Handler for SSLRequest (Int32(80877103) = 0x04d2162f)
// See details in https://www.postgresql.org/docs/current/protocol-message-formats.html.
if (code == 0x04d2162f) {
// Notify the filter that `SSLRequest` message was decoded.
// If the filter returns true, it means to pass the message upstream
// to the server. If it returns false it means, that filter will try
// to terminate SSL session and SSLRequest should not be passed to the
// server.
encrypted_ = callbacks_->onSSLRequest();
}

// Count it as recognized frontend message.
callbacks_->incMessagesFrontend();
if (encrypted_) {
ENVOY_LOG(trace, "postgres_proxy: detected encrypted traffic.");
incSessionsEncrypted();
startup_ = false;
}
data.drain(data.length());
return encrypted_ ? Decoder::ReadyForNext : Decoder::Stopped;
Decoder::Result result = Decoder::Result::ReadyForNext;
uint32_t code = data.peekBEInt<uint32_t>(4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename code to startup_packet just to use the Postgres vocabulary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but this is not really a packet itself, but just 4-bytes code which can be either a version or encryption type.

data.drain(4);
// Startup message with 1234 in the most significant 16 bits
// indicate request to encrypt.
if (code >= 0x04d20000) {
encrypted_ = true;
// Handler for SSLRequest (Int32(80877103) = 0x04d2162f)
// See details in https://www.postgresql.org/docs/current/protocol-message-formats.html.
if (code == 0x04d2162f) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both lines 249 and 253 use SSLRequest codes... for code legibility what about add some macros?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO macro will make it more difficult to read and will not add any value. The meaning of this value is documented pretty clearly here.

// Notify the filter that `SSLRequest` message was decoded.
// If the filter returns true, it means to pass the message upstream
// to the server. If it returns false it means, that filter will try
// to terminate SSL session and SSLRequest should not be passed to the
// server.
encrypted_ = callbacks_->onSSLRequest();
}

// Count it as recognized frontend message.
callbacks_->incMessagesFrontend();
if (encrypted_) {
ENVOY_LOG(trace, "postgres_proxy: detected encrypted traffic.");
incSessionsEncrypted();
state_ = State::EncryptedState;
startup_ = false;
cpakulski marked this conversation as resolved.
Show resolved Hide resolved
} else {
ENVOY_LOG(debug, "Detected version {}.{} of Postgres", code >> 16, code & 0x0000FFFF);
result = Decoder::Result::Stopped;
// Stay in InitState. After switch to SSL, another init packet will be sent.
}
} else {
ENVOY_LOG(debug, "Detected version {}.{} of Postgres", code >> 16, code & 0x0000FFFF);
state_ = State::InSyncState;
}

data.drain(startup_ ? 4 : 5); // Length plus optional 1st byte.

ENVOY_LOG(trace, "postgres_proxy: msg parsed");
return Decoder::ReadyForNext;
processMessageBody(data, FRONTEND, message_len_ - 4, first_, msgParser);
data.drain(message_len_);
return result;
}

Decoder::Result DecoderImpl::onData(Buffer::Instance& data, bool frontend) {
// If encrypted, just drain the traffic.
if (encrypted_) {
ENVOY_LOG(trace, "postgres_proxy: ignoring {} bytes of encrypted data", data.length());
data.drain(data.length());
return Decoder::ReadyForNext;
}
/*
Method invokes actions associated with message type and generate debug logs.
*/
void DecoderImpl::processMessageBody(Buffer::Instance& data, absl::string_view direction,
uint32_t length, MessageProcessor& msg,
const std::unique_ptr<Message>& parser) {
uint32_t bytes_to_read = length;

if (!frontend && startup_) {
data.drain(data.length());
return Decoder::ReadyForNext;
std::vector<MsgAction>& actions = std::get<2>(msg);
if (!actions.empty()) {
// Linearize the message for processing.
message_.assign(std::string(static_cast<char*>(data.linearize(bytes_to_read)), bytes_to_read));

// Invoke actions associated with the type of received message.
for (const auto& action : actions) {
action(this);
}

// Drop the linearized message.
message_.erase();
}

ENVOY_LOG(debug, "({}) command = {} ({})", direction, command_, std::get<0>(msg));
ENVOY_LOG(debug, "({}) length = {}", direction, message_len_);
ENVOY_LOG(debug, "({}) message = {}", direction, genDebugMessage(parser, data, bytes_to_read));

ENVOY_LOG(trace, "postgres_proxy: {} bytes remaining in buffer", data.length());

data.drain(length);
}

/*
onDataInSync is called when decoder is on-track with decoding messages.
All previous messages has been decoded properly and decoder is able to find
message boundaries.
*/
Decoder::Result DecoderImpl::onDataInSync(Buffer::Instance& data, bool frontend) {
ENVOY_LOG(trace, "postgres_proxy: decoding {} bytes", data.length());

const Decoder::Result result = parseHeader(data);
if (result != Decoder::ReadyForNext || encrypted_) {
return result;
ENVOY_LOG(trace, "postgres_proxy: parsing message, len {}", data.length());

// The minimum size of the message sufficient for parsing is 5 bytes.
if (data.length() < 5) {
// not enough data in the buffer.
return Decoder::Result::NeedMoreData;
}

data.copyOut(0, 1, &command_);
ENVOY_LOG(trace, "postgres_proxy: command is {}", command_);

// The 1 byte message type and message length should be in the buffer
// Find the message processor and validate the message syntax.

MsgGroup& msg_processor = std::ref(frontend ? FE_messages_ : BE_messages_);
frontend ? callbacks_->incMessagesFrontend() : callbacks_->incMessagesBackend();

// Set processing to the handler of unknown messages.
// If message is found, the processing will be updated.
std::reference_wrapper<MessageProcessor> msg = msg_processor.unknown_;

if (startup_) {
msg = std::ref(first_);
startup_ = false;
} else {
auto it = msg_processor.messages_.find(command_);
if (it != msg_processor.messages_.end()) {
msg = std::ref((*it).second);
}
auto it = msg_processor.messages_.find(command_);
if (it != msg_processor.messages_.end()) {
msg = std::ref((*it).second);
}

// message_len_ specifies total message length including 4 bytes long
// "length" field. The length of message body is total length minus size
// of "length" field (4 bytes).
uint32_t bytes_to_read = message_len_ - 4;

std::vector<MsgAction>& actions = std::get<2>(msg.get());
if (!actions.empty()) {
// Linearize the message for processing.
message_.assign(std::string(static_cast<char*>(data.linearize(bytes_to_read)), bytes_to_read));

// Invoke actions associated with the type of received message.
for (const auto& action : actions) {
action(this);
}
// Validate the message before processing.
const MsgBodyReader& f = std::get<1>(msg.get());
message_len_ = data.peekBEInt<uint32_t>(1);
const auto msgParser = f();
// Run the validation.
// Because the message validation may return NeedMoreData error, data must stay intact (no
// draining) until the remaining data arrives and validator will run again. Validator therefore
// starts at offset 5 (1 byte message type and 4 bytes of length). This is in contrast to
// processing of the message, which assumes that message has been validated and starts at the
// beginning of the message.
Message::ValidationResult validationResult = msgParser->validate(data, 5, message_len_ - 4);

if (validationResult == Message::ValidationNeedMoreData) {
ENVOY_LOG(trace, "postgres_proxy: cannot parse message. Not enough bytes in the buffer.");
return Decoder::Result::NeedMoreData;
}

// Drop the linearized message.
message_.erase();
if (validationResult == Message::ValidationFailed) {
// Message does not conform to the expected format. Move to out-of-sync state.
data.drain(data.length());
state_ = State::OutOfSyncState;
return Decoder::Result::ReadyForNext;
}

ENVOY_LOG(debug, "({}) command = {} ({})", msg_processor.direction_, command_,
std::get<0>(msg.get()));
ENVOY_LOG(debug, "({}) length = {}", msg_processor.direction_, message_len_);
ENVOY_LOG(debug, "({}) message = {}", msg_processor.direction_,
genDebugMessage(msg, data, bytes_to_read));
// Drain message code and length fields.
// Processing the message assumes that message starts at the beginning of the buffer.
data.drain(5);

data.drain(bytes_to_read);
ENVOY_LOG(trace, "postgres_proxy: {} bytes remaining in buffer", data.length());
processMessageBody(data, msg_processor.direction_, message_len_ - 4, msg, msgParser);

return Decoder::ReadyForNext;
return Decoder::Result::ReadyForNext;
}
/*
onDataIgnore method is called when the decoder does not inspect passing
messages. This happens when the decoder detected encrypted packets or
when the decoder could not validate passing messages and lost track of
messages boundaries. In order not to interpret received values as message
lengths and not to start buffering large amount of data, the decoder
enters OutOfSync state and starts ignoring passing messages. Once the
decoder enters OutOfSyncState it cannot leave that state.
*/
Decoder::Result DecoderImpl::onDataIgnore(Buffer::Instance& data, bool) {
data.drain(data.length());
return Decoder::Result::ReadyForNext;
}

// Method is called when C (CommandComplete) message has been
Expand Down Expand Up @@ -423,16 +508,10 @@ void DecoderImpl::onStartup() {
}

// Method generates displayable format of currently processed message.
const std::string DecoderImpl::genDebugMessage(const MessageProcessor& msg, Buffer::Instance& data,
uint32_t message_len) {
const MsgBodyReader& f = std::get<1>(msg);
std::string message = "Unrecognized";
if (f != nullptr) {
const auto msgParser = f();
msgParser->read(data, message_len);
message = msgParser->toString();
}
return message;
const std::string DecoderImpl::genDebugMessage(const std::unique_ptr<Message>& parser,
Buffer::Instance& data, uint32_t message_len) {
parser->read(data, message_len);
return parser->toString();
}

} // namespace PostgresProxy
Expand Down
Loading