Skip to content

Commit

Permalink
Reset instead of discard all (#549)
Browse files Browse the repository at this point in the history
* Use reset all instead of discard all

* Move 'X' handling to before admin handle

* fix tests
  • Loading branch information
zainkabani authored Aug 16, 2023
1 parent 4f0f45b commit bb27586
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 26 deletions.
16 changes: 8 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,14 @@ where
message_result = read_message(&mut self.read) => message_result?
};

if message[0] as char == 'X' {
debug!("Client disconnecting");

self.stats.disconnect();

return Ok(());
}

// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
Expand Down Expand Up @@ -940,14 +948,6 @@ where
continue;
}

'X' => {
debug!("Client disconnecting");

self.stats.disconnect();

return Ok(());
}

// Close (F)
'C' => {
if prepared_statements_enabled {
Expand Down
25 changes: 17 additions & 8 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ impl StreamInner {

#[derive(Copy, Clone)]
struct CleanupState {
/// If server connection requires DISCARD ALL before checkin because of set statement
/// If server connection requires RESET ALL before checkin because of set statement
needs_cleanup_set: bool,

/// If server connection requires DISCARD ALL before checkin because of prepare statement
/// If server connection requires DEALLOCATE ALL before checkin because of prepare statement
needs_cleanup_prepare: bool,
}

Expand Down Expand Up @@ -296,7 +296,7 @@ pub struct Server {
/// Is the server broken? We'll remote it from the pool if so.
bad: bool,

/// If server connection requires DISCARD ALL before checkin
/// If server connection requires reset statements before checkin
cleanup_state: CleanupState,

/// Mapping of clients and servers used for query cancellation.
Expand Down Expand Up @@ -982,7 +982,7 @@ impl Server {
// We don't detect set statements in transactions
// No great way to differentiate between set and set local
// As a result, we will miss cases when set statements are used in transactions
// This will reduce amount of discard statements sent
// This will reduce amount of reset statements sent
if !self.in_transaction {
debug!("Server connection marked for clean up");
self.cleanup_state.needs_cleanup_set = true;
Expand Down Expand Up @@ -1304,12 +1304,21 @@ impl Server {
// Client disconnected but it performed session-altering operations such as
// SET statement_timeout to 1 or create a prepared statement. We clear that
// to avoid leaking state between clients. For performance reasons we only
// send `DISCARD ALL` if we think the session is altered instead of just sending
// send `RESET ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self.cleanup_state.needs_cleanup() && self.cleanup_connections {
info!(target: "pgcat::server::cleanup", "Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
self.query("DISCARD ALL").await?;
self.query("RESET ROLE").await?;
let mut reset_string = String::from("RESET ROLE;");

if self.cleanup_state.needs_cleanup_set {
reset_string.push_str("RESET ALL;");
};

if self.cleanup_state.needs_cleanup_prepare {
reset_string.push_str("DEALLOCATE ALL;");
};

self.query(&reset_string).await?;
self.cleanup_state.reset();
}

Expand All @@ -1336,7 +1345,7 @@ impl Server {
self.last_activity
}

// Marks a connection as needing DISCARD ALL at checkin
// Marks a connection as needing cleanup at checkin
pub fn mark_dirty(&mut self) {
self.cleanup_state.set_true();
}
Expand Down
20 changes: 10 additions & 10 deletions tests/ruby/misc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@
conn.close
end

it "Does not send DISCARD ALL unless necessary" do
it "Does not send RESET ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
conn.async_exec("SELECT 1")
conn.close
end

expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)

10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Expand All @@ -239,7 +239,7 @@
conn.close
end

expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
expect(processes.primary.count_query("RESET ALL")).to eq(10)
end

it "Resets server roles correctly" do
Expand Down Expand Up @@ -273,7 +273,7 @@
end
end

it "Does not send DISCARD ALL unless necessary" do
it "Does not send RESET ALL unless necessary" do
10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
conn.async_exec("SET SERVER ROLE to 'primary'")
Expand All @@ -282,7 +282,7 @@
conn.close
end

expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)

10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Expand All @@ -292,7 +292,7 @@
conn.close
end

expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
expect(processes.primary.count_query("RESET ALL")).to eq(10)
end

it "Respects tracked parameters on startup" do
Expand Down Expand Up @@ -331,7 +331,7 @@
conn.async_exec("COMMIT")
conn.close
end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)

10.times do
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
Expand All @@ -341,7 +341,7 @@
conn.async_exec("COMMIT")
conn.close
end
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)
end
end

Expand All @@ -355,7 +355,7 @@
conn.close

puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)
end

it "will not clean up prepared statements" do
Expand All @@ -366,7 +366,7 @@
conn.close

puts processes.pgcat.logs
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
expect(processes.primary.count_query("RESET ALL")).to eq(0)
end
end
end
Expand Down

0 comments on commit bb27586

Please sign in to comment.