From c39ad7c5285b060cad5cc820220b744a03e1758f Mon Sep 17 00:00:00 2001 From: louispotok Date: Thu, 28 Sep 2017 16:42:01 -0700 Subject: [PATCH] Add chunksize param to read_json when lines=True (#17168) closes #17048 --- asv_bench/benchmarks/io_bench.py | 30 ++++ doc/source/io.rst | 10 ++ doc/source/whatsnew/v0.21.0.txt | 1 + pandas/io/json/json.py | 215 ++++++++++++++++++++----- pandas/tests/io/json/test_pandas.py | 47 ------ pandas/tests/io/json/test_readlines.py | 168 +++++++++++++++++++ 6 files changed, 383 insertions(+), 88 deletions(-) create mode 100644 pandas/tests/io/json/test_readlines.py diff --git a/asv_bench/benchmarks/io_bench.py b/asv_bench/benchmarks/io_bench.py index 52064d2cdb8a25..93273955a29b9f 100644 --- a/asv_bench/benchmarks/io_bench.py +++ b/asv_bench/benchmarks/io_bench.py @@ -1,3 +1,4 @@ +import os from .pandas_vb_common import * from pandas import concat, Timestamp, compat try: @@ -192,3 +193,32 @@ def time_read_nrows(self, compression, engine): ext = ".bz2" pd.read_csv(self.big_fname + ext, nrows=10, compression=compression, engine=engine) + + +class read_json_lines(object): + goal_time = 0.2 + fname = "__test__.json" + + def setup(self): + self.N = 100000 + self.C = 5 + self.df = DataFrame(dict([('float{0}'.format(i), randn(self.N)) for i in range(self.C)])) + self.df.to_json(self.fname,orient="records",lines=True) + + def teardown(self): + try: + os.remove(self.fname) + except: + pass + + def time_read_json_lines(self): + pd.read_json(self.fname, lines=True) + + def time_read_json_lines_chunk(self): + pd.concat(pd.read_json(self.fname, lines=True, chunksize=self.N//4)) + + def peakmem_read_json_lines(self): + pd.read_json(self.fname, lines=True) + + def peakmem_read_json_lines_chunk(self): + pd.concat(pd.read_json(self.fname, lines=True, chunksize=self.N//4)) diff --git a/doc/source/io.rst b/doc/source/io.rst index d6abed6e9d1ad6..4eba9687efc58e 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -1845,6 +1845,7 @@ is ``None``. To explicitly force ``Series`` parsing, pass ``typ=series`` seconds, milliseconds, microseconds or nanoseconds respectively. - ``lines`` : reads file as one json object per line. - ``encoding`` : The encoding to use to decode py3 bytes. +- ``chunksize`` : when used in combination with ``lines=True``, return a JsonReader which reads in ``chunksize`` lines per iteration. The parser will raise one of ``ValueError/TypeError/AssertionError`` if the JSON is not parseable. @@ -2049,6 +2050,10 @@ Line delimited json pandas is able to read and write line-delimited json files that are common in data processing pipelines using Hadoop or Spark. +.. versionadded:: 0.21.0 + +For line-delimited json files, pandas can also return an iterator which reads in ``chunksize`` lines at a time. This can be useful for large files or to read from a stream. + .. ipython:: python jsonl = ''' @@ -2059,6 +2064,11 @@ using Hadoop or Spark. df df.to_json(orient='records', lines=True) + # reader is an iterator that returns `chunksize` lines each iteration + reader = pd.read_json(StringIO(jsonl), lines=True, chunksize=1) + reader + for chunk in reader: + print(chunk) .. _io.table_schema: diff --git a/doc/source/whatsnew/v0.21.0.txt b/doc/source/whatsnew/v0.21.0.txt index 50f11c38bae236..d5d508d02cb730 100644 --- a/doc/source/whatsnew/v0.21.0.txt +++ b/doc/source/whatsnew/v0.21.0.txt @@ -162,6 +162,7 @@ Other Enhancements - :func:`MultiIndex.is_monotonic_decreasing` has been implemented. Previously returned ``False`` in all cases. (:issue:`16554`) - :func:`Categorical.rename_categories` now accepts a dict-like argument as `new_categories` and only updates the categories found in that dict. (:issue:`17336`) - :func:`read_excel` raises ``ImportError`` with a better message if ``xlrd`` is not installed. (:issue:`17613`) +- :func:`read_json` now accepts a ``chunksize`` parameter that can be used when ``lines=True``. If ``chunksize`` is passed, read_json now returns an iterator which reads in ``chunksize`` lines with each iteration. (:issue:`17048`) - :meth:`DataFrame.assign` will preserve the original order of ``**kwargs`` for Python 3.6+ users instead of sorting the column names diff --git a/pandas/io/json/json.py b/pandas/io/json/json.py index 5dae6099446d0f..ab74b265b6a067 100644 --- a/pandas/io/json/json.py +++ b/pandas/io/json/json.py @@ -1,4 +1,5 @@ # pylint: disable-msg=E1101,W0613,W0603 +from itertools import islice import os import numpy as np @@ -8,8 +9,10 @@ from pandas import compat, isna from pandas import Series, DataFrame, to_datetime, MultiIndex from pandas.io.common import (get_filepath_or_buffer, _get_handle, - _stringify_path) + _stringify_path, BaseIterator) +from pandas.io.parsers import _validate_integer from pandas.core.common import AbstractMethodError +from pandas.core.reshape.concat import concat from pandas.io.formats.printing import pprint_thing from .normalize import _convert_to_line_delimits from .table_schema import build_table_schema @@ -175,7 +178,7 @@ def write(self): def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True, convert_axes=True, convert_dates=True, keep_default_dates=True, numpy=False, precise_float=False, date_unit=None, encoding=None, - lines=False): + lines=False, chunksize=None): """ Convert a JSON string to pandas object @@ -264,6 +267,16 @@ def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True, .. versionadded:: 0.19.0 + chunksize: integer, default None + Return JsonReader object for iteration. + See the `line-delimted json docs + `_ + for more information on ``chunksize``. + This can only be passed if `lines=True`. + If this is None, the file will be read into memory all at once. + + .. versionadded:: 0.21.0 + Returns ------- result : Series or DataFrame, depending on the value of `typ`. @@ -323,47 +336,167 @@ def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True, filepath_or_buffer, _, _ = get_filepath_or_buffer(path_or_buf, encoding=encoding) - if isinstance(filepath_or_buffer, compat.string_types): - try: - exists = os.path.exists(filepath_or_buffer) - - # if the filepath is too long will raise here - # 5874 - except (TypeError, ValueError): - exists = False - - if exists: - fh, handles = _get_handle(filepath_or_buffer, 'r', - encoding=encoding) - json = fh.read() - fh.close() + + json_reader = JsonReader( + filepath_or_buffer, orient=orient, typ=typ, dtype=dtype, + convert_axes=convert_axes, convert_dates=convert_dates, + keep_default_dates=keep_default_dates, numpy=numpy, + precise_float=precise_float, date_unit=date_unit, encoding=encoding, + lines=lines, chunksize=chunksize + ) + + if chunksize: + return json_reader + + return json_reader.read() + + +class JsonReader(BaseIterator): + """ + JsonReader provides an interface for reading in a JSON file. + + If initialized with ``lines=True`` and ``chunksize``, can be iterated over + ``chunksize`` lines at a time. Otherwise, calling ``read`` reads in the + whole document. + """ + def __init__(self, filepath_or_buffer, orient, typ, dtype, convert_axes, + convert_dates, keep_default_dates, numpy, precise_float, + date_unit, encoding, lines, chunksize): + + self.path_or_buf = filepath_or_buffer + self.orient = orient + self.typ = typ + self.dtype = dtype + self.convert_axes = convert_axes + self.convert_dates = convert_dates + self.keep_default_dates = keep_default_dates + self.numpy = numpy + self.precise_float = precise_float + self.date_unit = date_unit + self.encoding = encoding + self.lines = lines + self.chunksize = chunksize + self.nrows_seen = 0 + self.should_close = False + + if self.chunksize is not None: + self.chunksize = _validate_integer("chunksize", self.chunksize, 1) + if not self.lines: + raise ValueError("chunksize can only be passed if lines=True") + + data = self._get_data_from_filepath(filepath_or_buffer) + self.data = self._preprocess_data(data) + + def _preprocess_data(self, data): + """ + At this point, the data either has a `read` attribute (e.g. a file + object or a StringIO) or is a string that is a JSON document. + + If self.chunksize, we prepare the data for the `__next__` method. + Otherwise, we read it into memory for the `read` method. + """ + if hasattr(data, 'read') and not self.chunksize: + data = data.read() + if not hasattr(data, 'read') and self.chunksize: + data = StringIO(data) + + return data + + def _get_data_from_filepath(self, filepath_or_buffer): + """ + read_json accepts three input types: + 1. filepath (string-like) + 2. file-like object (e.g. open file object, StringIO) + 3. JSON string + + This method turns (1) into (2) to simplify the rest of the processing. + It returns input types (2) and (3) unchanged. + """ + + data = filepath_or_buffer + + if isinstance(data, compat.string_types): + try: + exists = os.path.exists(filepath_or_buffer) + + # gh-5874: if the filepath is too long will raise here + except (TypeError, ValueError): + pass + + else: + if exists: + data, _ = _get_handle(filepath_or_buffer, 'r', + encoding=self.encoding) + self.should_close = True + self.open_stream = data + + return data + + def _combine_lines(self, lines): + """Combines a list of JSON objects into one JSON object""" + lines = filter(None, map(lambda x: x.strip(), lines)) + return '[' + ','.join(lines) + ']' + + def read(self): + """Read the whole JSON input into a pandas object""" + if self.lines and self.chunksize: + obj = concat(self) + elif self.lines: + obj = self._get_object_parser( + self._combine_lines(self.data.split('\n')) + ) else: - json = filepath_or_buffer - elif hasattr(filepath_or_buffer, 'read'): - json = filepath_or_buffer.read() - else: - json = filepath_or_buffer + obj = self._get_object_parser(self.data) + self.close() + return obj + + def _get_object_parser(self, json): + """parses a json document into a pandas object""" + typ = self.typ + dtype = self.dtype + kwargs = { + "orient": self.orient, "dtype": self.dtype, + "convert_axes": self.convert_axes, + "convert_dates": self.convert_dates, + "keep_default_dates": self.keep_default_dates, "numpy": self.numpy, + "precise_float": self.precise_float, "date_unit": self.date_unit + } + obj = None + if typ == 'frame': + obj = FrameParser(json, **kwargs).parse() + + if typ == 'series' or obj is None: + if not isinstance(dtype, bool): + dtype = dict(data=dtype) + obj = SeriesParser(json, **kwargs).parse() + + return obj + + def close(self): + """ + If we opened a stream earlier, in _get_data_from_filepath, we should + close it. If an open stream or file was passed, we leave it open. + """ + if self.should_close: + try: + self.open_stream.close() + except (IOError, AttributeError): + pass - if lines: - # If given a json lines file, we break the string into lines, add - # commas and put it in a json list to make a valid json object. - lines = list(StringIO(json.strip())) - json = '[' + ','.join(lines) + ']' - - obj = None - if typ == 'frame': - obj = FrameParser(json, orient, dtype, convert_axes, convert_dates, - keep_default_dates, numpy, precise_float, - date_unit).parse() - - if typ == 'series' or obj is None: - if not isinstance(dtype, bool): - dtype = dict(data=dtype) - obj = SeriesParser(json, orient, dtype, convert_axes, convert_dates, - keep_default_dates, numpy, precise_float, - date_unit).parse() - - return obj + def __next__(self): + lines = list(islice(self.data, self.chunksize)) + if lines: + lines_json = self._combine_lines(lines) + obj = self._get_object_parser(lines_json) + + # Make sure that the returned objects have the right index. + obj.index = range(self.nrows_seen, self.nrows_seen + len(obj)) + self.nrows_seen += len(obj) + + return obj + + self.close() + raise StopIteration class Parser(object): diff --git a/pandas/tests/io/json/test_pandas.py b/pandas/tests/io/json/test_pandas.py index 671d4248818e40..de4afec883efdb 100644 --- a/pandas/tests/io/json/test_pandas.py +++ b/pandas/tests/io/json/test_pandas.py @@ -985,53 +985,6 @@ def test_tz_range_is_utc(self): df = DataFrame({'DT': dti}) assert dumps(df, iso_dates=True) == dfexp - def test_read_jsonl(self): - # GH9180 - result = read_json('{"a": 1, "b": 2}\n{"b":2, "a" :1}\n', lines=True) - expected = DataFrame([[1, 2], [1, 2]], columns=['a', 'b']) - assert_frame_equal(result, expected) - - def test_read_jsonl_unicode_chars(self): - # GH15132: non-ascii unicode characters - # \u201d == RIGHT DOUBLE QUOTATION MARK - - # simulate file handle - json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n' - json = StringIO(json) - result = read_json(json, lines=True) - expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]], - columns=['a', 'b']) - assert_frame_equal(result, expected) - - # simulate string - json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n' - result = read_json(json, lines=True) - expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]], - columns=['a', 'b']) - assert_frame_equal(result, expected) - - def test_to_jsonl(self): - # GH9180 - df = DataFrame([[1, 2], [1, 2]], columns=['a', 'b']) - result = df.to_json(orient="records", lines=True) - expected = '{"a":1,"b":2}\n{"a":1,"b":2}' - assert result == expected - - df = DataFrame([["foo}", "bar"], ['foo"', "bar"]], columns=['a', 'b']) - result = df.to_json(orient="records", lines=True) - expected = '{"a":"foo}","b":"bar"}\n{"a":"foo\\"","b":"bar"}' - assert result == expected - assert_frame_equal(pd.read_json(result, lines=True), df) - - # GH15096: escaped characters in columns and data - df = DataFrame([["foo\\", "bar"], ['foo"', "bar"]], - columns=["a\\", 'b']) - result = df.to_json(orient="records", lines=True) - expected = ('{"a\\\\":"foo\\\\","b":"bar"}\n' - '{"a\\\\":"foo\\"","b":"bar"}') - assert result == expected - assert_frame_equal(pd.read_json(result, lines=True), df) - def test_latin_encoding(self): if compat.PY2: tm.assert_raises_regex( diff --git a/pandas/tests/io/json/test_readlines.py b/pandas/tests/io/json/test_readlines.py new file mode 100644 index 00000000000000..d14355b07cf204 --- /dev/null +++ b/pandas/tests/io/json/test_readlines.py @@ -0,0 +1,168 @@ +# -*- coding: utf-8 -*- +import pytest +import pandas as pd +from pandas import DataFrame, read_json +from pandas.compat import StringIO +from pandas.io.json.json import JsonReader +import pandas.util.testing as tm +from pandas.util.testing import (assert_frame_equal, assert_series_equal, + ensure_clean) + + +@pytest.fixture +def lines_json_df(): + df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}) + return df.to_json(lines=True, orient="records") + + +def test_read_jsonl(): + # GH9180 + result = read_json('{"a": 1, "b": 2}\n{"b":2, "a" :1}\n', lines=True) + expected = DataFrame([[1, 2], [1, 2]], columns=['a', 'b']) + assert_frame_equal(result, expected) + + +def test_read_jsonl_unicode_chars(): + # GH15132: non-ascii unicode characters + # \u201d == RIGHT DOUBLE QUOTATION MARK + + # simulate file handle + json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n' + json = StringIO(json) + result = read_json(json, lines=True) + expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]], + columns=['a', 'b']) + assert_frame_equal(result, expected) + + # simulate string + json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n' + result = read_json(json, lines=True) + expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]], + columns=['a', 'b']) + assert_frame_equal(result, expected) + + +def test_to_jsonl(): + # GH9180 + df = DataFrame([[1, 2], [1, 2]], columns=['a', 'b']) + result = df.to_json(orient="records", lines=True) + expected = '{"a":1,"b":2}\n{"a":1,"b":2}' + assert result == expected + + df = DataFrame([["foo}", "bar"], ['foo"', "bar"]], columns=['a', 'b']) + result = df.to_json(orient="records", lines=True) + expected = '{"a":"foo}","b":"bar"}\n{"a":"foo\\"","b":"bar"}' + assert result == expected + assert_frame_equal(read_json(result, lines=True), df) + + # GH15096: escaped characters in columns and data + df = DataFrame([["foo\\", "bar"], ['foo"', "bar"]], + columns=["a\\", 'b']) + result = df.to_json(orient="records", lines=True) + expected = ('{"a\\\\":"foo\\\\","b":"bar"}\n' + '{"a\\\\":"foo\\"","b":"bar"}') + assert result == expected + assert_frame_equal(read_json(result, lines=True), df) + + +@pytest.mark.parametrize("chunksize", [1, 1.0]) +def test_readjson_chunks(lines_json_df, chunksize): + # Basic test that read_json(chunks=True) gives the same result as + # read_json(chunks=False) + # GH17048: memory usage when lines=True + + unchunked = read_json(StringIO(lines_json_df), lines=True) + reader = read_json(StringIO(lines_json_df), lines=True, + chunksize=chunksize) + chunked = pd.concat(reader) + + assert_frame_equal(chunked, unchunked) + + +def test_readjson_chunksize_requires_lines(lines_json_df): + msg = "chunksize can only be passed if lines=True" + with tm.assert_raises_regex(ValueError, msg): + pd.read_json(StringIO(lines_json_df), lines=False, chunksize=2) + + +def test_readjson_chunks_series(): + # Test reading line-format JSON to Series with chunksize param + s = pd.Series({'A': 1, 'B': 2}) + + strio = StringIO(s.to_json(lines=True, orient="records")) + unchunked = pd.read_json(strio, lines=True, typ='Series') + + strio = StringIO(s.to_json(lines=True, orient="records")) + chunked = pd.concat(pd.read_json( + strio, lines=True, typ='Series', chunksize=1 + )) + + assert_series_equal(chunked, unchunked) + + +def test_readjson_each_chunk(lines_json_df): + # Other tests check that the final result of read_json(chunksize=True) + # is correct. This checks the intermediate chunks. + chunks = list( + pd.read_json(StringIO(lines_json_df), lines=True, chunksize=2) + ) + assert chunks[0].shape == (2, 2) + assert chunks[1].shape == (1, 2) + + +def test_readjson_chunks_from_file(): + with ensure_clean('test.json') as path: + df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}) + df.to_json(path, lines=True, orient="records") + chunked = pd.concat(pd.read_json(path, lines=True, chunksize=1)) + unchunked = pd.read_json(path, lines=True) + assert_frame_equal(unchunked, chunked) + + +@pytest.mark.parametrize("chunksize", [None, 1]) +def test_readjson_chunks_closes(chunksize): + with ensure_clean('test.json') as path: + df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}) + df.to_json(path, lines=True, orient="records") + reader = JsonReader( + path, orient=None, typ="frame", dtype=True, convert_axes=True, + convert_dates=True, keep_default_dates=True, numpy=False, + precise_float=False, date_unit=None, encoding=None, + lines=True, chunksize=chunksize) + reader.read() + assert reader.open_stream.closed, "didn't close stream with \ + chunksize = %s" % chunksize + + +@pytest.mark.parametrize("chunksize", [0, -1, 2.2, "foo"]) +def test_readjson_invalid_chunksize(lines_json_df, chunksize): + msg = r"'chunksize' must be an integer >=1" + + with tm.assert_raises_regex(ValueError, msg): + pd.read_json(StringIO(lines_json_df), lines=True, + chunksize=chunksize) + + +@pytest.mark.parametrize("chunksize", [None, 1, 2]) +def test_readjson_chunks_multiple_empty_lines(chunksize): + j = """ + + {"A":1,"B":4} + + + + {"A":2,"B":5} + + + + + + + + {"A":3,"B":6} + """ + orig = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}) + test = pd.read_json(j, lines=True, chunksize=chunksize) + if chunksize is not None: + test = pd.concat(test) + tm.assert_frame_equal(orig, test, obj="chunksize: %s" % chunksize)