From f4b01378171302e89390f967232ccab6c8193204 Mon Sep 17 00:00:00 2001 From: Tim Ryan Date: Mon, 27 Mar 2023 12:06:17 -0400 Subject: [PATCH] RM-48 factor to_pandas_dtype into dictionary RM-48 check if is callable RM-48 call only when complicated RM-48 factor to_pandas_dtype into dictionary --- metrics/flake8_high_water_mark | 2 +- .../records/schema/field/__init__.py | 170 +++++++++--------- 2 files changed, 90 insertions(+), 82 deletions(-) diff --git a/metrics/flake8_high_water_mark b/metrics/flake8_high_water_mark index 7ed6ff82d..b8626c4cf 100644 --- a/metrics/flake8_high_water_mark +++ b/metrics/flake8_high_water_mark @@ -1 +1 @@ -5 +4 diff --git a/records_mover/records/schema/field/__init__.py b/records_mover/records/schema/field/__init__.py index 2202ce601..5defa2e9c 100644 --- a/records_mover/records/schema/field/__init__.py +++ b/records_mover/records/schema/field/__init__.py @@ -199,96 +199,104 @@ def components_to_time_str(df: pd.DataFrame) -> datetime.time: target_type) return series.astype(target_type) - def to_pandas_dtype(self) -> 'Dtype': + def process_integer_field_type_to_pd_dtype(self) -> 'Dtype': import numpy as np import pandas as pd - from .pandas import supports_nullable_ints, integer_type_for_range + from .pandas import integer_type_for_range, supports_nullable_ints has_extension_types = supports_nullable_ints() + int_constraints =\ + cast(Optional[RecordsSchemaFieldIntegerConstraints], self.constraints) + min_: Optional[int] = None + max_: Optional[int] = None + required = False + if int_constraints: + min_ = int_constraints.min_ + max_ = int_constraints.max_ + required = int_constraints.required + + if not required and not has_extension_types: + logger.warning(f"Dataframe field {self.name} is nullable, but using pandas " + f"{pd.__version__} which does not support nullable integer type") + + if min_ is not None and max_ is not None: + dtype = integer_type_for_range(min_, max_, has_extension_types) + if dtype: + return dtype + else: + logger.warning("Asked for a type larger than int64 in dataframe " + f"field '{self.name}' - providing float128, but " + "loss of precision will occur! " + f"Requested min/max values: {min_}/{max_}") + return np.float128 + else: + logger.warning(f"No integer constraints provided for field '{self.name}'; " + "using int64") + if has_extension_types: + return pd.Int64Dtype() + else: + return np.int64 - if self.field_type == 'integer': - int_constraints =\ - cast(Optional[RecordsSchemaFieldIntegerConstraints], self.constraints) - min_: Optional[int] = None - max_: Optional[int] = None - required = False - if int_constraints: - min_ = int_constraints.min_ - max_ = int_constraints.max_ - required = int_constraints.required - - if not required and not has_extension_types: - logger.warning(f"Dataframe field {self.name} is nullable, but using pandas " - f"{pd.__version__} which does not support nullable integer type") - - if min_ is not None and max_ is not None: - dtype = integer_type_for_range(min_, max_, has_extension_types) - if dtype: - return dtype - else: - logger.warning("Asked for a type larger than int64 in dataframe " - f"field '{self.name}' - providing float128, but " - "loss of precision will occur! " - f"Requested min/max values: {min_}/{max_}") + def process_decimal_field_type_to_pd_dtype(self) -> 'Dtype': + import numpy as np + + decimal_constraints =\ + cast(Optional[RecordsSchemaFieldDecimalConstraints], self.constraints) + if decimal_constraints: + if (decimal_constraints.fixed_precision is not None and + decimal_constraints.fixed_scale is not None): + logger.warning("Pandas doesn't support a fixed precision type - " + "using np.float64") + return np.float64 + elif (decimal_constraints.fp_total_bits is not None and + decimal_constraints.fp_significand_bits is not None): + if (decimal_constraints.fp_total_bits <= 16 and + decimal_constraints.fp_significand_bits <= FLOAT16_SIGNIFICAND_BITS): + return np.float16 + elif (decimal_constraints.fp_total_bits <= 32 and + decimal_constraints.fp_significand_bits <= FLOAT32_SIGNIFICAND_BITS): + return np.float32 + elif (decimal_constraints.fp_total_bits <= 64 and + decimal_constraints.fp_significand_bits <= FLOAT64_SIGNIFICAND_BITS): + return np.float64 + elif (decimal_constraints.fp_total_bits <= 80 and + decimal_constraints.fp_significand_bits <= FLOAT80_SIGNIFICAND_BITS): return np.float128 - else: - logger.warning(f"No integer constraints provided for field '{self.name}'; " - "using int64") - if has_extension_types: - return pd.Int64Dtype() else: - return np.int64 - elif self.field_type == 'decimal': - decimal_constraints =\ - cast(Optional[RecordsSchemaFieldDecimalConstraints], self.constraints) - if decimal_constraints: - if (decimal_constraints.fixed_precision is not None and - decimal_constraints.fixed_scale is not None): - logger.warning("Pandas doesn't support a fixed precision type - " - "using np.float64") - return np.float64 - elif (decimal_constraints.fp_total_bits is not None and - decimal_constraints.fp_significand_bits is not None): - if (decimal_constraints.fp_total_bits <= 16 and - decimal_constraints.fp_significand_bits <= FLOAT16_SIGNIFICAND_BITS): - return np.float16 - elif (decimal_constraints.fp_total_bits <= 32 and - decimal_constraints.fp_significand_bits <= FLOAT32_SIGNIFICAND_BITS): - return np.float32 - elif (decimal_constraints.fp_total_bits <= 64 and - decimal_constraints.fp_significand_bits <= FLOAT64_SIGNIFICAND_BITS): - return np.float64 - elif (decimal_constraints.fp_total_bits <= 80 and - decimal_constraints.fp_significand_bits <= FLOAT80_SIGNIFICAND_BITS): - return np.float128 - else: - logger.warning("Downgrading float type to np.float128. " - "Requested total bits: " - f"{decimal_constraints.fp_total_bits}. " - "Requested significand bits: " - f"{decimal_constraints.fp_significand_bits}") - return np.float128 - - logger.warning(f"No decimal constraints provided for field '{self.name}'; " - "using float64") - return np.float64 - elif self.field_type == 'boolean': - return np.bool_ - elif self.field_type == 'string': - return np.object_ - elif self.field_type == 'date': - return np.object_ - elif self.field_type == 'datetime': - return 'datetime64[ns]' - elif self.field_type == 'datetimetz': - return 'datetime64[ns, UTC]' - elif self.field_type == 'time': - return np.object_ - elif self.field_type == 'timetz': - return np.object_ - else: + logger.warning("Downgrading float type to np.float128. " + "Requested total bits: " + f"{decimal_constraints.fp_total_bits}. " + "Requested significand bits: " + f"{decimal_constraints.fp_significand_bits}") + return np.float128 + + logger.warning(f"No decimal constraints provided for field '{self.name}'; " + "using float64") + return np.float64 + + def to_pandas_dtype(self) -> 'Dtype': + import numpy as np + + field_type_to_pd_dtype_map = { + 'integer': self.process_integer_field_type_to_pd_dtype, + 'decimal': self.process_decimal_field_type_to_pd_dtype, + 'boolean': np.bool_, + 'string': np.object_, + 'date': np.object_, + 'datetime': 'datetime64[ns]', + 'datetimetz': 'datetime64[ns, UTC]', + 'time': np.object_, + 'timetz': np.object_, + } + + pd_dtype = field_type_to_pd_dtype_map.get(self.field_type) + if not pd_dtype: raise NotImplementedError("Teach me how to handle records schema " f"type {self.field_type}") + if self.field_type in ('integer', 'decimal') and callable(pd_dtype): + pd_dtype = pd_dtype() + + return pd_dtype def to_data(self) -> 'FieldDict': out: 'FieldDict' = {'type': self.field_type}