Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres: do not copy and linearize received data when it is not going to be used #13393

Merged
merged 10 commits into from
Oct 16, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void DecoderImpl::initialize() {
};
}

bool DecoderImpl::parseMessage(Buffer::Instance& data) {
bool DecoderImpl::parseHeader(Buffer::Instance& data) {
ENVOY_LOG(trace, "postgres_proxy: parsing message, len {}", data.length());

// The minimum size of the message sufficient for parsing is 5 bytes.
Expand Down Expand Up @@ -220,10 +220,6 @@ bool DecoderImpl::parseMessage(Buffer::Instance& data) {

data.drain(startup_ ? 4 : 5); // Length plus optional 1st byte.

uint32_t bytes_to_read = message_len_ - 4;
message.assign(std::string(static_cast<char*>(data.linearize(bytes_to_read)), bytes_to_read));
setMessage(message);

ENVOY_LOG(trace, "postgres_proxy: msg parsed");
return true;
}
Expand All @@ -238,7 +234,7 @@ bool DecoderImpl::onData(Buffer::Instance& data, bool frontend) {

ENVOY_LOG(trace, "postgres_proxy: decoding {} bytes", data.length());

if (!parseMessage(data)) {
if (!parseHeader(data)) {
return false;
}

Expand All @@ -259,16 +255,25 @@ bool DecoderImpl::onData(Buffer::Instance& data, bool frontend) {
}
}

std::vector<MsgAction>& actions = std::get<2>(msg.get());
for (const auto& action : actions) {
action(this);
}

// message_len_ specifies total message length including 4 bytes long
// "length" field. The length of message body is total length minus size
// of "length" field (4 bytes).
uint32_t bytes_to_read = message_len_ - 4;

std::vector<MsgAction>& actions = std::get<2>(msg.get());
if (!actions.empty()) {
cpakulski marked this conversation as resolved.
Show resolved Hide resolved
// Linearize the message for processing.
message_.assign(std::string(static_cast<char*>(data.linearize(bytes_to_read)), bytes_to_read));

// Invoke actions associated with the type of received message.
for (const auto& action : actions) {
action(this);
}

// Drop the linearized message.
message_.erase();
}

ENVOY_LOG(debug, "({}) command = {} ({})", msg_processor.direction_, command_,
std::get<0>(msg.get()));
ENVOY_LOG(debug, "({}) length = {}", msg_processor.direction_, message_len_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {
bool onData(Buffer::Instance& data, bool frontend) override;
PostgresSession& getSession() override { return session_; }

void setMessage(std::string message) { message_ = message; }
std::string getMessage() { return message_; }

void setStartup(bool startup) { startup_ = startup; }
Expand Down Expand Up @@ -122,7 +121,7 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {
MsgAction unknown_;
};

bool parseMessage(Buffer::Instance& data);
bool parseHeader(Buffer::Instance& data);
void decode(Buffer::Instance& data);
void decodeAuthentication();
void decodeBackendStatements();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ TEST_P(PostgresProxyFrontendDecoderTest, FrontendInc) {
EXPECT_CALL(callbacks_, incMessagesFrontend()).Times(1);
createPostgresMsg(data_, GetParam(), "SELECT 1;");
decoder_->onData(data_, true);

// Make sure that decoder releases memory used during message processing.
ASSERT_TRUE(decoder_->getMessage().empty());
}

// Run the above test for each frontend message.
Expand Down