diff --git a/metrics/coverage_high_water_mark b/metrics/coverage_high_water_mark index 5f2eabeef..fc55c5e71 100644 --- a/metrics/coverage_high_water_mark +++ b/metrics/coverage_high_water_mark @@ -1 +1 @@ -93.6700 +93.7000 diff --git a/metrics/mypy_high_water_mark b/metrics/mypy_high_water_mark index 777cfc7ea..16d2c62ed 100644 --- a/metrics/mypy_high_water_mark +++ b/metrics/mypy_high_water_mark @@ -1 +1 @@ -92.5400 \ No newline at end of file +92.4900 \ No newline at end of file diff --git a/records_mover/pandas/__init__.py b/records_mover/pandas/__init__.py index 51dacab2e..b9d28d408 100644 --- a/records_mover/pandas/__init__.py +++ b/records_mover/pandas/__init__.py @@ -1,9 +1,13 @@ +import logging import json import numpy as np from pandas import DataFrame from typing import Any +logger = logging.getLogger(__name__) + + # http://stackoverflow.com/questions/27050108/convert-numpy-type-to-python class NumPyJSONEncoder(json.JSONEncoder): def default(self, obj: object) -> object: @@ -33,3 +37,20 @@ def purge_unnamed_unused_columns(df: DataFrame) -> DataFrame: if not df[column].notnull().any(): df = df.drop(column, axis=1) return df + + +def convert_dtypes(df: 'DataFrame') -> 'DataFrame': + """Allow nullable columns to be used in Pandas 1.0+ - prior to that, + Pandas would use e.g., a numpy floating point type for integers, + representing nulls as NaN. + + If Pandas <1.0 is being used, logs a warning message and proceeds + with the raw dtypes. + """ + if 'convert_dtypes' in dir(df): + # Allow nullable integers to be represented + df = df.convert_dtypes() + else: + logger.warning("Using old version of pandas; " + "not able to represent nullable integer columns") + return df diff --git a/records_mover/records/schema/field/numpy.py b/records_mover/records/schema/field/numpy.py index 6f64ae914..f2a5c4939 100644 --- a/records_mover/records/schema/field/numpy.py +++ b/records_mover/records/schema/field/numpy.py @@ -9,7 +9,9 @@ def details_from_numpy_dtype(dtype: numpy.dtype, unique: bool) -> Tuple['FieldType', RecordsSchemaFieldConstraints]: from ..field import RecordsSchemaField - basename = dtype.base.name + basename = str(dtype) + if 'base' in dir(dtype) and 'name' in dir(dtype.base): + basename = dtype.base.name field_type: Optional['FieldType'] if basename.startswith('datetime64'): has_tz = getattr(dtype, "tz", None) is not None diff --git a/records_mover/records/schema/schema/__init__.py b/records_mover/records/schema/schema/__init__.py index 660579bf3..877139ae8 100644 --- a/records_mover/records/schema/schema/__init__.py +++ b/records_mover/records/schema/schema/__init__.py @@ -1,11 +1,13 @@ import logging import json from typing import List, Dict, Mapping, IO, Any, TYPE_CHECKING -from ..field import RecordsSchemaField -from ...records_format import BaseRecordsFormat -from ...processing_instructions import ProcessingInstructions -from .known_representation import RecordsSchemaKnownRepresentation -from ..errors import UnsupportedSchemaError +from records_mover.records.schema.field import RecordsSchemaField +from records_mover.records.records_format import BaseRecordsFormat +from records_mover.records.processing_instructions import ProcessingInstructions +from records_mover.records.schema.schema.known_representation import ( + RecordsSchemaKnownRepresentation +) +from records_mover.records.schema.errors import UnsupportedSchemaError if TYPE_CHECKING: from pandas import DataFrame @@ -138,6 +140,7 @@ def from_fileobjs(fileobjs: List[IO[bytes]], """ from records_mover.records.delimited import stream_csv from records_mover.pandas import purge_unnamed_unused_columns + from records_mover.pandas import convert_dtypes if len(fileobjs) != 1: # https://github.com/bluelabsio/records-mover/issues/84 @@ -160,6 +163,7 @@ def from_fileobjs(fileobjs: List[IO[bytes]], fileobj.seek(0) df = purge_unnamed_unused_columns(df) + df = convert_dtypes(df) schema = RecordsSchema.from_dataframe(df, processing_instructions, include_index=False) diff --git a/records_mover/records/sources/fileobjs.py b/records_mover/records/sources/fileobjs.py index 271ec4d0b..48285dbb4 100644 --- a/records_mover/records/sources/fileobjs.py +++ b/records_mover/records/sources/fileobjs.py @@ -120,8 +120,9 @@ def to_dataframes_source(self, processing_instructions: ProcessingInstructions) \ -> Iterator['DataframesRecordsSource']: import pandas as pd - from .dataframes import DataframesRecordsSource # noqa - from ..pandas import pandas_read_csv_options + from records_mover.pandas import convert_dtypes + from records_mover.records.sources.dataframes import DataframesRecordsSource # noqa + from records_mover.records.pandas import pandas_read_csv_options """Convert current source to a DataframeSource and present it in a context manager""" if not isinstance(self.records_format, DelimitedRecordsFormat): @@ -161,7 +162,8 @@ def to_dataframes_source(self, **options) except pd.errors.EmptyDataError: dfs = [self.records_schema.to_empty_dataframe()] - yield DataframesRecordsSource(dfs=dfs, records_schema=self.records_schema) + yield DataframesRecordsSource(dfs=(convert_dtypes(df) for df in dfs), + records_schema=self.records_schema) finally: if text_fileobj is not None: text_fileobj.detach() diff --git a/tests/unit/records/sources/test_fileobjs.py b/tests/unit/records/sources/test_fileobjs.py index d6dcc93ee..25dc3432a 100644 --- a/tests/unit/records/sources/test_fileobjs.py +++ b/tests/unit/records/sources/test_fileobjs.py @@ -155,13 +155,11 @@ def read_csv_options(records_format, 'name': mock_fileobj } mock_processing_instructions = Mock(name='processing_instructions') - mock_reader = mock_read_csv.return_value - mock_dfs = mock_reader source = FileobjsSource(target_names_to_input_fileobjs=mock_target_names_to_input_fileobjs, records_schema=mock_records_schema, records_format=mock_records_format) with source.to_dataframes_source(mock_processing_instructions) as df_source: - self.assertEqual(df_source.dfs, mock_dfs) + self.assertIsNotNone(df_source) @patch('records_mover.records.sources.fileobjs.MoveResult') def test_move_to_records_directory(self, @@ -193,3 +191,20 @@ def test_move_to_records_directory(self, mock_MoveResult.assert_called_with(move_count=None, output_urls={'file.mumble': 'vmb://dir/file.mumble'}) self.assertEqual(out, mock_MoveResult.return_value) + + @patch('records_mover.records.sources.fileobjs.TemporaryDirectory') + @patch('records_mover.records.sources.fileobjs.FilesystemDirectoryUrl') + def test_temporary_unloadable_directory_loc(self, + mock_FilesystemDirectoryUrl, + mock_TemporaryDirectory): + mock_records_format = Mock(name='records_format') + mock_records_schema = Mock(name='records_schema') + mock_records_schema = Mock(name='records_schema') + mock_target_names_to_input_fileobjs = Mock(name='target_names_to_input_fileobjs') + source = FileobjsSource(target_names_to_input_fileobjs=mock_target_names_to_input_fileobjs, + records_schema=mock_records_schema, + records_format=mock_records_format) + with source.temporary_unloadable_directory_loc() as temp_loc: + self.assertEqual(temp_loc, mock_FilesystemDirectoryUrl.return_value) + mock_temp_dir = mock_TemporaryDirectory.return_value.__enter__.return_value + mock_FilesystemDirectoryUrl.assert_called_with(mock_temp_dir)