From ba5de10709fe78afb8ff55321bc033d381d8b617 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 11 Jul 2022 12:36:17 -0500 Subject: [PATCH] Add new native parquet footer API and deprecate the old one (#362) Signed-off-by: Robert (Bobby) Evans --- .../spark/rapids/jni/ParquetFooter.java | 154 +++++++++++++++++- 1 file changed, 152 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java b/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java index 292caca88e..cc02186b68 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/ParquetFooter.java @@ -18,6 +18,9 @@ import ai.rapids.cudf.*; +import java.util.ArrayList; +import java.util.Locale; + /** * Represents a footer for a parquet file that can be parsed using native code. */ @@ -26,6 +29,69 @@ public class ParquetFooter implements AutoCloseable { NativeDepsLoader.loadNativeDeps(); } + /** + * Base element for all types in a parquet schema. + */ + public static abstract class SchemaElement {} + + private static class ElementWithName { + final String name; + final SchemaElement element; + + public ElementWithName(String name, SchemaElement element) { + this.name = name; + this.element = element; + } + } + + public static class StructElement extends SchemaElement { + public static StructBuilder builder() { + return new StructBuilder(); + } + + private final ElementWithName[] children; + private StructElement(ElementWithName[] children) { + this.children = children; + } + } + + public static class StructBuilder { + ArrayList children = new ArrayList<>(); + + StructBuilder() { + // Empty + } + + public StructBuilder addChild(String name, SchemaElement child) { + children.add(new ElementWithName(name, child)); + return this; + } + + public StructElement build() { + return new StructElement(children.toArray(new ElementWithName[0])); + } + } + + public static class ValueElement extends SchemaElement { + public ValueElement() {} + } + + public static class ListElement extends SchemaElement { + private final SchemaElement item; + public ListElement(SchemaElement item) { + this.item = item; + } + } + + public static class MapElement extends SchemaElement { + private final SchemaElement key; + private final SchemaElement value; + public MapElement(SchemaElement key, SchemaElement value) { + this.key = key; + this.value = value; + } + } + private long nativeHandle; private ParquetFooter(long handle) { @@ -63,6 +129,90 @@ public void close() throws Exception { } } + /** + * Recursive helper function to flatten a SchemaElement, so it can more efficiently be passed + * through JNI. + */ + private static void depthFirstNamesHelper(SchemaElement se, String name, boolean makeLowerCase, + ArrayList names, ArrayList numChildren) { + if (makeLowerCase) { + name = name.toLowerCase(Locale.ROOT); + } + + if (se instanceof ValueElement) { + names.add(name); + numChildren.add(0); + } else if (se instanceof StructElement) { + StructElement st = (StructElement) se; + names.add(name); + numChildren.add(st.children.length); + for (ElementWithName child : st.children) { + depthFirstNamesHelper(child.element, child.name, makeLowerCase, names, numChildren); + } + } else if (se instanceof ListElement) { + ListElement le = (ListElement) se; + // This follows the conventions of newer parquet. This is just here as a bridge to the new + // API and code. + names.add(name); + numChildren.add(1); + names.add("list"); + numChildren.add(1); + depthFirstNamesHelper(le.item, "element", makeLowerCase, names, numChildren); + } else if (se instanceof MapElement) { + MapElement me = (MapElement) se; + // This follows the conventions of newer parquet. This is just here as a bridge to the new + // API and code. + names.add(name); + numChildren.add(1); + names.add("key_value"); + numChildren.add(2); + depthFirstNamesHelper(me.key, "key", makeLowerCase, names, numChildren); + depthFirstNamesHelper(me.value, "value", makeLowerCase, names, numChildren); + } else { + throw new UnsupportedOperationException(se + " is not a supported schema element type"); + } + } + + /** + * Flatten a SchemaElement, so it can more efficiently be passed through JNI. + */ + private static void depthFirstNames(StructElement schema, boolean makeLowerCase, + ArrayList names, ArrayList numChildren) { + // Initialize them with a quick length for non-nested values + for (ElementWithName se: schema.children) { + depthFirstNamesHelper(se.element, se.name, makeLowerCase, names, numChildren); + } + } + + /** + * Read a parquet thrift footer from a buffer and filter it like the java code would. The buffer + * should only include the thrift footer itself. This includes filtering out row groups that do + * not fall within the partition and pruning columns that are not needed. + * @param buffer the buffer to parse the footer out from. + * @param partOffset for a split the start of the split + * @param partLength the length of the split + * @param schema a stripped down schema so the code can verify that the types match what is + * expected. The java code does this too. + * @param ignoreCase should case be ignored when matching column names. If this is true then + * names should be converted to lower case before being passed to this. + * @return a reference to the parsed footer. + */ + public static ParquetFooter readAndFilter(HostMemoryBuffer buffer, + long partOffset, long partLength, StructElement schema, boolean ignoreCase) { + int parentNumChildren = schema.children.length; + ArrayList names = new ArrayList<>(); + ArrayList numChildren = new ArrayList<>(); + depthFirstNames(schema, ignoreCase, names, numChildren); + return new ParquetFooter( + readAndFilter + (buffer.getAddress(), buffer.getLength(), + partOffset, partLength, + names.toArray(new String[0]), + numChildren.stream().mapToInt(i -> i).toArray(), + parentNumChildren, + ignoreCase)); + } + /** * Read a parquet thrift footer from a buffer and filter it like the java code would. The buffer * should only include the thrift footer itself. This includes filtering out row groups that do @@ -78,7 +228,9 @@ public void close() throws Exception { * @param ignoreCase should case be ignored when matching column names. If this is true then * names should be converted to lower case before being passed to this. * @return a reference to the parsed footer. + * @deprecated Use the version that takes a StructElement instead */ + @Deprecated public static ParquetFooter readAndFilter(HostMemoryBuffer buffer, long partOffset, long partLength, String[] names, @@ -108,7 +260,5 @@ private static native long readAndFilter(long address, long length, private static native int getNumColumns(long nativeHandle); - private static native HostMemoryBuffer serializeCustom(long nativeHandle); - private static native HostMemoryBuffer serializeThriftFile(long nativeHandle); }