From 8a90ecec790044c6f9713ce62b35c87e7101566f Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 1 Oct 2021 11:40:09 +0000 Subject: [PATCH] Added support for timestamps with timezone to CSV. --- Cargo.toml | 2 +- src/io/csv/read/deserialize.rs | 54 +++++++++---- src/io/csv/read/infer_schema.rs | 134 +++++++++++++++++++++++--------- src/io/csv/read/mod.rs | 2 +- src/io/csv/read/reader.rs | 37 --------- tests/it/io/csv/read.rs | 42 +++++++++- 6 files changed, 180 insertions(+), 91 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c996600fb14..f4b9c43de64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,7 +102,7 @@ full = [ ] merge_sort = ["itertools"] io_csv = ["io_csv_read", "io_csv_write"] -io_csv_read = ["csv", "lazy_static", "regex", "lexical-core"] +io_csv_read = ["csv", "lexical-core"] io_csv_write = ["csv", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "indexmap"] io_ipc = ["flatbuffers"] diff --git a/src/io/csv/read/deserialize.rs b/src/io/csv/read/deserialize.rs index 9f142205f7a..14172e09a6b 100644 --- a/src/io/csv/read/deserialize.rs +++ b/src/io/csv/read/deserialize.rs @@ -8,10 +8,12 @@ use crate::{ datatypes::*, error::{ArrowError, Result}, record_batch::RecordBatch, - temporal_conversions::EPOCH_DAYS_FROM_CE, + temporal_conversions, types::{NativeType, NaturalDataType}, }; +use super::infer_schema::RFC3339; + fn deserialize_primitive( rows: &[ByteRecord], column: usize, @@ -63,6 +65,22 @@ fn deserialize_binary(rows: &[ByteRecord], column: usize) -> Arc::from_trusted_len_iter(iter)) } +#[inline] +fn deserialize_datetime(string: &str, tz: &T) -> Option { + let mut parsed = chrono::format::Parsed::new(); + let fmt = chrono::format::StrftimeItems::new(RFC3339); + if chrono::format::parse(&mut parsed, string, fmt).is_ok() { + parsed + .to_datetime() + .map(|x| x.naive_utc()) + .map(|x| tz.from_utc_datetime(&x)) + .map(|x| x.timestamp_nanos()) + .ok() + } else { + None + } +} + /// Deserializes `column` of `rows` into an [`Array`] of [`DataType`] `datatype`. pub fn deserialize_column( rows: &[ByteRecord], @@ -115,7 +133,7 @@ pub fn deserialize_column( simdutf8::basic::from_utf8(bytes) .ok() .and_then(|x| x.parse::().ok()) - .map(|x| x.num_days_from_ce() - EPOCH_DAYS_FROM_CE) + .map(|x| x.num_days_from_ce() - temporal_conversions::EPOCH_DAYS_FROM_CE) }), Date64 => deserialize_primitive(rows, column, datatype, |bytes| { simdutf8::basic::from_utf8(bytes) @@ -139,20 +157,30 @@ pub fn deserialize_column( .map(|x| x.timestamp_nanos() / 1000) }) } - Timestamp(TimeUnit::Millisecond, None) => { - deserialize_primitive(rows, column, datatype, |bytes| { - simdutf8::basic::from_utf8(bytes) - .ok() - .and_then(|x| x.parse::().ok()) - .map(|x| x.timestamp_nanos() / 1_000_000) - }) - } - Timestamp(TimeUnit::Second, None) => { + Timestamp(time_unit, None) => deserialize_primitive(rows, column, datatype, |bytes| { + simdutf8::basic::from_utf8(bytes) + .ok() + .and_then(|x| x.parse::().ok()) + .map(|x| x.timestamp_nanos()) + .map(|x| match time_unit { + TimeUnit::Second => x / 1_000_000_000, + TimeUnit::Millisecond => x / 1_000_000, + TimeUnit::Microsecond => x / 1_000, + TimeUnit::Nanosecond => x, + }) + }), + Timestamp(time_unit, Some(ref tz)) => { + let tz = temporal_conversions::parse_offset(tz)?; deserialize_primitive(rows, column, datatype, |bytes| { simdutf8::basic::from_utf8(bytes) .ok() - .and_then(|x| x.parse::().ok()) - .map(|x| x.timestamp_nanos() / 1_000_000_000) + .and_then(|x| deserialize_datetime(x, &tz)) + .map(|x| match time_unit { + TimeUnit::Second => x / 1_000_000_000, + TimeUnit::Millisecond => x / 1_000_000, + TimeUnit::Microsecond => x / 1_000, + TimeUnit::Nanosecond => x, + }) }) } Utf8 => deserialize_utf8::(rows, column), diff --git a/src/io/csv/read/infer_schema.rs b/src/io/csv/read/infer_schema.rs index e382d8a1bf9..7618ab7cb27 100644 --- a/src/io/csv/read/infer_schema.rs +++ b/src/io/csv/read/infer_schema.rs @@ -3,20 +3,82 @@ use std::{ io::{Read, Seek}, }; -use super::Reader; -use csv::StringRecord; +use super::{ByteRecord, Reader}; -use crate::datatypes::DataType; +use crate::datatypes::{DataType, TimeUnit}; use crate::datatypes::{Field, Schema}; use crate::error::Result; -/// Infer the schema of a CSV file by reading through the first n records of the file, -/// with `max_rows` controlling the maximum number of records to read. -/// -/// If `max_rows` is not set, the whole file is read to infer its schema. +pub(super) const RFC3339: &str = "%Y-%m-%dT%H:%M:%S%.f%:z"; + +fn is_boolean(bytes: &[u8]) -> bool { + bytes.eq_ignore_ascii_case(b"true") | bytes.eq_ignore_ascii_case(b"false") +} + +fn is_float(bytes: &[u8]) -> bool { + lexical_core::parse::(bytes).is_ok() +} + +fn is_integer(bytes: &[u8]) -> bool { + lexical_core::parse::(bytes).is_ok() +} + +fn is_date(string: &str) -> bool { + string.parse::().is_ok() +} + +fn is_time(string: &str) -> bool { + string.parse::().is_ok() +} + +fn is_naive_datetime(string: &str) -> bool { + string.parse::().is_ok() +} + +fn is_datetime(string: &str) -> Option { + let mut parsed = chrono::format::Parsed::new(); + let fmt = chrono::format::StrftimeItems::new(RFC3339); + if chrono::format::parse(&mut parsed, string, fmt).is_ok() { + parsed.offset.map(|x| { + let hours = x / 60 / 60; + let minutes = x / 60 - hours * 60; + format!("{:03}:{:02}", hours, minutes) + }) + } else { + None + } +} + +/// Infers [`DataType`] from `bytes` +pub fn infer(bytes: &[u8]) -> DataType { + if is_boolean(bytes) { + DataType::Boolean + } else if is_integer(bytes) { + DataType::Int64 + } else if is_float(bytes) { + DataType::Float64 + } else if let Ok(string) = simdutf8::basic::from_utf8(bytes) { + if is_date(string) { + DataType::Date32 + } else if is_time(string) { + DataType::Time32(TimeUnit::Millisecond) + } else if is_naive_datetime(string) { + DataType::Timestamp(TimeUnit::Millisecond, None) + } else if let Some(offset) = is_datetime(string) { + DataType::Timestamp(TimeUnit::Millisecond, Some(offset)) + } else { + DataType::Utf8 + } + } else { + // invalid utf8 + DataType::Binary + } +} + +/// Infer the schema of a CSV file by reading through the first n records up to `max_rows`. /// /// Return infered schema and number of records used for inference. -pub fn infer_schema DataType>( +pub fn infer_schema DataType>( reader: &mut Reader, max_rows: Option, has_header: bool, @@ -25,8 +87,7 @@ pub fn infer_schema DataType>( // get or create header names // when has_header is false, creates default column names with column_ prefix let headers: Vec = if has_header { - let headers = &reader.headers()?.clone(); - headers.iter().map(|s| s.to_string()).collect() + reader.headers()?.iter().map(|s| s.to_string()).collect() } else { let first_record_count = &reader.headers()?.len(); (0..*first_record_count) @@ -42,12 +103,11 @@ pub fn infer_schema DataType>( let mut column_types: Vec> = vec![HashSet::new(); header_length]; let mut records_count = 0; - let mut fields = vec![]; - let mut record = StringRecord::new(); + let mut record = ByteRecord::new(); let max_records = max_rows.unwrap_or(usize::MAX); while records_count < max_records { - if !reader.read_record(&mut record)? { + if !reader.read_byte_record(&mut record)? { break; } records_count += 1; @@ -60,32 +120,30 @@ pub fn infer_schema DataType>( } // build schema from inference results - for i in 0..header_length { - let possibilities = &column_types[i]; - let field_name = &headers[i]; - - // determine data type based on possible types - // if there are incompatible types, use DataType::Utf8 - match possibilities.len() { - 1 => { - for dtype in possibilities.iter() { - fields.push(Field::new(field_name, dtype.clone(), true)); - } - } - 2 => { - if possibilities.contains(&DataType::Int64) - && possibilities.contains(&DataType::Float64) - { - // we have an integer and double, fall down to double - fields.push(Field::new(field_name, DataType::Float64, true)); - } else { - // default to Utf8 for conflicting datatypes (e.g bool and int) - fields.push(Field::new(field_name, DataType::Utf8, true)); + let fields = headers + .iter() + .zip(column_types.into_iter()) + .map(|(field_name, mut possibilities)| { + // determine data type based on possible types + // if there are incompatible types, use DataType::Utf8 + let data_type = match possibilities.len() { + 1 => possibilities.drain().next().unwrap(), + 2 => { + if possibilities.contains(&DataType::Int64) + && possibilities.contains(&DataType::Float64) + { + // we have an integer and double, fall down to double + DataType::Float64 + } else { + // default to Utf8 for conflicting datatypes (e.g bool and int) + DataType::Utf8 + } } - } - _ => fields.push(Field::new(field_name, DataType::Utf8, true)), - } - } + _ => DataType::Utf8, + }; + Field::new(field_name, data_type, true) + }) + .collect(); // return the reader seek back to the start reader.seek(position)?; diff --git a/src/io/csv/read/mod.rs b/src/io/csv/read/mod.rs index 348ddfd5a53..48881040abc 100644 --- a/src/io/csv/read/mod.rs +++ b/src/io/csv/read/mod.rs @@ -8,5 +8,5 @@ pub use csv::{ByteRecord, Reader, ReaderBuilder}; mod infer_schema; pub use deserialize::{deserialize_batch, deserialize_column}; -pub use infer_schema::infer_schema; +pub use infer_schema::{infer, infer_schema}; pub use reader::*; diff --git a/src/io/csv/read/reader.rs b/src/io/csv/read/reader.rs index b83dc726108..14ddd8d07a1 100644 --- a/src/io/csv/read/reader.rs +++ b/src/io/csv/read/reader.rs @@ -1,8 +1,5 @@ use std::io::Read; -use lazy_static::lazy_static; -use regex::{Regex, RegexBuilder}; - use super::{ByteRecord, Reader}; use crate::{ @@ -52,37 +49,3 @@ pub fn read_rows( } Ok(row_number) } - -lazy_static! { - static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); - static ref INTEGER_RE: Regex = Regex::new(r"^-?(\d+)$").unwrap(); - static ref BOOLEAN_RE: Regex = RegexBuilder::new(r"^(true)$|^(false)$") - .case_insensitive(true) - .build() - .unwrap(); - static ref DATE_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\d$").unwrap(); - static ref DATETIME_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); -} - -/// Infer the data type of a record -pub fn infer(string: &str) -> DataType { - // when quoting is enabled in the reader, these quotes aren't escaped, we default to - // Utf8 for them - if string.starts_with('"') { - return DataType::Utf8; - } - // match regex in a particular order - if BOOLEAN_RE.is_match(string) { - DataType::Boolean - } else if DECIMAL_RE.is_match(string) { - DataType::Float64 - } else if INTEGER_RE.is_match(string) { - DataType::Int64 - } else if DATETIME_RE.is_match(string) { - DataType::Date64 - } else if DATE_RE.is_match(string) { - DataType::Date32 - } else { - DataType::Utf8 - } -} diff --git a/tests/it/io/csv/read.rs b/tests/it/io/csv/read.rs index e0c13a28f50..db4c534a34d 100644 --- a/tests/it/io/csv/read.rs +++ b/tests/it/io/csv/read.rs @@ -1,3 +1,5 @@ +use proptest::prelude::*; + use std::io::Cursor; use std::sync::Arc; @@ -175,7 +177,7 @@ fn float32() -> Result<()> { } #[test] -fn binary() -> Result<()> { +fn deserialize_binary() -> Result<()> { let input = vec!["aa", "bb"]; let input = input.join("\n"); @@ -185,3 +187,41 @@ fn binary() -> Result<()> { assert_eq!(expected, result.as_ref()); Ok(()) } + +#[test] +fn deserialize_timestamp() -> Result<()> { + let input = vec!["1996-12-19T16:34:57-02:00", "1996-12-19T16:34:58-02:00"]; + let input = input.join("\n"); + + let data_type = DataType::Timestamp(TimeUnit::Millisecond, Some("-01:00".to_string())); + + let expected = Int64Array::from([Some(851020497000), Some(851020498000)]).to(data_type.clone()); + + let result = test_deserialize(&input, data_type)?; + assert_eq!(expected, result.as_ref()); + Ok(()) +} + +proptest! { + #[test] + #[cfg_attr(miri, ignore)] // miri and proptest do not work well :( + fn i64(v in any::()) { + assert_eq!(infer(v.to_string().as_bytes()), DataType::Int64); + } +} + +proptest! { + #[test] + #[cfg_attr(miri, ignore)] // miri and proptest do not work well :( + fn utf8(v in "a.*") { + assert_eq!(infer(v.as_bytes()), DataType::Utf8); + } +} + +proptest! { + #[test] + #[cfg_attr(miri, ignore)] // miri and proptest do not work well :( + fn dates(v in "1996-12-19T16:3[0-9]:57-02:00") { + assert_eq!(infer(v.as_bytes()), DataType::Timestamp(TimeUnit::Millisecond, Some("-02:00".to_string()))); + } +}