Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix time based average stats #442

Merged
merged 9 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl Collector {
for stats in server_stats.values() {
if !stats.check_address_stat_average_is_updated_status() {
stats.address_stats().update_averages();
stats.address_stats().reset_current_counts();
stats.set_address_stat_average_is_updated_status(true);
}
}
Expand Down
233 changes: 142 additions & 91 deletions src/stats/address.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,26 @@
use std::sync::atomic::*;
use std::sync::Arc;

#[derive(Debug, Clone, Default)]
struct AddressStatFields {
xact_count: Arc<AtomicU64>,
query_count: Arc<AtomicU64>,
bytes_received: Arc<AtomicU64>,
bytes_sent: Arc<AtomicU64>,
xact_time: Arc<AtomicU64>,
query_time: Arc<AtomicU64>,
wait_time: Arc<AtomicU64>,
errors: Arc<AtomicU64>,
}

/// Internal address stats
#[derive(Debug, Clone, Default)]
pub struct AddressStats {
pub total_xact_count: Arc<AtomicU64>,
pub total_query_count: Arc<AtomicU64>,
pub total_received: Arc<AtomicU64>,
pub total_sent: Arc<AtomicU64>,
pub total_xact_time: Arc<AtomicU64>,
pub total_query_time: Arc<AtomicU64>,
pub total_wait_time: Arc<AtomicU64>,
pub total_errors: Arc<AtomicU64>,

pub old_total_xact_count: Arc<AtomicU64>,
pub old_total_query_count: Arc<AtomicU64>,
pub old_total_received: Arc<AtomicU64>,
pub old_total_sent: Arc<AtomicU64>,
pub old_total_xact_time: Arc<AtomicU64>,
pub old_total_query_time: Arc<AtomicU64>,
pub old_total_wait_time: Arc<AtomicU64>,
pub old_total_errors: Arc<AtomicU64>,

pub avg_query_count: Arc<AtomicU64>,
pub avg_query_time: Arc<AtomicU64>,
pub avg_recv: Arc<AtomicU64>,
pub avg_sent: Arc<AtomicU64>,
pub avg_errors: Arc<AtomicU64>,
pub avg_xact_time: Arc<AtomicU64>,
pub avg_xact_count: Arc<AtomicU64>,
pub avg_wait_time: Arc<AtomicU64>,
total: AddressStatFields,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice refactor


current: AddressStatFields,

averages: AddressStatFields,

// Determines if the averages have been updated since the last time they were reported
pub averages_updated: Arc<AtomicBool>,
Expand All @@ -43,133 +34,193 @@ impl IntoIterator for AddressStats {
vec![
(
"total_xact_count".to_string(),
self.total_xact_count.load(Ordering::Relaxed),
self.total.xact_count.load(Ordering::Relaxed),
),
(
"total_query_count".to_string(),
self.total_query_count.load(Ordering::Relaxed),
self.total.query_count.load(Ordering::Relaxed),
),
(
"total_received".to_string(),
self.total_received.load(Ordering::Relaxed),
self.total.bytes_received.load(Ordering::Relaxed),
),
(
"total_sent".to_string(),
self.total_sent.load(Ordering::Relaxed),
self.total.bytes_sent.load(Ordering::Relaxed),
),
(
"total_xact_time".to_string(),
self.total_xact_time.load(Ordering::Relaxed),
self.total.xact_time.load(Ordering::Relaxed),
),
(
"total_query_time".to_string(),
self.total_query_time.load(Ordering::Relaxed),
self.total.query_time.load(Ordering::Relaxed),
),
(
"total_wait_time".to_string(),
self.total_wait_time.load(Ordering::Relaxed),
self.total.wait_time.load(Ordering::Relaxed),
),
(
"total_errors".to_string(),
self.total_errors.load(Ordering::Relaxed),
self.total.errors.load(Ordering::Relaxed),
),
(
"avg_xact_count".to_string(),
self.avg_xact_count.load(Ordering::Relaxed),
self.averages.xact_count.load(Ordering::Relaxed),
),
(
"avg_query_count".to_string(),
self.avg_query_count.load(Ordering::Relaxed),
self.averages.query_count.load(Ordering::Relaxed),
),
(
"avg_recv".to_string(),
self.avg_recv.load(Ordering::Relaxed),
self.averages.bytes_received.load(Ordering::Relaxed),
),
(
"avg_sent".to_string(),
self.avg_sent.load(Ordering::Relaxed),
self.averages.bytes_sent.load(Ordering::Relaxed),
),
(
"avg_errors".to_string(),
self.avg_errors.load(Ordering::Relaxed),
self.averages.errors.load(Ordering::Relaxed),
),
(
"avg_xact_time".to_string(),
self.avg_xact_time.load(Ordering::Relaxed),
self.averages.xact_time.load(Ordering::Relaxed),
),
(
"avg_query_time".to_string(),
self.avg_query_time.load(Ordering::Relaxed),
self.averages.query_time.load(Ordering::Relaxed),
),
(
"avg_wait_time".to_string(),
self.avg_wait_time.load(Ordering::Relaxed),
self.averages.wait_time.load(Ordering::Relaxed),
),
]
.into_iter()
}
}

impl AddressStats {
pub fn xact_count_add(&self) {
self.total.xact_count.fetch_add(1, Ordering::Relaxed);
self.current.xact_count.fetch_add(1, Ordering::Relaxed);
}

pub fn query_count_add(&self) {
self.total.query_count.fetch_add(1, Ordering::Relaxed);
self.current.query_count.fetch_add(1, Ordering::Relaxed);
}

pub fn bytes_received_add(&self, bytes: u64) {
self.total
.bytes_received
.fetch_add(bytes, Ordering::Relaxed);
self.current
.bytes_received
.fetch_add(bytes, Ordering::Relaxed);
}

pub fn bytes_sent_add(&self, bytes: u64) {
self.total.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
self.current.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
}

pub fn xact_time_add(&self, time: u64) {
self.total.xact_time.fetch_add(time, Ordering::Relaxed);
self.current.xact_time.fetch_add(time, Ordering::Relaxed);
}

pub fn query_time_add(&self, time: u64) {
self.total.query_time.fetch_add(time, Ordering::Relaxed);
self.current.query_time.fetch_add(time, Ordering::Relaxed);
}

pub fn wait_time_add(&self, time: u64) {
self.total.wait_time.fetch_add(time, Ordering::Relaxed);
self.current.wait_time.fetch_add(time, Ordering::Relaxed);
}

pub fn error(&self) {
self.total_errors.fetch_add(1, Ordering::Relaxed);
self.total.errors.fetch_add(1, Ordering::Relaxed);
self.current.errors.fetch_add(1, Ordering::Relaxed);
}

pub fn update_averages(&self) {
let (totals, averages, old_totals) = self.fields_iterators();
for (total, average, old_total) in itertools::izip!(totals, averages, old_totals) {
let total_value = total.load(Ordering::Relaxed);
let old_total_value = old_total.load(Ordering::Relaxed);
average.store(
(total_value - old_total_value) / (crate::stats::STAT_PERIOD / 1_000),
Ordering::Relaxed,
); // Avg / second
old_total.store(total_value, Ordering::Relaxed);
let stat_period_per_second = crate::stats::STAT_PERIOD / 1_000;

// xact_count
let current_xact_count = self.current.xact_count.load(Ordering::Relaxed);
let current_xact_time = self.current.xact_time.load(Ordering::Relaxed);
self.averages.xact_count.store(
current_xact_count / stat_period_per_second,
Ordering::Relaxed,
);
if current_xact_count == 0 {
self.averages.xact_time.store(0, Ordering::Relaxed);
} else {
self.averages
.xact_time
.store(current_xact_time / current_xact_count, Ordering::Relaxed);
}

// query_count
let current_query_count = self.current.query_count.load(Ordering::Relaxed);
let current_query_time = self.current.query_time.load(Ordering::Relaxed);
self.averages.query_count.store(
current_query_count / stat_period_per_second,
Ordering::Relaxed,
);
if current_query_count == 0 {
self.averages.query_time.store(0, Ordering::Relaxed);
} else {
self.averages
.query_time
.store(current_query_time / current_query_count, Ordering::Relaxed);
}

// bytes_received
let current_bytes_received = self.current.bytes_received.load(Ordering::Relaxed);
self.averages.bytes_received.store(
current_bytes_received / stat_period_per_second,
Ordering::Relaxed,
);

// bytes_sent
let current_bytes_sent = self.current.bytes_sent.load(Ordering::Relaxed);
self.averages.bytes_sent.store(
current_bytes_sent / stat_period_per_second,
Ordering::Relaxed,
);

// wait_time
let current_wait_time = self.current.wait_time.load(Ordering::Relaxed);
self.averages.wait_time.store(
current_wait_time / stat_period_per_second,
Ordering::Relaxed,
);

// errors
let current_errors = self.current.errors.load(Ordering::Relaxed);
self.averages
.errors
.store(current_errors / stat_period_per_second, Ordering::Relaxed);
}

pub fn reset_current_counts(&self) {
self.current.xact_count.store(0, Ordering::Relaxed);
self.current.xact_time.store(0, Ordering::Relaxed);
self.current.query_count.store(0, Ordering::Relaxed);
self.current.query_time.store(0, Ordering::Relaxed);
self.current.bytes_received.store(0, Ordering::Relaxed);
self.current.bytes_sent.store(0, Ordering::Relaxed);
self.current.wait_time.store(0, Ordering::Relaxed);
self.current.errors.store(0, Ordering::Relaxed);
}

pub fn populate_row(&self, row: &mut Vec<String>) {
for (_key, value) in self.clone() {
row.push(value.to_string());
}
}

fn fields_iterators(
&self,
) -> (
Vec<Arc<AtomicU64>>,
Vec<Arc<AtomicU64>>,
Vec<Arc<AtomicU64>>,
) {
let mut totals: Vec<Arc<AtomicU64>> = Vec::new();
let mut averages: Vec<Arc<AtomicU64>> = Vec::new();
let mut old_totals: Vec<Arc<AtomicU64>> = Vec::new();

totals.push(self.total_xact_count.clone());
old_totals.push(self.old_total_xact_count.clone());
averages.push(self.avg_xact_count.clone());
totals.push(self.total_query_count.clone());
old_totals.push(self.old_total_query_count.clone());
averages.push(self.avg_query_count.clone());
totals.push(self.total_received.clone());
old_totals.push(self.old_total_received.clone());
averages.push(self.avg_recv.clone());
totals.push(self.total_sent.clone());
old_totals.push(self.old_total_sent.clone());
averages.push(self.avg_sent.clone());
totals.push(self.total_xact_time.clone());
old_totals.push(self.old_total_xact_time.clone());
averages.push(self.avg_xact_time.clone());
totals.push(self.total_query_time.clone());
old_totals.push(self.old_total_query_time.clone());
averages.push(self.avg_query_time.clone());
totals.push(self.total_wait_time.clone());
old_totals.push(self.old_total_wait_time.clone());
averages.push(self.avg_wait_time.clone());
totals.push(self.total_errors.clone());
old_totals.push(self.old_total_errors.clone());
averages.push(self.avg_errors.clone());

(totals, averages, old_totals)
}
}
31 changes: 7 additions & 24 deletions src/stats/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,9 @@ impl ServerStats {
}

pub fn checkout_time(&self, microseconds: u64, application_name: String) {
// Update server stats and address aggergation stats
// Update server stats and address aggregation stats
self.set_application(application_name);
self.address
.stats
.total_wait_time
.fetch_add(microseconds, Ordering::Relaxed);
self.address.stats.wait_time_add(microseconds);
self.pool_stats
.maxwait
.fetch_max(microseconds, Ordering::Relaxed);
Expand All @@ -191,13 +188,8 @@ impl ServerStats {
/// Report a query executed by a client against a server
pub fn query(&self, milliseconds: u64, application_name: &str) {
self.set_application(application_name.to_string());
let address_stats = self.address_stats();
address_stats
.total_query_count
.fetch_add(1, Ordering::Relaxed);
address_stats
.total_query_time
.fetch_add(milliseconds, Ordering::Relaxed);
self.address.stats.query_count_add();
self.address.stats.query_time_add(milliseconds);
}

/// Report a transaction executed by a client a server
Expand All @@ -208,29 +200,20 @@ impl ServerStats {
self.set_application(application_name.to_string());

self.transaction_count.fetch_add(1, Ordering::Relaxed);
self.address
.stats
.total_xact_count
.fetch_add(1, Ordering::Relaxed);
self.address.stats.xact_count_add();
}

/// Report data sent to a server
pub fn data_sent(&self, amount_bytes: usize) {
self.bytes_sent
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
self.address
.stats
.total_sent
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
self.address.stats.bytes_sent_add(amount_bytes as u64);
}

/// Report data received from a server
pub fn data_received(&self, amount_bytes: usize) {
self.bytes_received
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
self.address
.stats
.total_received
.fetch_add(amount_bytes as u64, Ordering::Relaxed);
self.address.stats.bytes_received_add(amount_bytes as u64);
}
}
2 changes: 1 addition & 1 deletion tests/ruby/admin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
results = admin_conn.async_exec("SHOW STATS")[0]
admin_conn.close
expect(results["total_query_time"].to_i).to be_within(200).of(750)
expect(results["avg_query_time"].to_i).to be_within(20).of(50)
expect(results["avg_query_time"].to_i).to be_within(50).of(250)

expect(results["total_wait_time"].to_i).to_not eq(0)
expect(results["avg_wait_time"].to_i).to_not eq(0)
Expand Down