Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add chunksize param to read_json when lines=True (pandas-dev#17168)
Browse files Browse the repository at this point in the history
louispotok authored and No-Stream committed Nov 28, 2017

Verified

This commit was signed with the committer’s verified signature.
filipw Filip W
1 parent b6265d3 commit c39ad7c
Showing 6 changed files with 383 additions and 88 deletions.
30 changes: 30 additions & 0 deletions asv_bench/benchmarks/io_bench.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from .pandas_vb_common import *
from pandas import concat, Timestamp, compat
try:
@@ -192,3 +193,32 @@ def time_read_nrows(self, compression, engine):
ext = ".bz2"
pd.read_csv(self.big_fname + ext, nrows=10,
compression=compression, engine=engine)


class read_json_lines(object):
goal_time = 0.2
fname = "__test__.json"

def setup(self):
self.N = 100000
self.C = 5
self.df = DataFrame(dict([('float{0}'.format(i), randn(self.N)) for i in range(self.C)]))
self.df.to_json(self.fname,orient="records",lines=True)

def teardown(self):
try:
os.remove(self.fname)
except:
pass

def time_read_json_lines(self):
pd.read_json(self.fname, lines=True)

def time_read_json_lines_chunk(self):
pd.concat(pd.read_json(self.fname, lines=True, chunksize=self.N//4))

def peakmem_read_json_lines(self):
pd.read_json(self.fname, lines=True)

def peakmem_read_json_lines_chunk(self):
pd.concat(pd.read_json(self.fname, lines=True, chunksize=self.N//4))
10 changes: 10 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
@@ -1845,6 +1845,7 @@ is ``None``. To explicitly force ``Series`` parsing, pass ``typ=series``
seconds, milliseconds, microseconds or nanoseconds respectively.
- ``lines`` : reads file as one json object per line.
- ``encoding`` : The encoding to use to decode py3 bytes.
- ``chunksize`` : when used in combination with ``lines=True``, return a JsonReader which reads in ``chunksize`` lines per iteration.

The parser will raise one of ``ValueError/TypeError/AssertionError`` if the JSON is not parseable.

@@ -2049,6 +2050,10 @@ Line delimited json
pandas is able to read and write line-delimited json files that are common in data processing pipelines
using Hadoop or Spark.

.. versionadded:: 0.21.0

For line-delimited json files, pandas can also return an iterator which reads in ``chunksize`` lines at a time. This can be useful for large files or to read from a stream.

.. ipython:: python
jsonl = '''
@@ -2059,6 +2064,11 @@ using Hadoop or Spark.
df
df.to_json(orient='records', lines=True)
# reader is an iterator that returns `chunksize` lines each iteration
reader = pd.read_json(StringIO(jsonl), lines=True, chunksize=1)
reader
for chunk in reader:
print(chunk)
.. _io.table_schema:

1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.21.0.txt
Original file line number Diff line number Diff line change
@@ -162,6 +162,7 @@ Other Enhancements
- :func:`MultiIndex.is_monotonic_decreasing` has been implemented. Previously returned ``False`` in all cases. (:issue:`16554`)
- :func:`Categorical.rename_categories` now accepts a dict-like argument as `new_categories` and only updates the categories found in that dict. (:issue:`17336`)
- :func:`read_excel` raises ``ImportError`` with a better message if ``xlrd`` is not installed. (:issue:`17613`)
- :func:`read_json` now accepts a ``chunksize`` parameter that can be used when ``lines=True``. If ``chunksize`` is passed, read_json now returns an iterator which reads in ``chunksize`` lines with each iteration. (:issue:`17048`)
- :meth:`DataFrame.assign` will preserve the original order of ``**kwargs`` for Python 3.6+ users instead of sorting the column names


215 changes: 174 additions & 41 deletions pandas/io/json/json.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# pylint: disable-msg=E1101,W0613,W0603
from itertools import islice
import os
import numpy as np

@@ -8,8 +9,10 @@
from pandas import compat, isna
from pandas import Series, DataFrame, to_datetime, MultiIndex
from pandas.io.common import (get_filepath_or_buffer, _get_handle,
_stringify_path)
_stringify_path, BaseIterator)
from pandas.io.parsers import _validate_integer
from pandas.core.common import AbstractMethodError
from pandas.core.reshape.concat import concat
from pandas.io.formats.printing import pprint_thing
from .normalize import _convert_to_line_delimits
from .table_schema import build_table_schema
@@ -175,7 +178,7 @@ def write(self):
def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True,
convert_axes=True, convert_dates=True, keep_default_dates=True,
numpy=False, precise_float=False, date_unit=None, encoding=None,
lines=False):
lines=False, chunksize=None):
"""
Convert a JSON string to pandas object
@@ -264,6 +267,16 @@ def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True,
.. versionadded:: 0.19.0
chunksize: integer, default None
Return JsonReader object for iteration.
See the `line-delimted json docs
<http://pandas.pydata.org/pandas-docs/stable/io.html#io-jsonl>`_
for more information on ``chunksize``.
This can only be passed if `lines=True`.
If this is None, the file will be read into memory all at once.
.. versionadded:: 0.21.0
Returns
-------
result : Series or DataFrame, depending on the value of `typ`.
@@ -323,47 +336,167 @@ def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True,

filepath_or_buffer, _, _ = get_filepath_or_buffer(path_or_buf,
encoding=encoding)
if isinstance(filepath_or_buffer, compat.string_types):
try:
exists = os.path.exists(filepath_or_buffer)

# if the filepath is too long will raise here
# 5874
except (TypeError, ValueError):
exists = False

if exists:
fh, handles = _get_handle(filepath_or_buffer, 'r',
encoding=encoding)
json = fh.read()
fh.close()

json_reader = JsonReader(
filepath_or_buffer, orient=orient, typ=typ, dtype=dtype,
convert_axes=convert_axes, convert_dates=convert_dates,
keep_default_dates=keep_default_dates, numpy=numpy,
precise_float=precise_float, date_unit=date_unit, encoding=encoding,
lines=lines, chunksize=chunksize
)

if chunksize:
return json_reader

return json_reader.read()


class JsonReader(BaseIterator):
"""
JsonReader provides an interface for reading in a JSON file.
If initialized with ``lines=True`` and ``chunksize``, can be iterated over
``chunksize`` lines at a time. Otherwise, calling ``read`` reads in the
whole document.
"""
def __init__(self, filepath_or_buffer, orient, typ, dtype, convert_axes,
convert_dates, keep_default_dates, numpy, precise_float,
date_unit, encoding, lines, chunksize):

self.path_or_buf = filepath_or_buffer
self.orient = orient
self.typ = typ
self.dtype = dtype
self.convert_axes = convert_axes
self.convert_dates = convert_dates
self.keep_default_dates = keep_default_dates
self.numpy = numpy
self.precise_float = precise_float
self.date_unit = date_unit
self.encoding = encoding
self.lines = lines
self.chunksize = chunksize
self.nrows_seen = 0
self.should_close = False

if self.chunksize is not None:
self.chunksize = _validate_integer("chunksize", self.chunksize, 1)
if not self.lines:
raise ValueError("chunksize can only be passed if lines=True")

data = self._get_data_from_filepath(filepath_or_buffer)
self.data = self._preprocess_data(data)

def _preprocess_data(self, data):
"""
At this point, the data either has a `read` attribute (e.g. a file
object or a StringIO) or is a string that is a JSON document.
If self.chunksize, we prepare the data for the `__next__` method.
Otherwise, we read it into memory for the `read` method.
"""
if hasattr(data, 'read') and not self.chunksize:
data = data.read()
if not hasattr(data, 'read') and self.chunksize:
data = StringIO(data)

return data

def _get_data_from_filepath(self, filepath_or_buffer):
"""
read_json accepts three input types:
1. filepath (string-like)
2. file-like object (e.g. open file object, StringIO)
3. JSON string
This method turns (1) into (2) to simplify the rest of the processing.
It returns input types (2) and (3) unchanged.
"""

data = filepath_or_buffer

if isinstance(data, compat.string_types):
try:
exists = os.path.exists(filepath_or_buffer)

# gh-5874: if the filepath is too long will raise here
except (TypeError, ValueError):
pass

else:
if exists:
data, _ = _get_handle(filepath_or_buffer, 'r',
encoding=self.encoding)
self.should_close = True
self.open_stream = data

return data

def _combine_lines(self, lines):
"""Combines a list of JSON objects into one JSON object"""
lines = filter(None, map(lambda x: x.strip(), lines))
return '[' + ','.join(lines) + ']'

def read(self):
"""Read the whole JSON input into a pandas object"""
if self.lines and self.chunksize:
obj = concat(self)
elif self.lines:
obj = self._get_object_parser(
self._combine_lines(self.data.split('\n'))
)
else:
json = filepath_or_buffer
elif hasattr(filepath_or_buffer, 'read'):
json = filepath_or_buffer.read()
else:
json = filepath_or_buffer
obj = self._get_object_parser(self.data)
self.close()
return obj

def _get_object_parser(self, json):
"""parses a json document into a pandas object"""
typ = self.typ
dtype = self.dtype
kwargs = {
"orient": self.orient, "dtype": self.dtype,
"convert_axes": self.convert_axes,
"convert_dates": self.convert_dates,
"keep_default_dates": self.keep_default_dates, "numpy": self.numpy,
"precise_float": self.precise_float, "date_unit": self.date_unit
}
obj = None
if typ == 'frame':
obj = FrameParser(json, **kwargs).parse()

if typ == 'series' or obj is None:
if not isinstance(dtype, bool):
dtype = dict(data=dtype)
obj = SeriesParser(json, **kwargs).parse()

return obj

def close(self):
"""
If we opened a stream earlier, in _get_data_from_filepath, we should
close it. If an open stream or file was passed, we leave it open.
"""
if self.should_close:
try:
self.open_stream.close()
except (IOError, AttributeError):
pass

if lines:
# If given a json lines file, we break the string into lines, add
# commas and put it in a json list to make a valid json object.
lines = list(StringIO(json.strip()))
json = '[' + ','.join(lines) + ']'

obj = None
if typ == 'frame':
obj = FrameParser(json, orient, dtype, convert_axes, convert_dates,
keep_default_dates, numpy, precise_float,
date_unit).parse()

if typ == 'series' or obj is None:
if not isinstance(dtype, bool):
dtype = dict(data=dtype)
obj = SeriesParser(json, orient, dtype, convert_axes, convert_dates,
keep_default_dates, numpy, precise_float,
date_unit).parse()

return obj
def __next__(self):
lines = list(islice(self.data, self.chunksize))
if lines:
lines_json = self._combine_lines(lines)
obj = self._get_object_parser(lines_json)

# Make sure that the returned objects have the right index.
obj.index = range(self.nrows_seen, self.nrows_seen + len(obj))
self.nrows_seen += len(obj)

return obj

self.close()
raise StopIteration


class Parser(object):
47 changes: 0 additions & 47 deletions pandas/tests/io/json/test_pandas.py
Original file line number Diff line number Diff line change
@@ -985,53 +985,6 @@ def test_tz_range_is_utc(self):
df = DataFrame({'DT': dti})
assert dumps(df, iso_dates=True) == dfexp

def test_read_jsonl(self):
# GH9180
result = read_json('{"a": 1, "b": 2}\n{"b":2, "a" :1}\n', lines=True)
expected = DataFrame([[1, 2], [1, 2]], columns=['a', 'b'])
assert_frame_equal(result, expected)

def test_read_jsonl_unicode_chars(self):
# GH15132: non-ascii unicode characters
# \u201d == RIGHT DOUBLE QUOTATION MARK

# simulate file handle
json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n'
json = StringIO(json)
result = read_json(json, lines=True)
expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]],
columns=['a', 'b'])
assert_frame_equal(result, expected)

# simulate string
json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n'
result = read_json(json, lines=True)
expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]],
columns=['a', 'b'])
assert_frame_equal(result, expected)

def test_to_jsonl(self):
# GH9180
df = DataFrame([[1, 2], [1, 2]], columns=['a', 'b'])
result = df.to_json(orient="records", lines=True)
expected = '{"a":1,"b":2}\n{"a":1,"b":2}'
assert result == expected

df = DataFrame([["foo}", "bar"], ['foo"', "bar"]], columns=['a', 'b'])
result = df.to_json(orient="records", lines=True)
expected = '{"a":"foo}","b":"bar"}\n{"a":"foo\\"","b":"bar"}'
assert result == expected
assert_frame_equal(pd.read_json(result, lines=True), df)

# GH15096: escaped characters in columns and data
df = DataFrame([["foo\\", "bar"], ['foo"', "bar"]],
columns=["a\\", 'b'])
result = df.to_json(orient="records", lines=True)
expected = ('{"a\\\\":"foo\\\\","b":"bar"}\n'
'{"a\\\\":"foo\\"","b":"bar"}')
assert result == expected
assert_frame_equal(pd.read_json(result, lines=True), df)

def test_latin_encoding(self):
if compat.PY2:
tm.assert_raises_regex(
168 changes: 168 additions & 0 deletions pandas/tests/io/json/test_readlines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# -*- coding: utf-8 -*-
import pytest
import pandas as pd
from pandas import DataFrame, read_json
from pandas.compat import StringIO
from pandas.io.json.json import JsonReader
import pandas.util.testing as tm
from pandas.util.testing import (assert_frame_equal, assert_series_equal,
ensure_clean)


@pytest.fixture
def lines_json_df():
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
return df.to_json(lines=True, orient="records")


def test_read_jsonl():
# GH9180
result = read_json('{"a": 1, "b": 2}\n{"b":2, "a" :1}\n', lines=True)
expected = DataFrame([[1, 2], [1, 2]], columns=['a', 'b'])
assert_frame_equal(result, expected)


def test_read_jsonl_unicode_chars():
# GH15132: non-ascii unicode characters
# \u201d == RIGHT DOUBLE QUOTATION MARK

# simulate file handle
json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n'
json = StringIO(json)
result = read_json(json, lines=True)
expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]],
columns=['a', 'b'])
assert_frame_equal(result, expected)

# simulate string
json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n'
result = read_json(json, lines=True)
expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]],
columns=['a', 'b'])
assert_frame_equal(result, expected)


def test_to_jsonl():
# GH9180
df = DataFrame([[1, 2], [1, 2]], columns=['a', 'b'])
result = df.to_json(orient="records", lines=True)
expected = '{"a":1,"b":2}\n{"a":1,"b":2}'
assert result == expected

df = DataFrame([["foo}", "bar"], ['foo"', "bar"]], columns=['a', 'b'])
result = df.to_json(orient="records", lines=True)
expected = '{"a":"foo}","b":"bar"}\n{"a":"foo\\"","b":"bar"}'
assert result == expected
assert_frame_equal(read_json(result, lines=True), df)

# GH15096: escaped characters in columns and data
df = DataFrame([["foo\\", "bar"], ['foo"', "bar"]],
columns=["a\\", 'b'])
result = df.to_json(orient="records", lines=True)
expected = ('{"a\\\\":"foo\\\\","b":"bar"}\n'
'{"a\\\\":"foo\\"","b":"bar"}')
assert result == expected
assert_frame_equal(read_json(result, lines=True), df)


@pytest.mark.parametrize("chunksize", [1, 1.0])
def test_readjson_chunks(lines_json_df, chunksize):
# Basic test that read_json(chunks=True) gives the same result as
# read_json(chunks=False)
# GH17048: memory usage when lines=True

unchunked = read_json(StringIO(lines_json_df), lines=True)
reader = read_json(StringIO(lines_json_df), lines=True,
chunksize=chunksize)
chunked = pd.concat(reader)

assert_frame_equal(chunked, unchunked)


def test_readjson_chunksize_requires_lines(lines_json_df):
msg = "chunksize can only be passed if lines=True"
with tm.assert_raises_regex(ValueError, msg):
pd.read_json(StringIO(lines_json_df), lines=False, chunksize=2)


def test_readjson_chunks_series():
# Test reading line-format JSON to Series with chunksize param
s = pd.Series({'A': 1, 'B': 2})

strio = StringIO(s.to_json(lines=True, orient="records"))
unchunked = pd.read_json(strio, lines=True, typ='Series')

strio = StringIO(s.to_json(lines=True, orient="records"))
chunked = pd.concat(pd.read_json(
strio, lines=True, typ='Series', chunksize=1
))

assert_series_equal(chunked, unchunked)


def test_readjson_each_chunk(lines_json_df):
# Other tests check that the final result of read_json(chunksize=True)
# is correct. This checks the intermediate chunks.
chunks = list(
pd.read_json(StringIO(lines_json_df), lines=True, chunksize=2)
)
assert chunks[0].shape == (2, 2)
assert chunks[1].shape == (1, 2)


def test_readjson_chunks_from_file():
with ensure_clean('test.json') as path:
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
df.to_json(path, lines=True, orient="records")
chunked = pd.concat(pd.read_json(path, lines=True, chunksize=1))
unchunked = pd.read_json(path, lines=True)
assert_frame_equal(unchunked, chunked)


@pytest.mark.parametrize("chunksize", [None, 1])
def test_readjson_chunks_closes(chunksize):
with ensure_clean('test.json') as path:
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
df.to_json(path, lines=True, orient="records")
reader = JsonReader(
path, orient=None, typ="frame", dtype=True, convert_axes=True,
convert_dates=True, keep_default_dates=True, numpy=False,
precise_float=False, date_unit=None, encoding=None,
lines=True, chunksize=chunksize)
reader.read()
assert reader.open_stream.closed, "didn't close stream with \
chunksize = %s" % chunksize


@pytest.mark.parametrize("chunksize", [0, -1, 2.2, "foo"])
def test_readjson_invalid_chunksize(lines_json_df, chunksize):
msg = r"'chunksize' must be an integer >=1"

with tm.assert_raises_regex(ValueError, msg):
pd.read_json(StringIO(lines_json_df), lines=True,
chunksize=chunksize)


@pytest.mark.parametrize("chunksize", [None, 1, 2])
def test_readjson_chunks_multiple_empty_lines(chunksize):
j = """
{"A":1,"B":4}
{"A":2,"B":5}
{"A":3,"B":6}
"""
orig = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
test = pd.read_json(j, lines=True, chunksize=chunksize)
if chunksize is not None:
test = pd.concat(test)
tm.assert_frame_equal(orig, test, obj="chunksize: %s" % chunksize)

0 comments on commit c39ad7c

Please sign in to comment.