Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Allow nullable integer columns to be used from Pandas 1.0+ #138

Closed
wants to merge 13 commits into from
15 changes: 15 additions & 0 deletions records_mover/pandas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import logging
import json
import numpy as np
from pandas import DataFrame
from typing import Any


logger = logging.getLogger(__name__)


# http://stackoverflow.com/questions/27050108/convert-numpy-type-to-python
class NumPyJSONEncoder(json.JSONEncoder):
def default(self, obj: object) -> object:
Expand Down Expand Up @@ -33,3 +37,14 @@ def purge_unnamed_unused_columns(df: DataFrame) -> DataFrame:
if not df[column].notnull().any():
df = df.drop(column, axis=1)
return df


def convert_integer_dtypes(df: 'DataFrame'):
# TODO: Document
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
if 'convert_dtypes' in dir(df):
# Allow nullable integers to be represented
df = df.convert_dtypes(convert_integer=True)
else:
logger.warning("Using old version of pandas; "
"not able to represent nullable integer columns")
return df
4 changes: 3 additions & 1 deletion records_mover/records/schema/field/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ def details_from_numpy_dtype(dtype: numpy.dtype,
unique: bool) -> Tuple['FieldType',
RecordsSchemaFieldConstraints]:
from ..field import RecordsSchemaField
basename = dtype.base.name
basename = str(dtype)
if 'base' in dir(dtype) and 'name' in dir(dtype.base):
basename = dtype.base.name
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new Integer64 type in Pandas is not actually a type that has a base type, so there's no 'base' attribute in it.

field_type: Optional['FieldType']
if basename.startswith('datetime64'):
has_tz = getattr(dtype, "tz", None) is not None
Expand Down
14 changes: 9 additions & 5 deletions records_mover/records/schema/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
import json
from typing import List, Dict, Mapping, IO, Any, TYPE_CHECKING
from ..field import RecordsSchemaField
from ...records_format import BaseRecordsFormat
from ...processing_instructions import ProcessingInstructions
from .known_representation import RecordsSchemaKnownRepresentation
from ..errors import UnsupportedSchemaError
from records_mover.pandas import convert_integer_dtypes
from records_mover.records.schema.field import RecordsSchemaField
from records_mover.records.records_format import BaseRecordsFormat
from records_mover.records.processing_instructions import ProcessingInstructions
from records_mover.records.schema.schema.known_representation import (
RecordsSchemaKnownRepresentation
)
from records_mover.records.schema.errors import UnsupportedSchemaError
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change. Generally I'd like to start moving away from relative import addressing, as it gets confusing and painful to rebaseline when I move code around.

if TYPE_CHECKING:
from pandas import DataFrame

Expand Down Expand Up @@ -160,6 +163,7 @@ def from_fileobjs(fileobjs: List[IO[bytes]],
fileobj.seek(0)

df = purge_unnamed_unused_columns(df)
df = convert_integer_dtypes(df)
schema = RecordsSchema.from_dataframe(df, processing_instructions,
include_index=False)

Expand Down
17 changes: 16 additions & 1 deletion records_mover/records/sources/fileobjs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import Mapping, IO, Optional, Iterator, List, Any, TYPE_CHECKING
if TYPE_CHECKING:
from .dataframes import DataframesRecordsSource # noqa
from pandas import Dataframe
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,14 +155,28 @@ def to_dataframes_source(self,
else:
chunksize = int(entries_per_chunk / num_fields)

def fix_integer_columns(dfs: Iterator['Dataframe']) -> Iterator['Dataframe']:
# TODO: Move generator expression using that function
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
for df in dfs:
if 'convert_dtypes' in dir(df):
# Allow nullable integers to be represented
df = df.convert_dtypes(convert_integer=True)
else:
logger.warning("Using old version of pandas; "
"not able to represent nullable integer columns")
yield df

# TODO: switch to generator expressions: https://treyhunner.com/2018/06/how-to-make-an-iterator-in-python/
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved
vinceatbluelabs marked this conversation as resolved.
Show resolved Hide resolved

try:
dfs = pd.read_csv(filepath_or_buffer=target_fileobj,
iterator=True,
chunksize=chunksize,
**options)
except pd.errors.EmptyDataError:
dfs = [self.records_schema.to_empty_dataframe()]
yield DataframesRecordsSource(dfs=dfs, records_schema=self.records_schema)
yield DataframesRecordsSource(dfs=fix_integer_columns(dfs),
records_schema=self.records_schema)
finally:
if text_fileobj is not None:
text_fileobj.detach()
Expand Down