Skip to content

Commit

Permalink
Add new native parquet footer API and deprecate the old one (#362)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored Jul 11, 2022
1 parent 6ede0c9 commit ba5de10
Showing 1 changed file with 152 additions and 2 deletions.
154 changes: 152 additions & 2 deletions src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import ai.rapids.cudf.*;

import java.util.ArrayList;
import java.util.Locale;

/**
* Represents a footer for a parquet file that can be parsed using native code.
*/
Expand All @@ -26,6 +29,69 @@ public class ParquetFooter implements AutoCloseable {
NativeDepsLoader.loadNativeDeps();
}

/**
* Base element for all types in a parquet schema.
*/
public static abstract class SchemaElement {}

private static class ElementWithName {
final String name;
final SchemaElement element;

public ElementWithName(String name, SchemaElement element) {
this.name = name;
this.element = element;
}
}

public static class StructElement extends SchemaElement {
public static StructBuilder builder() {
return new StructBuilder();
}

private final ElementWithName[] children;
private StructElement(ElementWithName[] children) {
this.children = children;
}
}

public static class StructBuilder {
ArrayList<ElementWithName> children = new ArrayList<>();

StructBuilder() {
// Empty
}

public StructBuilder addChild(String name, SchemaElement child) {
children.add(new ElementWithName(name, child));
return this;
}

public StructElement build() {
return new StructElement(children.toArray(new ElementWithName[0]));
}
}

public static class ValueElement extends SchemaElement {
public ValueElement() {}
}

public static class ListElement extends SchemaElement {
private final SchemaElement item;
public ListElement(SchemaElement item) {
this.item = item;
}
}

public static class MapElement extends SchemaElement {
private final SchemaElement key;
private final SchemaElement value;
public MapElement(SchemaElement key, SchemaElement value) {
this.key = key;
this.value = value;
}
}

private long nativeHandle;

private ParquetFooter(long handle) {
Expand Down Expand Up @@ -63,6 +129,90 @@ public void close() throws Exception {
}
}

/**
* Recursive helper function to flatten a SchemaElement, so it can more efficiently be passed
* through JNI.
*/
private static void depthFirstNamesHelper(SchemaElement se, String name, boolean makeLowerCase,
ArrayList<String> names, ArrayList<Integer> numChildren) {
if (makeLowerCase) {
name = name.toLowerCase(Locale.ROOT);
}

if (se instanceof ValueElement) {
names.add(name);
numChildren.add(0);
} else if (se instanceof StructElement) {
StructElement st = (StructElement) se;
names.add(name);
numChildren.add(st.children.length);
for (ElementWithName child : st.children) {
depthFirstNamesHelper(child.element, child.name, makeLowerCase, names, numChildren);
}
} else if (se instanceof ListElement) {
ListElement le = (ListElement) se;
// This follows the conventions of newer parquet. This is just here as a bridge to the new
// API and code.
names.add(name);
numChildren.add(1);
names.add("list");
numChildren.add(1);
depthFirstNamesHelper(le.item, "element", makeLowerCase, names, numChildren);
} else if (se instanceof MapElement) {
MapElement me = (MapElement) se;
// This follows the conventions of newer parquet. This is just here as a bridge to the new
// API and code.
names.add(name);
numChildren.add(1);
names.add("key_value");
numChildren.add(2);
depthFirstNamesHelper(me.key, "key", makeLowerCase, names, numChildren);
depthFirstNamesHelper(me.value, "value", makeLowerCase, names, numChildren);
} else {
throw new UnsupportedOperationException(se + " is not a supported schema element type");
}
}

/**
* Flatten a SchemaElement, so it can more efficiently be passed through JNI.
*/
private static void depthFirstNames(StructElement schema, boolean makeLowerCase,
ArrayList<String> names, ArrayList<Integer> numChildren) {
// Initialize them with a quick length for non-nested values
for (ElementWithName se: schema.children) {
depthFirstNamesHelper(se.element, se.name, makeLowerCase, names, numChildren);
}
}

/**
* Read a parquet thrift footer from a buffer and filter it like the java code would. The buffer
* should only include the thrift footer itself. This includes filtering out row groups that do
* not fall within the partition and pruning columns that are not needed.
* @param buffer the buffer to parse the footer out from.
* @param partOffset for a split the start of the split
* @param partLength the length of the split
* @param schema a stripped down schema so the code can verify that the types match what is
* expected. The java code does this too.
* @param ignoreCase should case be ignored when matching column names. If this is true then
* names should be converted to lower case before being passed to this.
* @return a reference to the parsed footer.
*/
public static ParquetFooter readAndFilter(HostMemoryBuffer buffer,
long partOffset, long partLength, StructElement schema, boolean ignoreCase) {
int parentNumChildren = schema.children.length;
ArrayList<String> names = new ArrayList<>();
ArrayList<Integer> numChildren = new ArrayList<>();
depthFirstNames(schema, ignoreCase, names, numChildren);
return new ParquetFooter(
readAndFilter
(buffer.getAddress(), buffer.getLength(),
partOffset, partLength,
names.toArray(new String[0]),
numChildren.stream().mapToInt(i -> i).toArray(),
parentNumChildren,
ignoreCase));
}

/**
* Read a parquet thrift footer from a buffer and filter it like the java code would. The buffer
* should only include the thrift footer itself. This includes filtering out row groups that do
Expand All @@ -78,7 +228,9 @@ public void close() throws Exception {
* @param ignoreCase should case be ignored when matching column names. If this is true then
* names should be converted to lower case before being passed to this.
* @return a reference to the parsed footer.
* @deprecated Use the version that takes a StructElement instead
*/
@Deprecated
public static ParquetFooter readAndFilter(HostMemoryBuffer buffer,
long partOffset, long partLength,
String[] names,
Expand Down Expand Up @@ -108,7 +260,5 @@ private static native long readAndFilter(long address, long length,

private static native int getNumColumns(long nativeHandle);

private static native HostMemoryBuffer serializeCustom(long nativeHandle);

private static native HostMemoryBuffer serializeThriftFile(long nativeHandle);
}

0 comments on commit ba5de10

Please sign in to comment.