Skip to content

Commit

Permalink
Migrate JSON reader to pylibcudf (#15966)
Browse files Browse the repository at this point in the history
Switches the JSON reader to use pylibcudf.
xref #15162

Authors:
  - Thomas Li (https://github.com/lithomas1)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #15966
  • Loading branch information
lithomas1 authored Jul 8, 2024
1 parent 6169ee1 commit 036e0ef
Show file tree
Hide file tree
Showing 18 changed files with 674 additions and 101 deletions.
4 changes: 4 additions & 0 deletions python/cudf/cudf/_lib/io/utils.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ cdef source_info make_source_info(list src) except*
cdef sink_info make_sinks_info(
list src, vector[unique_ptr[data_sink]] & data) except*
cdef sink_info make_sink_info(src, unique_ptr[data_sink] & data) except*
cdef add_df_col_struct_names(
df,
child_names_dict
)
cdef update_struct_field_names(
table,
vector[column_name_info]& schema_info)
Expand Down
27 changes: 27 additions & 0 deletions python/cudf/cudf/_lib/io/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,37 @@ cdef cppclass iobase_data_sink(data_sink):
return buf.tell()


cdef add_df_col_struct_names(df, child_names_dict):
for name, child_names in child_names_dict.items():
col = df._data[name]

df._data[name] = update_col_struct_field_names(col, child_names)


cdef update_col_struct_field_names(Column col, child_names):
if col.children:
children = list(col.children)
for i, (child, names) in enumerate(zip(children, child_names.values())):
children[i] = update_col_struct_field_names(
child,
names
)
col.set_base_children(tuple(children))

if isinstance(col.dtype, StructDtype):
col = col._rename_fields(
child_names.keys()
)

return col


cdef update_struct_field_names(
table,
vector[column_name_info]& schema_info
):
# Deprecated, remove in favor of add_col_struct_names
# when a reader is ported to pylibcudf
for i, (name, col) in enumerate(table._data.items()):
table._data[name] = update_column_struct_field_names(
col, schema_info[i]
Expand Down
127 changes: 53 additions & 74 deletions python/cudf/cudf/_lib/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,16 @@ import cudf
from cudf.core.buffer import acquire_spill_lock

from libcpp cimport bool
from libcpp.map cimport map
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types
from cudf._lib.io.utils cimport make_source_info, update_struct_field_names
from cudf._lib.pylibcudf.libcudf.io.json cimport (
json_reader_options,
json_recovery_mode_t,
read_json as libcudf_read_json,
schema_element,
)
from cudf._lib.pylibcudf.libcudf.io.types cimport (
compression_type,
table_with_metadata,
)
from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type
from cudf._lib.io.utils cimport add_df_col_struct_names
from cudf._lib.pylibcudf.io.types cimport compression_type
from cudf._lib.pylibcudf.libcudf.io.json cimport json_recovery_mode_t
from cudf._lib.pylibcudf.libcudf.io.types cimport compression_type
from cudf._lib.pylibcudf.libcudf.types cimport data_type, type_id
from cudf._lib.pylibcudf.types cimport DataType
from cudf._lib.types cimport dtype_to_data_type
from cudf._lib.utils cimport data_from_unique_ptr
from cudf._lib.utils cimport data_from_pylibcudf_io

import cudf._lib.pylibcudf as plc

Expand Down Expand Up @@ -62,6 +52,7 @@ cpdef read_json(object filepaths_or_buffers,
# If input data is a JSON string (or StringIO), hold a reference to
# the encoded memoryview externally to ensure the encoded buffer
# isn't destroyed before calling libcudf `read_json()`

for idx in range(len(filepaths_or_buffers)):
if isinstance(filepaths_or_buffers[idx], io.StringIO):
filepaths_or_buffers[idx] = \
Expand All @@ -71,17 +62,7 @@ cpdef read_json(object filepaths_or_buffers,
filepaths_or_buffers[idx] = filepaths_or_buffers[idx].encode()

# Setup arguments
cdef vector[data_type] c_dtypes_list
cdef map[string, schema_element] c_dtypes_schema_map
cdef cudf_io_types.compression_type c_compression
# Determine byte read offsets if applicable
cdef size_type c_range_offset = (
byte_range[0] if byte_range is not None else 0
)
cdef size_type c_range_size = (
byte_range[1] if byte_range is not None else 0
)
cdef bool c_lines = lines

if compression is not None:
if compression == 'gzip':
Expand All @@ -94,56 +75,50 @@ cpdef read_json(object filepaths_or_buffers,
c_compression = cudf_io_types.compression_type.AUTO
else:
c_compression = cudf_io_types.compression_type.NONE
is_list_like_dtypes = False

processed_dtypes = None

if dtype is False:
raise ValueError("False value is unsupported for `dtype`")
elif dtype is not True:
processed_dtypes = []
if isinstance(dtype, abc.Mapping):
for k, v in dtype.items():
c_dtypes_schema_map[str(k).encode()] = \
_get_cudf_schema_element_from_dtype(v)
# Make sure keys are string
k = str(k)
lib_type, child_types = _get_cudf_schema_element_from_dtype(v)
processed_dtypes.append((k, lib_type, child_types))
elif isinstance(dtype, abc.Collection):
is_list_like_dtypes = True
c_dtypes_list.reserve(len(dtype))
for col_dtype in dtype:
c_dtypes_list.push_back(
_get_cudf_data_type_from_dtype(
col_dtype))
processed_dtypes.append(
# Ignore child columns since we cannot specify their dtypes
# when passing a list
_get_cudf_schema_element_from_dtype(col_dtype)[0]
)
else:
raise TypeError("`dtype` must be 'list like' or 'dict'")

cdef json_reader_options opts = move(
json_reader_options.builder(make_source_info(filepaths_or_buffers))
.compression(c_compression)
.lines(c_lines)
.byte_range_offset(c_range_offset)
.byte_range_size(c_range_size)
.recovery_mode(_get_json_recovery_mode(on_bad_lines))
.build()
table_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(filepaths_or_buffers),
processed_dtypes,
c_compression,
lines,
byte_range_offset = byte_range[0] if byte_range is not None else 0,
byte_range_size = byte_range[1] if byte_range is not None else 0,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
)
if is_list_like_dtypes:
opts.set_dtypes(c_dtypes_list)
else:
opts.set_dtypes(c_dtypes_schema_map)

opts.enable_keep_quotes(keep_quotes)
opts.enable_mixed_types_as_string(mixed_types_as_string)
opts.enable_prune_columns(prune_columns)

# Read JSON
cdef cudf_io_types.table_with_metadata c_result

with nogil:
c_result = move(libcudf_read_json(opts))

meta_names = [info.name.decode() for info in c_result.metadata.schema_info]
df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=meta_names
))

update_struct_field_names(df, c_result.metadata.schema_info)
df = cudf.DataFrame._from_data(
*data_from_pylibcudf_io(
table_w_meta
)
)

# Post-processing to add in struct column names
add_df_col_struct_names(df, table_w_meta.child_names)
return df


Expand Down Expand Up @@ -192,28 +167,32 @@ def write_json(
)


cdef schema_element _get_cudf_schema_element_from_dtype(object dtype) except *:
cdef schema_element s_element
cdef data_type lib_type
cdef _get_cudf_schema_element_from_dtype(object dtype) except *:
dtype = cudf.dtype(dtype)
if isinstance(dtype, cudf.CategoricalDtype):
raise NotImplementedError(
"CategoricalDtype as dtype is not yet "
"supported in JSON reader"
)
lib_type = dtype_to_data_type(dtype)
s_element.type = lib_type

lib_type = DataType.from_libcudf(dtype_to_data_type(dtype))
child_types = []

if isinstance(dtype, cudf.StructDtype):
for name, child_type in dtype.fields.items():
s_element.child_types[name.encode()] = \
child_lib_type, grandchild_types = \
_get_cudf_schema_element_from_dtype(child_type)
child_types.append((name, child_lib_type, grandchild_types))
elif isinstance(dtype, cudf.ListDtype):
s_element.child_types["offsets".encode()] = \
_get_cudf_schema_element_from_dtype(cudf.dtype("int32"))
s_element.child_types["element".encode()] = \
child_lib_type, grandchild_types = \
_get_cudf_schema_element_from_dtype(dtype.element_type)

return s_element
child_types = [
("offsets", DataType.from_libcudf(data_type(type_id.INT32)), []),
("element", child_lib_type, grandchild_types)
]

return lib_type, child_types


cdef data_type _get_cudf_data_type_from_dtype(object dtype) except *:
Expand Down
23 changes: 21 additions & 2 deletions python/cudf/cudf/_lib/pylibcudf/io/json.pxd
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp cimport bool

from cudf._lib.pylibcudf.io.types cimport SinkInfo, TableWithMetadata
from cudf._lib.pylibcudf.io.types cimport (
SinkInfo,
SourceInfo,
TableWithMetadata,
compression_type,
)
from cudf._lib.pylibcudf.libcudf.io.json cimport json_recovery_mode_t
from cudf._lib.pylibcudf.libcudf.types cimport size_type


cpdef TableWithMetadata read_json(
SourceInfo source_info,
list dtypes = *,
compression_type compression = *,
bool lines = *,
size_type byte_range_offset = *,
size_type byte_range_size = *,
bool keep_quotes = *,
bool mixed_types_as_string = *,
bool prune_columns = *,
json_recovery_mode_t recovery_mode = *,
)


cpdef void write_json(
SinkInfo sink_info,
TableWithMetadata tbl,
Expand Down
Loading

0 comments on commit 036e0ef

Please sign in to comment.