Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(store): Apply clock drift correction based on received_at #580

Merged
merged 1 commit into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions relay-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ publish = false
[dependencies]
backoff = "0.1.5"
cadence = "0.17.1"
chrono = { version = "0.4.7", optional = true }
failure = "0.1.5"
globset = "0.4.4"
lazy_static = "1.3.0"
Expand All @@ -22,3 +23,6 @@ parking_lot = "0.10.0"
regex = "1.2.0"
sentry-types = "0.14.1"
serde = { version = "1.0.98", features = ["derive"] }

[features]
default = []
8 changes: 7 additions & 1 deletion relay-common/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@
use std::fmt;
use std::time::{Duration, Instant, SystemTime};

/// Converts an Instant into a SystemTime.
/// Converts an `Instant` into a `SystemTime`.
pub fn instant_to_system_time(instant: Instant) -> SystemTime {
SystemTime::now() - instant.elapsed()
}

/// Converts an `Instant` into a `DateTime`.
#[cfg(feature = "chrono")]
pub fn instant_to_date_time(instant: Instant) -> chrono::DateTime<chrono::Utc> {
instant_to_system_time(instant).into()
}

/// A unix timestap (time elapsed since 1970).
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
pub struct UnixTimestamp(Duration);
Expand Down
1 change: 1 addition & 0 deletions relay-general/benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ fn bench_store_processor(c: &mut Criterion) {
remove_other: Some(true),
user_agent: None,
sent_at: None,
received_at: None,
};

let mut processor = StoreProcessor::new(config, None);
Expand Down
263 changes: 263 additions & 0 deletions relay-general/src/store/clock_drift.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
use std::fmt;

use chrono::{DateTime, Duration as SignedDuration, Utc};

use crate::processor::{ProcessValue, ProcessingState, Processor};
use crate::protocol::Event;
use crate::types::{Error, ErrorKind, Meta, ProcessingResult, Timestamp};

/// The minimum clock drift for correction to apply.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would add the reasoning here or move the processor's docstring to the module-level

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already on the doc-string ClockDriftProcessor, which will be exported in a later PR (so that it can be used by non-processing Relays).

@dashed you can find the reasoning in that same docstring:

/// There is a minimum clock drift of _55 minutes_ to compensate for network latency and small clock
/// drift on the sender's machine, but allow to detect timezone differences. For differences lower
/// than that, no correction is performed.

Let me know if you'd like me to clarify.

const MINIMUM_CLOCK_DRIFT_SECS: i64 = 55 * 60;

/// A signed correction that contains the sender's timestamp as well as the drift to the receiver.
#[derive(Clone, Copy, Debug)]
struct ClockCorrection {
sent_at: DateTime<Utc>,
drift: SignedDuration,
}

impl ClockCorrection {
fn new(sent_at: DateTime<Utc>, received_at: DateTime<Utc>) -> Self {
let drift = received_at - sent_at;
Self { sent_at, drift }
}

fn at_least(self, lower_bound: SignedDuration) -> Option<Self> {
if self.drift.num_seconds().abs() >= lower_bound.num_seconds().abs() {
Some(self)
} else {
None
}
}
}

/// Prints a duration with minimum precision.
///
/// Uses days if the duration is at least 1 day, otherwise falls back to hours and then seconds.
/// Also supports negative durations.
#[derive(Clone, Copy, Debug)]
struct HumanDuration(SignedDuration);

impl fmt::Display for HumanDuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let days = self.0.num_days();
if days.abs() == 1 {
write!(f, "{} day", days)
} else if days != 0 {
write!(f, "{} days", days)
} else if self.0.num_hours() != 0 {
write!(f, "{}h", self.0.num_hours())
} else {
write!(f, "{}s", self.0.num_seconds())
}
}
}

/// Corrects clock drift based on the sender's and receivers timestamps.
///
/// Clock drift correction applies to all timestamps in the event protocol. This includes especially
/// the event's timestamp, breadcrumbs and spans.
///
/// There is a minimum clock drift of _55 minutes_ to compensate for network latency and small clock
/// drift on the sender's machine, but allow to detect timezone differences. For differences lower
/// than that, no correction is performed.
///
/// Clock drift is corrected in both ways:
///
/// - The drift is added to timestamps if the received time is after the send time. This indicates
/// that the sender's clock was lagging behind. For instance, if an event was received with
/// yesterday's timestamp, one day is added to all timestamps.
///
/// - The drift is subtracted from timestamps if the received time is before the send time. This
/// indicates that the sender's clock was running ahead. For instance, if an event was received
/// with tomorrow's timestamp, one day is subtracted from all timestamps.
#[derive(Debug)]
pub struct ClockDriftProcessor {
received_at: DateTime<Utc>,
correction: Option<ClockCorrection>,
}

impl ClockDriftProcessor {
/// Creates a new `ClockDriftProcessor`.
///
/// If no `sent_at` timestamp is provided, then clock drift correction is disabled. The drift is
/// calculated from the signed difference between the receiver's and the sender's timestamp.
pub fn new(sent_at: Option<DateTime<Utc>>, received_at: DateTime<Utc>) -> Self {
let correction = sent_at.and_then(|sent_at| {
ClockCorrection::new(sent_at, received_at)
.at_least(SignedDuration::seconds(MINIMUM_CLOCK_DRIFT_SECS))
});

Self {
received_at,
correction,
}
}
}

impl Processor for ClockDriftProcessor {
fn process_event(
&mut self,
event: &mut Event,
_meta: &mut Meta,
state: &ProcessingState<'_>,
) -> ProcessingResult {
event.process_child_values(self, state)?;

if let Some(correction) = self.correction {
let timestamp_meta = event.timestamp.meta_mut();
timestamp_meta.add_error(Error::with(ErrorKind::InvalidData, |e| {
let reason = format!(
"clock drift: all timestamps adjusted by {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of producing error messages here, we might want to refactor _meta to emit reason codes (not related to this PR in particular, just for the future)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ErrorKind you saw is such a reason code, but I didn't want to touch Sentry for this now. If you like, we can follow up with a new error kind and then teach Sentry about it.

HumanDuration(correction.drift)
);

e.insert("reason", reason);
e.insert("sdk_time", correction.sent_at.to_string());
e.insert("server_time", self.received_at.to_string());
}));
}

Ok(())
}

fn process_timestamp(
&mut self,
timestamp: &mut Timestamp,
_meta: &mut Meta,
_state: &ProcessingState<'_>,
) -> ProcessingResult {
if let Some(correction) = self.correction {
// NB: We're not setting the original value here, as this could considerably increase
// the event's size. Instead, attach an error message to the top-level event.
*timestamp = *timestamp + correction.drift;
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

use chrono::offset::TimeZone;

use crate::processor::process_value;
use crate::protocol::{
Context, ContextInner, Contexts, EventType, SpanId, TraceContext, TraceId,
};
use crate::types::{Annotated, Object};

fn create_transaction(start: DateTime<Utc>, end: DateTime<Utc>) -> Annotated<Event> {
Annotated::new(Event {
ty: Annotated::new(EventType::Transaction),
timestamp: Annotated::new(end),
start_timestamp: Annotated::new(start),
contexts: Annotated::new(Contexts({
let mut contexts = Object::new();
contexts.insert(
"trace".to_owned(),
Annotated::new(ContextInner(Context::Trace(Box::new(TraceContext {
trace_id: Annotated::new(TraceId(
"4c79f60c11214eb38604f4ae0781bfb2".into(),
)),
span_id: Annotated::new(SpanId("fa90fdead5f74053".into())),
op: Annotated::new("http.server".to_owned()),
..Default::default()
})))),
);
contexts
})),
spans: Annotated::new(vec![]),
..Default::default()
})
}

#[test]
fn test_no_sent_at() {
let start = Utc.ymd(2000, 1, 1).and_hms(0, 0, 0);
let end = Utc.ymd(2000, 1, 2).and_hms(0, 0, 0);
let now = end;

// No information on delay, do not default to anything.
let mut processor = ClockDriftProcessor::new(None, now);
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();

let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), end);
assert_eq!(*event.start_timestamp.value().unwrap(), start);
}

#[test]
fn test_no_clock_drift() {
let start = Utc.ymd(2000, 1, 1).and_hms(0, 0, 0);
let end = Utc.ymd(2000, 1, 2).and_hms(0, 0, 0);

let now = end;

// The event was sent instantly without delay
let mut processor = ClockDriftProcessor::new(Some(end), now);
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();

let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), end);
assert_eq!(*event.start_timestamp.value().unwrap(), start);
}

#[test]
fn test_clock_drift_lower_bound() {
let start = Utc.ymd(2000, 1, 1).and_hms(0, 0, 0);
let end = Utc.ymd(2000, 1, 2).and_hms(0, 0, 0);

let drift = SignedDuration::minutes(1);
let now = end + drift;

// The event was sent and received with minimal delay, which should not correct
let mut processor = ClockDriftProcessor::new(Some(end), now);
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();

let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), end);
assert_eq!(*event.start_timestamp.value().unwrap(), start);
}

#[test]
fn test_clock_drift_from_past() {
let start = Utc.ymd(2000, 1, 1).and_hms(0, 0, 0);
let end = Utc.ymd(2000, 1, 2).and_hms(0, 0, 0);

let drift = SignedDuration::days(1);
let now = end + drift;

// The event was sent and received with delay
let mut processor = ClockDriftProcessor::new(Some(end), now);
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();

let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), now);
assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
}

#[test]
fn test_clock_drift_from_future() {
let start = Utc.ymd(2000, 1, 1).and_hms(0, 0, 0);
let end = Utc.ymd(2000, 1, 2).and_hms(0, 0, 0);

let drift = -SignedDuration::days(1);
let now = end + drift;

// The event was sent and received with delay
let mut processor = ClockDriftProcessor::new(Some(end), now);
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();

let event = event.value().unwrap();
assert_eq!(*event.timestamp.value().unwrap(), now);
assert_eq!(*event.start_timestamp.value().unwrap(), start + drift);
}
}
13 changes: 8 additions & 5 deletions relay-general/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::processor::{ProcessingState, Processor};
use crate::protocol::{Event, IpAddr};
use crate::types::{Meta, ProcessingResult};

mod clock_drift;
mod event_error;
mod geo;
mod legacy;
Expand All @@ -31,6 +32,8 @@ pub struct StoreConfig {
pub protocol_version: Option<String>,
pub grouping_config: Option<Value>,
pub user_agent: Option<String>,
pub received_at: Option<DateTime<Utc>>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only optional to retain Default for StoreConfig. Effectively, it is always set in the event processor. Internally, StoreProcessor defaults to Utc::now if we ever choose to not pass a value here.

pub sent_at: Option<DateTime<Utc>>,

pub max_secs_in_future: Option<i64>,
pub max_secs_in_past: Option<i64>,
Expand All @@ -45,9 +48,6 @@ pub struct StoreConfig {

/// When `true` it adds context information extracted from the user agent
pub normalize_user_agent: Option<bool>,

/// When the event has been sent, according to the SDK. Passed in via envelope headers.
pub sent_at: Option<DateTime<Utc>>,
}

/// The processor that normalizes events for store.
Expand Down Expand Up @@ -87,11 +87,14 @@ impl<'a> Processor for StoreProcessor<'a> {
legacy::LegacyProcessor.process_event(event, meta, state)?;

if !is_renormalize {
let received_at = self.config.received_at.unwrap_or_else(Utc::now);
clock_drift::ClockDriftProcessor::new(self.config.sent_at, received_at)
.process_event(event, meta, state)?;

// internally noops for non-transaction events
// TODO: Parts of this processor should probably be a filter once Relay is store so we
// can revert some changes to ProcessingAction
transactions::TransactionsProcessor::new(self.config.sent_at)
.process_event(event, meta, state)?;
transactions::TransactionsProcessor.process_event(event, meta, state)?;
}

if !is_renormalize {
Expand Down
10 changes: 5 additions & 5 deletions relay-general/src/store/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,19 @@ impl<'a> NormalizeProcessor<'a> {

/// Validates the timestamp range and sets a default value.
fn normalize_timestamps(&self, event: &mut Event) -> ProcessingResult {
let current_timestamp = Utc::now();
event.received = Annotated::new(current_timestamp);
let received_at = self.config.received_at.unwrap_or_else(Utc::now);
event.received = Annotated::new(received_at);

event.timestamp.apply(|timestamp, meta| {
if let Some(secs) = self.config.max_secs_in_future {
if *timestamp > current_timestamp + Duration::seconds(secs) {
if *timestamp > received_at + Duration::seconds(secs) {
meta.add_error(ErrorKind::FutureTimestamp);
return Err(ProcessingAction::DeleteValueSoft);
}
}

if let Some(secs) = self.config.max_secs_in_past {
if *timestamp < current_timestamp - Duration::seconds(secs) {
if *timestamp < received_at - Duration::seconds(secs) {
meta.add_error(ErrorKind::PastTimestamp);
return Err(ProcessingAction::DeleteValueSoft);
}
Expand All @@ -141,7 +141,7 @@ impl<'a> NormalizeProcessor<'a> {
})?;

if event.timestamp.value().is_none() {
event.timestamp.set_value(Some(current_timestamp));
event.timestamp.set_value(Some(received_at));
}

event
Expand Down
Loading