Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add low memory JSON reader for cudf.pandas #16204

Merged
merged 15 commits into from
Jul 12, 2024
Merged
3 changes: 2 additions & 1 deletion cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ datasource::owning_buffer<rmm::device_uvector<char>> get_record_range_raw_input(
size_t chunk_size = reader_opts.get_byte_range_size();

CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset,
"Invalid offsetting");
"Invalid offsetting",
std::invalid_argument);
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class memory_mapped_source : public file_source {

void map(int fd, size_t offset, size_t size)
{
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file");
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file", std::overflow_error);

// Offset for `mmap()` must be page aligned
_map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1);
Expand Down
148 changes: 117 additions & 31 deletions python/cudf/cudf/_lib/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ from cudf._lib.pylibcudf.libcudf.io.types cimport (
)
from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type
from cudf._lib.types cimport dtype_to_data_type
from cudf._lib.utils cimport data_from_unique_ptr
from cudf._lib.utils cimport (
columns_from_unique_ptr,
data_from_pylibcudf_table,
data_from_unique_ptr,
)

import cudf._lib.pylibcudf as plc
from cudf._lib.concat import concat_columns


cdef json_recovery_mode_t _get_json_recovery_mode(object on_bad_lines):
Expand All @@ -40,25 +45,17 @@ cdef json_recovery_mode_t _get_json_recovery_mode(object on_bad_lines):
else:
raise TypeError(f"Invalid parameter for {on_bad_lines=}")


cpdef read_json(object filepaths_or_buffers,
object dtype,
bool lines,
object compression,
object byte_range,
bool keep_quotes,
bool mixed_types_as_string,
bool prune_columns,
object on_bad_lines):
"""
Cython function to call into libcudf API, see `read_json`.

See Also
--------
cudf.io.json.read_json
cudf.io.json.to_json
"""

cdef json_reader_options _setup_json_reader_options(
object filepaths_or_buffers,
object dtype,
object compression,
bool keep_quotes,
bool mixed_types_as_string,
bool prune_columns,
object on_bad_lines,
bool lines,
size_type byte_range_offset,
size_type byte_range_size):
# If input data is a JSON string (or StringIO), hold a reference to
# the encoded memoryview externally to ensure the encoded buffer
# isn't destroyed before calling libcudf `read_json()`
Expand All @@ -74,14 +71,6 @@ cpdef read_json(object filepaths_or_buffers,
cdef vector[data_type] c_dtypes_list
cdef map[string, schema_element] c_dtypes_schema_map
cdef cudf_io_types.compression_type c_compression
# Determine byte read offsets if applicable
cdef size_type c_range_offset = (
byte_range[0] if byte_range is not None else 0
)
cdef size_type c_range_size = (
byte_range[1] if byte_range is not None else 0
)
cdef bool c_lines = lines

if compression is not None:
if compression == 'gzip':
Expand Down Expand Up @@ -115,9 +104,9 @@ cpdef read_json(object filepaths_or_buffers,
cdef json_reader_options opts = move(
json_reader_options.builder(make_source_info(filepaths_or_buffers))
.compression(c_compression)
.lines(c_lines)
.byte_range_offset(c_range_offset)
.byte_range_size(c_range_size)
.lines(lines)
.byte_range_offset(byte_range_offset)
.byte_range_size(byte_range_size)
.recovery_mode(_get_json_recovery_mode(on_bad_lines))
.build()
)
Expand All @@ -130,6 +119,38 @@ cpdef read_json(object filepaths_or_buffers,
opts.enable_mixed_types_as_string(mixed_types_as_string)
opts.enable_prune_columns(prune_columns)

return opts

cpdef read_json(object filepaths_or_buffers,
object dtype,
bool lines,
object compression,
object byte_range,
bool keep_quotes,
bool mixed_types_as_string,
bool prune_columns,
object on_bad_lines):
"""
Cython function to call into libcudf API, see `read_json`.

See Also
--------
cudf.io.json.read_json
cudf.io.json.to_json
"""
# Determine byte read offsets if applicable
cdef size_type c_range_offset = (
byte_range[0] if byte_range is not None else 0
)
cdef size_type c_range_size = (
byte_range[1] if byte_range is not None else 0
)
cdef json_reader_options opts = _setup_json_reader_options(
filepaths_or_buffers, dtype, compression, keep_quotes,
mixed_types_as_string, prune_columns, on_bad_lines,
lines, c_range_offset, c_range_size
)

# Read JSON
cdef cudf_io_types.table_with_metadata c_result

Expand All @@ -146,6 +167,71 @@ cpdef read_json(object filepaths_or_buffers,

return df

cpdef chunked_read_json(object filepaths_or_buffers,
object dtype,
object compression,
bool keep_quotes,
bool mixed_types_as_string,
bool prune_columns,
object on_bad_lines,
int chunk_size=100_000_000):
"""
Cython function to call into libcudf API, see `read_json`.

See Also
--------
cudf.io.json.read_json
cudf.io.json.to_json
"""
cdef size_type c_range_size = (
chunk_size if chunk_size is not None else 0
)
cdef json_reader_options opts = _setup_json_reader_options(
filepaths_or_buffers, dtype, compression, keep_quotes,
mixed_types_as_string, prune_columns, on_bad_lines,
True, 0, c_range_size
)

# Read JSON
cdef cudf_io_types.table_with_metadata c_result
final_columns = []
meta_names = None
i = 0
while True:
opts.set_byte_range_offset(c_range_size * i)
opts.set_byte_range_size(c_range_size)

try:
with nogil:
c_result = move(libcudf_read_json(opts))
except (ValueError, OverflowError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious if we could use some logic such as _has_next() in PQ chunked reader to break this loop instead of this exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we're catching the exception from a datasource here. The memory mapping is very much an implementation detail.
How expensive would it be to get the total source(s) size? Then we can loop until all of it is read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is expensive, we basically have to seek to the end of file to get the size of all data sources. For remote data-sources it get's complicated to properly perform seek too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious if we could use some logic such as _has_next() in PQ chunked reader to break this loop instead of this exception?

We basically call into libcudf layer for that, is there any such provision for json reader in libcudf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, we already need the file size(s) to read JSON input(s). With the current implementation of the low memory JSON reader, we get the size of each input file for each byte range, so getting the sizes one more time to have a cleaner loop would not add much.

break
if meta_names is None:
meta_names = [info.name.decode() for info in c_result.metadata.schema_info]
new_chunk = columns_from_unique_ptr(move(c_result.tbl))
if len(final_columns) == 0:
final_columns = new_chunk
else:
for col_idx in range(len(meta_names)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming that the concatenation technique here is generally the same as done in the parquet reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

final_columns[col_idx] = concat_columns(
[final_columns[col_idx], new_chunk[col_idx]]
)
# Must drop any residual GPU columns to save memory
new_chunk[col_idx] = None
i += 1
df = cudf.DataFrame._from_data(
*data_from_pylibcudf_table(
plc.Table(
[col.to_pylibcudf(mode="read") for col in final_columns]
),
column_names=meta_names,
index_names=None
)
)
update_struct_field_names(df, c_result.metadata.schema_info)

return df


@acquire_spill_lock()
def write_json(
Expand Down
33 changes: 22 additions & 11 deletions python/cudf/cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,28 @@ def read_json(
else:
filepaths_or_buffers.append(tmp_source)

df = libjson.read_json(
filepaths_or_buffers=filepaths_or_buffers,
dtype=dtype,
lines=lines,
compression=compression,
byte_range=byte_range,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
on_bad_lines=on_bad_lines,
)
if cudf.get_option("mode.pandas_compatible") and lines:
df = libjson.chunked_read_json(
filepaths_or_buffers=filepaths_or_buffers,
dtype=dtype,
compression=compression,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
on_bad_lines=on_bad_lines,
)
else:
df = libjson.read_json(
filepaths_or_buffers=filepaths_or_buffers,
dtype=dtype,
lines=lines,
compression=compression,
byte_range=byte_range,
keep_quotes=keep_quotes,
mixed_types_as_string=mixed_types_as_string,
prune_columns=prune_columns,
on_bad_lines=on_bad_lines,
)
else:
warnings.warn(
"Using CPU via Pandas to read JSON dataset, this may "
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ def test_csv_reader_byte_range_type_corner_case(tmpdir):
).to_csv(fname, chunksize=100000)

byte_range = (2_147_483_648, 0)
with pytest.raises(RuntimeError, match="Offset is past end of file"):
with pytest.raises(OverflowError, match="Offset is past end of file"):
cudf.read_csv(fname, byte_range=byte_range, header=None)


Expand Down
16 changes: 16 additions & 0 deletions python/cudf/cudf/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -1423,3 +1423,19 @@ def test_json_reader_on_bad_lines(on_bad_lines):
orient="records",
on_bad_lines=on_bad_lines,
)


def test_chunked_json_reader():
df = cudf.DataFrame(
{
"a": ["aaaa"] * 9_00_00_00,
"b": list(range(0, 9_00_00_00)),
}
)
buf = BytesIO()
df.to_json(buf, lines=True, orient="records", engine="cudf")
buf.seek(0)
df = df.to_pandas()
with cudf.option_context("mode.pandas_compatible", True):
gdf = cudf.read_json(buf, lines=True)
assert_eq(df, gdf)
Loading