Skip to content

Commit

Permalink
🐛 Source S3: timestamp parquet data (#6613)
Browse files Browse the repository at this point in the history
* fix datetime parquet data

* Update airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_parser.py

Co-authored-by: George Claireaux <[email protected]>

* aggregate pyarrow types

Co-authored-by: Maksym Pavlenok <[email protected]>
Co-authored-by: George Claireaux <[email protected]>
  • Loading branch information
3 people authored Oct 4, 2021
1 parent effceb6 commit b80f81e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 31 deletions.
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-s3/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"smart-open[s3]==5.1.0",
"wcmatch==8.2",
"dill==0.3.4",
"pytz",
]

TEST_REQUIREMENTS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Any, BinaryIO, Iterator, List, Mapping, TextIO, Union
from typing import Any, BinaryIO, Iterator, List, Mapping, TextIO, Tuple, Union

import pyarrow.parquet as pq
from pyarrow.parquet import ParquetFile

from .abstract_file_parser import AbstractFileParser
from .parquet_spec import ParquetFormat

# All possible parquet data typess
# All possible parquet data types
PARQUET_TYPES = {
"BOOLEAN": "boolean",
"DOUBLE": "number",
"FLOAT": "number",
"BYTE_ARRAY": "string",
"INT32": "integer",
"INT64": "integer",
"INT96": "integer",
# logical_type: (json_type, parquet_types, convert_function)
# standard types
"string": ("string", ["BYTE_ARRAY"], None),
"boolean": ("boolean", ["BOOLEAN"], None),
"number": ("number", ["DOUBLE", "FLOAT"], None),
"integer": ("integer", ["INT32", "INT64", "INT96"], None),
# supported by PyArrow types
"timestamp": ("string", ["INT32", "INT64", "INT96"], lambda v: v.isoformat()),
"date": ("string", ["INT32", "INT64", "INT96"], lambda v: v.isoformat()),
"time": ("string", ["INT32", "INT64", "INT96"], lambda v: v.isoformat()),
}


Expand Down Expand Up @@ -54,14 +57,45 @@ def _init_reader(self, file: BinaryIO) -> ParquetFile:
options["memory_map"] = True
return pq.ParquetFile(file, **options)

@staticmethod
def parse_field_type(needed_logical_type: str, need_physical_type: str = None) -> Tuple[str, str]:
"""Pyarrow can parse/support non-JSON types
Docs: https://github.com/apache/arrow/blob/5aa2901beddf6ad7c0a786ead45fdb7843bfcccd/python/pyarrow/_parquet.pxd#L56
"""
if needed_logical_type not in PARQUET_TYPES:
# by default the pyarrow library marks scalar types as 'none' logical type.
# For these cases we need to look for by a physical type
for logical_type, (json_type, physical_types, _) in PARQUET_TYPES.items():
if need_physical_type in physical_types:
return json_type, logical_type
else:
json_type, physical_types, _ = PARQUET_TYPES[needed_logical_type]
if need_physical_type and need_physical_type not in physical_types:
raise TypeError(f"incorrect parquet physical type: {need_physical_type}; logical type: {needed_logical_type}")
return json_type, needed_logical_type

raise TypeError(f"incorrect parquet physical type: {need_physical_type}; logical type: {needed_logical_type}")

@staticmethod
def convert_field_data(logical_type: str, field_value: Any) -> Any:
"""Converts not JSON format to JSON one"""
if field_value is None:
return None
if logical_type in PARQUET_TYPES:
_, _, func = PARQUET_TYPES[logical_type]
return func(field_value) if func else field_value
raise TypeError(f"unsupported field type: {logical_type}, value: {field_value}")

def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
"""
https://arrow.apache.org/docs/python/parquet.html#finer-grained-reading-and-writing
A stored schema is a part of metadata and we can extract it without parsing of full file
"""
reader = self._init_reader(file)
schema_dict = {field.name: PARQUET_TYPES[field.physical_type] for field in reader.schema}
schema_dict = {
field.name: self.parse_field_type(field.logical_type.type.lower(), field.physical_type)[0] for field in reader.schema
}
if not schema_dict:
# pyarrow can parse empty parquet files but a connector can't generate dynamic schema
raise OSError("empty Parquet file")
Expand All @@ -75,7 +109,9 @@ def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str,

reader = self._init_reader(file)
self.logger.info(f"found {reader.num_row_groups} row groups")

logical_types = {
field.name: self.parse_field_type(field.logical_type.type.lower(), field.physical_type)[1] for field in reader.schema
}
if not reader.schema:
# pyarrow can parse empty parquet files but a connector can't generate dynamic schema
raise OSError("empty Parquet file")
Expand All @@ -95,4 +131,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str,

# we zip this to get row-by-row
for record_values in zip(*columnwise_record_values):
yield {batch_columns[i]: record_values[i] for i in range(len(batch_columns))}
yield {
batch_columns[i]: self.convert_field_data(logical_types[batch_columns[i]], record_values[i])
for i in range(len(batch_columns))
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import shutil
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, List, Mapping

Expand Down Expand Up @@ -60,7 +61,7 @@ def _generate_row(cls, types: List[str]) -> List[Any]:
"""Generates random values with request types"""
row = []
for needed_type in types:
for json_type in PARQUET_TYPES.values():
for json_type in PARQUET_TYPES:
if json_type == needed_type:
row.append(cls._generate_value(needed_type))
break
Expand All @@ -86,7 +87,14 @@ def _generate_value(cls, typ: str) -> Any:
elif typ == "string":
random_length = random.randint(0, 10 * 1024) # max size of bytes is 10k
return os.urandom(random_length)

elif typ == "timestamp":
return datetime.now() + timedelta(seconds=random.randint(0, 7200))
elif typ == "date":
dt = cls._generate_value("timestamp")
return dt.date() if dt else None
elif typ == "time":
dt = cls._generate_value("timestamp")
return dt.time() if dt else None
raise Exception(f"not supported type: {typ}")

@property
Expand Down Expand Up @@ -123,18 +131,24 @@ def test_files(self) -> List[Mapping[str, Any]]:
"degrees": "number",
"birthday": "string",
"last_seen": "string",
"created_at": "timestamp",
"created_date_at": "date",
"created_time_at": "time",
}
# datetime => string type

master_schema = {k: ParquetParser.parse_field_type(needed_logical_type=v)[0] for k, v in schema.items()}
suite = []
# basic 'normal' test
num_records = 10
params = {"filetype": self.filetype}
suite.append(
{
"test_alias": "basic 'normal' test",
"AbstractFileParser": ParquetParser(format=params, master_schema=schema),
"AbstractFileParser": ParquetParser(format=params, master_schema=master_schema),
"filepath": self.generate_parquet_file("normal_test", schema, num_records),
"num_records": num_records,
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
Expand All @@ -154,9 +168,9 @@ def test_files(self) -> List[Mapping[str, Any]]:
"num_records": num_records,
"AbstractFileParser": ParquetParser(
format=params,
master_schema=schema,
master_schema=master_schema,
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
Expand All @@ -175,9 +189,9 @@ def test_files(self) -> List[Mapping[str, Any]]:
"num_records": num_records,
"AbstractFileParser": ParquetParser(
format=params,
master_schema=schema,
master_schema=master_schema,
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
Expand All @@ -193,8 +207,16 @@ def test_files(self) -> List[Mapping[str, Any]]:
"degrees": -9.2,
"birthday": self._generate_value("string"),
"last_seen": self._generate_value("string"),
"created_at": self._generate_value("timestamp"),
"created_date_at": self._generate_value("date"),
"created_time_at": self._generate_value("time"),
}

expected_record = copy.deepcopy(test_record)
expected_record["created_date_at"] = ParquetParser.convert_field_data("date", expected_record["created_date_at"])
expected_record["created_time_at"] = ParquetParser.convert_field_data("time", expected_record["created_time_at"])
expected_record["created_at"] = ParquetParser.convert_field_data("timestamp", expected_record["created_at"])

suite.append(
{
"test_alias": "check one record",
Expand All @@ -204,18 +226,18 @@ def test_files(self) -> List[Mapping[str, Any]]:
"num_records": num_records,
"AbstractFileParser": ParquetParser(
format=params,
master_schema=schema,
master_schema=master_schema,
),
"inferred_schema": schema,
"line_checks": {8: test_record},
"inferred_schema": master_schema,
"line_checks": {8: expected_record},
"fails": [],
}
)

# extra columns in master schema
params = {"filetype": self.filetype}
num_records = 10
extra_schema = copy.deepcopy(schema)
extra_schema = copy.deepcopy(master_schema)
extra_schema.update(
{
"extra_id": "integer",
Expand All @@ -231,15 +253,15 @@ def test_files(self) -> List[Mapping[str, Any]]:
format=params,
master_schema=extra_schema,
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
)
# tests missing columns in master schema
params = {"filetype": self.filetype}
num_records = 10
simplified_schema = copy.deepcopy(schema)
simplified_schema = copy.deepcopy(master_schema)
simplified_schema.pop("id")
simplified_schema.pop("name")

Expand All @@ -252,7 +274,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
format=params,
master_schema=simplified_schema,
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
Expand All @@ -268,7 +290,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
format=params,
master_schema={},
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": ["test_get_inferred_schema", "test_stream_records"],
}
Expand All @@ -287,10 +309,10 @@ def test_files(self) -> List[Mapping[str, Any]]:
"num_records": num_records,
"AbstractFileParser": ParquetParser(
format=params,
master_schema=schema,
master_schema=master_schema,
),
"inferred_schema": schema,
"line_checks": {8: test_record},
"inferred_schema": master_schema,
"line_checks": {8: expected_record},
"fails": [],
}
)
Expand Down

0 comments on commit b80f81e

Please sign in to comment.