Skip to content

Commit

Permalink
Merge pull request #92 from carloshorn/allow_PathLike
Browse files Browse the repository at this point in the history
Allow PathLike objects
  • Loading branch information
mraspaud authored Jan 14, 2021
2 parents adfae04 + 166cbbd commit c86b67e
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 119 deletions.
3 changes: 2 additions & 1 deletion .stickler.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
linters:
flake8:
fixer: true
max-line-length: 120
python: 3
config: setup.cfg
fixers:
enable: true
1 change: 1 addition & 0 deletions pygac/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def filename(self, filepath):
if filepath is None:
self._filename = None
else:
filepath = os.fspath(filepath)
match = self.data_set_pattern.search(filepath)
if match:
self._filename = match.group()
Expand Down
12 changes: 12 additions & 0 deletions pygac/tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import datetime
import unittest
import sys
import os
try:
import mock
except ImportError:
Expand Down Expand Up @@ -55,6 +56,17 @@ def test_filename(self):
filepath = '/path/to/' + filename + '.gz'
self.reader.filename = filepath
self.assertEqual(self.reader.filename, filename)
self.reader.filename = None
self.assertIsNone(self.reader.filename)

class TestPath(os.PathLike):
def __init__(self, path):
self.path = str(path)

def __fspath__(self):
return self.path
self.reader.filename = TestPath(filepath)
self.assertEqual(self.reader.filename, filename)

@unittest.skipIf(sys.version_info.major < 3, "Skipped in python2!")
def test__read_scanlines(self):
Expand Down
81 changes: 43 additions & 38 deletions pygac/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,59 +23,29 @@
"""Test pygac.utils module
"""

import unittest
import io
import gzip
import sys
import numpy as np
try:
from unittest import mock
except ImportError:
import mock

from pygac.utils import (is_file_object, file_opener,
calculate_sun_earth_distance_correction)
import io
import os
import unittest
from unittest import mock

import numpy as np

def _raise_OSError(*args, **kwargs):
raise OSError
from pygac.utils import file_opener, calculate_sun_earth_distance_correction


class TestUtils(unittest.TestCase):
"""Test pygac.utils functions"""

longMessage = True

def test_is_file_object(self):
"""Test is_file_object function."""
# true test
with io.BytesIO(b'file content') as fileobj:
self.assertTrue(is_file_object(fileobj))
# false test
self.assertFalse(is_file_object("test.txt"))
# duck type test

class Duck(object):
def read(self, n):
return n*b'\00'

def seekable(self):
return True

def close(self):
pass
duck = Duck()
self.assertTrue(is_file_object(duck))

@mock.patch('pygac.utils.open', mock.mock_open(read_data='file content'))
@mock.patch('pygac.utils.gzip.open', _raise_OSError)
@mock.patch('pygac.utils.open', mock.MagicMock(return_value=io.BytesIO(b'file content')))
def test_file_opener_1(self):
"""Test if a file is redirected correctly through file_opener."""
with file_opener('path/to/file') as f:
content = f.read()
self.assertEqual(content, 'file content')
self.assertEqual(content, b'file content')

@unittest.skipIf(sys.version_info.major < 3, "Not supported in python2!")
def test_file_opener_2(self):
"""Test file_opener with file objects and compression"""
# prepare test
Expand All @@ -98,6 +68,41 @@ def test_file_opener_2(self):
message = g.read()
self.assertEqual(message, gzip_message_decoded)

@mock.patch('pygac.utils.open', mock.MagicMock(side_effect=FileNotFoundError))
def test_file_opener_3(self):
"""Test file_opener with PathLike object"""
# prepare test
class RawBytes(os.PathLike):
def __init__(self, filename, raw_bytes):
self.filename = str(filename)
self.raw_bytes = raw_bytes

def __fspath__(self):
return self.filename

def open(self):
return io.BytesIO(self.raw_bytes)

filename = '/path/to/file'
file_bytes = b'TestTestTest'
test_pathlike = RawBytes(filename, file_bytes)
with file_opener(test_pathlike) as f:
content = f.read()
self.assertEqual(content, file_bytes)

# test with lazy loading open method (open only in context)
class RawBytesLazy(RawBytes):
def open(self):
self.lazy_opener_mock = mock.MagicMock()
self.lazy_opener_mock.__enter__.return_value = io.BytesIO(self.raw_bytes)
return self.lazy_opener_mock

test_pathlike = RawBytesLazy(filename, file_bytes)
with file_opener(test_pathlike) as f:
content = f.read()
self.assertEqual(content, file_bytes)
test_pathlike.lazy_opener_mock.__exit__.assert_called_once_with(None, None, None)

def test_calculate_sun_earth_distance_correction(self):
"""Test function for the sun distance corretction."""
corr = calculate_sun_earth_distance_correction(3)
Expand Down
101 changes: 22 additions & 79 deletions pygac/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,100 +20,43 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import gzip
import io
import logging
import numpy as np
import sys
from contextlib import contextmanager

LOG = logging.getLogger(__name__)


def is_file_object(filename):
"""Check if the input is a file object.
from contextlib import contextmanager, nullcontext

Args:
filename - object to check
Note:
This method only check if the object implements the
interface of a file object to allow duck types like
gzip.GzipFile instances.
"""
has_close = hasattr(filename, 'close')
has_read = hasattr(filename, 'read')
if hasattr(filename, 'seekable'):
is_seekable = filename.seekable()
else:
is_seekable = False
return has_close and has_read and is_seekable
import numpy as np

LOG = logging.getLogger(__name__)

@contextmanager
def _file_opener(file):
"""Open a file depending on the input.

Args:
file - path to file or file object
"""
# open file if necessary
if is_file_object(file):
open_file = file
close = False
else:
open_file = open(file, mode='rb')
close = True
# check if it is a gzip file
def gzip_inspected(open_file):
"""Try to gzip decompress the file object if applicable."""
try:
file_object = gzip.open(open_file)
file_object = gzip.GzipFile(mode='rb', fileobj=open_file)
file_object.read(1)
except OSError:
file_object = open_file
finally:
file_object.seek(0)
# provide file_object with the context
try:
yield file_object
finally:
if close:
file_object.close()
return file_object


@contextmanager
def _file_opener_py2(file):
"""Open a file depending on the input.
Args:
file - path to file
"""
close = True
# check if it is a gzip file
try:
file_object = gzip.open(file)
file_object.read(1)
# Note: in python 2, this is an IOError, but we keep the
# OSError for testing.
except (OSError, IOError):
file_object = open(file, mode='rb')
except TypeError:
# In python 2 gzip.open cannot handle file objects
LOG.debug("Gzip cannot open file objects in python2!")
if is_file_object(file):
file_object = file
close = False
finally:
file_object.seek(0)
# provide file_object with the context
try:
yield file_object
finally:
if close:
file_object.close()


if sys.version_info.major < 3:
file_opener = _file_opener_py2
else:
file_opener = _file_opener
def file_opener(file):
if isinstance(file, io.IOBase) and file.seekable():
# avoid closing the file using nullcontext
open_file = nullcontext(file)
elif hasattr(file, 'open'):
try:
open_file = file.open(mode='rb')
except TypeError:
open_file = file.open()
else:
open_file = open(file, mode='rb')
# set open_file into context in case of lazy loading in __enter__ method.
with open_file as file_object:
yield gzip_inspected(file_object)


def get_absolute_azimuth_angle_diff(sat_azi, sun_azi):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@
('gapfilled_tles', ['gapfilled_tles/TLE_noaa16.txt'])],
test_suite="pygac.tests.suite",
tests_require=[],
python_requires='>=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*',
python_requires='>=3.6',
zip_safe=False
)

0 comments on commit c86b67e

Please sign in to comment.