From 5c4ff6cdd6f984ecff83a6502fc1260236292c15 Mon Sep 17 00:00:00 2001 From: Zain Kabani <zain.kabani@instacart.com> Date: Mon, 15 May 2023 14:12:54 -0400 Subject: [PATCH 1/9] keep track of current stats and zero them after updating averages --- src/stats/address.rs | 243 +++++++++++++++++++++++++++---------------- src/stats/server.rs | 31 ++---- 2 files changed, 161 insertions(+), 113 deletions(-) diff --git a/src/stats/address.rs b/src/stats/address.rs index 89e4ebe7..47d38f0d 100644 --- a/src/stats/address.rs +++ b/src/stats/address.rs @@ -1,35 +1,26 @@ use std::sync::atomic::*; use std::sync::Arc; +#[derive(Debug, Clone, Default)] +struct AddressStatFields { + xact_count: Arc<AtomicU64>, + query_count: Arc<AtomicU64>, + bytes_received: Arc<AtomicU64>, + bytes_sent: Arc<AtomicU64>, + xact_time: Arc<AtomicU64>, + query_time: Arc<AtomicU64>, + wait_time: Arc<AtomicU64>, + errors: Arc<AtomicU64>, +} + /// Internal address stats #[derive(Debug, Clone, Default)] pub struct AddressStats { - pub total_xact_count: Arc<AtomicU64>, - pub total_query_count: Arc<AtomicU64>, - pub total_received: Arc<AtomicU64>, - pub total_sent: Arc<AtomicU64>, - pub total_xact_time: Arc<AtomicU64>, - pub total_query_time: Arc<AtomicU64>, - pub total_wait_time: Arc<AtomicU64>, - pub total_errors: Arc<AtomicU64>, - - pub old_total_xact_count: Arc<AtomicU64>, - pub old_total_query_count: Arc<AtomicU64>, - pub old_total_received: Arc<AtomicU64>, - pub old_total_sent: Arc<AtomicU64>, - pub old_total_xact_time: Arc<AtomicU64>, - pub old_total_query_time: Arc<AtomicU64>, - pub old_total_wait_time: Arc<AtomicU64>, - pub old_total_errors: Arc<AtomicU64>, - - pub avg_query_count: Arc<AtomicU64>, - pub avg_query_time: Arc<AtomicU64>, - pub avg_recv: Arc<AtomicU64>, - pub avg_sent: Arc<AtomicU64>, - pub avg_errors: Arc<AtomicU64>, - pub avg_xact_time: Arc<AtomicU64>, - pub avg_xact_count: Arc<AtomicU64>, - pub avg_wait_time: Arc<AtomicU64>, + total: AddressStatFields, + + current: AddressStatFields, + + averages: AddressStatFields, // Determines if the averages have been updated since the last time they were reported pub averages_updated: Arc<AtomicBool>, @@ -43,88 +34,147 @@ impl IntoIterator for AddressStats { vec![ ( "total_xact_count".to_string(), - self.total_xact_count.load(Ordering::Relaxed), + self.total.xact_count.load(Ordering::Relaxed), ), ( "total_query_count".to_string(), - self.total_query_count.load(Ordering::Relaxed), + self.total.query_count.load(Ordering::Relaxed), ), ( "total_received".to_string(), - self.total_received.load(Ordering::Relaxed), + self.total.bytes_received.load(Ordering::Relaxed), ), ( "total_sent".to_string(), - self.total_sent.load(Ordering::Relaxed), + self.total.bytes_sent.load(Ordering::Relaxed), ), ( "total_xact_time".to_string(), - self.total_xact_time.load(Ordering::Relaxed), + self.total.xact_time.load(Ordering::Relaxed), ), ( "total_query_time".to_string(), - self.total_query_time.load(Ordering::Relaxed), + self.total.query_time.load(Ordering::Relaxed), ), ( "total_wait_time".to_string(), - self.total_wait_time.load(Ordering::Relaxed), + self.total.wait_time.load(Ordering::Relaxed), ), ( "total_errors".to_string(), - self.total_errors.load(Ordering::Relaxed), + self.total.errors.load(Ordering::Relaxed), ), ( "avg_xact_count".to_string(), - self.avg_xact_count.load(Ordering::Relaxed), + self.averages.xact_count.load(Ordering::Relaxed), ), ( "avg_query_count".to_string(), - self.avg_query_count.load(Ordering::Relaxed), + self.averages.query_count.load(Ordering::Relaxed), ), ( "avg_recv".to_string(), - self.avg_recv.load(Ordering::Relaxed), + self.averages.bytes_received.load(Ordering::Relaxed), ), ( "avg_sent".to_string(), - self.avg_sent.load(Ordering::Relaxed), + self.averages.bytes_sent.load(Ordering::Relaxed), ), ( "avg_errors".to_string(), - self.avg_errors.load(Ordering::Relaxed), + self.averages.errors.load(Ordering::Relaxed), ), ( "avg_xact_time".to_string(), - self.avg_xact_time.load(Ordering::Relaxed), + self.averages.xact_time.load(Ordering::Relaxed), ), ( "avg_query_time".to_string(), - self.avg_query_time.load(Ordering::Relaxed), + self.averages.query_time.load(Ordering::Relaxed), ), ( "avg_wait_time".to_string(), - self.avg_wait_time.load(Ordering::Relaxed), + self.averages.wait_time.load(Ordering::Relaxed), ), ] .into_iter() } } +struct CountStatField { + current: Arc<AtomicU64>, + corresponding_stat: Option<Arc<AtomicU64>>, + average: Arc<AtomicU64>, +} + impl AddressStats { + pub fn xact_count_add(&self) { + self.total.xact_count.fetch_add(1, Ordering::Relaxed); + self.current.xact_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn query_count_add(&self) { + self.total.query_count.fetch_add(1, Ordering::Relaxed); + self.current.query_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn bytes_received_add(&self, bytes: u64) { + self.totals + .bytes_received + .fetch_add(bytes, Ordering::Relaxed); + self.current + .bytes_received + .fetch_add(bytes, Ordering::Relaxed); + } + + pub fn bytes_sent_add(&self, bytes: u64) { + self.total.bytes_sent.fetch_add(bytes, Ordering::Relaxed); + self.current.bytes_sent.fetch_add(bytes, Ordering::Relaxed); + } + + pub fn xact_time_add(&self, time: u64) { + self.total.xact_time.fetch_add(time, Ordering::Relaxed); + self.current.xact_time.fetch_add(time, Ordering::Relaxed); + } + + pub fn query_time_add(&self, time: u64) { + self.total.query_time.fetch_add(time, Ordering::Relaxed); + self.current.query_time.fetch_add(time, Ordering::Relaxed); + } + + pub fn wait_time_add(&self, time: u64) { + self.total.wait_time.fetch_add(time, Ordering::Relaxed); + self.current.wait_time.fetch_add(time, Ordering::Relaxed); + } + pub fn error(&self) { - self.total_errors.fetch_add(1, Ordering::Relaxed); + self.total.errors.fetch_add(1, Ordering::Relaxed); + self.current.errors.fetch_add(1, Ordering::Relaxed); } pub fn update_averages(&self) { - let (totals, averages, old_totals) = self.fields_iterators(); - for (total, average, old_total) in itertools::izip!(totals, averages, old_totals) { - let total_value = total.load(Ordering::Relaxed); - let old_total_value = old_total.load(Ordering::Relaxed); - average.store( - (total_value - old_total_value) / (crate::stats::STAT_PERIOD / 1_000), - Ordering::Relaxed, - ); // Avg / second - old_total.store(total_value, Ordering::Relaxed); + for count_field in self.count_fields_iterator() { + let current_value = count_field.current.load(Ordering::Relaxed); + + match count_field.corresponding_stat { + // This means that averaging by time makes sense here + None => count_field.average.store( + current_value / (crate::stats::STAT_PERIOD / 1_000), + Ordering::Relaxed, + ), + // This means we should average by some corresponding field, ie. number of queries + Some(corresponding_stat) => { + let corresponding_stat_value = corresponding_stat.load(Ordering::Relaxed); + count_field + .average + .store(current_value / corresponding_stat_value, Ordering::Relaxed); + } + }; + } + + // Reset current counts to 0 + for count_field in self.count_fields_iterator() { + count_field.current.store(0, Ordering::Relaxed); } } @@ -134,42 +184,57 @@ impl AddressStats { } } - fn fields_iterators( - &self, - ) -> ( - Vec<Arc<AtomicU64>>, - Vec<Arc<AtomicU64>>, - Vec<Arc<AtomicU64>>, - ) { - let mut totals: Vec<Arc<AtomicU64>> = Vec::new(); - let mut averages: Vec<Arc<AtomicU64>> = Vec::new(); - let mut old_totals: Vec<Arc<AtomicU64>> = Vec::new(); - - totals.push(self.total_xact_count.clone()); - old_totals.push(self.old_total_xact_count.clone()); - averages.push(self.avg_xact_count.clone()); - totals.push(self.total_query_count.clone()); - old_totals.push(self.old_total_query_count.clone()); - averages.push(self.avg_query_count.clone()); - totals.push(self.total_received.clone()); - old_totals.push(self.old_total_received.clone()); - averages.push(self.avg_recv.clone()); - totals.push(self.total_sent.clone()); - old_totals.push(self.old_total_sent.clone()); - averages.push(self.avg_sent.clone()); - totals.push(self.total_xact_time.clone()); - old_totals.push(self.old_total_xact_time.clone()); - averages.push(self.avg_xact_time.clone()); - totals.push(self.total_query_time.clone()); - old_totals.push(self.old_total_query_time.clone()); - averages.push(self.avg_query_time.clone()); - totals.push(self.total_wait_time.clone()); - old_totals.push(self.old_total_wait_time.clone()); - averages.push(self.avg_wait_time.clone()); - totals.push(self.total_errors.clone()); - old_totals.push(self.old_total_errors.clone()); - averages.push(self.avg_errors.clone()); - - (totals, averages, old_totals) + fn count_fields_iterator(&self) -> Vec<CountStatField> { + let mut count_fields: Vec<CountStatField> = Vec::new(); + + count_fields.push(CountStatField { + current: self.current.xact_count.clone(), + corresponding_stat: None, + average: self.averages.xact_count.clone(), + }); + + count_fields.push(CountStatField { + current: self.current.query_count.clone(), + corresponding_stat: None, + average: self.averages.query_count.clone(), + }); + + count_fields.push(CountStatField { + current: self.current.bytes_received.clone(), + corresponding_stat: None, + average: self.averages.bytes_received.clone(), + }); + + count_fields.push(CountStatField { + current: self.current.bytes_sent.clone(), + corresponding_stat: None, + average: self.averages.bytes_sent.clone(), + }); + + count_fields.push(CountStatField { + current: self.current.xact_time.clone(), + corresponding_stat: Some(self.total.xact_count.clone()), + average: self.averages.xact_time.clone(), + }); + + count_fields.push(CountStatField { + current: self.current.query_time.clone(), + corresponding_stat: Some(self.total.query_count.clone()), + average: self.averages.query_time.clone(), + }); + + count_fields.push(CountStatField { + current: self.current.wait_time.clone(), + corresponding_stat: None, + average: self.averages.wait_time.clone(), + }); + + count_fields.push(CountStatField { + current: self.current.errors.clone(), + corresponding_stat: None, + average: self.averages.errors.clone(), + }); + + count_fields } } diff --git a/src/stats/server.rs b/src/stats/server.rs index 399e585f..a327fa34 100644 --- a/src/stats/server.rs +++ b/src/stats/server.rs @@ -177,12 +177,9 @@ impl ServerStats { } pub fn checkout_time(&self, microseconds: u64, application_name: String) { - // Update server stats and address aggergation stats + // Update server stats and address aggregation stats self.set_application(application_name); - self.address - .stats - .total_wait_time - .fetch_add(microseconds, Ordering::Relaxed); + self.address.stats.wait_time_add(microseconds); self.pool_stats .maxwait .fetch_max(microseconds, Ordering::Relaxed); @@ -191,13 +188,8 @@ impl ServerStats { /// Report a query executed by a client against a server pub fn query(&self, milliseconds: u64, application_name: &str) { self.set_application(application_name.to_string()); - let address_stats = self.address_stats(); - address_stats - .total_query_count - .fetch_add(1, Ordering::Relaxed); - address_stats - .total_query_time - .fetch_add(milliseconds, Ordering::Relaxed); + self.address.stats.query_count_add(); + self.address.stats.query_time_add(milliseconds); } /// Report a transaction executed by a client a server @@ -208,29 +200,20 @@ impl ServerStats { self.set_application(application_name.to_string()); self.transaction_count.fetch_add(1, Ordering::Relaxed); - self.address - .stats - .total_xact_count - .fetch_add(1, Ordering::Relaxed); + self.address.stats.xact_count_add(); } /// Report data sent to a server pub fn data_sent(&self, amount_bytes: usize) { self.bytes_sent .fetch_add(amount_bytes as u64, Ordering::Relaxed); - self.address - .stats - .total_sent - .fetch_add(amount_bytes as u64, Ordering::Relaxed); + self.address.stats.bytes_sent_add(amount_bytes as u64); } /// Report data received from a server pub fn data_received(&self, amount_bytes: usize) { self.bytes_received .fetch_add(amount_bytes as u64, Ordering::Relaxed); - self.address - .stats - .total_received - .fetch_add(amount_bytes as u64, Ordering::Relaxed); + self.address.stats.bytes_received_add(amount_bytes as u64); } } From f36742254a2b6b8a4042d7b3130372dbaed65f76 Mon Sep 17 00:00:00 2001 From: Zain Kabani <zain.kabani@instacart.com> Date: Mon, 15 May 2023 14:13:01 -0400 Subject: [PATCH 2/9] Try tests --- .circleci/run_tests.sh | 105 +++++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 4ba497c3..534f5c09 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -32,81 +32,82 @@ sleep 1 # Create a database at port 5433, forward it to Postgres toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica -start_pgcat "info" +# start_pgcat "info" -# Check that prometheus is running -curl --fail localhost:9930/metrics +# # Check that prometheus is running +# curl --fail localhost:9930/metrics -export PGPASSWORD=sharding_user -export PGDATABASE=sharded_db +# export PGPASSWORD=sharding_user +# export PGDATABASE=sharded_db -# pgbench test -pgbench -U sharding_user -i -h 127.0.0.1 -p 6432 -pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql -pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended +# # pgbench test +# pgbench -U sharding_user -i -h 127.0.0.1 -p 6432 +# pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql +# pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended -# COPY TO STDOUT test -psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null +# # COPY TO STDOUT test +# psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null -# Query cancellation test -(psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) & -sleep 1 -killall psql -s SIGINT - -# Pause/resume test. -# Running benches before, during, and after pause/resume. -pgbench -U sharding_user -t 500 -c 2 -h 127.0.0.1 -p 6432 --protocol extended & -BENCH_ONE=$! -PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'PAUSE sharded_db,sharding_user' -pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended & -BENCH_TWO=$! -PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RESUME sharded_db,sharding_user' -wait ${BENCH_ONE} -wait ${BENCH_TWO} - -# Reload pool (closing unused server connections) -PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' - -(psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) & -sleep 1 -killall psql -s SIGINT +# # Query cancellation test +# (psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) & +# sleep 1 +# killall psql -s SIGINT + +# # Pause/resume test. +# # Running benches before, during, and after pause/resume. +# pgbench -U sharding_user -t 500 -c 2 -h 127.0.0.1 -p 6432 --protocol extended & +# BENCH_ONE=$! +# PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'PAUSE sharded_db,sharding_user' +# pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended & +# BENCH_TWO=$! +# PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RESUME sharded_db,sharding_user' +# wait ${BENCH_ONE} +# wait ${BENCH_TWO} + +# # Reload pool (closing unused server connections) +# PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' + +# (psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) & +# sleep 1 +# killall psql -s SIGINT -# Sharding insert -psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_insert.sql +# # Sharding insert +# psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_insert.sql -# Sharding select -psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /dev/null +# # Sharding select +# psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /dev/null -# Replica/primary selection & more sharding tests -psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null +# # Replica/primary selection & more sharding tests +# psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null -# Statement timeout tests -sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml -kill -SIGHUP $(pgrep pgcat) # Reload config -sleep 0.2 +# # Statement timeout tests +# sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml +# kill -SIGHUP $(pgrep pgcat) # Reload config +# sleep 0.2 -# This should timeout -(! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)') +# # This should timeout +# (! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)') -# Disable statement timeout -sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml -kill -SIGHUP $(pgrep pgcat) # Reload config again +# # Disable statement timeout +# sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml +# kill -SIGHUP $(pgrep pgcat) # Reload config again # # Integration tests and ActiveRecord tests # cd tests/ruby sudo bundle install -bundle exec ruby tests.rb --format documentation || exit 1 -bundle exec rspec *_spec.rb --format documentation || exit 1 +bundle exec rspec admin_spec.rb --format documentation || exit 1 +# bundle exec ruby tests.rb --format documentation || exit 1 +# bundle exec rspec *_spec.rb --format documentation || exit 1 cd ../.. # # Python tests # These tests will start and stop the pgcat server so it will need to be restarted after the tests # -pip3 install -r tests/python/requirements.txt -python3 tests/python/tests.py || exit 1 +# pip3 install -r tests/python/requirements.txt +# python3 tests/python/tests.py || exit 1 start_pgcat "info" From 74e27c9438281690ee78f1d406753e4578b2cd90 Mon Sep 17 00:00:00 2001 From: Zain Kabani <zain.kabani@instacart.com> Date: Mon, 15 May 2023 14:20:40 -0400 Subject: [PATCH 3/9] typo --- src/stats/address.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stats/address.rs b/src/stats/address.rs index 47d38f0d..818e8708 100644 --- a/src/stats/address.rs +++ b/src/stats/address.rs @@ -119,7 +119,7 @@ impl AddressStats { } pub fn bytes_received_add(&self, bytes: u64) { - self.totals + self.total .bytes_received .fetch_add(bytes, Ordering::Relaxed); self.current From 0f648aa78acf8bce019911598d2ad0a58155f115 Mon Sep 17 00:00:00 2001 From: Zain Kabani <zain.kabani@instacart.com> Date: Mon, 15 May 2023 14:27:08 -0400 Subject: [PATCH 4/9] remove commented test stuff --- .circleci/run_tests.sh | 105 ++++++++++++++++++++--------------------- 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 534f5c09..4ba497c3 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -32,82 +32,81 @@ sleep 1 # Create a database at port 5433, forward it to Postgres toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica -# start_pgcat "info" - -# # Check that prometheus is running -# curl --fail localhost:9930/metrics - -# export PGPASSWORD=sharding_user -# export PGDATABASE=sharded_db - -# # pgbench test -# pgbench -U sharding_user -i -h 127.0.0.1 -p 6432 -# pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql -# pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended +start_pgcat "info" -# # COPY TO STDOUT test -# psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null +# Check that prometheus is running +curl --fail localhost:9930/metrics -# # Query cancellation test -# (psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) & -# sleep 1 -# killall psql -s SIGINT +export PGPASSWORD=sharding_user +export PGDATABASE=sharded_db -# # Pause/resume test. -# # Running benches before, during, and after pause/resume. -# pgbench -U sharding_user -t 500 -c 2 -h 127.0.0.1 -p 6432 --protocol extended & -# BENCH_ONE=$! -# PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'PAUSE sharded_db,sharding_user' -# pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended & -# BENCH_TWO=$! -# PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RESUME sharded_db,sharding_user' -# wait ${BENCH_ONE} -# wait ${BENCH_TWO} +# pgbench test +pgbench -U sharding_user -i -h 127.0.0.1 -p 6432 +pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql +pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended -# # Reload pool (closing unused server connections) -# PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' +# COPY TO STDOUT test +psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null -# (psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) & -# sleep 1 -# killall psql -s SIGINT +# Query cancellation test +(psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) & +sleep 1 +killall psql -s SIGINT + +# Pause/resume test. +# Running benches before, during, and after pause/resume. +pgbench -U sharding_user -t 500 -c 2 -h 127.0.0.1 -p 6432 --protocol extended & +BENCH_ONE=$! +PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'PAUSE sharded_db,sharding_user' +pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended & +BENCH_TWO=$! +PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RESUME sharded_db,sharding_user' +wait ${BENCH_ONE} +wait ${BENCH_TWO} + +# Reload pool (closing unused server connections) +PGPASSWORD=admin_pass psql -U admin_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' + +(psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(50)' || true) & +sleep 1 +killall psql -s SIGINT -# # Sharding insert -# psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_insert.sql +# Sharding insert +psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_insert.sql -# # Sharding select -# psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /dev/null +# Sharding select +psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /dev/null -# # Replica/primary selection & more sharding tests -# psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null +# Replica/primary selection & more sharding tests +psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null -# # Statement timeout tests -# sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml -# kill -SIGHUP $(pgrep pgcat) # Reload config -# sleep 0.2 +# Statement timeout tests +sed -i 's/statement_timeout = 0/statement_timeout = 100/' .circleci/pgcat.toml +kill -SIGHUP $(pgrep pgcat) # Reload config +sleep 0.2 -# # This should timeout -# (! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)') +# This should timeout +(! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -c 'select pg_sleep(0.5)') -# # Disable statement timeout -# sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml -# kill -SIGHUP $(pgrep pgcat) # Reload config again +# Disable statement timeout +sed -i 's/statement_timeout = 100/statement_timeout = 0/' .circleci/pgcat.toml +kill -SIGHUP $(pgrep pgcat) # Reload config again # # Integration tests and ActiveRecord tests # cd tests/ruby sudo bundle install -bundle exec rspec admin_spec.rb --format documentation || exit 1 -# bundle exec ruby tests.rb --format documentation || exit 1 -# bundle exec rspec *_spec.rb --format documentation || exit 1 +bundle exec ruby tests.rb --format documentation || exit 1 +bundle exec rspec *_spec.rb --format documentation || exit 1 cd ../.. # # Python tests # These tests will start and stop the pgcat server so it will need to be restarted after the tests # -# pip3 install -r tests/python/requirements.txt -# python3 tests/python/tests.py || exit 1 +pip3 install -r tests/python/requirements.txt +python3 tests/python/tests.py || exit 1 start_pgcat "info" From eb81c4ec75d655d9bb23ff9809a218e37fd3723c Mon Sep 17 00:00:00 2001 From: Zain Kabani <zain.kabani@instacart.com> Date: Mon, 15 May 2023 14:30:18 -0400 Subject: [PATCH 5/9] Avoid dividing by zero --- src/stats/address.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/stats/address.rs b/src/stats/address.rs index 818e8708..09bb0583 100644 --- a/src/stats/address.rs +++ b/src/stats/address.rs @@ -165,9 +165,13 @@ impl AddressStats { // This means we should average by some corresponding field, ie. number of queries Some(corresponding_stat) => { let corresponding_stat_value = corresponding_stat.load(Ordering::Relaxed); - count_field - .average - .store(current_value / corresponding_stat_value, Ordering::Relaxed); + if corresponding_stat_value == 0 { + count_field.average.store(0, Ordering::Relaxed); + } else { + count_field + .average + .store(current_value / corresponding_stat_value, Ordering::Relaxed); + } } }; } From 8570c0e2fac075bee494cf66bddccad29971fce6 Mon Sep 17 00:00:00 2001 From: Zain Kabani <zain.kabani@instacart.com> Date: Mon, 15 May 2023 14:46:02 -0400 Subject: [PATCH 6/9] Fix test --- tests/ruby/admin_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index e054b45e..f93b1a6c 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -27,7 +27,7 @@ results = admin_conn.async_exec("SHOW STATS")[0] admin_conn.close expect(results["total_query_time"].to_i).to be_within(200).of(750) - expect(results["avg_query_time"].to_i).to be_within(20).of(50) + expect(results["avg_query_time"].to_i).to be_within(50).of(250) expect(results["total_wait_time"].to_i).to_not eq(0) expect(results["avg_wait_time"].to_i).to_not eq(0) From ecc2d8bf1ff9eacb10669019b4bc9af8ebb68610 Mon Sep 17 00:00:00 2001 From: Zain Kabani <zain.kabani@instacart.com> Date: Mon, 15 May 2023 15:58:15 -0400 Subject: [PATCH 7/9] refactor, get rid of iterator. do it manually --- src/stats.rs | 1 + src/stats/address.rs | 146 ++++++++++++++++++------------------------- 2 files changed, 61 insertions(+), 86 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 6de784f5..ce076d2d 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -113,6 +113,7 @@ impl Collector { for stats in server_stats.values() { if !stats.check_address_stat_average_is_updated_status() { stats.address_stats().update_averages(); + stats.address_stats().reset_current_counts(); stats.set_address_stat_average_is_updated_status(true); } } diff --git a/src/stats/address.rs b/src/stats/address.rs index 09bb0583..9447e1ac 100644 --- a/src/stats/address.rs +++ b/src/stats/address.rs @@ -101,12 +101,6 @@ impl IntoIterator for AddressStats { } } -struct CountStatField { - current: Arc<AtomicU64>, - corresponding_stat: Option<Arc<AtomicU64>>, - average: Arc<AtomicU64>, -} - impl AddressStats { pub fn xact_count_add(&self) { self.total.xact_count.fetch_add(1, Ordering::Relaxed); @@ -153,33 +147,67 @@ impl AddressStats { } pub fn update_averages(&self) { - for count_field in self.count_fields_iterator() { - let current_value = count_field.current.load(Ordering::Relaxed); - - match count_field.corresponding_stat { - // This means that averaging by time makes sense here - None => count_field.average.store( - current_value / (crate::stats::STAT_PERIOD / 1_000), - Ordering::Relaxed, - ), - // This means we should average by some corresponding field, ie. number of queries - Some(corresponding_stat) => { - let corresponding_stat_value = corresponding_stat.load(Ordering::Relaxed); - if corresponding_stat_value == 0 { - count_field.average.store(0, Ordering::Relaxed); - } else { - count_field - .average - .store(current_value / corresponding_stat_value, Ordering::Relaxed); - } - } - }; - } + let stat_period_per_second = crate::stats::STAT_PERIOD / 1_000; + + // xact_count + let current_xact_count = self.current.xact_count.load(Ordering::Relaxed); + let current_xact_time = self.current.xact_time.load(Ordering::Relaxed); + self.averages.xact_count.store( + current_xact_count / stat_period_per_second, + Ordering::Relaxed, + ); + self.averages + .xact_time + .store(current_xact_time / current_xact_count, Ordering::Relaxed); + + // query_count + let current_query_count = self.current.query_count.load(Ordering::Relaxed); + let current_query_time = self.current.query_time.load(Ordering::Relaxed); + self.averages.query_count.store( + current_query_count / stat_period_per_second, + Ordering::Relaxed, + ); + self.averages + .query_time + .store(current_query_time / current_query_count, Ordering::Relaxed); + + // bytes_received + let current_bytes_received = self.current.bytes_received.load(Ordering::Relaxed); + self.averages.bytes_received.store( + current_bytes_received / stat_period_per_second, + Ordering::Relaxed, + ); + + // bytes_sent + let current_bytes_sent = self.current.bytes_sent.load(Ordering::Relaxed); + self.averages.bytes_sent.store( + current_bytes_sent / stat_period_per_second, + Ordering::Relaxed, + ); + + // wait_time + let current_wait_time = self.current.wait_time.load(Ordering::Relaxed); + self.averages.wait_time.store( + current_wait_time / stat_period_per_second, + Ordering::Relaxed, + ); + + // errors + let current_errors = self.current.errors.load(Ordering::Relaxed); + self.averages + .errors + .store(current_errors / stat_period_per_second, Ordering::Relaxed); + } - // Reset current counts to 0 - for count_field in self.count_fields_iterator() { - count_field.current.store(0, Ordering::Relaxed); - } + pub fn reset_current_counts(&self) { + self.current.xact_count.store(0, Ordering::Relaxed); + self.current.xact_time.store(0, Ordering::Relaxed); + self.current.query_count.store(0, Ordering::Relaxed); + self.current.query_time.store(0, Ordering::Relaxed); + self.current.bytes_received.store(0, Ordering::Relaxed); + self.current.bytes_sent.store(0, Ordering::Relaxed); + self.current.wait_time.store(0, Ordering::Relaxed); + self.current.errors.store(0, Ordering::Relaxed); } pub fn populate_row(&self, row: &mut Vec<String>) { @@ -187,58 +215,4 @@ impl AddressStats { row.push(value.to_string()); } } - - fn count_fields_iterator(&self) -> Vec<CountStatField> { - let mut count_fields: Vec<CountStatField> = Vec::new(); - - count_fields.push(CountStatField { - current: self.current.xact_count.clone(), - corresponding_stat: None, - average: self.averages.xact_count.clone(), - }); - - count_fields.push(CountStatField { - current: self.current.query_count.clone(), - corresponding_stat: None, - average: self.averages.query_count.clone(), - }); - - count_fields.push(CountStatField { - current: self.current.bytes_received.clone(), - corresponding_stat: None, - average: self.averages.bytes_received.clone(), - }); - - count_fields.push(CountStatField { - current: self.current.bytes_sent.clone(), - corresponding_stat: None, - average: self.averages.bytes_sent.clone(), - }); - - count_fields.push(CountStatField { - current: self.current.xact_time.clone(), - corresponding_stat: Some(self.total.xact_count.clone()), - average: self.averages.xact_time.clone(), - }); - - count_fields.push(CountStatField { - current: self.current.query_time.clone(), - corresponding_stat: Some(self.total.query_count.clone()), - average: self.averages.query_time.clone(), - }); - - count_fields.push(CountStatField { - current: self.current.wait_time.clone(), - corresponding_stat: None, - average: self.averages.wait_time.clone(), - }); - - count_fields.push(CountStatField { - current: self.current.errors.clone(), - corresponding_stat: None, - average: self.averages.errors.clone(), - }); - - count_fields - } } From fc99e032b3de0b0c779cba197e5a566d247a09b5 Mon Sep 17 00:00:00 2001 From: Zain Kabani <zain.kabani@instacart.com> Date: Mon, 15 May 2023 17:32:36 -0400 Subject: [PATCH 8/9] trigger build From 6522a2630b200ba021e4b9a25ed218e6fffffae8 Mon Sep 17 00:00:00 2001 From: Zain Kabani <zain.kabani@instacart.com> Date: Mon, 15 May 2023 17:36:40 -0400 Subject: [PATCH 9/9] Fix --- src/stats/address.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/stats/address.rs b/src/stats/address.rs index 9447e1ac..a0486445 100644 --- a/src/stats/address.rs +++ b/src/stats/address.rs @@ -156,9 +156,13 @@ impl AddressStats { current_xact_count / stat_period_per_second, Ordering::Relaxed, ); - self.averages - .xact_time - .store(current_xact_time / current_xact_count, Ordering::Relaxed); + if current_xact_count == 0 { + self.averages.xact_time.store(0, Ordering::Relaxed); + } else { + self.averages + .xact_time + .store(current_xact_time / current_xact_count, Ordering::Relaxed); + } // query_count let current_query_count = self.current.query_count.load(Ordering::Relaxed); @@ -167,9 +171,13 @@ impl AddressStats { current_query_count / stat_period_per_second, Ordering::Relaxed, ); - self.averages - .query_time - .store(current_query_time / current_query_count, Ordering::Relaxed); + if current_query_count == 0 { + self.averages.query_time.store(0, Ordering::Relaxed); + } else { + self.averages + .query_time + .store(current_query_time / current_query_count, Ordering::Relaxed); + } // bytes_received let current_bytes_received = self.current.bytes_received.load(Ordering::Relaxed);