Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunksize param to read_json when lines=True #17168

Merged
merged 62 commits into from
Sep 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
4cd506e
Add chunksize param to read_json when lines=True
louispotok Jul 24, 2017
9fe44f1
Add read_json chunksize change to whatsnew
louispotok Aug 4, 2017
6de3a27
Add versionadded to docstring
louispotok Aug 4, 2017
e235c70
add docstring for _read_json_as_lines
louispotok Aug 4, 2017
0a5a8f9
add basic read_json chunksize test
louispotok Aug 4, 2017
ce23444
validate read_json chunksize is an integer and >=1
louispotok Aug 4, 2017
a97ca0b
Add more tests to read_json chunksize
louispotok Aug 4, 2017
2861d0e
Return JsonLineReader from read_json
louispotok Aug 13, 2017
da59b4a
Raise ValueError if chunksize is not None, but not lines
louispotok Aug 13, 2017
4544f82
Add issue number to test docstring and add test.
louispotok Aug 13, 2017
dad7f11
bugfix: raise StopIteration, dont return it
louispotok Aug 13, 2017
bb8b1b6
PEP8 cleanup
louispotok Aug 13, 2017
3e81bba
Bugfixes for chunksize
louispotok Aug 14, 2017
e049d29
add chunksize test for reading from file
louispotok Aug 14, 2017
400d313
pep8 cleanup
louispotok Aug 14, 2017
b756c90
Run chunksize checks before file is opened
louispotok Aug 14, 2017
b71f65b
move strio df in test to fixture
louispotok Sep 10, 2017
d6e86af
Improve read_json chunking tests
louispotok Sep 10, 2017
4d91280
bugfix in read_json tests, remove fixture
louispotok Sep 10, 2017
2474429
JsonLineReader opens and closes filepaths
louispotok Sep 10, 2017
b18b3df
pep8 cleanup
louispotok Sep 12, 2017
4c1d6a6
update whatsnew
louispotok Sep 13, 2017
d589b0b
update docs on read_json chunksize
louispotok Sep 14, 2017
eba45a2
Always use JsonReader in read_json
louispotok Sep 14, 2017
d0ea295
make lines_json_df a fixture
louispotok Sep 14, 2017
bb3182d
remove unneeded concats in tests
louispotok Sep 14, 2017
8cc43ff
parametrize some tests
louispotok Sep 14, 2017
de03462
add __close__ method to JsonReader and use it
louispotok Sep 15, 2017
07b31c7
remove import io in docs
louispotok Sep 15, 2017
7d0642f
move read_json in whatsnew to Other Enhancements
louispotok Sep 15, 2017
398961b
move chunksize and lines validation into JsonReader
louispotok Sep 15, 2017
dfa2967
remove extraneous else
louispotok Sep 15, 2017
b0e4bb0
remove unneccessary cast to list
louispotok Sep 15, 2017
e3197c5
move combine_lines call into read
louispotok Sep 15, 2017
39f9881
remove another extraneous else
louispotok Sep 15, 2017
c2247c3
always close JsonReader
louispotok Sep 15, 2017
46d8a68
add test that read_json closes file correctly
louispotok Sep 15, 2017
066e26d
minor formatting fixups
louispotok Sep 15, 2017
08e8b6c
remove extraneous else
louispotok Sep 15, 2017
1ac6953
add benchmarks for read_json
louispotok Sep 15, 2017
0782df9
update benchmarks
louispotok Sep 15, 2017
014d493
move json_lines tests to io_bench
louispotok Sep 15, 2017
a913d8e
add peakmem for jsonlines
louispotok Sep 15, 2017
ce7aef6
smaller benchmark
louispotok Sep 15, 2017
1dc1526
refactor JsonReader
louispotok Sep 21, 2017
03b6069
add test for reading with multiple empty lines
louispotok Sep 21, 2017
aef6bbc
add support for JSON docs with multiple consecutive newlines
louispotok Sep 21, 2017
30e4043
remove raw_json init param
louispotok Sep 21, 2017
7dae78a
DRY for combining lines
louispotok Sep 21, 2017
fe95445
use floor division in asv bench
louispotok Sep 21, 2017
e41124a
add teardown to asv bench
louispotok Sep 21, 2017
9cfd012
add docs
louispotok Sep 21, 2017
035ca84
pep fixup
louispotok Sep 21, 2017
4c92287
update documentation
louispotok Sep 22, 2017
61178be
simplify JsonReader._preprocess_data
louispotok Sep 22, 2017
a284187
simplify _get_data_from_filepath
louispotok Sep 22, 2017
55170dd
Update read_json tests
louispotok Sep 22, 2017
1d7087d
JsonReader should only close if it opened
louispotok Sep 22, 2017
6a76c55
split out json readlines to sep test class
louispotok Sep 26, 2017
a72411f
add encoding to test_readlines
louispotok Sep 26, 2017
5612934
pep8 cleanup
louispotok Sep 27, 2017
28d1cbe
minor fixups
louispotok Sep 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions asv_bench/benchmarks/io_bench.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from .pandas_vb_common import *
from pandas import concat, Timestamp, compat
try:
Expand Down Expand Up @@ -192,3 +193,32 @@ def time_read_nrows(self, compression, engine):
ext = ".bz2"
pd.read_csv(self.big_fname + ext, nrows=10,
compression=compression, engine=engine)


class read_json_lines(object):
goal_time = 0.2
fname = "__test__.json"

def setup(self):
self.N = 100000
self.C = 5
self.df = DataFrame(dict([('float{0}'.format(i), randn(self.N)) for i in range(self.C)]))
self.df.to_json(self.fname,orient="records",lines=True)

def teardown(self):
try:
os.remove(self.fname)
except:
pass

def time_read_json_lines(self):
pd.read_json(self.fname, lines=True)

def time_read_json_lines_chunk(self):
pd.concat(pd.read_json(self.fname, lines=True, chunksize=self.N//4))

def peakmem_read_json_lines(self):
pd.read_json(self.fname, lines=True)

def peakmem_read_json_lines_chunk(self):
pd.concat(pd.read_json(self.fname, lines=True, chunksize=self.N//4))
10 changes: 10 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1845,6 +1845,7 @@ is ``None``. To explicitly force ``Series`` parsing, pass ``typ=series``
seconds, milliseconds, microseconds or nanoseconds respectively.
- ``lines`` : reads file as one json object per line.
- ``encoding`` : The encoding to use to decode py3 bytes.
- ``chunksize`` : when used in combination with ``lines=True``, return a JsonReader which reads in ``chunksize`` lines per iteration.

The parser will raise one of ``ValueError/TypeError/AssertionError`` if the JSON is not parseable.

Expand Down Expand Up @@ -2049,6 +2050,10 @@ Line delimited json
pandas is able to read and write line-delimited json files that are common in data processing pipelines
using Hadoop or Spark.

.. versionadded:: 0.21.0

For line-delimited json files, pandas can also return an iterator which reads in ``chunksize`` lines at a time. This can be useful for large files or to read from a stream.

.. ipython:: python

jsonl = '''
Expand All @@ -2059,6 +2064,11 @@ using Hadoop or Spark.
df
df.to_json(orient='records', lines=True)

# reader is an iterator that returns `chunksize` lines each iteration
reader = pd.read_json(StringIO(jsonl), lines=True, chunksize=1)
reader
for chunk in reader:
print(chunk)

.. _io.table_schema:

Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.21.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Other Enhancements
- :func:`MultiIndex.is_monotonic_decreasing` has been implemented. Previously returned ``False`` in all cases. (:issue:`16554`)
- :func:`Categorical.rename_categories` now accepts a dict-like argument as `new_categories` and only updates the categories found in that dict. (:issue:`17336`)
- :func:`read_excel` raises ``ImportError`` with a better message if ``xlrd`` is not installed. (:issue:`17613`)
- :func:`read_json` now accepts a ``chunksize`` parameter that can be used when ``lines=True``. If ``chunksize`` is passed, read_json now returns an iterator which reads in ``chunksize`` lines with each iteration. (:issue:`17048`)
- :meth:`DataFrame.assign` will preserve the original order of ``**kwargs`` for Python 3.6+ users instead of sorting the column names


Expand Down
215 changes: 174 additions & 41 deletions pandas/io/json/json.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# pylint: disable-msg=E1101,W0613,W0603
from itertools import islice
import os
import numpy as np

Expand All @@ -8,8 +9,10 @@
from pandas import compat, isna
from pandas import Series, DataFrame, to_datetime, MultiIndex
from pandas.io.common import (get_filepath_or_buffer, _get_handle,
_stringify_path)
_stringify_path, BaseIterator)
from pandas.io.parsers import _validate_integer
from pandas.core.common import AbstractMethodError
from pandas.core.reshape.concat import concat
from pandas.io.formats.printing import pprint_thing
from .normalize import _convert_to_line_delimits
from .table_schema import build_table_schema
Expand Down Expand Up @@ -175,7 +178,7 @@ def write(self):
def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True,
convert_axes=True, convert_dates=True, keep_default_dates=True,
numpy=False, precise_float=False, date_unit=None, encoding=None,
lines=False):
lines=False, chunksize=None):
"""
Convert a JSON string to pandas object

Expand Down Expand Up @@ -264,6 +267,16 @@ def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True,

.. versionadded:: 0.19.0

chunksize: integer, default None
Return JsonReader object for iteration.
See the `line-delimted json docs
<http://pandas.pydata.org/pandas-docs/stable/io.html#io-jsonl>`_
for more information on ``chunksize``.
This can only be passed if `lines=True`.
If this is None, the file will be read into memory all at once.

.. versionadded:: 0.21.0

Returns
-------
result : Series or DataFrame, depending on the value of `typ`.
Expand Down Expand Up @@ -323,47 +336,167 @@ def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True,

filepath_or_buffer, _, _ = get_filepath_or_buffer(path_or_buf,
encoding=encoding)
if isinstance(filepath_or_buffer, compat.string_types):
try:
exists = os.path.exists(filepath_or_buffer)

# if the filepath is too long will raise here
# 5874
except (TypeError, ValueError):
exists = False

if exists:
fh, handles = _get_handle(filepath_or_buffer, 'r',
encoding=encoding)
json = fh.read()
fh.close()

json_reader = JsonReader(
filepath_or_buffer, orient=orient, typ=typ, dtype=dtype,
convert_axes=convert_axes, convert_dates=convert_dates,
keep_default_dates=keep_default_dates, numpy=numpy,
precise_float=precise_float, date_unit=date_unit, encoding=encoding,
lines=lines, chunksize=chunksize
)

if chunksize:
return json_reader

return json_reader.read()


class JsonReader(BaseIterator):
"""
JsonReader provides an interface for reading in a JSON file.

If initialized with ``lines=True`` and ``chunksize``, can be iterated over
``chunksize`` lines at a time. Otherwise, calling ``read`` reads in the
whole document.
"""
def __init__(self, filepath_or_buffer, orient, typ, dtype, convert_axes,
convert_dates, keep_default_dates, numpy, precise_float,
date_unit, encoding, lines, chunksize):

self.path_or_buf = filepath_or_buffer
self.orient = orient
self.typ = typ
self.dtype = dtype
self.convert_axes = convert_axes
self.convert_dates = convert_dates
self.keep_default_dates = keep_default_dates
self.numpy = numpy
self.precise_float = precise_float
self.date_unit = date_unit
self.encoding = encoding
self.lines = lines
self.chunksize = chunksize
self.nrows_seen = 0
self.should_close = False

if self.chunksize is not None:
self.chunksize = _validate_integer("chunksize", self.chunksize, 1)
if not self.lines:
raise ValueError("chunksize can only be passed if lines=True")

data = self._get_data_from_filepath(filepath_or_buffer)
self.data = self._preprocess_data(data)

def _preprocess_data(self, data):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could move _preprocess_data back to __init__

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer it split out, I think it keeps things easier to read. But I can move it back if you like.

"""
At this point, the data either has a `read` attribute (e.g. a file
object or a StringIO) or is a string that is a JSON document.

If self.chunksize, we prepare the data for the `__next__` method.
Otherwise, we read it into memory for the `read` method.
"""
if hasattr(data, 'read') and not self.chunksize:
data = data.read()
if not hasattr(data, 'read') and self.chunksize:
data = StringIO(data)

return data

def _get_data_from_filepath(self, filepath_or_buffer):
"""
read_json accepts three input types:
1. filepath (string-like)
2. file-like object (e.g. open file object, StringIO)
3. JSON string

This method turns (1) into (2) to simplify the rest of the processing.
It returns input types (2) and (3) unchanged.
"""

data = filepath_or_buffer

if isinstance(data, compat.string_types):
try:
exists = os.path.exists(filepath_or_buffer)

# gh-5874: if the filepath is too long will raise here
except (TypeError, ValueError):
pass

else:
if exists:
data, _ = _get_handle(filepath_or_buffer, 'r',
encoding=self.encoding)
self.should_close = True
self.open_stream = data

return data

def _combine_lines(self, lines):
"""Combines a list of JSON objects into one JSON object"""
lines = filter(None, map(lambda x: x.strip(), lines))
return '[' + ','.join(lines) + ']'

def read(self):
"""Read the whole JSON input into a pandas object"""
if self.lines and self.chunksize:
obj = concat(self)
elif self.lines:
obj = self._get_object_parser(
self._combine_lines(self.data.split('\n'))
)
else:
json = filepath_or_buffer
elif hasattr(filepath_or_buffer, 'read'):
json = filepath_or_buffer.read()
else:
json = filepath_or_buffer
obj = self._get_object_parser(self.data)
self.close()
return obj

def _get_object_parser(self, json):
"""parses a json document into a pandas object"""
typ = self.typ
dtype = self.dtype
kwargs = {
"orient": self.orient, "dtype": self.dtype,
"convert_axes": self.convert_axes,
"convert_dates": self.convert_dates,
"keep_default_dates": self.keep_default_dates, "numpy": self.numpy,
"precise_float": self.precise_float, "date_unit": self.date_unit
}
obj = None
if typ == 'frame':
obj = FrameParser(json, **kwargs).parse()

if typ == 'series' or obj is None:
if not isinstance(dtype, bool):
dtype = dict(data=dtype)
obj = SeriesParser(json, **kwargs).parse()

return obj

def close(self):
"""
If we opened a stream earlier, in _get_data_from_filepath, we should
close it. If an open stream or file was passed, we leave it open.
"""
if self.should_close:
try:
self.open_stream.close()
except (IOError, AttributeError):
pass

if lines:
# If given a json lines file, we break the string into lines, add
# commas and put it in a json list to make a valid json object.
lines = list(StringIO(json.strip()))
json = '[' + ','.join(lines) + ']'

obj = None
if typ == 'frame':
obj = FrameParser(json, orient, dtype, convert_axes, convert_dates,
keep_default_dates, numpy, precise_float,
date_unit).parse()

if typ == 'series' or obj is None:
if not isinstance(dtype, bool):
dtype = dict(data=dtype)
obj = SeriesParser(json, orient, dtype, convert_axes, convert_dates,
keep_default_dates, numpy, precise_float,
date_unit).parse()

return obj
def __next__(self):
lines = list(islice(self.data, self.chunksize))
if lines:
lines_json = self._combine_lines(lines)
obj = self._get_object_parser(lines_json)

# Make sure that the returned objects have the right index.
obj.index = range(self.nrows_seen, self.nrows_seen + len(obj))
self.nrows_seen += len(obj)

return obj

self.close()
raise StopIteration


class Parser(object):
Expand Down
47 changes: 0 additions & 47 deletions pandas/tests/io/json/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,53 +985,6 @@ def test_tz_range_is_utc(self):
df = DataFrame({'DT': dti})
assert dumps(df, iso_dates=True) == dfexp

def test_read_jsonl(self):
# GH9180
result = read_json('{"a": 1, "b": 2}\n{"b":2, "a" :1}\n', lines=True)
expected = DataFrame([[1, 2], [1, 2]], columns=['a', 'b'])
assert_frame_equal(result, expected)

def test_read_jsonl_unicode_chars(self):
# GH15132: non-ascii unicode characters
# \u201d == RIGHT DOUBLE QUOTATION MARK

# simulate file handle
json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n'
json = StringIO(json)
result = read_json(json, lines=True)
expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]],
columns=['a', 'b'])
assert_frame_equal(result, expected)

# simulate string
json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n'
result = read_json(json, lines=True)
expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]],
columns=['a', 'b'])
assert_frame_equal(result, expected)

def test_to_jsonl(self):
# GH9180
df = DataFrame([[1, 2], [1, 2]], columns=['a', 'b'])
result = df.to_json(orient="records", lines=True)
expected = '{"a":1,"b":2}\n{"a":1,"b":2}'
assert result == expected

df = DataFrame([["foo}", "bar"], ['foo"', "bar"]], columns=['a', 'b'])
result = df.to_json(orient="records", lines=True)
expected = '{"a":"foo}","b":"bar"}\n{"a":"foo\\"","b":"bar"}'
assert result == expected
assert_frame_equal(pd.read_json(result, lines=True), df)

# GH15096: escaped characters in columns and data
df = DataFrame([["foo\\", "bar"], ['foo"', "bar"]],
columns=["a\\", 'b'])
result = df.to_json(orient="records", lines=True)
expected = ('{"a\\\\":"foo\\\\","b":"bar"}\n'
'{"a\\\\":"foo\\"","b":"bar"}')
assert result == expected
assert_frame_equal(pd.read_json(result, lines=True), df)

def test_latin_encoding(self):
if compat.PY2:
tm.assert_raises_regex(
Expand Down
Loading