Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Allow nullable integer columns to be used from Pandas 1.0+ #138

Closed
wants to merge 13 commits into from
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.6700
93.7000
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
92.5400
92.4900
21 changes: 21 additions & 0 deletions records_mover/pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import logging
import json
import numpy as np
from pandas import DataFrame
from typing import Any


logger = logging.getLogger(__name__)


# http://stackoverflow.com/questions/27050108/convert-numpy-type-to-python
class NumPyJSONEncoder(json.JSONEncoder):
def default(self, obj: object) -> object:
Expand Down Expand Up @@ -33,3 +37,20 @@ def purge_unnamed_unused_columns(df: DataFrame) -> DataFrame:
if not df[column].notnull().any():
df = df.drop(column, axis=1)
return df


def convert_dtypes(df: 'DataFrame') -> 'DataFrame':
"""Allow nullable columns to be used in Pandas 1.0+ - prior to that,
Pandas would use e.g., a numpy floating point type for integers,
representing nulls as NaN.

If Pandas <1.0 is being used, logs a warning message and proceeds
with the raw dtypes.
"""
if 'convert_dtypes' in dir(df):
# Allow nullable integers to be represented
df = df.convert_dtypes()
else:
logger.warning("Using old version of pandas; "
"not able to represent nullable integer columns")
return df
4 changes: 3 additions & 1 deletion records_mover/records/schema/field/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ def details_from_numpy_dtype(dtype: numpy.dtype,
unique: bool) -> Tuple['FieldType',
RecordsSchemaFieldConstraints]:
from ..field import RecordsSchemaField
basename = dtype.base.name
basename = str(dtype)
if 'base' in dir(dtype) and 'name' in dir(dtype.base):
basename = dtype.base.name
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new Integer64 type in Pandas is not actually a type that has a base type, so there's no 'base' attribute in it.

field_type: Optional['FieldType']
if basename.startswith('datetime64'):
has_tz = getattr(dtype, "tz", None) is not None
Expand Down
14 changes: 9 additions & 5 deletions records_mover/records/schema/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging
import json
from typing import List, Dict, Mapping, IO, Any, TYPE_CHECKING
from ..field import RecordsSchemaField
from ...records_format import BaseRecordsFormat
from ...processing_instructions import ProcessingInstructions
from .known_representation import RecordsSchemaKnownRepresentation
from ..errors import UnsupportedSchemaError
from records_mover.records.schema.field import RecordsSchemaField
from records_mover.records.records_format import BaseRecordsFormat
from records_mover.records.processing_instructions import ProcessingInstructions
from records_mover.records.schema.schema.known_representation import (
RecordsSchemaKnownRepresentation
)
from records_mover.records.schema.errors import UnsupportedSchemaError
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change. Generally I'd like to start moving away from relative import addressing, as it gets confusing and painful to rebaseline when I move code around.

if TYPE_CHECKING:
from pandas import DataFrame

Expand Down Expand Up @@ -138,6 +140,7 @@ def from_fileobjs(fileobjs: List[IO[bytes]],
"""
from records_mover.records.delimited import stream_csv
from records_mover.pandas import purge_unnamed_unused_columns
from records_mover.pandas import convert_dtypes

if len(fileobjs) != 1:
# https://github.com/bluelabsio/records-mover/issues/84
Expand All @@ -160,6 +163,7 @@ def from_fileobjs(fileobjs: List[IO[bytes]],
fileobj.seek(0)

df = purge_unnamed_unused_columns(df)
df = convert_dtypes(df)
schema = RecordsSchema.from_dataframe(df, processing_instructions,
include_index=False)

Expand Down
8 changes: 5 additions & 3 deletions records_mover/records/sources/fileobjs.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ def to_dataframes_source(self,
processing_instructions: ProcessingInstructions) \
-> Iterator['DataframesRecordsSource']:
import pandas as pd
from .dataframes import DataframesRecordsSource # noqa
from ..pandas import pandas_read_csv_options
from records_mover.pandas import convert_dtypes
from records_mover.records.sources.dataframes import DataframesRecordsSource # noqa
from records_mover.records.pandas import pandas_read_csv_options

"""Convert current source to a DataframeSource and present it in a context manager"""
if not isinstance(self.records_format, DelimitedRecordsFormat):
Expand Down Expand Up @@ -161,7 +162,8 @@ def to_dataframes_source(self,
**options)
except pd.errors.EmptyDataError:
dfs = [self.records_schema.to_empty_dataframe()]
yield DataframesRecordsSource(dfs=dfs, records_schema=self.records_schema)
yield DataframesRecordsSource(dfs=(convert_dtypes(df) for df in dfs),
records_schema=self.records_schema)
finally:
if text_fileobj is not None:
text_fileobj.detach()
Expand Down
21 changes: 18 additions & 3 deletions tests/unit/records/sources/test_fileobjs.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,11 @@ def read_csv_options(records_format,
'name': mock_fileobj
}
mock_processing_instructions = Mock(name='processing_instructions')
mock_reader = mock_read_csv.return_value
mock_dfs = mock_reader
source = FileobjsSource(target_names_to_input_fileobjs=mock_target_names_to_input_fileobjs,
records_schema=mock_records_schema,
records_format=mock_records_format)
with source.to_dataframes_source(mock_processing_instructions) as df_source:
self.assertEqual(df_source.dfs, mock_dfs)
self.assertIsNotNone(df_source)

@patch('records_mover.records.sources.fileobjs.MoveResult')
def test_move_to_records_directory(self,
Expand Down Expand Up @@ -193,3 +191,20 @@ def test_move_to_records_directory(self,
mock_MoveResult.assert_called_with(move_count=None,
output_urls={'file.mumble': 'vmb://dir/file.mumble'})
self.assertEqual(out, mock_MoveResult.return_value)

@patch('records_mover.records.sources.fileobjs.TemporaryDirectory')
@patch('records_mover.records.sources.fileobjs.FilesystemDirectoryUrl')
def test_temporary_unloadable_directory_loc(self,
mock_FilesystemDirectoryUrl,
mock_TemporaryDirectory):
mock_records_format = Mock(name='records_format')
mock_records_schema = Mock(name='records_schema')
mock_records_schema = Mock(name='records_schema')
mock_target_names_to_input_fileobjs = Mock(name='target_names_to_input_fileobjs')
source = FileobjsSource(target_names_to_input_fileobjs=mock_target_names_to_input_fileobjs,
records_schema=mock_records_schema,
records_format=mock_records_format)
with source.temporary_unloadable_directory_loc() as temp_loc:
self.assertEqual(temp_loc, mock_FilesystemDirectoryUrl.return_value)
mock_temp_dir = mock_TemporaryDirectory.return_value.__enter__.return_value
mock_FilesystemDirectoryUrl.assert_called_with(mock_temp_dir)