Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to nanosecond precision #2016

Merged
merged 1 commit into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions columnar/src/column_values/monotonic_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ impl MonotonicallyMappableToU64 for i64 {
impl MonotonicallyMappableToU64 for DateTime {
#[inline(always)]
fn to_u64(self) -> u64 {
common::i64_to_u64(self.into_timestamp_micros())
common::i64_to_u64(self.into_timestamp_nanos())
}

#[inline(always)]
fn from_u64(val: u64) -> Self {
DateTime::from_timestamp_micros(common::u64_to_i64(val))
DateTime::from_timestamp_nanos(common::u64_to_i64(val))
}
}

Expand Down
2 changes: 1 addition & 1 deletion columnar/src/column_values/u64_based/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct StatsCollector {
// This is the same as computing the difference between the values and the first value.
//
// This way, we can compress i64-converted-to-u64 (e.g. timestamp that were supplied in
// seconds, only to be converted in microseconds).
// seconds, only to be converted in nanoseconds).
increment_gcd_opt: Option<(NonZeroU64, DividerU64)>,
first_value_opt: Option<u64>,
}
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/columnar/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl ColumnarWriter {
let mut column: ColumnWriter = column_opt.unwrap_or_default();
column.record(
doc,
NumericalValue::I64(datetime.into_timestamp_micros()),
NumericalValue::I64(datetime.into_timestamp_nanos()),
arena,
);
column
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Coerce for f64 {
impl Coerce for DateTime {
fn coerce(value: NumericalValue) -> Self {
let timestamp_micros = i64::coerce(value);
DateTime::from_timestamp_micros(timestamp_micros)
DateTime::from_timestamp_nanos(timestamp_micros)
}
}

Expand Down
54 changes: 34 additions & 20 deletions common/src/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ pub enum DatePrecision {
Milliseconds,
/// Micro-seconds precision.
Microseconds,
/// Nano-seconds precision.
Nanoseconds,
}

/// A date/time value with microsecond precision.
/// A date/time value with nanoseconds precision.
///
/// This timestamp does not carry any explicit time zone information.
/// Users are responsible for applying the provided conversion
Expand All @@ -31,49 +33,56 @@ pub enum DatePrecision {
/// to prevent unintended usage.
#[derive(Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DateTime {
// Timestamp in microseconds.
pub(crate) timestamp_micros: i64,
// Timestamp in nanoseconds.
pub(crate) timestamp_nanos: i64,
}

impl DateTime {
/// Minimum possible `DateTime` value.
pub const MIN: DateTime = DateTime {
timestamp_micros: i64::MIN,
timestamp_nanos: i64::MIN,
};

/// Maximum possible `DateTime` value.
pub const MAX: DateTime = DateTime {
timestamp_micros: i64::MAX,
timestamp_nanos: i64::MAX,
};

/// Create new from UNIX timestamp in seconds
pub const fn from_timestamp_secs(seconds: i64) -> Self {
Self {
timestamp_micros: seconds * 1_000_000,
timestamp_nanos: seconds * 1_000_000_000,
}
}

/// Create new from UNIX timestamp in milliseconds
pub const fn from_timestamp_millis(milliseconds: i64) -> Self {
Self {
timestamp_micros: milliseconds * 1_000,
timestamp_nanos: milliseconds * 1_000_000,
}
}

/// Create new from UNIX timestamp in microseconds.
pub const fn from_timestamp_micros(microseconds: i64) -> Self {
Self {
timestamp_micros: microseconds,
timestamp_nanos: microseconds * 1_000,
}
}

/// Create new from UNIX timestamp in nanoseconds.
pub const fn from_timestamp_nanos(nanoseconds: i64) -> Self {
Self {
timestamp_nanos: nanoseconds,
}
}

/// Create new from `OffsetDateTime`
///
/// The given date/time is converted to UTC and the actual
/// time zone is discarded.
pub const fn from_utc(dt: OffsetDateTime) -> Self {
let timestamp_micros = dt.unix_timestamp() * 1_000_000 + dt.microsecond() as i64;
Self { timestamp_micros }
pub fn from_utc(dt: OffsetDateTime) -> Self {
let timestamp_nanos = dt.unix_timestamp_nanos() as i64;
Self { timestamp_nanos }
}

/// Create new from `PrimitiveDateTime`
Expand All @@ -87,23 +96,27 @@ impl DateTime {

/// Convert to UNIX timestamp in seconds.
pub const fn into_timestamp_secs(self) -> i64 {
self.timestamp_micros / 1_000_000
self.timestamp_nanos / 1_000_000_000
}

/// Convert to UNIX timestamp in milliseconds.
pub const fn into_timestamp_millis(self) -> i64 {
self.timestamp_micros / 1_000
self.timestamp_nanos / 1_000_000
}

/// Convert to UNIX timestamp in microseconds.
pub const fn into_timestamp_micros(self) -> i64 {
self.timestamp_micros
self.timestamp_nanos / 1_000
}

/// Convert to UNIX timestamp in nanoseconds.
pub const fn into_timestamp_nanos(self) -> i64 {
self.timestamp_nanos
}

/// Convert to UTC `OffsetDateTime`
pub fn into_utc(self) -> OffsetDateTime {
let timestamp_nanos = self.timestamp_micros as i128 * 1000;
let utc_datetime = OffsetDateTime::from_unix_timestamp_nanos(timestamp_nanos)
let utc_datetime = OffsetDateTime::from_unix_timestamp_nanos(self.timestamp_nanos as i128)
.expect("valid UNIX timestamp");
debug_assert_eq!(UtcOffset::UTC, utc_datetime.offset());
utc_datetime
Expand All @@ -128,12 +141,13 @@ impl DateTime {
/// Truncates the microseconds value to the corresponding precision.
pub fn truncate(self, precision: DatePrecision) -> Self {
let truncated_timestamp_micros = match precision {
DatePrecision::Seconds => (self.timestamp_micros / 1_000_000) * 1_000_000,
DatePrecision::Milliseconds => (self.timestamp_micros / 1_000) * 1_000,
DatePrecision::Microseconds => self.timestamp_micros,
DatePrecision::Seconds => (self.timestamp_nanos / 1_000_000_000) * 1_000_000_000,
DatePrecision::Milliseconds => (self.timestamp_nanos / 1_000_000) * 1_000_000,
DatePrecision::Microseconds => (self.timestamp_nanos / 1_000) * 1_000,
DatePrecision::Nanoseconds => self.timestamp_nanos,
};
Self {
timestamp_micros: truncated_timestamp_micros,
timestamp_nanos: truncated_timestamp_micros,
}
}
}
Expand Down
81 changes: 44 additions & 37 deletions src/aggregation/bucket/histogram/date_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct DateHistogramAggregationReq {
date_interval: Option<String>,
/// The field to aggregate on.
pub field: String,
/// The format to format dates.
/// The format to format dates. Unsupported currently.
pub format: Option<String>,
/// The interval to chunk your data range. Each bucket spans a value range of
/// [0..fixed_interval). Accepted values
Expand Down Expand Up @@ -77,7 +77,7 @@ pub struct DateHistogramAggregationReq {
/// hard_bounds only limits the buckets, to force a range set both extended_bounds and
/// hard_bounds to the same range.
///
/// Needs to be provided as timestamp in microseconds precision.
/// Needs to be provided as timestamp in nanosecond precision.
///
/// ## Example
/// ```json
Expand All @@ -88,7 +88,7 @@ pub struct DateHistogramAggregationReq {
/// "interval": "1d",
/// "hard_bounds": {
/// "min": 0,
/// "max": 1420502400000000
/// "max": 1420502400000000000
/// }
/// }
/// }
Expand All @@ -114,11 +114,11 @@ impl DateHistogramAggregationReq {
self.validate()?;
Ok(HistogramAggregation {
field: self.field.to_string(),
interval: parse_into_microseconds(self.fixed_interval.as_ref().unwrap())? as f64,
interval: parse_into_nanoseconds(self.fixed_interval.as_ref().unwrap())? as f64,
offset: self
.offset
.as_ref()
.map(|offset| parse_offset_into_microseconds(offset))
.map(|offset| parse_offset_into_nanosecs(offset))
.transpose()?
.map(|el| el as f64),
min_doc_count: self.min_doc_count,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl DateHistogramAggregationReq {
));
}

parse_into_microseconds(self.fixed_interval.as_ref().unwrap())?;
parse_into_nanoseconds(self.fixed_interval.as_ref().unwrap())?;

Ok(())
}
Expand All @@ -176,9 +176,12 @@ pub enum DateHistogramParseError {
/// Offset invalid
#[error("passed offset is invalid {0:?}")]
InvalidOffset(String),
/// Value out of bounds
#[error("passed value is out of bounds: {0:?}")]
OutOfBounds(String),
}

fn parse_offset_into_microseconds(input: &str) -> Result<i64, AggregationError> {
fn parse_offset_into_nanosecs(input: &str) -> Result<i64, AggregationError> {
let is_sign = |byte| &[byte] == b"-" || &[byte] == b"+";
if input.is_empty() {
return Err(DateHistogramParseError::InvalidOffset(input.to_string()).into());
Expand All @@ -187,18 +190,18 @@ fn parse_offset_into_microseconds(input: &str) -> Result<i64, AggregationError>
let has_sign = is_sign(input.as_bytes()[0]);
if has_sign {
let (sign, input) = input.split_at(1);
let val = parse_into_microseconds(input)?;
let val = parse_into_nanoseconds(input)?;
if sign == "-" {
Ok(-val)
} else {
Ok(val)
}
} else {
parse_into_microseconds(input)
parse_into_nanoseconds(input)
}
}

fn parse_into_microseconds(input: &str) -> Result<i64, AggregationError> {
fn parse_into_nanoseconds(input: &str) -> Result<i64, AggregationError> {
let split_boundary = input
.as_bytes()
.iter()
Expand Down Expand Up @@ -226,7 +229,11 @@ fn parse_into_microseconds(input: &str) -> Result<i64, AggregationError> {
_ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string()).into()),
};

Ok(number * multiplier_from_unit * 1000)
let val = (number * multiplier_from_unit)
.checked_mul(1_000_000)
.ok_or_else(|| DateHistogramParseError::OutOfBounds(input.to_string()))?;

Ok(val)
}

#[cfg(test)]
Expand All @@ -241,49 +248,49 @@ mod tests {
use crate::Index;

#[test]
fn test_parse_into_microseconds() {
assert_eq!(parse_into_microseconds("1m").unwrap(), 60_000_000);
assert_eq!(parse_into_microseconds("2m").unwrap(), 120_000_000);
fn test_parse_into_nanosecs() {
assert_eq!(parse_into_nanoseconds("1m").unwrap(), 60_000_000_000);
assert_eq!(parse_into_nanoseconds("2m").unwrap(), 120_000_000_000);
assert_eq!(
parse_into_microseconds("2y").unwrap_err(),
parse_into_nanoseconds("2y").unwrap_err(),
DateHistogramParseError::UnitNotRecognized("y".to_string()).into()
);
assert_eq!(
parse_into_microseconds("2000").unwrap_err(),
parse_into_nanoseconds("2000").unwrap_err(),
DateHistogramParseError::UnitMissing("2000".to_string()).into()
);
assert_eq!(
parse_into_microseconds("ms").unwrap_err(),
parse_into_nanoseconds("ms").unwrap_err(),
DateHistogramParseError::NumberMissing("ms".to_string()).into()
);
}

#[test]
fn test_parse_offset_into_microseconds() {
assert_eq!(parse_offset_into_microseconds("1m").unwrap(), 60_000_000);
assert_eq!(parse_offset_into_microseconds("+1m").unwrap(), 60_000_000);
assert_eq!(parse_offset_into_microseconds("-1m").unwrap(), -60_000_000);
assert_eq!(parse_offset_into_microseconds("2m").unwrap(), 120_000_000);
assert_eq!(parse_offset_into_microseconds("+2m").unwrap(), 120_000_000);
assert_eq!(parse_offset_into_microseconds("-2m").unwrap(), -120_000_000);
assert_eq!(parse_offset_into_microseconds("-2ms").unwrap(), -2_000);
fn test_parse_offset_into_nanosecs() {
assert_eq!(parse_offset_into_nanosecs("1m").unwrap(), 60_000_000_000);
assert_eq!(parse_offset_into_nanosecs("+1m").unwrap(), 60_000_000_000);
assert_eq!(parse_offset_into_nanosecs("-1m").unwrap(), -60_000_000_000);
assert_eq!(parse_offset_into_nanosecs("2m").unwrap(), 120_000_000_000);
assert_eq!(parse_offset_into_nanosecs("+2m").unwrap(), 120_000_000_000);
assert_eq!(parse_offset_into_nanosecs("-2m").unwrap(), -120_000_000_000);
assert_eq!(parse_offset_into_nanosecs("-2ms").unwrap(), -2_000_000);
assert_eq!(
parse_offset_into_microseconds("2y").unwrap_err(),
parse_offset_into_nanosecs("2y").unwrap_err(),
DateHistogramParseError::UnitNotRecognized("y".to_string()).into()
);
assert_eq!(
parse_offset_into_microseconds("2000").unwrap_err(),
parse_offset_into_nanosecs("2000").unwrap_err(),
DateHistogramParseError::UnitMissing("2000".to_string()).into()
);
assert_eq!(
parse_offset_into_microseconds("ms").unwrap_err(),
parse_offset_into_nanosecs("ms").unwrap_err(),
DateHistogramParseError::NumberMissing("ms".to_string()).into()
);
}

#[test]
fn test_parse_into_milliseconds_do_not_accept_non_ascii() {
assert!(parse_into_microseconds("1m").is_err());
assert!(parse_into_nanoseconds("1m").is_err());
}

pub fn get_test_index_from_docs(
Expand Down Expand Up @@ -361,7 +368,7 @@ mod tests {
"buckets" : [
{
"key_as_string" : "2015-01-01T00:00:00Z",
"key" : 1420070400000000.0,
"key" : 1420070400000000000.0,
"doc_count" : 4
}
]
Expand Down Expand Up @@ -397,7 +404,7 @@ mod tests {
"buckets" : [
{
"key_as_string" : "2015-01-01T00:00:00Z",
"key" : 1420070400000000.0,
"key" : 1420070400000000000.0,
"doc_count" : 4,
"texts": {
"buckets": [
Expand Down Expand Up @@ -444,32 +451,32 @@ mod tests {
"buckets": [
{
"doc_count": 2,
"key": 1420070400000000.0,
"key": 1420070400000000000.0,
"key_as_string": "2015-01-01T00:00:00Z"
},
{
"doc_count": 1,
"key": 1420156800000000.0,
"key": 1420156800000000000.0,
"key_as_string": "2015-01-02T00:00:00Z"
},
{
"doc_count": 0,
"key": 1420243200000000.0,
"key": 1420243200000000000.0,
"key_as_string": "2015-01-03T00:00:00Z"
},
{
"doc_count": 0,
"key": 1420329600000000.0,
"key": 1420329600000000000.0,
"key_as_string": "2015-01-04T00:00:00Z"
},
{
"doc_count": 0,
"key": 1420416000000000.0,
"key": 1420416000000000000.0,
"key_as_string": "2015-01-05T00:00:00Z"
},
{
"doc_count": 1,
"key": 1420502400000000.0,
"key": 1420502400000000000.0,
"key_as_string": "2015-01-06T00:00:00Z"
}
]
Expand Down
Loading