Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use more pylibcudf.io.types enums in cudf._libs #17237

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ from pylibcudf.libcudf.io.csv cimport (
write_csv as cpp_write_csv,
)
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.types cimport compression_type, sink_info
from pylibcudf.libcudf.io.types cimport sink_info
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.io.utils cimport make_sink_info
Expand Down Expand Up @@ -148,13 +148,13 @@ def read_csv(
byte_range = (0, 0)

if compression is None:
c_compression = compression_type.NONE
c_compression = plc.io.types.CompressionType.NONE
else:
compression_map = {
"infer": compression_type.AUTO,
"gzip": compression_type.GZIP,
"bz2": compression_type.BZIP2,
"zip": compression_type.ZIP,
"infer": plc.io.types.CompressionType.AUTO,
"gzip": plc.io.types.CompressionType.GZIP,
"bz2": plc.io.types.CompressionType.BZIP2,
"zip": plc.io.types.CompressionType.ZIP,
}
c_compression = compression_map[compression]

Expand Down
38 changes: 15 additions & 23 deletions python/cudf/cudf/_lib/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ from cudf.core.buffer import acquire_spill_lock

from libcpp cimport bool

cimport pylibcudf.libcudf.io.types as cudf_io_types
from pylibcudf.io.types cimport compression_type
from pylibcudf.libcudf.io.json cimport json_recovery_mode_t
from pylibcudf.libcudf.io.types cimport compression_type
from pylibcudf.libcudf.types cimport data_type, type_id
from pylibcudf.types cimport DataType

Expand All @@ -24,15 +20,6 @@ from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io
import pylibcudf as plc


cdef json_recovery_mode_t _get_json_recovery_mode(object on_bad_lines):
if on_bad_lines.lower() == "error":
return json_recovery_mode_t.FAIL
elif on_bad_lines.lower() == "recover":
return json_recovery_mode_t.RECOVER_WITH_NULL
else:
raise TypeError(f"Invalid parameter for {on_bad_lines=}")


cpdef read_json(object filepaths_or_buffers,
object dtype,
bool lines,
Expand All @@ -41,7 +28,7 @@ cpdef read_json(object filepaths_or_buffers,
bool keep_quotes,
bool mixed_types_as_string,
bool prune_columns,
object on_bad_lines):
str on_bad_lines):
"""
Cython function to call into libcudf API, see `read_json`.

Expand All @@ -64,19 +51,24 @@ cpdef read_json(object filepaths_or_buffers,
filepaths_or_buffers[idx] = filepaths_or_buffers[idx].encode()

# Setup arguments
cdef cudf_io_types.compression_type c_compression

if compression is not None:
if compression == 'gzip':
c_compression = cudf_io_types.compression_type.GZIP
c_compression = plc.io.types.CompressionType.GZIP
elif compression == 'bz2':
c_compression = cudf_io_types.compression_type.BZIP2
c_compression = plc.io.types.CompressionType.BZIP2
elif compression == 'zip':
c_compression = cudf_io_types.compression_type.ZIP
c_compression = plc.io.types.CompressionType.ZIP
else:
c_compression = cudf_io_types.compression_type.AUTO
c_compression = plc.io.types.CompressionType.AUTO
else:
c_compression = plc.io.types.CompressionType.NONE

if on_bad_lines.lower() == "error":
c_on_bad_lines = plc.io.types.JSONRecoveryMode.FAIL
elif on_bad_lines.lower() == "recover":
c_on_bad_lines = plc.io.types.JSONRecoveryMode.RECOVER_WITH_NULL
else:
c_compression = cudf_io_types.compression_type.NONE
raise TypeError(f"Invalid parameter for {on_bad_lines=}")

processed_dtypes = None

Expand Down Expand Up @@ -108,7 +100,7 @@ cpdef read_json(object filepaths_or_buffers,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
recovery_mode = c_on_bad_lines
)
df = cudf.DataFrame._from_data(
*_data_from_columns(
Expand All @@ -130,7 +122,7 @@ cpdef read_json(object filepaths_or_buffers,
keep_quotes = keep_quotes,
mixed_types_as_string = mixed_types_as_string,
prune_columns = prune_columns,
recovery_mode = _get_json_recovery_mode(on_bad_lines)
recovery_mode = c_on_bad_lines
)

df = cudf.DataFrame._from_data(
Expand Down
43 changes: 21 additions & 22 deletions python/cudf/cudf/_lib/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ try:
except ImportError:
import json

cimport pylibcudf.libcudf.io.types as cudf_io_types
cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.orc cimport (
Expand All @@ -26,7 +25,6 @@ from pylibcudf.libcudf.io.orc cimport (
)
from pylibcudf.libcudf.io.types cimport (
column_in_metadata,
compression_type,
sink_info,
table_input_metadata,
)
Expand Down Expand Up @@ -137,22 +135,23 @@ cpdef read_orc(object filepaths_or_buffers,
return data, index


cdef compression_type _get_comp_type(object compression):
def _get_comp_type(object compression):
if compression is None or compression is False:
return compression_type.NONE
return plc.io.types.CompressionType.NONE

compression = str(compression).upper()
if compression == "SNAPPY":
return compression_type.SNAPPY
return plc.io.types.CompressionType.SNAPPY
elif compression == "ZLIB":
return compression_type.ZLIB
return plc.io.types.CompressionType.ZLIB
elif compression == "ZSTD":
return compression_type.ZSTD
return plc.io.types.CompressionType.ZSTD
elif compression == "LZ4":
return compression_type.LZ4
return plc.io.types.CompressionType.LZ4
else:
raise ValueError(f"Unsupported `compression` type {compression}")


cdef tuple _get_index_from_metadata(
vector[map[string, string]] user_data,
object names,
Expand Down Expand Up @@ -210,19 +209,20 @@ cdef tuple _get_index_from_metadata(
range_idx
)

cdef cudf_io_types.statistics_freq _get_orc_stat_freq(object statistics):

def _get_orc_stat_freq(str statistics):
"""
Convert ORC statistics terms to CUDF convention:
- ORC "STRIPE" == CUDF "ROWGROUP"
- ORC "ROWGROUP" == CUDF "PAGE"
"""
statistics = str(statistics).upper()
if statistics == "NONE":
return cudf_io_types.statistics_freq.STATISTICS_NONE
return plc.io.types.StatisticsFreq.STATISTICS_NONE
elif statistics == "STRIPE":
return cudf_io_types.statistics_freq.STATISTICS_ROWGROUP
return plc.io.types.StatisticsFreq.STATISTICS_ROWGROUP
elif statistics == "ROWGROUP":
return cudf_io_types.statistics_freq.STATISTICS_PAGE
return plc.io.types.StatisticsFreq.STATISTICS_PAGE
else:
raise ValueError(f"Unsupported `statistics_freq` type {statistics}")

Expand All @@ -232,7 +232,7 @@ def write_orc(
table,
object path_or_buf,
object compression="snappy",
object statistics="ROWGROUP",
str statistics="ROWGROUP",
object stripe_size_bytes=None,
object stripe_size_rows=None,
object row_index_stride=None,
Expand All @@ -246,7 +246,6 @@ def write_orc(
--------
cudf.read_orc
"""
cdef compression_type compression_ = _get_comp_type(compression)
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)
cdef table_input_metadata tbl_meta
Expand Down Expand Up @@ -289,7 +288,7 @@ def write_orc(
sink_info_c, tv
).metadata(tbl_meta)
.key_value_metadata(move(user_data))
.compression(compression_)
.compression(_get_comp_type(compression))
.enable_statistics(_get_orc_stat_freq(statistics))
.build()
)
Expand Down Expand Up @@ -330,8 +329,8 @@ cdef class ORCWriter:
cdef unique_ptr[orc_chunked_writer] writer
cdef sink_info sink
cdef unique_ptr[data_sink] _data_sink
cdef cudf_io_types.statistics_freq stat_freq
cdef compression_type comp_type
cdef str statistics
cdef object compression
cdef object index
cdef table_input_metadata tbl_meta
cdef object cols_as_map_type
Expand All @@ -343,15 +342,15 @@ cdef class ORCWriter:
object path,
object index=None,
object compression="snappy",
object statistics="ROWGROUP",
str statistics="ROWGROUP",
object cols_as_map_type=None,
object stripe_size_bytes=None,
object stripe_size_rows=None,
object row_index_stride=None):

self.sink = make_sink_info(path, self._data_sink)
self.stat_freq = _get_orc_stat_freq(statistics)
self.comp_type = _get_comp_type(compression)
self.statistics = statistics
self.compression = compression
self.index = index
self.cols_as_map_type = cols_as_map_type \
if cols_as_map_type is None else set(cols_as_map_type)
Expand Down Expand Up @@ -429,8 +428,8 @@ cdef class ORCWriter:
chunked_orc_writer_options.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(move(user_data))
.compression(self.comp_type)
.enable_statistics(self.stat_freq)
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
if self.stripe_size_bytes is not None:
Expand Down
Loading
Loading