Skip to content

Commit

Permalink
Better hint sniffing (#57)
Browse files Browse the repository at this point in the history
The Pandas hint inference is actually not doing that much for us. The following things aren't inferred at all by it:

'escape'
'doublequote'
'quotechar',
'encoding'
'header-row'

This is confusing, as our current code nominally pulls that info from a pandas object - but in practice, those things are either pulled from what we originally passed in to read_csv() or set to default values - https://github.com/pandas-dev/pandas/blob/e9b019b653d37146f9095bb0522525b3a8d9e386/pandas/io/parsers.py#L2253

It turns out that Python itself ships with an csv.Sniffer class that can do a lot of this. There are some limitations - it only works with some common record terminators and it only operates on decompressed files, but those are things we can work around.

The net effect is a lot better hint sniffing overall, which shows in the component test results here (sample CSV need fewer initial hints to be able to understand the files).
  • Loading branch information
vinceatbluelabs authored May 16, 2020
1 parent 6ec62d5 commit ff786a8
Show file tree
Hide file tree
Showing 20 changed files with 493 additions and 162 deletions.
2 changes: 1 addition & 1 deletion metrics/coverage_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
93.9600
93.9900
2 changes: 1 addition & 1 deletion metrics/mypy_high_water_mark
Original file line number Diff line number Diff line change
@@ -1 +1 @@
91.8200
92.1000
9 changes: 0 additions & 9 deletions records_mover/records/delimited/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,3 @@
# even if the only data it saw was in ASCII, let's be ready to see more
'ascii': 'UTF8',
}

hint_compression_from_pandas: Dict[Optional[str], HintCompression] = {
# https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
# https://github.com/bluelabsio/knowledge/
# blob/master/Engineering/Architecture/JobDataExchange/output-design.md#hints
'gzip': 'GZIP',
'bz2': 'BZIP',
None: None,
}
380 changes: 262 additions & 118 deletions records_mover/records/delimited/sniff.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions records_mover/records/delimited/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
'encoding': HintEncoding,
'escape': HintEscape,
'compression': HintCompression,
'record-terminator': HintRecordTerminator,
},
total=False)

Expand Down
25 changes: 25 additions & 0 deletions records_mover/utils/rewound_fileobj.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from contextlib import contextmanager
from typing import IO, Iterator
import logging


logger = logging.getLogger(__name__)


@contextmanager
def rewound_fileobj(fileobj: IO[bytes]) -> Iterator[IO[bytes]]:
if getattr(fileobj, 'closed', None) is not None:
closed = fileobj.closed
if closed:
logger.warning("Stream already closed")
raise OSError('Stream is already closed')
if not fileobj.seekable():
# OSError is what is thrown when you call .seek() on a
# non-rewindable stream.
raise OSError('Stream is not rewindable')
original_position = fileobj.tell()
fileobj.seek(0)
try:
yield fileobj
finally:
fileobj.seek(original_position)
Empty file.
38 changes: 38 additions & 0 deletions tests/unit/records/delimited/test_sniff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from records_mover.records.delimited.sniff import (
rewound_fileobj, infer_newline_format, sniff_encoding_hint
)
from mock import Mock, patch
import unittest


class TestSniff(unittest.TestCase):
def test_rewound_fileobj_already_closed(self):
mock_fileobj = Mock(name='fileobj')
mock_fileobj.closed = True
with self.assertRaises(OSError):
with rewound_fileobj(mock_fileobj):
pass

@patch('records_mover.records.delimited.sniff.io')
def test_infer_newline_format_cant_infer(self,
mock_io):
mock_fileobj = Mock(name='fileobj')
mock_fileobj.closed = False
mock_encoding_hint = 'UTF8'
mock_compression = None
mock_text_fileobj = mock_io.TextIOWrapper.return_value
mock_text_fileobj.newlines = None
out = infer_newline_format(mock_fileobj,
mock_encoding_hint,
mock_compression)
mock_text_fileobj.readline.assert_called
self.assertIsNone(out)

@patch('records_mover.records.delimited.sniff.chardet')
def test_sniff_encoding_hint_no_result(self,
mock_chardet):
mock_fileobj = Mock(name='fileobj')
mock_fileobj.closed = False
mock_chardet.result = {}
out = sniff_encoding_hint(mock_fileobj)
self.assertIsNone(out)
121 changes: 100 additions & 21 deletions tests/unit/records/test_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
)
from mock import MagicMock, patch
import io
import gzip
import bz2
import unittest
import json
import os
Expand All @@ -11,19 +13,22 @@
class TestHints(unittest.TestCase):
maxDiff = None

def test_sniff_hints(self):
resources_dir = os.path.dirname(os.path.abspath(__file__)) + '/../resources'
hint_sniffing_dir = f"{resources_dir}/hint_sniffing"
def setUp(self):
self.resources_dir = os.path.dirname(os.path.abspath(__file__)) + '/../resources'
self.hint_sniffing_dir = f"{self.resources_dir}/hint_sniffing"

test_cases = [
def sample_file_basenames(self):
return [
os.path.splitext(os.path.basename(f))[0]
for f in os.listdir(hint_sniffing_dir)
if (os.path.isfile(os.path.join(hint_sniffing_dir, f)) and
for f in os.listdir(self.hint_sniffing_dir)
if (os.path.isfile(os.path.join(self.hint_sniffing_dir, f)) and
os.path.splitext(f)[1] == '.csv')
]
for basename in test_cases:
csv_filename = f'{hint_sniffing_dir}/{basename}.csv'
config_filename = f'{hint_sniffing_dir}/{basename}.json'

def test_sniff_hints(self):
for basename in self.sample_file_basenames():
csv_filename = f'{self.hint_sniffing_dir}/{basename}.csv'
config_filename = f'{self.hint_sniffing_dir}/{basename}.json'
with open(config_filename, 'r') as config_fileobj:
config = json.load(config_fileobj)
required_hints = config['required']
Expand All @@ -35,44 +40,117 @@ def test_sniff_hints(self):
f"Expected at least these hints while reading {basename}: "
f"{required_hints}, found these hints: {hints}")

def test_sniff_hints_gzipped_preinformed(self):
for basename in self.sample_file_basenames():
csv_filename = f'{self.hint_sniffing_dir}/{basename}.csv'
config_filename = f'{self.hint_sniffing_dir}/{basename}.json'
with open(config_filename, 'r') as config_fileobj:
config = json.load(config_fileobj)
required_hints = config['required']
initial_hints = config['initial_hints']
required_hints['compression'] = 'GZIP'
initial_hints['compression'] = 'GZIP'

with open(csv_filename, 'rb') as uncompressed_fileobj:
gzipped_data = gzip.compress(uncompressed_fileobj.read())
fileobj = io.BytesIO(gzipped_data)
hints = sniff_hints(fileobj, initial_hints=initial_hints)
self.assertTrue(set(required_hints.items()).issubset(set(hints.items())),
f"Expected at least these hints while reading {basename}: "
f"{required_hints}, found these hints: {hints}")

def test_sniff_hints_gzipped_sniffed(self):
for basename in self.sample_file_basenames():
csv_filename = f'{self.hint_sniffing_dir}/{basename}.csv'
config_filename = f'{self.hint_sniffing_dir}/{basename}.json'
with open(config_filename, 'r') as config_fileobj:
config = json.load(config_fileobj)
required_hints = config['required']
initial_hints = config['initial_hints']
required_hints['compression'] = 'GZIP'

with open(csv_filename, 'rb') as uncompressed_fileobj:
gzipped_data = gzip.compress(uncompressed_fileobj.read())
fileobj = io.BytesIO(gzipped_data)
hints = sniff_hints(fileobj, initial_hints=initial_hints)
self.assertTrue(set(required_hints.items()).issubset(set(hints.items())),
f"Expected at least these hints while reading {basename}: "
f"{required_hints}, found these hints: {hints}")

def test_sniff_hints_bzipped_preinformed(self):
for basename in self.sample_file_basenames():
csv_filename = f'{self.hint_sniffing_dir}/{basename}.csv'
config_filename = f'{self.hint_sniffing_dir}/{basename}.json'
with open(config_filename, 'r') as config_fileobj:
config = json.load(config_fileobj)
required_hints = config['required']
initial_hints = config['initial_hints']
required_hints['compression'] = 'BZIP'
initial_hints['compression'] = 'BZIP'

with open(csv_filename, 'rb') as uncompressed_fileobj:
gzipped_data = bz2.compress(uncompressed_fileobj.read())
fileobj = io.BytesIO(gzipped_data)
hints = sniff_hints(fileobj, initial_hints=initial_hints)
self.assertTrue(set(required_hints.items()).issubset(set(hints.items())),
f"Expected at least these hints while reading {basename}: "
f"{required_hints}, found these hints: {hints}")

def test_sniff_hints_bzipped_sniffed(self):
for basename in self.sample_file_basenames():
csv_filename = f'{self.hint_sniffing_dir}/{basename}.csv'
config_filename = f'{self.hint_sniffing_dir}/{basename}.json'
with open(config_filename, 'r') as config_fileobj:
config = json.load(config_fileobj)
required_hints = config['required']
initial_hints = config['initial_hints']
required_hints['compression'] = 'BZIP'

with open(csv_filename, 'rb') as uncompressed_fileobj:
gzipped_data = bz2.compress(uncompressed_fileobj.read())
fileobj = io.BytesIO(gzipped_data)
hints = sniff_hints(fileobj, initial_hints=initial_hints)
self.assertTrue(set(required_hints.items()).issubset(set(hints.items())),
f"Expected at least these hints while reading {basename}: "
f"{required_hints}, found these hints: {hints}")

@patch('records_mover.records.delimited.sniff.csv')
@patch('records_mover.records.delimited.sniff.stream_csv')
@patch('records_mover.records.delimited.sniff.io')
def test_sniff_hints_from_fileobjs(self,
mock_io,
mock_stream_csv) -> None:
mock_stream_csv,
mock_csv) -> None:
mock_fileobj = MagicMock(name='fileobj')
mock_fileobj.closed = False
mock_fileobjs = [mock_fileobj]
mock_initial_hints: BootstrappingRecordsHints = {
'field-delimiter': ','
}
mock_streaming_engine = mock_stream_csv.return_value.__enter__.return_value._engine
mock_io.TextIOWrapper.return_value.newlines = '\n'
mock_streaming_engine.compression = 'gzip'
mock_streaming_engine.encoding = 'utf-8'
out = sniff_hints_from_fileobjs(fileobjs=mock_fileobjs,
initial_hints=mock_initial_hints)
self.assertEqual(out, {
'compression': 'GZIP',
'dateformat': 'YYYY-MM-DD',
'datetimeformat': 'YYYY-MM-DD HH:MI:SS',
'datetimeformattz': 'YYYY-MM-DD HH:MI:SSOF',
'doublequote': mock_streaming_engine.doublequote,
'doublequote': mock_csv.Sniffer().sniff().doublequote,
'encoding': 'UTF8',
'escape': mock_streaming_engine.data.dialect.escapechar,
'quotechar': mock_csv.Sniffer().sniff().quotechar,
'quoting': 'minimal',
'field-delimiter': ',',
'header-row': True,
'quotechar': mock_streaming_engine.data.dialect.quotechar,
'header-row': mock_csv.Sniffer().has_header(),
'record-terminator': str(mock_io.TextIOWrapper.return_value.newlines),
'timeonlyformat': 'HH12:MI AM'
})

def test_sniff_hints_from_fileobjs_nonseekable(self):
csv = 'Liberté,égalité,fraternité\n"“a”",2,3\n'
csv_bytes = csv.encode('utf-8', errors='replace')
with io.BytesIO(csv_bytes) as fileobj:
fileobj.seekable = lambda: False
out = sniff_encoding_hint(fileobj=fileobj)
self.assertIsNone(out)
with self.assertRaises(OSError):
sniff_encoding_hint(fileobj=fileobj)

def test_sniff_hints_from_fileobjs_encodings(self):
expected_hint = {
Expand Down Expand Up @@ -107,4 +185,5 @@ def test_sniff_hints_from_fileobjs_encodings(self):
'header-row': True,
'record-terminator': '\n'
}
self.assertTrue(set(needed_settings.items()).issubset(set(out.items())))
self.assertTrue(set(needed_settings.items()).issubset(set(out.items())),
f"Needed at least {needed_settings}, got {out}")
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
"quoting": null
},
"initial_hints": {
"header-row": false,
"escape": "\\",
"quoting": null
"escape": "\\"
},
"notes": "Nothing here is correctly guessed by our current system. Boo."
"notes": "Escaping is not currently sniffed."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
num,numstr,str,comma,doublequote,quotecommaquote,newlinestr,date,time,timestamp,timestamptz,
123,123,foo,",","""",""",""","* SQL unload would generate multiple files (one for each slice/part)
* Filecat would produce a single data file",1/1/00,12:00 AM,1/2/00 12:34,1/2/00 12:34
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"required": {
"header-row": true,
"escape": null,
"quoting": "minimal",
"record-terminator": "\r\n"
},
"initial_hints": {
"escape": null
},
"notes": "Escaping is not currently sniffed."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
num,numstr,str,comma,doublequote,quotecommaquote,newlinestr,date,time,timestamp,timestamptz,123,123,foo,",","""",""",""","* SQL unload would generate multiple files (one for each slice/part)* Filecat would produce a single data file",1/1/00,12:00 AM,1/2/00 12:34,1/2/00 12:34
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"required": {
"header-row": true,
"escape": null,
"quoting": "minimal",
"record-terminator": "\r"
},
"initial_hints": {
"escape": null
},
"notes": "Escaping is not currently sniffed."
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
"required": {
"header-row": true,
"escape": null,
"quoting": "minimal"
"quoting": "minimal",
"record-terminator": "\n"
},
"initial_hints": {
"quoting": "minimal"
"escape": null
},
"notes": "Doesn't look like current code tries to sniff quoting"
"notes": "Escaping is not currently sniffed."
}
2 changes: 2 additions & 0 deletions tests/unit/resources/hint_sniffing/delimited-one-column.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
a
1
10 changes: 10 additions & 0 deletions tests/unit/resources/hint_sniffing/delimited-one-column.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"required": {
"record-terminator": "\n",
"header-row": true
},
"initial_hints": {
"header-row": true
},
"notes": "Python's sniffer doesn't do well on single-column files, meaning we don't get header-row info even if field delimiters are somewhat irrelevant"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
num numstr str comma doublequote quotecommaquote newlinestr date time timestamp timestamptz
123 123 foo , """" """,""" "* SQL unload would generate multiple files (one for each slice/part)
* Filecat would produce a single data file" 1/1/00 12:00 AM 1/2/00 12:34 1/2/00 12:34
12 changes: 12 additions & 0 deletions tests/unit/resources/hint_sniffing/delimited-tsv-with-header.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"required": {
"header-row": true,
"escape": null,
"quoting": "minimal",
"field-delimiter": "\t"
},
"initial_hints": {
"escape": null
},
"notes": "Escaping is not currently sniffed."
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
"quoting": null,
"doublequote": false,
"escape": null,
"compression": null,
"header-row": false
},
"initial_hints": {
"escape": null,
"field-delimiter": "\u0001",
"record-terminator": "\u0002",
"header-row": false,
"quoting": null,
"doublequote": false
"doublequote": false,
"header-row": false
},
"notes": "This one is a disaster in terms of what is able to be determined"
"notes": "Escaping is not currently sniffed. We only sniff common newline types. The Python csv.Sniffer module doesn't work with non-standard newline types."
}

0 comments on commit ff786a8

Please sign in to comment.