diff --git a/build-libcudf.xml b/build-libcudf.xml
index 1be5d49c84..81ffb21c0e 100644
--- a/build-libcudf.xml
+++ b/build-libcudf.xml
@@ -42,6 +42,7 @@
+
diff --git a/build/build-in-docker b/build/build-in-docker
index a3b007cd90..462dfdad3b 100755
--- a/build/build-in-docker
+++ b/build/build-in-docker
@@ -31,7 +31,7 @@ LOCAL_MAVEN_REPO=${LOCAL_MAVEN_REPO:-"$HOME/.m2/repository"}
PER_THREAD_DEFAULT_STREAM=${PER_THREAD_DEFAULT_STREAM:-ON}
USE_GDS=${USE_GDS:-ON}
-IMAGE_NAME="cudf-build:${CUDA_VERSION}-devel-centos7"
+SPARK_IMAGE_NAME="spark-rapids-jni-build:${CUDA_VERSION}-devel-centos7"
WORKSPACE_DIR=/rapids
WORKSPACE_REPODIR="$WORKSPACE_DIR/spark-rapids-jni"
WORKSPACE_MAVEN_REPODIR="$WORKSPACE_DIR/.m2/repository"
@@ -41,10 +41,10 @@ if (( $# == 0 )); then
exit 1
fi
-$DOCKER_CMD build -f $REPODIR/thirdparty/cudf/java/ci/Dockerfile.centos7 \
+$DOCKER_CMD build -f $REPODIR/ci/Dockerfile \
--build-arg CUDA_VERSION=$CUDA_VERSION \
- -t $IMAGE_NAME \
- $REPODIR/thirdparty/cudf/java/ci
+ -t $SPARK_IMAGE_NAME \
+ $REPODIR/build
$DOCKER_CMD run -it -u $(id -u):$(id -g) --rm \
-v "/etc/group:/etc/group:ro" \
@@ -58,7 +58,7 @@ $DOCKER_CMD run -it -u $(id -u):$(id -g) --rm \
-e CUDA_VISIBLE_DEVICES \
-e PARALLEL_LEVEL \
-e VERBOSE \
- $IMAGE_NAME \
+ $SPARK_IMAGE_NAME \
scl enable devtoolset-9 "mvn \
-Dmaven.repo.local=$WORKSPACE_MAVEN_REPODIR \
-DPER_THREAD_DEFAULT_STREAM=$PER_THREAD_DEFAULT_STREAM \
diff --git a/ci/Dockerfile b/ci/Dockerfile
index 546c15ca94..4814ac4b27 100755
--- a/ci/Dockerfile
+++ b/ci/Dockerfile
@@ -39,3 +39,13 @@ RUN cd /usr/local/ && wget --quiet https://github.com/Kitware/CMake/releases/dow
tar zxf cmake-${CMAKE_VERSION}-linux-x86_64.tar.gz && \
rm cmake-${CMAKE_VERSION}-linux-x86_64.tar.gz
ENV PATH /usr/local/cmake-${CMAKE_VERSION}-linux-x86_64/bin:$PATH
+
+## install a version of boost that is needed for arrow/parquet to work
+RUN cd /usr/local && wget https://boostorg.jfrog.io/artifactory/main/release/1.79.0/source/boost_1_79_0.tar.gz && \
+ tar -xzf boost_1_79_0.tar.gz && \
+ rm boost_1_79_0.tar.gz && \
+ cd boost_1_79_0 && \
+ ./bootstrap.sh --prefix=/usr/local && \
+ ./b2 install --prefix=/usr/local --with-filesystem --with-system && \
+ cd /usr/local && \
+ rm -rf boost_1_79_0
diff --git a/src/main/cpp/CMakeLists.txt b/src/main/cpp/CMakeLists.txt
index c6d88aefcd..36f30968d9 100644
--- a/src/main/cpp/CMakeLists.txt
+++ b/src/main/cpp/CMakeLists.txt
@@ -102,9 +102,27 @@ find_library(CUDFJNI_LIB "libcudfjni.a" REQUIRED NO_DEFAULT_PATH
HINTS "${PROJECT_BINARY_DIR}/../libcudfjni"
)
+# parquet
+find_library(PARQUET_LIB "libparquet.a" REQUIRED NO_DEFAULT_PATH
+ HINTS "${PROJECT_BINARY_DIR}/../libcudf-install/lib64"
+)
+
+# Internal parquet headers
+set (GENERATED_PARQUET_INCLUDE
+ "${CUDF_DIR}/cpp/build/_deps/arrow-src/cpp/src/"
+ CACHE STRING "generated parquet thrift headers"
+)
+
+# thrift
+find_library(THRIFT_LIB "libthrift.a" REQUIRED NO_DEFAULT_PATH
+ HINTS "${CUDF_DIR}/cpp/build/_deps/arrow-build/thrift_ep-install/lib/"
+)
+
set(CUDFJNI_INCLUDE_DIRS
"${CUDF_DIR}/java/src/main/native/include"
"${CUDF_DIR}/java/src/main/native/src"
+ "${GENERATED_PARQUET_INCLUDE}"
+ "${CUDF_DIR}/cpp/build/_deps/arrow-build/thrift_ep-install/include/"
)
# ##################################################################################################
@@ -113,6 +131,7 @@ set(CUDFJNI_INCLUDE_DIRS
add_library(
spark_rapids_jni SHARED
src/RowConversionJni.cpp
+ src/NativeParquetJni.cpp
src/row_conversion.cu
)
@@ -159,6 +178,8 @@ target_link_libraries(
-Wl,--whole-archive
${CUDFJNI_LIB}
cudf::cudf
+ ${PARQUET_LIB}
+ ${THRIFT_LIB}
-Wl,--no-whole-archive
cudf::cudf
)
diff --git a/src/main/cpp/src/NativeParquetJni.cpp b/src/main/cpp/src/NativeParquetJni.cpp
new file mode 100644
index 0000000000..83d9e1b525
--- /dev/null
+++ b/src/main/cpp/src/NativeParquetJni.cpp
@@ -0,0 +1,625 @@
+/*
+ * Copyright (c) 2022, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+#include
+#include
+#include
+
+// TCompactProtocol requires some #defines to work right.
+// This came from the parquet code itself...
+#define SIGNED_RIGHT_SHIFT_IS 1
+#define ARITHMETIC_RIGHT_SHIFT 1
+#include
+#include
+#include
+
+#include
+#include
+
+#include "cudf_jni_apis.hpp"
+#include "jni_utils.hpp"
+
+namespace rapids {
+namespace jni {
+
+/**
+ * Convert a string to lower case. It uses std::tolower per character which has limitations
+ * and may not produce the exact same result as the JVM does. This is probably good enough
+ * for now.
+ */
+std::string unicode_to_lower(std::string const& input) {
+ // get the size of the wide character result
+ std::size_t wide_size = std::mbstowcs(nullptr, input.data(), 0);
+ if (wide_size < 0) {
+ throw std::invalid_argument("invalid character sequence");
+ }
+
+ std::vector wide(wide_size + 1);
+ // Set a null so we can get a proper output size from wcstombs. This is because
+ // we pass in a max length of 0, so it will only stop when it see the null character.
+ wide.back() = 0;
+ if (std::mbstowcs(wide.data(), input.data(), wide_size) != wide_size) {
+ throw std::runtime_error("error during wide char converstion");
+ }
+ for (auto wit = wide.begin(); wit != wide.end(); ++wit) {
+ *wit = std::towlower(*wit);
+ }
+ // Get the multi-byte result size
+ std::size_t mb_size = std::wcstombs(nullptr, wide.data(), 0);
+ if (mb_size < 0) {
+ throw std::invalid_argument("unsupported wide character sequence");
+ }
+ // We are allocating a fixed size string so we can put the data directly into it
+ // instead of going through a NUL terminated char* first. The NUL fill char is
+ // just because we need to pass in a fill char. The value does not matter
+ // because it will be overwritten. std::string itself will insert a NUL
+ // terminator on the buffer it allocates internally. We don't need to worry about it.
+ std::string ret(mb_size, '\0');
+ if (std::wcstombs(ret.data(), wide.data(), mb_size) != mb_size) {
+ throw std::runtime_error("error during multibyte char converstion");
+ }
+ return ret;
+}
+
+/**
+ * Holds a set of "maps" that are used to rewrite various parts of the parquet metadata.
+ * Generally each "map" is a gather map that pulls data from an input vector to be placed in
+ * an output vector.
+ */
+struct column_pruning_maps {
+ // gather map for pulling out items from the schema
+ std::vector schema_map;
+ // Each SchemaElement also includes the number of children in it. This allows the vector
+ // to be interpreted as a tree flattened depth first. These are the new values for num
+ // children after the schema is gathered.
+ std::vector schema_num_children;
+ // There are several places where a struct is stored only for a leaf column (like a column chunk)
+ // This holds the gather map for those cases.
+ std::vector chunk_map;
+};
+
+/**
+ * This class will handle processing column pruning for a schema. It is written as a class because
+ * of JNI we are sending the names of the columns as a depth first list, like parquet does internally.
+ */
+class column_pruner {
+public:
+ /**
+ * Create pruning filter from a depth first flattened tree of names and num_children.
+ * The root entry is not included in names or in num_children, but parent_num_children
+ * should hold how many entries there are in it.
+ */
+ column_pruner(const std::vector & names,
+ const std::vector & num_children,
+ int parent_num_children): children(), s_id(0), c_id(-1) {
+ add_depth_first(names, num_children, parent_num_children);
+ }
+
+ column_pruner(int s_id, int c_id): children(), s_id(s_id), c_id(c_id) {
+ }
+
+ column_pruner(): children(), s_id(0), c_id(-1) {
+ }
+
+ /**
+ * Given a schema from a parquet file create a set of pruning maps to prune columns from the rest of the footer
+ */
+ column_pruning_maps filter_schema(std::vector & schema, bool ignore_case) {
+ // The following are all covered by follow on work in https://github.com/NVIDIA/spark-rapids-jni/issues/210
+ // TODO the java code will fail if there is ambiguity in the names and ignore_case is true
+ // so we need to figure that out too.
+ // TODO there are a number of different way to represent a list or a map. We want to support all of them
+ // so we need a way to detect that schema is a list and group the parts we don't care about together.
+ // TODO the java code verifies that the schema matches when it is looking at the columns or it throws
+ // an exception. Sort of, It really just checks that it is a GroupType where it expects to find them
+ //
+ // With all of this in mind I think what we want to do is to pass down a full-ish schema, not just the names,
+ // and the number of children. We need to know if it is a Map, an Array, a Struct or primitive.
+ //
+ // Then when we are walking the tree we need to keep track of if we are looking for a Map, an array or
+ // a struct and match up the SchemaElement entries accordingly as we go.
+ // If we see something that is off we need to throw an exception.
+ //
+ // To be able to handle the duplicates, I think we need to have some state in the column_pruner class
+ // to say if we have matched a leaf node or not.
+ //
+ // From the Parquet spec
+ // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+ //
+ // A repeated field that is neither contained by a LIST- or MAP-annotated group nor annotated by LIST
+ // or MAP should be interpreted as a required list of required elements where the element type is the
+ // type of the field.
+ //
+ // LIST must always annotate a 3-level structure:
+ // group (LIST) {
+ // repeated group list {
+ // element;
+ // }
+ // }
+ // ...
+ // However, these names may not be used in existing data and should not be enforced as errors when reading.
+ // ...
+ // Some existing data does not include the inner element layer. For backward-compatibility, the type of
+ // elements in LIST-annotated structures should always be determined by the following rules:
+ //
+ // 1. If the repeated field is not a group, then its type is the element type and elements are required.
+ // 2. If the repeated field is a group with multiple fields, then its type is the element type and
+ // elements are required.
+ // 3. If the repeated field is a group with one field and is named either array or uses the
+ // LIST-annotated group's name with _tuple appended then the repeated type is the element
+ // type and elements are required.
+ // 4. Otherwise, the repeated field's type is the element type with the repeated field's repetition.
+
+ // MAP is used to annotate types that should be interpreted as a map from keys to values. MAP must
+ // annotate a 3-level structure:
+ //
+ // * The outer-most level must be a group annotated with MAP that contains a single field named
+ // key_value. The repetition of this level must be either optional or required and determines
+ // whether the list is nullable.
+ // * The middle level, named key_value, must be a repeated group with a key field for map keys
+ // and, optionally, a value field for map values.
+ // * The key field encodes the map's key type. This field must have repetition required and must
+ // always be present.
+ // * The value field encodes the map's value type and repetition. This field can be required,
+ // optional, or omitted.
+ //
+ // It is required that the repeated group of key-value pairs is named key_value and that its
+ // fields are named key and value. However, these names may not be used in existing data and
+ // should not be enforced as errors when reading.
+ //
+ // Some existing data incorrectly used MAP_KEY_VALUE in place of MAP. For backward-compatibility,
+ // a group annotated with MAP_KEY_VALUE that is not contained by a MAP-annotated group should be
+ // handled as a MAP-annotated group.
+
+ // Parquet says that the map's value is optional, but Spark looks like it would throw an exception
+ // if it ever actually saw that in practice, so we should too.
+ CUDF_FUNC_RANGE();
+ // The maps are sorted so we can compress the tree...
+ // These are the outputs of the computation
+ std::map> chunk_map;
+ std::map> schema_map;
+ std::map> num_children_map;
+ // Start off with 0 children in the root, will add more as we go
+ schema_map[0] = 0;
+ num_children_map[0] = 0;
+
+ // num_children_stack and tree_stack hold the current state as we walk though schema
+ std::vector num_children_stack;
+ std::vector tree_stack;
+ tree_stack.push_back(this);
+ if (schema.size() == 0) {
+ throw std::invalid_argument("a root schema element must exist");
+ }
+ num_children_stack.push_back(schema[0].num_children);
+
+ uint64_t chunk_index = 0;
+ // We are skipping over the first entry in the schema because it is always the root entry, and
+ // we already processed it
+ for (uint64_t schema_index = 1; schema_index < schema.size(); ++schema_index) {
+ auto schema_item = schema[schema_index];
+ // num_children is optional, but is supposed to be set for non-leaf nodes. That said leaf nodes
+ // will have 0 children so we can just default to that.
+ int num_children = 0;
+ if (schema_item.__isset.num_children) {
+ num_children = schema_item.num_children;
+ }
+ std::string name;
+ if (ignore_case) {
+ name = unicode_to_lower(schema_item.name);
+ } else {
+ name = schema_item.name;
+ }
+ column_pruner * found = nullptr;
+ if (tree_stack.back() != nullptr) {
+ // tree_stack can have a nullptr in it if the schema we are looking through
+ // has an entry that does not match the tree
+ auto found_it = tree_stack.back()->children.find(name);
+ if (found_it != tree_stack.back()->children.end()) {
+ found = &(found_it->second);
+ int parent_mapped_schema_index = tree_stack.back()->s_id;
+ ++num_children_map[parent_mapped_schema_index];
+
+ int mapped_schema_index = found->s_id;
+ schema_map[mapped_schema_index] = schema_index;
+ num_children_map[mapped_schema_index] = 0;
+ }
+ }
+
+ if (schema_item.__isset.type) {
+ // this is a leaf node, it has a primitive type.
+ if (found != nullptr) {
+ int mapped_chunk_index = found->c_id;
+ chunk_map[mapped_chunk_index] = chunk_index;
+ }
+ ++chunk_index;
+ }
+ // else it is a non-leaf node it is group typed
+ // chunks are only for leaf nodes
+
+ // num_children and if the type is set or not should correspond to each other.
+ // By convention in parquet they should, but to be on the safe side I keep them
+ // separate.
+ if (num_children > 0) {
+ tree_stack.push_back(found);
+ num_children_stack.push_back(num_children);
+ } else {
+ // go back up the stack/tree removing children until we hit one with more children
+ bool done = false;
+ while (!done) {
+ int parent_children_left = num_children_stack.back() - 1;
+ if (parent_children_left > 0) {
+ num_children_stack.back() = parent_children_left;
+ done = true;
+ } else {
+ tree_stack.pop_back();
+ num_children_stack.pop_back();
+ }
+
+ if (tree_stack.size() == 0) {
+ done = true;
+ }
+ }
+ }
+ }
+
+ // If there is a column that is missing from this file we need to compress the gather maps
+ // so there are no gaps
+ std::vector final_schema_map;
+ final_schema_map.reserve(schema_map.size());
+ for (auto it = schema_map.begin(); it != schema_map.end(); ++it) {
+ final_schema_map.push_back(it->second);
+ }
+
+ std::vector final_num_children_map;
+ final_num_children_map.reserve(num_children_map.size());
+ for (auto it = num_children_map.begin(); it != num_children_map.end(); ++it) {
+ final_num_children_map.push_back(it->second);
+ }
+
+ std::vector final_chunk_map;
+ final_chunk_map.reserve(chunk_map.size());
+ for (auto it = chunk_map.begin(); it != chunk_map.end(); ++it) {
+ final_chunk_map.push_back(it->second);
+ }
+
+ return column_pruning_maps{std::move(final_schema_map),
+ std::move(final_num_children_map),
+ std::move(final_chunk_map)};
+ }
+
+private:
+
+ void add_depth_first(std::vector const& names,
+ std::vector const& num_children,
+ int parent_num_children) {
+ CUDF_FUNC_RANGE();
+ if (parent_num_children == 0) {
+ // There is no point in doing more the tree is empty, and it lets us avoid some corner cases
+ // in the code below
+ return;
+ }
+ int local_s_id = 0; // There is always a root on the schema
+ int local_c_id = -1; // for columns it is just the leaf nodes
+ auto num = names.size();
+ std::vector tree_stack;
+ std::vector num_children_stack;
+ tree_stack.push_back(this);
+ num_children_stack.push_back(parent_num_children);
+ for(uint64_t i = 0; i < num; ++i) {
+ auto name = names[i];
+ auto num_c = num_children[i];
+ ++local_s_id;
+ int tmp_c_id = -1;
+ if (num_c == 0) {
+ // leaf node...
+ ++local_c_id;
+ tmp_c_id = local_c_id;
+ }
+ tree_stack.back()->children.try_emplace(name, local_s_id, tmp_c_id);
+ if (num_c > 0) {
+ tree_stack.push_back(&tree_stack.back()->children[name]);
+ num_children_stack.push_back(num_c);
+ } else {
+ // go back up the stack/tree removing children until we hit one with more children
+ bool done = false;
+ while (!done) {
+ int parent_children_left = num_children_stack.back() - 1;
+ if (parent_children_left > 0) {
+ num_children_stack.back() = parent_children_left;
+ done = true;
+ } else {
+ tree_stack.pop_back();
+ num_children_stack.pop_back();
+ }
+
+ if (tree_stack.size() <= 0) {
+ done = true;
+ }
+ }
+ }
+ }
+ if (tree_stack.size() != 0 || num_children_stack.size() != 0) {
+ throw std::invalid_argument("DIDN'T CONSUME EVERYTHING...");
+ }
+ }
+
+ std::map children;
+ // The following IDs are the position that they should be in when output in a filtered footer, except
+ // that if there are any missing columns in the actual data the gaps need to be removed.
+ // schema ID
+ int s_id;
+ // Column chunk and Column order ID
+ int c_id;
+};
+
+static bool invalid_file_offset(long start_index, long pre_start_index, long pre_compressed_size) {
+ bool invalid = false;
+ // checking the first rowGroup
+ if (pre_start_index == 0 && start_index != 4) {
+ invalid = true;
+ return invalid;
+ }
+
+ //calculate start index for other blocks
+ int64_t min_start_index = pre_start_index + pre_compressed_size;
+ if (start_index < min_start_index) {
+ // a bad offset detected, try first column's offset
+ // can not use minStartIndex in case of padding
+ invalid = true;
+ }
+
+ return invalid;
+}
+
+static int64_t get_offset(parquet::format::ColumnChunk const& column_chunk) {
+ auto md = column_chunk.meta_data;
+ int64_t offset = md.data_page_offset;
+ if (md.__isset.dictionary_page_offset && offset > md.dictionary_page_offset) {
+ offset = md.dictionary_page_offset;
+ }
+ return offset;
+}
+
+static std::vector filter_groups(parquet::format::FileMetaData const& meta,
+ int64_t part_offset, int64_t part_length) {
+ CUDF_FUNC_RANGE();
+ // This is based off of the java parquet_mr code to find the groups in a range...
+ auto num_row_groups = meta.row_groups.size();
+ int64_t pre_start_index = 0;
+ int64_t pre_compressed_size = 0;
+ bool first_column_with_metadata = true;
+ if (num_row_groups > 0) {
+ first_column_with_metadata = meta.row_groups[0].columns[0].__isset.meta_data;
+ }
+
+ std::vector filtered_groups;
+ for (uint64_t rg_i = 0; rg_i < num_row_groups; ++rg_i) {
+ parquet::format::RowGroup const& row_group = meta.row_groups[rg_i];
+ int64_t total_size = 0;
+ int64_t start_index;
+ auto column_chunk = row_group.columns[0];
+ if (first_column_with_metadata) {
+ start_index = get_offset(column_chunk);
+ } else {
+ //the file_offset of first block always holds the truth, while other blocks don't :
+ //see PARQUET-2078 for details
+ start_index = row_group.file_offset;
+ if (invalid_file_offset(start_index, pre_start_index, pre_compressed_size)) {
+ //first row group's offset is always 4
+ if (pre_start_index == 0) {
+ start_index = 4;
+ } else {
+ // use minStartIndex(imprecise in case of padding, but good enough for filtering)
+ start_index = pre_start_index + pre_compressed_size;
+ }
+ }
+ pre_start_index = start_index;
+ pre_compressed_size = row_group.total_compressed_size;
+ }
+ if (row_group.__isset.total_compressed_size) {
+ total_size = row_group.total_compressed_size;
+ } else {
+ auto num_columns = row_group.columns.size();
+ for (uint64_t cc_i = 0; cc_i < num_columns; ++cc_i) {
+ parquet::format::ColumnChunk const& col = row_group.columns[cc_i];
+ total_size += col.meta_data.total_compressed_size;
+ }
+ }
+
+ int64_t mid_point = start_index + total_size / 2;
+ if (mid_point >= part_offset && mid_point < (part_offset + part_length)) {
+ filtered_groups.push_back(row_group);
+ }
+ }
+ return filtered_groups;
+}
+
+void deserialize_parquet_footer(uint8_t * buffer, uint32_t len, parquet::format::FileMetaData * meta) {
+ using ThriftBuffer = apache::thrift::transport::TMemoryBuffer;
+
+ CUDF_FUNC_RANGE();
+ // A lot of this came from the parquet source code...
+ // Deserialize msg bytes into c++ thrift msg using memory transport.
+ #if PARQUET_THRIFT_VERSION_MAJOR > 0 || PARQUET_THRIFT_VERSION_MINOR >= 14
+ auto conf = std::make_shared();
+ conf->setMaxMessageSize(std::numeric_limits::max());
+ auto tmem_transport = std::make_shared(buffer, len, ThriftBuffer::OBSERVE, conf);
+ #else
+ auto tmem_transport = std::make_shared(buffer, len);
+ #endif
+
+ apache::thrift::protocol::TCompactProtocolFactoryT tproto_factory;
+ // Protect against CPU and memory bombs
+ tproto_factory.setStringSizeLimit(100 * 1000 * 1000);
+ // Structs in the thrift definition are relatively large (at least 300 bytes).
+ // This limits total memory to the same order of magnitude as stringSize.
+ tproto_factory.setContainerSizeLimit(1000 * 1000);
+ std::shared_ptr tproto =
+ tproto_factory.getProtocol(tmem_transport);
+ try {
+ meta->read(tproto.get());
+ } catch (std::exception& e) {
+ std::stringstream ss;
+ ss << "Couldn't deserialize thrift: " << e.what() << "\n";
+ throw std::runtime_error(ss.str());
+ }
+}
+
+void filter_columns(std::vector & groups, std::vector & chunk_filter) {
+ CUDF_FUNC_RANGE();
+ for (auto group_it = groups.begin(); group_it != groups.end(); ++group_it) {
+ std::vector new_chunks;
+ for (auto it = chunk_filter.begin(); it != chunk_filter.end(); ++it) {
+ new_chunks.push_back(group_it->columns[*it]);
+ }
+ group_it->columns = std::move(new_chunks);
+ }
+}
+
+}
+}
+
+extern "C" {
+
+JNIEXPORT long JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_readAndFilter(JNIEnv * env, jclass,
+ jlong buffer,
+ jlong buffer_length,
+ jlong part_offset,
+ jlong part_length,
+ jobjectArray filter_col_names,
+ jintArray num_children,
+ jint parent_num_children,
+ jboolean ignore_case) {
+ CUDF_FUNC_RANGE();
+ try {
+ auto meta = std::make_unique();
+ uint32_t len = static_cast(buffer_length);
+ // We don't support encrypted parquet...
+ rapids::jni::deserialize_parquet_footer(reinterpret_cast(buffer), len, meta.get());
+
+ // Get the filter for the columns first...
+ cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names);
+ cudf::jni::native_jintArray n_num_children(env, num_children);
+
+ rapids::jni::column_pruner pruner(n_filter_col_names.as_cpp_vector(),
+ std::vector(n_num_children.begin(), n_num_children.end()),
+ parent_num_children);
+ auto filter = pruner.filter_schema(meta->schema, ignore_case);
+
+ // start by filtering the schema and the chunks
+ std::size_t new_schema_size = filter.schema_map.size();
+ std::vector new_schema(new_schema_size);
+ for (std::size_t i = 0; i < new_schema_size; ++i) {
+ int orig_index = filter.schema_map[i];
+ int new_num_children = filter.schema_num_children[i];
+ new_schema[i] = meta->schema[orig_index];
+ new_schema[i].num_children = new_num_children;
+ }
+ meta->schema = std::move(new_schema);
+ if (meta->__isset.column_orders) {
+ std::vector new_order;
+ for (auto it = filter.chunk_map.begin(); it != filter.chunk_map.end(); ++it) {
+ new_order.push_back(meta->column_orders[*it]);
+ }
+ meta->column_orders = std::move(new_order);
+ }
+ // Now we want to filter the columns out of each row group that we care about as we go.
+ if (part_length >= 0) {
+ meta->row_groups = std::move(rapids::jni::filter_groups(*meta, part_offset, part_length));
+ }
+ rapids::jni::filter_columns(meta->row_groups, filter.chunk_map);
+
+ return cudf::jni::release_as_jlong(meta);
+ }
+ CATCH_STD(env, 0);
+}
+
+JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_close(JNIEnv * env, jclass,
+ jlong handle) {
+ try {
+ parquet::format::FileMetaData * ptr = reinterpret_cast(handle);
+ delete ptr;
+ }
+ CATCH_STD(env, );
+}
+
+JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_getNumRows(JNIEnv * env, jclass,
+ jlong handle) {
+ try {
+ parquet::format::FileMetaData * ptr = reinterpret_cast(handle);
+ long ret = 0;
+ for(auto it = ptr->row_groups.begin(); it != ptr->row_groups.end(); ++it) {
+ ret = ret + it->num_rows;
+ }
+ return ret;
+ }
+ CATCH_STD(env, -1);
+}
+
+JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_getNumColumns(JNIEnv * env, jclass,
+ jlong handle) {
+ try {
+ parquet::format::FileMetaData * ptr = reinterpret_cast(handle);
+ int ret = 0;
+ if (ptr->schema.size() > 0) {
+ if (ptr->schema[0].__isset.num_children) {
+ ret = ptr->schema[0].num_children;
+ }
+ }
+ return ret;
+ }
+ CATCH_STD(env, -1);
+}
+
+JNIEXPORT jobject JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_serializeThriftFile(JNIEnv * env, jclass,
+ jlong handle) {
+ CUDF_FUNC_RANGE();
+ try {
+ parquet::format::FileMetaData * meta = reinterpret_cast(handle);
+ std::shared_ptr transportOut(
+ new apache::thrift::transport::TMemoryBuffer());
+ apache::thrift::protocol::TCompactProtocolFactoryT factory;
+ auto protocolOut = factory.getProtocol(transportOut);
+ meta->write(protocolOut.get());
+ uint8_t * buf_ptr;
+ uint32_t buf_size;
+ transportOut->getBuffer(&buf_ptr, &buf_size);
+
+ // 12 extra is for the MAGIC thrift_footer length MAGIC
+ jobject ret = cudf::jni::allocate_host_buffer(env, buf_size + 12, false);
+ uint8_t* ret_addr = reinterpret_cast(cudf::jni::get_host_buffer_address(env, ret));
+ ret_addr[0] = 'P';
+ ret_addr[1] = 'A';
+ ret_addr[2] = 'R';
+ ret_addr[3] = '1';
+ std::memcpy(ret_addr + 4, buf_ptr, buf_size);
+ uint8_t * after = ret_addr + buf_size + 4;
+ after[0] = static_cast(0xFF & buf_size);
+ after[1] = static_cast(0xFF & (buf_size >> 8));
+ after[2] = static_cast(0xFF & (buf_size >> 16));
+ after[3] = static_cast(0xFF & (buf_size >> 24));
+ after[4] = 'P';
+ after[5] = 'A';
+ after[6] = 'R';
+ after[7] = '1';
+ return ret;
+ }
+ CATCH_STD(env, nullptr);
+}
+
+}
diff --git a/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java b/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java
new file mode 100644
index 0000000000..292caca88e
--- /dev/null
+++ b/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2022, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.jni;
+
+import ai.rapids.cudf.*;
+
+/**
+ * Represents a footer for a parquet file that can be parsed using native code.
+ */
+public class ParquetFooter implements AutoCloseable {
+ static {
+ NativeDepsLoader.loadNativeDeps();
+ }
+
+ private long nativeHandle;
+
+ private ParquetFooter(long handle) {
+ nativeHandle = handle;
+ }
+
+ /**
+ * Write the filtered footer back out in a format that is compatible with a parquet
+ * footer file. This will include the MAGIC PAR1 at the beginning and end and also the
+ * length of the footer just before the PAR1 at the end.
+ */
+ public HostMemoryBuffer serializeThriftFile() {
+ return serializeThriftFile(nativeHandle);
+ }
+
+ /**
+ * Get the number of rows in the footer after filtering.
+ */
+ public long getNumRows() {
+ return getNumRows(nativeHandle);
+ }
+
+ /**
+ * Get the number of top level columns in the footer after filtering.
+ */
+ public int getNumColumns() {
+ return getNumColumns(nativeHandle);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (nativeHandle != 0) {
+ close(nativeHandle);
+ nativeHandle = 0;
+ }
+ }
+
+ /**
+ * Read a parquet thrift footer from a buffer and filter it like the java code would. The buffer
+ * should only include the thrift footer itself. This includes filtering out row groups that do
+ * not fall within the partition and pruning columns that are not needed.
+ * @param buffer the buffer to parse the footer out from.
+ * @param partOffset for a split the start of the split
+ * @param partLength the length of the split
+ * @param names the names of the nodes in the tree to keep, flattened in a depth first way. The
+ * root node should be skipped and the names of maps and lists needs to match what
+ * parquet writes in.
+ * @param numChildren the number of children for each item in name.
+ * @param parentNumChildren the number of children in the root nodes
+ * @param ignoreCase should case be ignored when matching column names. If this is true then
+ * names should be converted to lower case before being passed to this.
+ * @return a reference to the parsed footer.
+ */
+ public static ParquetFooter readAndFilter(HostMemoryBuffer buffer,
+ long partOffset, long partLength,
+ String[] names,
+ int[] numChildren,
+ int parentNumChildren,
+ boolean ignoreCase) {
+ return new ParquetFooter(
+ readAndFilter
+ (buffer.getAddress(), buffer.getLength(),
+ partOffset, partLength,
+ names, numChildren,
+ parentNumChildren,
+ ignoreCase));
+ }
+
+ // Native APIS
+ private static native long readAndFilter(long address, long length,
+ long partOffset, long partLength,
+ String[] names,
+ int[] numChildren,
+ int parentNumChildren,
+ boolean ignoreCase) throws CudfException;
+
+ private static native void close(long nativeHandle);
+
+ private static native long getNumRows(long nativeHandle);
+
+ private static native int getNumColumns(long nativeHandle);
+
+ private static native HostMemoryBuffer serializeCustom(long nativeHandle);
+
+ private static native HostMemoryBuffer serializeThriftFile(long nativeHandle);
+}