Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple input files/buffers for read_json #8403

Merged
merged 13 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 32 additions & 20 deletions cpp/src/io/json/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ std::pair<std::vector<std::string>, col_map_ptr_type> reader::impl::get_json_obj
/**
* @brief Ingest input JSON file/buffer, without decompression.
*
* Sets the source_, byte_range_offset_, and byte_range_size_ data members
* Sets the sources_, byte_range_offset_, and byte_range_size_ data members
*
* @param[in] range_offset Number of bytes offset from the start
* @param[in] range_size Bytes to read; use `0` for all remaining data
Expand All @@ -241,14 +241,26 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size)

// Support delayed opening of the file if using memory mapping datasource
// This allows only mapping of a subset of the file if using byte range
if (source_ == nullptr) {
assert(!filepath_.empty());
source_ = datasource::create(filepath_, range_offset, map_range_size);
if (sources_.empty()) {
assert(!filepaths_.empty());
for (const auto &path : filepaths_) {
sources_.emplace_back(datasource::create(path, range_offset, map_range_size));
}
}

if (!source_->is_empty()) {
auto data_size = (map_range_size != 0) ? map_range_size : source_->size();
buffer_ = source_->host_read(range_offset, data_size);
// Iterate through the user defined sources and read the contents into the local buffer
CUDF_EXPECTS(!sources_.empty(), "No sources were defined");
size_t total_source_size = 0;
for (const auto &source : sources_) { total_source_size += source->size(); }
total_source_size = total_source_size - range_offset;

buffer_.resize(total_source_size);
size_t bytes_read = 0;
for (const auto &source : sources_) {
if (!source->is_empty()) {
auto data_size = (map_range_size != 0) ? map_range_size : source->size();
bytes_read += source->host_read(range_offset, data_size, &buffer_[bytes_read]);
}
}

byte_range_offset_ = range_offset;
Expand All @@ -266,17 +278,17 @@ void reader::impl::decompress_input(rmm::cuda_stream_view stream)
{
const auto compression_type =
infer_compression_type(options_.get_compression(),
filepath_,
filepaths_.size() > 0 ? filepaths_[0] : "",
{{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}});
if (compression_type == "none") {
// Do not use the owner vector here to avoid extra copy
uncomp_data_ = reinterpret_cast<const char *>(buffer_->data());
uncomp_size_ = buffer_->size();
uncomp_data_ = reinterpret_cast<const char *>(buffer_.data());
uncomp_size_ = buffer_.size();
} else {
uncomp_data_owner_ = get_uncompressed_data( //
host_span<char const>( //
reinterpret_cast<const char *>(buffer_->data()),
buffer_->size()),
reinterpret_cast<const char *>(buffer_.data()),
buffer_.size()),
compression_type);

uncomp_data_ = uncomp_data_owner_.data();
Expand Down Expand Up @@ -620,12 +632,12 @@ table_with_metadata reader::impl::convert_data_to_table(device_span<uint64_t con
return table_with_metadata{std::make_unique<table>(std::move(out_columns)), metadata_};
}

reader::impl::impl(std::unique_ptr<datasource> source,
std::string filepath,
reader::impl::impl(std::vector<std::unique_ptr<datasource>> &&sources,
std::vector<std::string> const &filepaths,
json_reader_options const &options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr)
: options_(options), mr_(mr), source_(std::move(source)), filepath_(filepath)
: options_(options), mr_(mr), sources_(std::move(sources)), filepaths_(filepaths)
{
CUDF_EXPECTS(options_.is_enabled_lines(), "Only JSON Lines format is currently supported.\n");

Expand All @@ -652,7 +664,7 @@ table_with_metadata reader::impl::read(json_reader_options const &options,
auto range_size = options.get_byte_range_size();

ingest_raw_input(range_offset, range_size);
CUDF_EXPECTS(buffer_ != nullptr, "Ingest failed: input data is null.\n");
CUDF_EXPECTS(buffer_.size() != 0, "Ingest failed: input data is null.\n");

decompress_input(stream);
CUDF_EXPECTS(uncomp_data_ != nullptr, "Ingest failed: uncompressed input data is null.\n");
Expand All @@ -679,10 +691,10 @@ reader::reader(std::vector<std::string> const &filepaths,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr)
{
CUDF_EXPECTS(filepaths.size() == 1, "Only a single source is currently supported.");
// Delay actual instantiation of data source until read to allow for
// partial memory mapping of file using byte ranges
_impl = std::make_unique<impl>(nullptr, filepaths[0], options, stream, mr);
std::vector<std::unique_ptr<datasource>> src = {}; // Empty datasources
_impl = std::make_unique<impl>(std::move(src), filepaths, options, stream, mr);
}

// Forward to implementation
Expand All @@ -691,8 +703,8 @@ reader::reader(std::vector<std::unique_ptr<cudf::io::datasource>> &&sources,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr)
{
CUDF_EXPECTS(sources.size() == 1, "Only a single source is currently supported.");
_impl = std::make_unique<impl>(std::move(sources[0]), "", options, stream, mr);
std::vector<std::string> file_paths = {}; // Empty filepaths
_impl = std::make_unique<impl>(std::move(sources), file_paths, options, stream, mr);
}

// Destructor within this translation unit
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/io/json/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ class reader::impl {

rmm::mr::device_memory_resource *mr_ = nullptr;

std::unique_ptr<datasource> source_;
std::string filepath_;
std::unique_ptr<datasource::buffer> buffer_;
std::vector<std::unique_ptr<datasource>> sources_;
std::vector<std::string> filepaths_;
std::vector<uint8_t> buffer_;

const char *uncomp_data_ = nullptr;
size_t uncomp_size_ = 0;
Expand Down Expand Up @@ -183,8 +183,8 @@ class reader::impl {
/**
* @brief Constructor from a dataset source with reader options.
*/
explicit impl(std::unique_ptr<datasource> source,
std::string filepath,
explicit impl(std::vector<std::unique_ptr<datasource>> &&sources,
std::vector<std::string> const &filepaths,
json_reader_options const &options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr);
Expand Down
39 changes: 37 additions & 2 deletions cpp/tests/io/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ void check_float_column(cudf::column_view const& col,
struct JsonReaderTest : public cudf::test::BaseFixture {
};

/*
TEST_F(JsonReaderTest, BasicJsonLines)
{
std::string data = "[1, 1.1]\n[2, 2.2]\n[3, 3.3]\n";
Expand Down Expand Up @@ -614,7 +615,7 @@ TEST_F(JsonReaderTest, JsonLinesObjectsOutOfOrder)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(2),
cudf::test::strings_column_wrapper({"aaa", "bbb"}));
}

*/
/*
// currently, the json reader is strict about having non-empty input.
TEST_F(JsonReaderTest, EmptyFile) {
Expand Down Expand Up @@ -648,7 +649,7 @@ TEST_F(JsonReaderTest, NoDataFile) {
EXPECT_EQ(0, view.num_columns());
}
*/

/*
TEST_F(JsonReaderTest, ArrowFileSource)
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
{
const std::string fname = temp_env->get_temp_dir() + "ArrowFileSource.csv";
Expand Down Expand Up @@ -860,5 +861,39 @@ TEST_F(JsonReaderTest, ParseOutOfRangeIntegers)
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(input_less_int64_min_append, view.column(8));
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(input_mixed_range_append, view.column(9));
}
*/
TEST_F(JsonReaderTest, JsonLinesMultipleFileInputs)
{
const std::string file1 = temp_env->get_temp_dir() + "JsonLinesFileTest1.json";
std::ofstream outfile(file1, std::ofstream::out);
outfile << "[11, 1.1]\n[22, 2.2]\n";
outfile.close();

const std::string file2 = temp_env->get_temp_dir() + "JsonLinesFileTest2.json";
std::ofstream outfile2(file2, std::ofstream::out);
outfile2 << "[33, 3.3]\n[44, 4.4]";
outfile2.close();

cudf_io::json_reader_options in_options =
cudf_io::json_reader_options::builder(cudf_io::source_info{{file1, file2}}).lines(true);

cudf_io::table_with_metadata result = cudf_io::read_json(in_options);

EXPECT_EQ(result.tbl->num_columns(), 2);
EXPECT_EQ(result.tbl->num_rows(), 4);

EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64);
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64);

EXPECT_EQ(std::string(result.metadata.column_names[0]), "0");
EXPECT_EQ(std::string(result.metadata.column_names[1]), "1");

auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; });

CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0),
int64_wrapper{{11, 22, 33, 44}, validity});
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1),
float64_wrapper{{1.1, 2.2, 3.3, 4.4}, validity});
}

CUDF_TEST_PROGRAM_MAIN()
18 changes: 9 additions & 9 deletions python/cudf/cudf/_lib/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ from cudf._lib.table cimport Table
cimport cudf._lib.cpp.io.types as cudf_io_types


cpdef read_json(object filepath_or_buffer,
cpdef read_json(object filepaths_or_buffers,
object dtype,
bool lines,
object compression,
Expand All @@ -37,16 +37,16 @@ cpdef read_json(object filepath_or_buffer,
cudf.io.json.to_json
"""

# Determine read source
path_or_data = filepath_or_buffer

# If input data is a JSON string (or StringIO), hold a reference to
# the encoded memoryview externally to ensure the encoded buffer
# isn't destroyed before calling libcudf++ `read_json()`
if isinstance(path_or_data, io.StringIO):
path_or_data = path_or_data.read().encode()
elif isinstance(path_or_data, str) and not os.path.isfile(path_or_data):
path_or_data = path_or_data.encode()
for idx in range(len(filepaths_or_buffers)):
if isinstance(filepaths_or_buffers[idx], io.StringIO):
filepaths_or_buffers[idx] = \
filepaths_or_buffers[idx].read().encode()
elif isinstance(filepaths_or_buffers[idx], str) and \
not os.path.isfile(filepaths_or_buffers[idx]):
filepaths_or_buffers[idx] = filepaths_or_buffers[idx].encode()

# Setup arguments
cdef vector[string] c_dtypes
Expand Down Expand Up @@ -95,7 +95,7 @@ cpdef read_json(object filepath_or_buffer,
c_dtypes.push_back(str(col_dtype).encode())

cdef json_reader_options opts = move(
json_reader_options.builder(make_source_info([path_or_data]))
json_reader_options.builder(make_source_info(filepaths_or_buffers))
.dtypes(c_dtypes)
.compression(c_compression)
.lines(c_lines)
Expand Down
64 changes: 49 additions & 15 deletions python/cudf/cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,20 @@
from io import BytesIO, StringIO

import pandas as pd
from fsspec.core import get_fs_token_paths

import cudf
from cudf._lib import json as libjson
from cudf.utils import ioutils
from cudf.utils.dtypes import is_list_like


def _ensure_filesystem(passed_filesystem, path):
if passed_filesystem is None:
return get_fs_token_paths(path[0] if isinstance(path, list) else path)[
0
]
return passed_filesystem


@ioutils.doc_read_json()
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -26,32 +36,56 @@ def read_json(
raise ValueError("cudf engine only supports JSON Lines format")
if engine == "auto":
engine = "cudf" if lines else "pandas"
if engine == "cudf":
# Multiple sources are passed as a list. If a single source is passed,
# wrap it in a list for unified processing downstream.
if not is_list_like(path_or_buf):
path_or_buf = [path_or_buf]

is_single_filepath_or_buffer = ioutils.ensure_single_filepath_or_buffer(
path_or_data=path_or_buf, **kwargs,
)
if not is_single_filepath_or_buffer:
raise NotImplementedError(
"`read_json` does not yet support reading multiple files"
)
filepaths_or_buffers = []
for source in path_or_buf:
if ioutils.is_directory(source, **kwargs):
fs = _ensure_filesystem(passed_filesystem=None, path=source)
source = ioutils.stringify_pathlike(source)
source = fs.sep.join([source, "*.json"])

tmp_source, compression = ioutils.get_filepath_or_buffer(
path_or_data=source,
compression=compression,
iotypes=(BytesIO, StringIO),
**kwargs,
)
if isinstance(tmp_source, list):
filepaths_or_buffers.extend(tmp_source)
else:
filepaths_or_buffers.append(tmp_source)

path_or_buf, compression = ioutils.get_filepath_or_buffer(
path_or_data=path_or_buf,
compression=compression,
iotypes=(BytesIO, StringIO),
**kwargs,
)
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
if engine == "cudf":
return cudf.DataFrame._from_table(
libjson.read_json(
path_or_buf, dtype, lines, compression, byte_range
filepaths_or_buffers, dtype, lines, compression, byte_range
)
)
else:
warnings.warn(
"Using CPU via Pandas to read JSON dataset, this may "
"be GPU accelerated in the future"
)

if not ioutils.ensure_single_filepath_or_buffer(
path_or_data=path_or_buf, **kwargs,
):
raise NotImplementedError(
"`read_json` does not yet support reading "
"multiple files via pandas"
)

path_or_buf, compression = ioutils.get_filepath_or_buffer(
path_or_data=path_or_buf,
compression=compression,
iotypes=(BytesIO, StringIO),
**kwargs,
)

if kwargs.get("orient") == "table":
pd_value = pd.read_json(
path_or_buf,
Expand Down
50 changes: 50 additions & 0 deletions python/cudf/cudf/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,56 @@ def test_json_lines_basic(json_input, engine):
np.testing.assert_array_equal(pd_df[pd_col], cu_df[cu_col].to_array())


@pytest.mark.filterwarnings("ignore:Using CPU")
@pytest.mark.parametrize("engine", ["auto", "cudf"])
def test_json_lines_multiple(tmpdir, json_input, engine):
tmp_file1 = tmpdir.join("MultiInputs1.json")
tmp_file2 = tmpdir.join("MultiInputs2.json")

pdf = pd.read_json(json_input, lines=True)
pdf.to_json(tmp_file1, compression="infer", lines=True, orient="records")
pdf.to_json(tmp_file2, compression="infer", lines=True, orient="records")

cu_df = cudf.read_json([tmp_file1, tmp_file2], engine=engine, lines=True)
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
pd_df = pd.concat([pdf, pdf])

assert all(cu_df.dtypes == ["int64", "int64", "int64"])
for cu_col, pd_col in zip(cu_df.columns, pd_df.columns):
assert str(cu_col) == str(pd_col)
np.testing.assert_array_equal(pd_df[pd_col], cu_df[cu_col].to_array())


@pytest.mark.parametrize("engine", ["auto", "cudf"])
def test_json_read_directory(tmpdir, json_input, engine):
pdf = pd.read_json(json_input, lines=True)
pdf.to_json(
tmpdir.join("MultiInputs1.json"),
compression="infer",
lines=True,
orient="records",
)
pdf.to_json(
tmpdir.join("MultiInputs2.json"),
compression="infer",
lines=True,
orient="records",
)
pdf.to_json(
tmpdir.join("MultiInputs3.json"),
compression="infer",
lines=True,
orient="records",
)

cu_df = cudf.read_json(tmpdir, engine=engine, lines=True)
pd_df = pd.concat([pdf, pdf, pdf])

assert all(cu_df.dtypes == ["int64", "int64", "int64"])
for cu_col, pd_col in zip(cu_df.columns, pd_df.columns):
assert str(cu_col) == str(pd_col)
np.testing.assert_array_equal(pd_df[pd_col], cu_df[cu_col].to_array())


def test_json_lines_byte_range(json_input):
# include the first row and half of the second row
# should parse the first two rows
Expand Down
Loading