Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update JNI JSON reader column compatability for Spark #13477

Merged
merged 5 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion java/src/main/java/ai/rapids/cudf/Schema.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,6 +75,17 @@ int[] getTypeScales() {
return ret;
}

DType[] getTypes() {
if (types == null) {
return null;
}
DType[] ret = new DType[types.size()];
for (int i = 0; i < types.size(); i++) {
ret[i] = types.get(i);
}
return ret;
Comment on lines +82 to +86
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DType[] ret = new DType[types.size()];
for (int i = 0; i < types.size(); i++) {
ret[i] = types.get(i);
}
return ret;
return types.toArray(new DType[types.size()]);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat. That would pack nicely to:

return types == null ? null : types.toArray(new DType[types.size()]); 

}

public static class Builder {
private final List<String> names = new ArrayList<>();
private final List<DType> types = new ArrayList<>();
Expand Down
62 changes: 53 additions & 9 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ private static native long[] readCSV(String[] columnNames,
byte comment, String[] nullValues,
String[] trueValues, String[] falseValues) throws CudfException;

private static native long[] readJSON(String[] columnNames,
/**
* read JSON data and return a pointer to a TableWithMeta object.
*/
private static native long readJSON(String[] columnNames,
int[] dTypeIds, int[] dTypeScales,
String filePath, long address, long length,
boolean dayFirst, boolean lines) throws CudfException;
Comment on lines 240 to 242
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These parameters are now mis-indented

Expand Down Expand Up @@ -968,6 +971,42 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer) {
return readJSON(schema, opts, buffer, 0, buffer.length);
}

private static Table gatherJSONColumns(Schema schema, TableWithMeta twm) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a comment on this method explaining it will release and close the underlying table of twm as a side-effect.

String[] neededColumns = schema.getColumnNames();
if (neededColumns == null || neededColumns.length == 0) {
return twm.releaseTable();
} else {
String[] foundNames = twm.getColumnNames();
HashMap<String, Integer> indices = new HashMap<>();
for (int i = 0; i < foundNames.length; i++) {
indices.put(foundNames[i], i);
}
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
// We might need to rearrange the columns to match what we want.
DType[] types = schema.getTypes();
ColumnVector[] columns = new ColumnVector[neededColumns.length];
try (Table tbl = twm.releaseTable()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Seems like this should be the first thing in the method so no matter what happens (NPE, whatever) we're closing the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableWithMetadata will close the table if we don't pull it out first.

for (int i = 0; i < columns.length; i++) {
String neededColumnName = neededColumns[i];
Integer index = indices.get(neededColumnName);
if (index != null) {
columns[i] = tbl.getColumn(index).incRefCount();
} else {
try (Scalar s = Scalar.fromNull(types[i])) {
columns[i] = ColumnVector.fromScalar(s, (int)tbl.getRowCount());
}
}
}
return new Table(columns);
} finally {
for (ColumnVector c: columns) {
if (c != null) {
c.close();
}
}
}
}
}

/**
* Read a JSON file.
* @param schema the schema of the file. You may use Schema.INFERRED to infer the schema.
Expand All @@ -976,11 +1015,14 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer) {
* @return the file parsed as a table on the GPU.
*/
public static Table readJSON(Schema schema, JSONOptions opts, File path) {
return new Table(
readJSON(schema.getColumnNames(), schema.getTypeIds(), schema.getTypeScales(),
path.getAbsolutePath(),
0, 0,
opts.isDayFirst(), opts.isLines()));
try (TableWithMeta twm = new TableWithMeta(
readJSON(schema.getColumnNames(), schema.getTypeIds(), schema.getTypeScales(),
path.getAbsolutePath(),
0, 0,
opts.isDayFirst(), opts.isLines()))) {

return gatherJSONColumns(schema, twm);
}
}

/**
Expand Down Expand Up @@ -1043,9 +1085,11 @@ public static Table readJSON(Schema schema, JSONOptions opts, HostMemoryBuffer b
assert len > 0;
assert len <= buffer.length - offset;
assert offset >= 0 && offset < buffer.length;
return new Table(readJSON(schema.getColumnNames(), schema.getTypeIds(), schema.getTypeScales(),
null, buffer.getAddress() + offset, len,
opts.isDayFirst(), opts.isLines()));
try (TableWithMeta twm = new TableWithMeta(readJSON(schema.getColumnNames(),
schema.getTypeIds(), schema.getTypeScales(), null,
buffer.getAddress() + offset, len, opts.isDayFirst(), opts.isLines()))) {
return gatherJSONColumns(schema, twm);
}
}

/**
Expand Down
61 changes: 12 additions & 49 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1410,20 +1410,19 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_TableWithMeta_releaseTable(JNIE
CATCH_STD(env, nullptr);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(
JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_readJSON(
JNIEnv *env, jclass, jobjectArray col_names, jintArray j_types, jintArray j_scales,
jstring inputfilepath, jlong buffer, jlong buffer_length, jboolean day_first, jboolean lines) {

bool read_buffer = true;
if (buffer == 0) {
JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", NULL);
JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", 0);
read_buffer = false;
} else if (inputfilepath != NULL) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException",
"cannot pass in both a buffer and an inputfilepath", NULL);
"cannot pass in both a buffer and an inputfilepath", 0);
} else if (buffer_length <= 0) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported",
NULL);
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", 0);
}

try {
Expand All @@ -1433,13 +1432,13 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(
cudf::jni::native_jintArray n_scales(env, j_scales);
if (n_types.is_null() != n_scales.is_null()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "types and scales must match null",
NULL);
0);
}
std::vector<cudf::data_type> data_types;
if (!n_types.is_null()) {
if (n_types.size() != n_scales.size()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "types and scales must match size",
NULL);
0);
}
data_types.reserve(n_types.size());
std::transform(n_types.begin(), n_types.end(), n_scales.begin(),
Expand All @@ -1450,8 +1449,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(

cudf::jni::native_jstring filename(env, inputfilepath);
if (!read_buffer && filename.is_empty()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inputfilepath can't be empty",
NULL);
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inputfilepath can't be empty", 0);
}

auto source = read_buffer ? cudf::io::source_info{reinterpret_cast<char *>(buffer),
Expand All @@ -1465,7 +1463,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(
if (!n_col_names.is_null() && data_types.size() > 0) {
if (n_col_names.size() != n_types.size()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException",
"types and column names must match size", NULL);
"types and column names must match size", 0);
}

std::map<std::string, cudf::data_type> map;
Expand All @@ -1481,47 +1479,12 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(
// should infer the types
}

cudf::io::table_with_metadata result = cudf::io::read_json(opts.build());

// there is no need to re-order columns when inferring schema
if (result.metadata.schema_info.empty() || n_col_names.size() <= 0) {
return convert_table_for_return(env, result.tbl);
} else {
// json reader will not return the correct column order,
// so we need to re-order the column of table according to table meta.

// turn name and its index in table into map<name, index>
std::map<std::string, cudf::size_type> m;
std::transform(result.metadata.schema_info.cbegin(), result.metadata.schema_info.cend(),
thrust::make_counting_iterator(0), std::inserter(m, m.end()),
[](auto const &column_info, auto const &index) {
return std::make_pair(column_info.name, index);
});

auto col_names_vec = n_col_names.as_cpp_vector();
std::vector<cudf::size_type> indices;

bool match = true;
for (size_t i = 0; i < col_names_vec.size(); i++) {
if (m.find(col_names_vec[i]) == m.end()) {
match = false;
break;
} else {
indices.push_back(m.at(col_names_vec[i]));
}
}
auto result =
std::make_unique<cudf::io::table_with_metadata>(cudf::io::read_json(opts.build()));

if (!match) {
// can't find some input column names in table meta, return what json reader reads.
return convert_table_for_return(env, result.tbl);
} else {
auto tbv = result.tbl->view().select(std::move(indices));
auto table = std::make_unique<cudf::table>(tbv);
return convert_table_for_return(env, table);
}
}
return reinterpret_cast<jlong>(result.release());
}
CATCH_STD(env, NULL);
CATCH_STD(env, 0);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(
Expand Down