Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source S3: timestamp parquet data #6613

Merged
merged 3 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-s3/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"smart-open[s3]==5.1.0",
"wcmatch==8.2",
"dill==0.3.4",
"pytz",
]

TEST_REQUIREMENTS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Any, BinaryIO, Iterator, List, Mapping, TextIO, Union
from typing import Any, BinaryIO, Iterator, List, Mapping, TextIO, Tuple, Union

import pyarrow.parquet as pq
from pyarrow.parquet import ParquetFile

from .abstract_file_parser import AbstractFileParser
from .parquet_spec import ParquetFormat

# All possible parquet data typess
# All possible parquet data types
PARQUET_TYPES = {
"BOOLEAN": "boolean",
"DOUBLE": "number",
"FLOAT": "number",
"BYTE_ARRAY": "string",
"INT32": "integer",
"INT64": "integer",
"INT96": "integer",
# logical_type: (json_type, parquet_types, convert_function)
# standard types
"string": ("string", ["BYTE_ARRAY"], None),
"boolean": ("boolean", ["BOOLEAN"], None),
"number": ("number", ["DOUBLE", "FLOAT"], None),
"integer": ("integer", ["INT32", "INT64", "INT96"], None),
# supported by PyArrow types
"timestamp": ("string", ["INT32", "INT64", "INT96"], lambda v: v.isoformat()),
"date": ("string", ["INT32", "INT64", "INT96"], lambda v: v.isoformat()),
"time": ("string", ["INT32", "INT64", "INT96"], lambda v: v.isoformat()),
}


Expand Down Expand Up @@ -54,14 +57,45 @@ def _init_reader(self, file: BinaryIO) -> ParquetFile:
options["memory_map"] = True
return pq.ParquetFile(file, **options)

@staticmethod
def parse_field_type(needed_logical_type: str, need_physical_type: str = None) -> Tuple[str, str]:
"""Pyarrow can parse/support non-JSON types
Docs: https://github.com/apache/arrow/blob/5aa2901beddf6ad7c0a786ead45fdb7843bfcccd/python/pyarrow/_parquet.pxd#L56
"""
antixar marked this conversation as resolved.
Show resolved Hide resolved
if needed_logical_type not in PARQUET_TYPES:
# by default the pyarrow library marks scalar types as 'none' logical type.
# For these cases we need to look for by a physical type
for logical_type, (json_type, physical_types, _) in PARQUET_TYPES.items():
if need_physical_type in physical_types:
return json_type, logical_type
else:
json_type, physical_types, _ = PARQUET_TYPES[needed_logical_type]
if need_physical_type and need_physical_type not in physical_types:
raise TypeError(f"incorrect parquet physical type: {need_physical_type}; logical type: {needed_logical_type}")
return json_type, needed_logical_type

raise TypeError(f"incorrect parquet physical type: {need_physical_type}; logical type: {needed_logical_type}")

@staticmethod
def convert_field_data(logical_type: str, field_value: Any) -> Any:
"""Converts not JSON format to JSON one"""
if field_value is None:
return None
if logical_type in PARQUET_TYPES:
_, _, func = PARQUET_TYPES[logical_type]
return func(field_value) if func else field_value
raise TypeError(f"unsupported field type: {logical_type}, value: {field_value}")

def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
"""
https://arrow.apache.org/docs/python/parquet.html#finer-grained-reading-and-writing

A stored schema is a part of metadata and we can extract it without parsing of full file
"""
reader = self._init_reader(file)
schema_dict = {field.name: PARQUET_TYPES[field.physical_type] for field in reader.schema}
schema_dict = {
field.name: self.parse_field_type(field.logical_type.type.lower(), field.physical_type)[0] for field in reader.schema
}
if not schema_dict:
# pyarrow can parse empty parquet files but a connector can't generate dynamic schema
raise OSError("empty Parquet file")
Expand All @@ -75,7 +109,9 @@ def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str,

reader = self._init_reader(file)
self.logger.info(f"found {reader.num_row_groups} row groups")

logical_types = {
field.name: self.parse_field_type(field.logical_type.type.lower(), field.physical_type)[1] for field in reader.schema
}
if not reader.schema:
# pyarrow can parse empty parquet files but a connector can't generate dynamic schema
raise OSError("empty Parquet file")
Expand All @@ -95,4 +131,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str,

# we zip this to get row-by-row
for record_values in zip(*columnwise_record_values):
yield {batch_columns[i]: record_values[i] for i in range(len(batch_columns))}
yield {
batch_columns[i]: self.convert_field_data(logical_types[batch_columns[i]], record_values[i])
for i in range(len(batch_columns))
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import shutil
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, List, Mapping

Expand Down Expand Up @@ -60,7 +61,7 @@ def _generate_row(cls, types: List[str]) -> List[Any]:
"""Generates random values with request types"""
row = []
for needed_type in types:
for json_type in PARQUET_TYPES.values():
for json_type in PARQUET_TYPES:
if json_type == needed_type:
row.append(cls._generate_value(needed_type))
break
Expand All @@ -86,7 +87,14 @@ def _generate_value(cls, typ: str) -> Any:
elif typ == "string":
random_length = random.randint(0, 10 * 1024) # max size of bytes is 10k
return os.urandom(random_length)

elif typ == "timestamp":
return datetime.now() + timedelta(seconds=random.randint(0, 7200))
elif typ == "date":
dt = cls._generate_value("timestamp")
return dt.date() if dt else None
elif typ == "time":
dt = cls._generate_value("timestamp")
return dt.time() if dt else None
raise Exception(f"not supported type: {typ}")

@property
Expand Down Expand Up @@ -123,18 +131,24 @@ def test_files(self) -> List[Mapping[str, Any]]:
"degrees": "number",
"birthday": "string",
"last_seen": "string",
"created_at": "timestamp",
"created_date_at": "date",
"created_time_at": "time",
}
# datetime => string type

master_schema = {k: ParquetParser.parse_field_type(needed_logical_type=v)[0] for k, v in schema.items()}
suite = []
# basic 'normal' test
num_records = 10
params = {"filetype": self.filetype}
suite.append(
{
"test_alias": "basic 'normal' test",
"AbstractFileParser": ParquetParser(format=params, master_schema=schema),
"AbstractFileParser": ParquetParser(format=params, master_schema=master_schema),
"filepath": self.generate_parquet_file("normal_test", schema, num_records),
"num_records": num_records,
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
Expand All @@ -154,9 +168,9 @@ def test_files(self) -> List[Mapping[str, Any]]:
"num_records": num_records,
"AbstractFileParser": ParquetParser(
format=params,
master_schema=schema,
master_schema=master_schema,
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
Expand All @@ -175,9 +189,9 @@ def test_files(self) -> List[Mapping[str, Any]]:
"num_records": num_records,
"AbstractFileParser": ParquetParser(
format=params,
master_schema=schema,
master_schema=master_schema,
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
Expand All @@ -193,8 +207,16 @@ def test_files(self) -> List[Mapping[str, Any]]:
"degrees": -9.2,
"birthday": self._generate_value("string"),
"last_seen": self._generate_value("string"),
"created_at": self._generate_value("timestamp"),
"created_date_at": self._generate_value("date"),
"created_time_at": self._generate_value("time"),
}

expected_record = copy.deepcopy(test_record)
expected_record["created_date_at"] = ParquetParser.convert_field_data("date", expected_record["created_date_at"])
expected_record["created_time_at"] = ParquetParser.convert_field_data("time", expected_record["created_time_at"])
expected_record["created_at"] = ParquetParser.convert_field_data("timestamp", expected_record["created_at"])

suite.append(
{
"test_alias": "check one record",
Expand All @@ -204,18 +226,18 @@ def test_files(self) -> List[Mapping[str, Any]]:
"num_records": num_records,
"AbstractFileParser": ParquetParser(
format=params,
master_schema=schema,
master_schema=master_schema,
),
"inferred_schema": schema,
"line_checks": {8: test_record},
"inferred_schema": master_schema,
"line_checks": {8: expected_record},
"fails": [],
}
)

# extra columns in master schema
params = {"filetype": self.filetype}
num_records = 10
extra_schema = copy.deepcopy(schema)
extra_schema = copy.deepcopy(master_schema)
extra_schema.update(
{
"extra_id": "integer",
Expand All @@ -231,15 +253,15 @@ def test_files(self) -> List[Mapping[str, Any]]:
format=params,
master_schema=extra_schema,
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
)
# tests missing columns in master schema
params = {"filetype": self.filetype}
num_records = 10
simplified_schema = copy.deepcopy(schema)
simplified_schema = copy.deepcopy(master_schema)
simplified_schema.pop("id")
simplified_schema.pop("name")

Expand All @@ -252,7 +274,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
format=params,
master_schema=simplified_schema,
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": [],
}
Expand All @@ -268,7 +290,7 @@ def test_files(self) -> List[Mapping[str, Any]]:
format=params,
master_schema={},
),
"inferred_schema": schema,
"inferred_schema": master_schema,
"line_checks": {},
"fails": ["test_get_inferred_schema", "test_stream_records"],
}
Expand All @@ -287,10 +309,10 @@ def test_files(self) -> List[Mapping[str, Any]]:
"num_records": num_records,
"AbstractFileParser": ParquetParser(
format=params,
master_schema=schema,
master_schema=master_schema,
),
"inferred_schema": schema,
"line_checks": {8: test_record},
"inferred_schema": master_schema,
"line_checks": {8: expected_record},
"fails": [],
}
)
Expand Down