Skip to content

Commit

Permalink
Also downcast constraints and statistics when downcasting field types (
Browse files Browse the repository at this point in the history
  • Loading branch information
vinceatbluelabs authored Oct 2, 2020
1 parent ba0c233 commit 34c3b03
Show file tree
Hide file tree
Showing 24 changed files with 304 additions and 125 deletions.
2 changes: 1 addition & 1 deletion metrics/bigfiles_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1028
1049
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.4700
93.5400
2 changes: 1 addition & 1 deletion metrics/flake8_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
169
168
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.3100
92.3300
6 changes: 4 additions & 2 deletions records_mover/mover_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
# mypy way of validating we're covering all cases of an enum
#
# https://github.com/python/mypy/issues/6366#issuecomment-560369716
def _assert_never(x: NoReturn) -> NoReturn:
assert False, "Unhandled type: {}".format(type(x).__name__)
def _assert_never(x: NoReturn, errmsg: Optional[str] = None) -> NoReturn:
if errmsg is None:
errmsg = "Unhandled type: {}".format(type(x).__name__)
assert False, errmsg


# mypy-friendly way of doing a singleton object:
Expand Down
29 changes: 25 additions & 4 deletions records_mover/records/schema/field/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
RecordsSchemaFieldIntegerConstraints,
RecordsSchemaFieldDecimalConstraints)
from .statistics import RecordsSchemaFieldStatistics
from .types import RECORDS_FIELD_TYPES
from .field_types import RECORDS_FIELD_TYPES
if TYPE_CHECKING:
from pandas import Series, Index
from sqlalchemy import Column
from sqlalchemy.types import TypeEngine
from records_mover.db import DBDriver # noqa
from .types import FieldType
from .field_types import FieldType

from mypy_extensions import TypedDict

Expand Down Expand Up @@ -64,9 +64,9 @@ def __init__(self,
def refine_from_series(self,
series: 'Series',
total_rows: int,
rows_sampled: int) -> None:
rows_sampled: int) -> 'RecordsSchemaField':
from .pandas import refine_field_from_series
refine_field_from_series(self, series, total_rows, rows_sampled)
return refine_field_from_series(self, series, total_rows, rows_sampled)

@staticmethod
def is_more_specific_type(a: 'FieldType', b: 'FieldType') -> bool:
Expand All @@ -77,6 +77,7 @@ def is_more_specific_type(a: 'FieldType', b: 'FieldType') -> bool:
@staticmethod
def python_type_to_field_type(specific_type: Type[Any]) -> Optional['FieldType']:
import numpy as np
import pandas as pd

# Note: records spec doesn't cover complex number types, so
# np.complex_, complex64 and complex128 are not supported
Expand Down Expand Up @@ -114,6 +115,10 @@ def python_type_to_field_type(specific_type: Type[Any]) -> Optional['FieldType']
datetime.date: 'date',

datetime.time: 'time',

datetime.datetime: 'datetime',

pd.Timestamp: 'datetime',
}
if specific_type not in type_mapping:
logger.warning(f"Please teach me how to map {specific_type} into records "
Expand Down Expand Up @@ -318,3 +323,19 @@ def convert_datetime_to_datetimetz(self) -> 'RecordsSchemaField':
constraints=self.constraints,
statistics=self.statistics,
representations=self.representations)

def cast(self, field_type: 'FieldType') -> 'RecordsSchemaField':
if self.constraints is None:
constraints = None
else:
constraints = self.constraints.cast(field_type)
if self.statistics is None:
statistics = None
else:
statistics = self.statistics.cast(field_type)
field = RecordsSchemaField(name=self.name,
field_type=field_type,
constraints=constraints,
statistics=statistics,
representations=self.representations)
return field
45 changes: 42 additions & 3 deletions records_mover/records/schema/field/constraints/constraints.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging

from records_mover.mover_types import _assert_never
from typing import Optional, cast, TYPE_CHECKING
from records_mover.utils.limits import (FLOAT16_SIGNIFICAND_BITS,
FLOAT32_SIGNIFICAND_BITS,
Expand Down Expand Up @@ -31,14 +31,14 @@ class FieldIntegerConstraintsDict(FieldConstraintsDict, total=False):
min: str
max: str

from ..types import FieldType # noqa
from ..field_types import FieldType # noqa


logger = logging.getLogger(__name__)


class RecordsSchemaFieldConstraints:
def __init__(self, required: bool, unique: Optional[bool]=None):
def __init__(self, required: bool, unique: Optional[bool] = None):
"""
:param required: If True, data must always be provided for this
column in the origin representation; if False, a 'null' or
Expand Down Expand Up @@ -108,6 +108,45 @@ def from_data(data: Optional['FieldConstraintsDict'],
else:
return RecordsSchemaFieldConstraints(required=required, unique=unique)

def cast(self, field_type: 'FieldType') -> 'RecordsSchemaFieldConstraints':
from .integer import RecordsSchemaFieldIntegerConstraints
from .decimal import RecordsSchemaFieldDecimalConstraints
from .string import RecordsSchemaFieldStringConstraints
required = self.required
unique = self.unique
constraints: RecordsSchemaFieldConstraints
if field_type == 'integer':
constraints =\
RecordsSchemaFieldIntegerConstraints(required=required,
unique=unique,
min_=None,
max_=None)
elif field_type == 'string':
constraints =\
RecordsSchemaFieldStringConstraints(required=required,
unique=unique,
max_length_bytes=None,
max_length_chars=None)
elif field_type == 'decimal':
constraints =\
RecordsSchemaFieldDecimalConstraints(required=required,
unique=unique)
elif (field_type == 'boolean' or
field_type == 'date' or
field_type == 'time' or
field_type == 'timetz' or
field_type == 'datetime' or
field_type == 'datetimetz'):
constraints =\
RecordsSchemaFieldConstraints(required=required,
unique=unique)
else:
_assert_never(field_type,
'Teach me how to downcast constraints '
f'for {field_type}')

return constraints

@staticmethod
def from_sqlalchemy_type(required: bool,
unique: Optional[bool],
Expand Down
16 changes: 16 additions & 0 deletions records_mover/records/schema/field/field_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing_extensions import Literal
from typing_inspect import get_args
from typing import List

FieldType = Literal['integer',
'decimal',
'string',
'boolean',
'date',
'time',
'timetz',
'datetime',
'datetimetz']

# Be sure to add new things below in FieldType, too
RECORDS_FIELD_TYPES: List[str] = list(get_args(FieldType)) # type: ignore
2 changes: 1 addition & 1 deletion records_mover/records/schema/field/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Optional, Tuple, TYPE_CHECKING
from .constraints import RecordsSchemaFieldConstraints
if TYPE_CHECKING:
from .types import FieldType # noqa
from .field_types import FieldType # noqa


def details_from_numpy_dtype(dtype: numpy.dtype,
Expand Down
9 changes: 5 additions & 4 deletions records_mover/records/schema/field/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def field_from_series(series: Series,
def refine_field_from_series(field: 'RecordsSchemaField',
series: Series,
total_rows: int,
rows_sampled: int) -> None:
rows_sampled: int) -> 'RecordsSchemaField':
from ..field import RecordsSchemaField # noqa
#
# if the series is full of object types that aren't numpy
Expand All @@ -57,7 +57,7 @@ def refine_field_from_series(field: 'RecordsSchemaField',
field_type = field.python_type_to_field_type(unique_python_type)
if field_type is not None:
if RecordsSchemaField.is_more_specific_type(field_type, field.field_type):
field.field_type = field_type
field = field.cast(field_type)

if field.field_type == 'string':
max_column_length = series.astype('str').map(len).max()
Expand All @@ -70,7 +70,8 @@ def refine_field_from_series(field: 'RecordsSchemaField',
if field.statistics is None:
field.statistics = statistics
elif not isinstance(field.statistics, RecordsSchemaFieldStringStatistics):
raise SyntaxError("Did not expect to see existing statistics "
f"for string type: {field.statistics}")
raise ValueError("Did not expect to see existing statistics "
f"for string type: {field.statistics}")
else:
field.statistics.merge(statistics)
return field
10 changes: 7 additions & 3 deletions records_mover/records/schema/field/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ....db import DBDriver # noqa
from ..field import RecordsSchemaField # noqa
from ..schema import RecordsSchema # noqa
from .types import FieldType # noqa
from .field_types import FieldType # noqa


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -127,6 +127,10 @@ def field_from_sqlalchemy_column(column: Column,
def field_to_sqlalchemy_type(field: 'RecordsSchemaField',
driver: 'DBDriver') -> sqlalchemy.types.TypeEngine:
if field.field_type == 'integer':
if field.constraints and\
not isinstance(field.constraints, RecordsSchemaFieldIntegerConstraints):
raise ValueError(f"Incorrect constraint type in {field.name}: {field.constraints}")

int_constraints =\
cast(Optional[RecordsSchemaFieldIntegerConstraints], field.constraints)
min_: Optional[int] = None
Expand Down Expand Up @@ -162,11 +166,11 @@ def field_to_sqlalchemy_type(field: 'RecordsSchemaField',
elif field.field_type == 'string':
if field.constraints and\
not isinstance(field.constraints, RecordsSchemaFieldStringConstraints):
raise SyntaxError(f"Incorrect constraint type: {field.constraints}")
raise ValueError(f"Incorrect constraint type in {field.name}: {field.constraints}")

if field.statistics and\
not isinstance(field.statistics, RecordsSchemaFieldStringStatistics):
raise SyntaxError(f"Incorrect statistics type: {field.statistics}")
raise ValueError(f"Incorrect statistics type in {field.name}: {field.statistics}")

string_constraints =\
cast(Optional[RecordsSchemaFieldStringConstraints], field.constraints)
Expand Down
12 changes: 11 additions & 1 deletion records_mover/records/schema/field/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
if TYPE_CHECKING:
from mypy_extensions import TypedDict

from .types import FieldType # noqa
from .field_types import FieldType # noqa

class FieldStatisticsDict(TypedDict):
rows_sampled: int
Expand Down Expand Up @@ -48,6 +48,10 @@ def from_data(data: Optional[Union['FieldStatisticsDict', 'StringFieldStatistics
return RecordsSchemaFieldStatistics(rows_sampled=rows_sampled,
total_rows=total_rows)

def cast(self, field_type: 'FieldType') -> Optional['RecordsSchemaFieldStatistics']:
# only string provides statistics at this point
return None

def __str__(self) -> str:
return f"{type(self)}({self.to_data()})"

Expand All @@ -74,3 +78,9 @@ def to_data(self) -> 'StringFieldStatisticsDict':

def merge(self, other: 'RecordsSchemaFieldStringStatistics') -> None:
raise NotImplementedError

def cast(self, field_type: 'FieldType') -> Optional['RecordsSchemaFieldStatistics']:
if field_type == 'string':
return self
else:
return super().cast(field_type)
25 changes: 0 additions & 25 deletions records_mover/records/schema/field/types.py

This file was deleted.

6 changes: 3 additions & 3 deletions records_mover/records/schema/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ def from_fileobjs(fileobjs: List[IO[bytes]],
schema = RecordsSchema.from_dataframe(df, processing_instructions,
include_index=False)

schema.refine_from_dataframe(df,
processing_instructions=processing_instructions)
schema = schema.refine_from_dataframe(df,
processing_instructions=processing_instructions)
return schema

def refine_from_dataframe(self,
df: 'DataFrame',
processing_instructions:
ProcessingInstructions = ProcessingInstructions()) -> None:
ProcessingInstructions = ProcessingInstructions()) -> 'RecordsSchema':
"""
Adjust records schema based on facts found from a dataframe.
"""
Expand Down
14 changes: 10 additions & 4 deletions records_mover/records/schema/schema/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def schema_from_dataframe(df: DataFrame,
def refine_schema_from_dataframe(records_schema: 'RecordsSchema',
df: DataFrame,
processing_instructions:
ProcessingInstructions = ProcessingInstructions()) -> None:
ProcessingInstructions = ProcessingInstructions()) ->\
'RecordsSchema':
from records_mover.records.schema import RecordsSchema

max_sample_size = processing_instructions.max_inference_rows
total_rows = len(df.index)
if max_sample_size is not None and max_sample_size < total_rows:
Expand All @@ -44,8 +47,11 @@ def refine_schema_from_dataframe(records_schema: 'RecordsSchema',
sampled_df = df
rows_sampled = len(sampled_df.index)

for field in records_schema.fields:
series = sampled_df[field.name]
field.refine_from_series(series,
fields = [
field.refine_from_series(sampled_df[field.name],
total_rows=total_rows,
rows_sampled=rows_sampled)
for field in records_schema.fields
]
return RecordsSchema(fields=fields,
known_representations=records_schema.known_representations)
2 changes: 1 addition & 1 deletion records_mover/records/sources/dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def schema_from_df(self, df: 'DataFrame',
# Otherwise, gather information to create an efficient
# schema on the target of the move.
#
records_schema.refine_from_dataframe(df, processing_instructions)
records_schema = records_schema.refine_from_dataframe(df, processing_instructions)

return records_schema

Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ max-complexity = 15
mypy_path = types/stubs
warn_unused_ignores = True

[mypy-alembic.*]
ignore_missing_imports = True

# https://github.com/pandas-dev/pandas/issues/26766
# https://github.com/pandas-dev/pandas/issues/26792
# https://github.com/pandas-dev/pandas/issues/28142
Expand Down
Loading

0 comments on commit 34c3b03

Please sign in to comment.