Skip to content

Commit

Permalink
Update JNI JSON reader column compatability for Spark (#13477)
Browse files Browse the repository at this point in the history
This moves the logic to update the columns returned from the JSON reader to java. It also updated the code to be able to deal with requested columns that were not in the data. It is not perfect because it will not work if the input file had no columns at all in it.

```
{}
{}
```

But it fixes issues for a file that has valid columns in it, but none of them are the columns that we requested.

This is a work around for #13473, but is not perfect.

Authors:
  - Robert (Bobby) Evans (https://github.com/revans2)

Approvers:
  - Jason Lowe (https://github.com/jlowe)
  - MithunR (https://github.com/mythrocks)

URL: #13477
  • Loading branch information
revans2 authored Jun 1, 2023
1 parent 8c6c087 commit c32e097
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 59 deletions.
13 changes: 12 additions & 1 deletion java/src/main/java/ai/rapids/cudf/Schema.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,6 +75,17 @@ int[] getTypeScales() {
return ret;
}

DType[] getTypes() {
if (types == null) {
return null;
}
DType[] ret = new DType[types.size()];
for (int i = 0; i < types.size(); i++) {
ret[i] = types.get(i);
}
return ret;
}

public static class Builder {
private final List<String> names = new ArrayList<>();
private final List<DType> types = new ArrayList<>();
Expand Down
62 changes: 53 additions & 9 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ private static native long[] readCSV(String[] columnNames,
byte comment, String[] nullValues,
String[] trueValues, String[] falseValues) throws CudfException;

private static native long[] readJSON(String[] columnNames,
/**
* read JSON data and return a pointer to a TableWithMeta object.
*/
private static native long readJSON(String[] columnNames,
int[] dTypeIds, int[] dTypeScales,
String filePath, long address, long length,
boolean dayFirst, boolean lines) throws CudfException;
Expand Down Expand Up @@ -968,6 +971,42 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer) {
return readJSON(schema, opts, buffer, 0, buffer.length);
}

private static Table gatherJSONColumns(Schema schema, TableWithMeta twm) {
String[] neededColumns = schema.getColumnNames();
if (neededColumns == null || neededColumns.length == 0) {
return twm.releaseTable();
} else {
String[] foundNames = twm.getColumnNames();
HashMap<String, Integer> indices = new HashMap<>();
for (int i = 0; i < foundNames.length; i++) {
indices.put(foundNames[i], i);
}
// We might need to rearrange the columns to match what we want.
DType[] types = schema.getTypes();
ColumnVector[] columns = new ColumnVector[neededColumns.length];
try (Table tbl = twm.releaseTable()) {
for (int i = 0; i < columns.length; i++) {
String neededColumnName = neededColumns[i];
Integer index = indices.get(neededColumnName);
if (index != null) {
columns[i] = tbl.getColumn(index).incRefCount();
} else {
try (Scalar s = Scalar.fromNull(types[i])) {
columns[i] = ColumnVector.fromScalar(s, (int)tbl.getRowCount());
}
}
}
return new Table(columns);
} finally {
for (ColumnVector c: columns) {
if (c != null) {
c.close();
}
}
}
}
}

/**
* Read a JSON file.
* @param schema the schema of the file. You may use Schema.INFERRED to infer the schema.
Expand All @@ -976,11 +1015,14 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer) {
* @return the file parsed as a table on the GPU.
*/
public static Table readJSON(Schema schema, JSONOptions opts, File path) {
return new Table(
readJSON(schema.getColumnNames(), schema.getTypeIds(), schema.getTypeScales(),
path.getAbsolutePath(),
0, 0,
opts.isDayFirst(), opts.isLines()));
try (TableWithMeta twm = new TableWithMeta(
readJSON(schema.getColumnNames(), schema.getTypeIds(), schema.getTypeScales(),
path.getAbsolutePath(),
0, 0,
opts.isDayFirst(), opts.isLines()))) {

return gatherJSONColumns(schema, twm);
}
}

/**
Expand Down Expand Up @@ -1043,9 +1085,11 @@ public static Table readJSON(Schema schema, JSONOptions opts, HostMemoryBuffer b
assert len > 0;
assert len <= buffer.length - offset;
assert offset >= 0 && offset < buffer.length;
return new Table(readJSON(schema.getColumnNames(), schema.getTypeIds(), schema.getTypeScales(),
null, buffer.getAddress() + offset, len,
opts.isDayFirst(), opts.isLines()));
try (TableWithMeta twm = new TableWithMeta(readJSON(schema.getColumnNames(),
schema.getTypeIds(), schema.getTypeScales(), null,
buffer.getAddress() + offset, len, opts.isDayFirst(), opts.isLines()))) {
return gatherJSONColumns(schema, twm);
}
}

/**
Expand Down
61 changes: 12 additions & 49 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1410,20 +1410,19 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_TableWithMeta_releaseTable(JNIE
CATCH_STD(env, nullptr);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(
JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_readJSON(
JNIEnv *env, jclass, jobjectArray col_names, jintArray j_types, jintArray j_scales,
jstring inputfilepath, jlong buffer, jlong buffer_length, jboolean day_first, jboolean lines) {

bool read_buffer = true;
if (buffer == 0) {
JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", NULL);
JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", 0);
read_buffer = false;
} else if (inputfilepath != NULL) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException",
"cannot pass in both a buffer and an inputfilepath", NULL);
"cannot pass in both a buffer and an inputfilepath", 0);
} else if (buffer_length <= 0) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported",
NULL);
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", 0);
}

try {
Expand All @@ -1433,13 +1432,13 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(
cudf::jni::native_jintArray n_scales(env, j_scales);
if (n_types.is_null() != n_scales.is_null()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "types and scales must match null",
NULL);
0);
}
std::vector<cudf::data_type> data_types;
if (!n_types.is_null()) {
if (n_types.size() != n_scales.size()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "types and scales must match size",
NULL);
0);
}
data_types.reserve(n_types.size());
std::transform(n_types.begin(), n_types.end(), n_scales.begin(),
Expand All @@ -1450,8 +1449,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(

cudf::jni::native_jstring filename(env, inputfilepath);
if (!read_buffer && filename.is_empty()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inputfilepath can't be empty",
NULL);
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inputfilepath can't be empty", 0);
}

auto source = read_buffer ? cudf::io::source_info{reinterpret_cast<char *>(buffer),
Expand All @@ -1465,7 +1463,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(
if (!n_col_names.is_null() && data_types.size() > 0) {
if (n_col_names.size() != n_types.size()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException",
"types and column names must match size", NULL);
"types and column names must match size", 0);
}

std::map<std::string, cudf::data_type> map;
Expand All @@ -1481,47 +1479,12 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readJSON(
// should infer the types
}

cudf::io::table_with_metadata result = cudf::io::read_json(opts.build());

// there is no need to re-order columns when inferring schema
if (result.metadata.schema_info.empty() || n_col_names.size() <= 0) {
return convert_table_for_return(env, result.tbl);
} else {
// json reader will not return the correct column order,
// so we need to re-order the column of table according to table meta.

// turn name and its index in table into map<name, index>
std::map<std::string, cudf::size_type> m;
std::transform(result.metadata.schema_info.cbegin(), result.metadata.schema_info.cend(),
thrust::make_counting_iterator(0), std::inserter(m, m.end()),
[](auto const &column_info, auto const &index) {
return std::make_pair(column_info.name, index);
});

auto col_names_vec = n_col_names.as_cpp_vector();
std::vector<cudf::size_type> indices;

bool match = true;
for (size_t i = 0; i < col_names_vec.size(); i++) {
if (m.find(col_names_vec[i]) == m.end()) {
match = false;
break;
} else {
indices.push_back(m.at(col_names_vec[i]));
}
}
auto result =
std::make_unique<cudf::io::table_with_metadata>(cudf::io::read_json(opts.build()));

if (!match) {
// can't find some input column names in table meta, return what json reader reads.
return convert_table_for_return(env, result.tbl);
} else {
auto tbv = result.tbl->view().select(std::move(indices));
auto table = std::make_unique<cudf::table>(tbv);
return convert_table_for_return(env, table);
}
}
return reinterpret_cast<jlong>(result.release());
}
CATCH_STD(env, NULL);
CATCH_STD(env, 0);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(
Expand Down

0 comments on commit c32e097

Please sign in to comment.