From 9da0899c6cba34bce0d73a7b58ed6d9a5d38cd67 Mon Sep 17 00:00:00 2001 From: cashmand Date: Fri, 31 May 2024 17:00:15 -0400 Subject: [PATCH 1/4] Describe shredding scheme --- common/variant/shredding.md | 252 ++++++++++++++++++++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 common/variant/shredding.md diff --git a/common/variant/shredding.md b/common/variant/shredding.md new file mode 100644 index 0000000000000..6d22df875392d --- /dev/null +++ b/common/variant/shredding.md @@ -0,0 +1,252 @@ +# Shredding Overview + +The Spark Variant type is designed to store and process semi-structured data efficiently, even with heterogeneous values. Query engines encode each variant value in a self-describing format, and store it as a group containing **value** and **metadata** binary fields in Parquet. Since data is often partially homogenous, it can be beneficial to extract certain fields into separate Parquet columns to further improve performance. We refer to this process as "shredding". Each Parquet file remains fully self-describing, with no additional metadata required to read or fully reconstruct the Variant data from the file. Combining shredding with a binary residual provides the flexibility to represent complex, evolving data with an unbounded number of unique fields while limiting the size of file schemas, and retaining the performance benefits of a columnar format. + +This document focuses on the shredding semantics, Parquet representation, implications for readers and writers, as well as the Variant reconstruction. For now, it does not discuss which fields to shred, user-facing API changes, or any engine-specific considerations like how to use shredded columns. The approach builds on top of the generic Spark Variant representation, and leverages the existing Parquet specification for maximum compatibility with the open-source ecosystem. + +At a high level, we introduce a third field to the Variant Parquet group called **paths**. This represents a fixed schema suitable for merging into the main **value** and **metadata** to construct the full Variant value for each row. + +Shredding lets Spark (or any other query engine) reap the full benefits of Parquet's columnar representation, such as more compact data encoding, min/max statistics for data skipping, and I/O and CPU savings from pruning unnecessary fields not accessed by a query (including the non-shredded Variant binary data). +Without shredding, any query that accesses a Variant column must fetch all bytes of the full binary buffer. With shredding, we can get nearly equivalent performance as in a relational (scalar) data model. + +For example, `select variant_get(variant_col, ‘$.field1.inner_field2’, ‘string’) from tbl` only needs to access `inner_field2`, and the file scan could avoid fetching the rest of the Variant value if this field was shredded into a separate column in the Parquet schema. Similarly, for the query `select * from tbl where variant_get(variant_col, ‘$.id’, ‘integer’) = 123`, the scan could first decode the shredded `id` column, and only fetch/decode the full Variant value for rows that pass the filter. + +# Parquet Example + +Consider the following Parquet schema together with how Variant values might be mapped to it. Notice that we represent each shredded field in **paths** as a group of two fields, **typed_value** and **untyped_value**. We extract all homogenous data items of a certain path into **typed_value**, and set aside incompatible data items in **untyped_value**. Intuitively, incompatibilities within the same path may occur because we store the shredding schema per Parquet file, and each file can contain several row groups. Selecting a type for each field that is acceptable for all rows would be impractical because it would require buffering the contents of an entire file before writing. + +``` +optional group variant_col { + optional binary value; + optional binary metadata; + optional group paths { + optional group a { + optional int64 typed_value; + optional binary untyped_value; + } + optional group b { + optional binary typed_value (STRING); + optional binary untyped_value; + } + } +} +``` + + +| Variant Value | Top-level value/metadata | Non-null in a | Non-null in b | +|---------------|--------------------------|---------------|---------------| +| {a: 123, b: “hello”} | null | typed_value | typed_value | +| {a: 1.23, b: “123”} | null | untyped_value | typed_value | +| {a: [1,2,3], b: null} | null | untyped_value | untyped_value | +| {a: 123, c: 456} | {c: 123} | typed_value | null | +| [{a: 1, b: 2}, {a:3, b:4}] | [{a: 1, b: 2}, {a:3, b:4}] | null | null | + +# Parquet Layout + +The **paths** group may contain any arbitrary nesting of list or group types, representing Variant array and object types, respectively. The group field names correspond to the Variant object field names. Arrays must use the three-level list structure described in https://github.com/apache/parquet-format/blob/master/LogicalTypes.md. + +Each leaf in the **paths** schema can store an arbitrary Variant value. It contains an **untyped_value** binary field and a **typed_value** field. If non-null, **untyped_value** represents the value stored as a Variant binary; the metadata and value of a normal Variant are concatenated. The **typed_value** field may be any type that has a corresponding Variant type. For each value in the data, at most one of the **typed_value** and **untyped_value** may be non-null. + +| typed_value | value/metadata | Meaning | +|-------------|----------------|---------| +| null | null | Field is missing in the reconstructed Variant. | +| null | non-null | Field may be any type in the reconstructed Variant. | +| non-null | null | Field has this column’s type in the reconstructed Variant. | +| non-null | non-null | Invalid | + +The **typed_value** may be absent from the Parquet schema for any field, which is equivalent to its value being always null (in which case the shredded field is always stored as a Variant binary). By the same token, **untyped_value** may be absent, which is equivalent to their value being always null (in which case the field will always be missing or have the type of the **typed_value** column). +(Exception: **typed_value** may be a nested type, but to avoid confusion, if the type of **typed_value** is itself a nested type, the value and metadata must be present to identify it as a leaf.) + +It is possible to reconstruct the full metadata and value from **untyped_value** by treating the leading bytes as metadata, and using the header, dictionary size and final dictionary offset to determine the start of the Variant value section. (See the metadata description in the common/variant/README.md for more detail on how to interpret it.) For example, in the binary below, there is a one-element dictionary, and the final offset (`offset[1]`) indicates that the last dictionary entry ends at the second byte. Therefore the full metadata size is six bytes, and the rest is the value section of the Variant. + +``` + hdr sz offset[0] offset[1] bytes[0] bytes[1] value + -------------------------------------------------------------------- +| | | | | | | +| 0x01 | 0x01 | 0x00 | 0x02 | ‘h’ | ‘i’ | . . . . . . . . +|______|______|_________|_________|________|________|________________ +``` + +# Top-level value/metadata + +Shredded values are not redundantly stored in the top-level **value** binary. The top-level binary must only contain the values that cannot be represented in the **paths** group. For example, if **paths** contains a leaf field at the path `field1.sub_field1`, then that path should not appear in **value**. However `field1` may appear in **value** as an object with fields other than `sub_field1` (such as `sub_field2`), or as any non-object (such as an array or scalar value). + +The top-level **metadata** only needs to contain dictionary entries that are referred to in the top-level **value**. When reconstructing a full variant, the metadata will need to be extended to contain dictionary entries implied by data in **paths**. + +If all values can be represented in **paths**, the top-level value and metadata are set to null. If an array contains a mix of elements that could and couldn't be fully represented in **paths**, then elements that were fully represented are replaced will Variant NULL in the top-level value. For objects, fully represented fields are simply omitted from the object. + +# Using untyped_value vs. typed_value + +In general, it is desirable to store values in the **typed_value** field rather than the **untyped_value** whenever possible. This will typically improve encoding efficiency, and allow the use of Parquet statistics to filter at the row group or page level. In the best case, the **untyped_value** fields are all null and the engine does not need to read them (or it can omit them from the schema on write entirely). There are two main motivations for including the **untyped_value** column: + +1) In a case where there are rare type mismatches (for example, a numeric field with rare strings like “n/a”), we allow the field to be shredded, which could still be a significant performance benefit compared to fetching and decoding the full value/metadata binary. +2) Since there is a single schema per file, there would be no easy way to recover from a type mismatch encountered late in a file write. Parquet files can be large, and buffering all file data before starting to write could be expensive. Including an untyped column for every field guarantees we can adhere to the requested shredding schema. + +The **untyped_value** is stored in a single binary column, rather than storing the value and metadata separately as is done in the top-level binary format. The motivation for storing them separately at the top level is that this lets the engine encode and compress the metadata more efficiently when the fields are consistent across rows. We chose to combine them in the shredded fields: we expect the encoding/compression benefit to be lower, since in the case of uniform data, the values should be stored in the typed column. Using a single binary reduces the extra metadata required in the Parquet schema, which can be significant in some cases. + +# Data Skipping + +Shredded columns are expected to store statistics in the same format as a normal Parquet column. In general, the engine can only skip a row group or page if all rows in the **untyped_value** field are null, since it is possible for a `variant_get` expression to successfully cast a value from the **untyped_value** to the target type. For example, if **typed_value** is of type `int64`, then the string “123” might be contained in **untyped_value**, which would not be reflected in statistics, but could be retained by a filter like `where variant_get(col, “$.field”, “long”) = 123`. If **untyped_value** is all-null, then the engine can prune pages or row groups based on **typed_value**. + +# Shredding Semantics + +Variant defines a number of integer and decimal types of varying widths. When writing, it would be quite limiting to strictly enforce the mapping between Variant types and Parquet/Spark types. For example, if we chose to shred a field as `int64`, and encountered the value 123 encoded as `int32`, it seems preferable to write this to the **typed_value** column, even though it technically loses information about the type in the original Variant object, and would be reconstructed as an `int64`. + +On the other hand, storing arbitrarily casted values in the **typed_value** column could create inconsistent behavior before and after shredding, and could leak behavior from the writing engine to the reading engine. For example, double-to-string casts can produce different results in different engines. Performing such a cast while shredding (even if we somehow retained the knowledge that the original value was a `double`) could result in confusing behavior changes if shredding took place using a different tool from the query engine that wrote it. + +Our approach is a pragmatic compromise that allows the use of **typed_value** in cases where the type can be losslessly widened without resulting in a significant difference in the reconstructed Variant: + +1) All integer and decimal types in Variant are conceptually a single “number” type. The engine may shred any number into the **typed_value** of any other number, provided that no information about the value is lost. For example, the integer 123 may be shredded as Decimal<9, 2>, but 1.23 may not be shredded as any integer type. + +2) To ensure that behavior remains unchanged before and after shredding, we will aim to have all Spark expressions that operate on Variant be agnostic to the specific numeric type. For example, `cast(val as string)` should produce the string “123” if `val` is any integer or decimal type that is exactly equal to 123. Note that this is unlike the normal Spark behavior for `decimal` types, which would produce “123.00” for `Decimal<9,2>`. + +3) One exception to the above is `schema_of_variant`, which will still report the underlying physical type. This means that `schema_of_variant` may report different numeric types before and after shredding. + +4) Other than integer and decimal, we will not allow casting between types. For example, we will not write the string “123” to an integer **typed_value** column, even though `variant_get(“123”, “$”, “integer”)` would produce the integer 123. Similarly, double and float types are considered distinct from other numeric types, and we would not write them to a numeric **typed_value** column. + +# Reconstructing a Variant + +It is possible to recover a full Variant value using a recursive algorithm, where the initial call is to `ConstructVariant` with the Variant value implied by the top-level value/metadata, and the nested structure implied by the top-level **paths**. + +Note that the top-level **paths** may represent an object, array or even a scalar (that is, **paths** itself may be the leaf of the tree in the pseudocode below). + +``` +# Constructs a Variant from `value` and `paths`. +# Either may be null, in which case the result is the Variant value implied by the other one. +def ConstructVariant(value, paths): + if paths is null: return value + elif is_struct(paths): + return ConstructObject(value, paths) + elif is_array(paths): + return ConstructArray(value, paths) + else: + # Leaf in paths. If the value is also non-empty, then the result is ambiguous. + assert(value is null or value is VariantNull) + return coalesce(paths.untyped_value, cast(paths.typed_value as Variant)) + +# Construct an object from a `paths` group, and a (possibly null) Variant value +def ConstructObject(value, paths) + # If value is present and is not an Object, then the result is ambiguous. + assert(value is null or is_object(value)) + all_keys = Union(value.keys, paths.fields) + return VariantObject(all_keys.map { key -> + if paths[field] is null: (key, value[field]) + # Note: value[field] can be null here. + else: (key, ConstructVariant(value[field], paths[field])) + } + +def ConstructArray(value, paths) + # If value is present and is not an Array of the same size, then the result is ambiguous. + assert(value is null or value is VariantNull or (is_array(value) and value.size == paths.size) + newVariantArray = VariantArray() + for i in range(paths.size): + # value[i] is null iff value is null + newVariantArray.append(ConstructVariant(value[i], paths[i]) +``` + +# Nested Parquet Example + +This section describes a more deeply nested example, using a top-level array as the shredding type. + +Below is a sample of JSON that would be fully shredded in this example. It contains an array of objects, containing an “a” field shredded as an array, and a “b” field shredded as an integer. + +``` +[ + { + "a": [1, 2, 3], + "b": 100 + }, + { + "a": [4, 5, 6], + "b": 200 + } +] +``` + + +The corresponding Parquet schema with “a” and “b” as leaf types is: + +``` +optional group variant_col { + optional binary value; + optional binary metadata; + optional group paths (LIST) { + repeated group list { + optional group element { + optional group a (LIST) { + repeated group list { + optional group element { + optional int64 typed_value; + optional binary untyped_value; + } + } + } + optional group b { + optional int64 typed_value; + optional binary untyped_value; + } + } + } + } +} +``` + +In the above example schema, if “a” contains a mix of integer and non-integer values, the engine will shred individual elements appropriately into either **typed_value** or **untyped_value**. If the top-level Variant is not an array (for example, an object), the engine cannot shred the value and it will store it in the top-level value/metadata. + +If the array contains a mix of elements that can and cannot be fully shredded, then the unshredded elements are stored as a Variant NULL. Consider the following example: + +``` +[ + { + "a": [1, 2, 3], + "b": 100, + “c”: “unexpected” + }, + { + "a": [4, 5, 6], + "b": 200 + }, + “not an object” +] +``` + +The second array element can be fully shredded, but the first and third cannot be. The resulting top-level Variant would be as follows: + +``` +[ + { “c”: “unexpected” }, + NULL, + “not an object” +] +``` + +An alternative Parquet schema for the data above is the following: + +``` +optional group variant_col { + optional binary value; + optional binary metadata; + optional group paths (LIST) { + repeated group list { + optional group element { + optional group a { + optional group typed_value (LIST) { + repeated group list { + optional int64 element; + } + } + } + optional binary untyped_value; + } + optional group b { + optional int64 typed_value; + optional binary untyped_value; + } + } + } + } +} +``` + +This is similar to the previous schema, but the **typed_value** is an entire list. In this case, if the “a” field is an array, but contains values that cannot be stored as int64, then the entire array is stored as a single **untyped_value** binary. We expect the pros and cons of these different schemes to be marginal, and we may choose not to support the latter on the write path. The example mainly serves to demonstrate flexibility in deciding how to shred. + From 170797a94c0ba4a4479e4011e5d9444b629be7c9 Mon Sep 17 00:00:00 2001 From: cashmand Date: Fri, 31 May 2024 18:03:50 -0400 Subject: [PATCH 2/4] Link to shredding doc --- common/variant/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/variant/README.md b/common/variant/README.md index 0fc4d91f3f8a3..3e1b00c494755 100644 --- a/common/variant/README.md +++ b/common/variant/README.md @@ -375,3 +375,7 @@ Field names are case-sensitive. Field names are required to be unique for each o # Versions and extensions An implementation is not expected to parse a Variant value whose metadata version is higher than the version supported by the implementation. However, new types may be added to the specification without incrementing the version ID. In such a situation, an implementation should be able to read the rest of the Variant value if desired. + +# Shredding + +For columnar storage formats, a single Variant object may have poor read performance when only a small number of fields are needed. A better approach is to create separate columns for individual fields, referred to as shredding or subcolumnarization. [shredding.md](shredding.md) describes an approach to shredding Variant columns in Parquet and similar columnar formats. From 05d225c2e57ce507c190c801d1a5a09615270726 Mon Sep 17 00:00:00 2001 From: cashmand Date: Wed, 3 Jul 2024 18:31:17 -0400 Subject: [PATCH 3/4] Modify scheme to include intermediate levels --- common/variant/shredding.md | 190 +++++++++++++++++------------------- 1 file changed, 89 insertions(+), 101 deletions(-) diff --git a/common/variant/shredding.md b/common/variant/shredding.md index 6d22df875392d..79efcb6233bf4 100644 --- a/common/variant/shredding.md +++ b/common/variant/shredding.md @@ -4,7 +4,7 @@ The Spark Variant type is designed to store and process semi-structured data eff This document focuses on the shredding semantics, Parquet representation, implications for readers and writers, as well as the Variant reconstruction. For now, it does not discuss which fields to shred, user-facing API changes, or any engine-specific considerations like how to use shredded columns. The approach builds on top of the generic Spark Variant representation, and leverages the existing Parquet specification for maximum compatibility with the open-source ecosystem. -At a high level, we introduce a third field to the Variant Parquet group called **paths**. This represents a fixed schema suitable for merging into the main **value** and **metadata** to construct the full Variant value for each row. +At a high level, we replace the **value** and **metadata** of the Variant Parquet group with one or more fields called **object**, **array**, **typed_value** and **untyped_value**. These represent a fixed schema suitable for constructing the full Variant value for each row. Shredding lets Spark (or any other query engine) reap the full benefits of Parquet's columnar representation, such as more compact data encoding, min/max statistics for data skipping, and I/O and CPU savings from pruning unnecessary fields not accessed by a query (including the non-shredded Variant binary data). Without shredding, any query that accesses a Variant column must fetch all bytes of the full binary buffer. With shredding, we can get nearly equivalent performance as in a relational (scalar) data model. @@ -13,51 +13,60 @@ For example, `select variant_get(variant_col, ‘$.field1.inner_field2’, ‘st # Parquet Example -Consider the following Parquet schema together with how Variant values might be mapped to it. Notice that we represent each shredded field in **paths** as a group of two fields, **typed_value** and **untyped_value**. We extract all homogenous data items of a certain path into **typed_value**, and set aside incompatible data items in **untyped_value**. Intuitively, incompatibilities within the same path may occur because we store the shredding schema per Parquet file, and each file can contain several row groups. Selecting a type for each field that is acceptable for all rows would be impractical because it would require buffering the contents of an entire file before writing. +Consider the following Parquet schema together with how Variant values might be mapped to it. Notice that we represent each shredded field in **object** as a group of two fields, **typed_value** and **untyped_value**. We extract all homogenous data items of a certain path into **typed_value**, and set aside incompatible data items in **untyped_value**. Intuitively, incompatibilities within the same path may occur because we store the shredding schema per Parquet file, and each file can contain several row groups. Selecting a type for each field that is acceptable for all rows would be impractical because it would require buffering the contents of an entire file before writing. + +Typically, the expectation is that **untyped_value** exists at every level as an option, along with one of **object**, **array** or **typed_value**. If the actual Variant value contains a type that does not match the provided schema, it is stored in **untyped_value**. An **untyped_value** may also be populated if an object can be partially represented: any fields that are present in the schema must be written to those fields, and any missing fields are written to **untyped_valud**. ``` optional group variant_col { - optional binary value; - optional binary metadata; - optional group paths { + optional binary untyped_value; + optional group object { optional group a { - optional int64 typed_value; optional binary untyped_value; + optional int64 typed_value; } optional group b { - optional binary typed_value (STRING); optional binary untyped_value; + optional group object { + optional group c { + optional binary untyped_value; + optional binary typed_value (STRING); + } + } } } } ``` - -| Variant Value | Top-level value/metadata | Non-null in a | Non-null in b | +| Variant Value | Top-level untyped_value | b.untyped_value | Non-null in a | Non-null in b.c | |---------------|--------------------------|---------------|---------------| -| {a: 123, b: “hello”} | null | typed_value | typed_value | -| {a: 1.23, b: “123”} | null | untyped_value | typed_value | -| {a: [1,2,3], b: null} | null | untyped_value | untyped_value | -| {a: 123, c: 456} | {c: 123} | typed_value | null | -| [{a: 1, b: 2}, {a:3, b:4}] | [{a: 1, b: 2}, {a:3, b:4}] | null | null | +| {a: 123, b: {c: “hello”}} | null | null | typed_value | typed_value | +| {a: 1.23, b: {c: “123”}} | null | null | untyped_value | typed_value | +| {a: [1,2,3], b: {c: null}} | null | null | untyped_value | untyped_value | +| {a: 123, c: 456} | {c: 456} | null | typed_value | null | +| {a: 123, b: {c: "hello", d: 456}} | null | {d: 456} | typed_value | typed_value | +| [{a: 1, b: {c: 2}}, {a: 3, b: {c: 4}}] | [{a: 1, b: {c: 2}}, {a: 3, b: {c: 4}}] | null | null | null | # Parquet Layout -The **paths** group may contain any arbitrary nesting of list or group types, representing Variant array and object types, respectively. The group field names correspond to the Variant object field names. Arrays must use the three-level list structure described in https://github.com/apache/parquet-format/blob/master/LogicalTypes.md. +The **array** and **object** fields represent Variant array and object types, respectively. Arrays must use the three-level list structure described in https://github.com/apache/parquet-format/blob/master/LogicalTypes.md. + +An **object** field must be a group. Each field name of this inner group corresponds to the Variant value's object field name. Each inner field's type is a recursively shredded variant value: that is, the fields of each object field must be one or more of **object**, **array**, **typed_value** or **untyped_value**. + +Similarly the elements of an **array** must be a group containing one or more of **object**, **array**, **typed_value** or **untyped_value**. -Each leaf in the **paths** schema can store an arbitrary Variant value. It contains an **untyped_value** binary field and a **typed_value** field. If non-null, **untyped_value** represents the value stored as a Variant binary; the metadata and value of a normal Variant are concatenated. The **typed_value** field may be any type that has a corresponding Variant type. For each value in the data, at most one of the **typed_value** and **untyped_value** may be non-null. +Each leaf in the schema can store an arbitrary Variant value. It contains an **untyped_value** binary field and a **typed_value** field. If non-null, **untyped_value** represents the value stored as a Variant binary; the metadata and value of a normal Variant are concatenated. The **typed_value** field may be any type that has a corresponding Variant type. For each value in the data, at most one of the **typed_value** and **untyped_value** may be non-null. A writer may omit either field, which is equivalent to all rows being null. -| typed_value | value/metadata | Meaning | +| typed_value | untyped_value | Meaning | |-------------|----------------|---------| -| null | null | Field is missing in the reconstructed Variant. | -| null | non-null | Field may be any type in the reconstructed Variant. | -| non-null | null | Field has this column’s type in the reconstructed Variant. | +| null | null | Field is missing in the reconstructed Variant. | +| null | non-null | Field may be any type in the reconstructed Variant. | +| non-null | null | Field has this column’s type in the reconstructed Variant. | | non-null | non-null | Invalid | The **typed_value** may be absent from the Parquet schema for any field, which is equivalent to its value being always null (in which case the shredded field is always stored as a Variant binary). By the same token, **untyped_value** may be absent, which is equivalent to their value being always null (in which case the field will always be missing or have the type of the **typed_value** column). -(Exception: **typed_value** may be a nested type, but to avoid confusion, if the type of **typed_value** is itself a nested type, the value and metadata must be present to identify it as a leaf.) -It is possible to reconstruct the full metadata and value from **untyped_value** by treating the leading bytes as metadata, and using the header, dictionary size and final dictionary offset to determine the start of the Variant value section. (See the metadata description in the common/variant/README.md for more detail on how to interpret it.) For example, in the binary below, there is a one-element dictionary, and the final offset (`offset[1]`) indicates that the last dictionary entry ends at the second byte. Therefore the full metadata size is six bytes, and the rest is the value section of the Variant. +The full metadata and value can be reconstructed from **untyped_value** by treating the leading bytes as metadata, and using the header, dictionary size and final dictionary offset to determine the start of the Variant value section. (See the metadata description in the common/variant/README.md for more detail on how to interpret it.) For example, in the binary below, there is a one-element dictionary, and the final offset (`offset[1]`) indicates that the last dictionary entry ends at the second byte. Therefore the full metadata size is six bytes, and the rest is the value section of the Variant. ``` hdr sz offset[0] offset[1] bytes[0] bytes[1] value @@ -67,13 +76,15 @@ It is possible to reconstruct the full metadata and value from **untyped_value** |______|______|_________|_________|________|________|________________ ``` -# Top-level value/metadata +# Unshredded values -Shredded values are not redundantly stored in the top-level **value** binary. The top-level binary must only contain the values that cannot be represented in the **paths** group. For example, if **paths** contains a leaf field at the path `field1.sub_field1`, then that path should not appear in **value**. However `field1` may appear in **value** as an object with fields other than `sub_field1` (such as `sub_field2`), or as any non-object (such as an array or scalar value). +If all values can be represented at a given level by whichever of **object**, **array** or **typed_value** is present, **untyped_value** is set to null. -The top-level **metadata** only needs to contain dictionary entries that are referred to in the top-level **value**. When reconstructing a full variant, the metadata will need to be extended to contain dictionary entries implied by data in **paths**. +If a value cannot be represented by whichever of **object**, **array** or **typed_value** is present in the schema, then it is stored in **untyped_value**, and the other fields are set to null. In the Parquet example above, if field **a** was an object or array, or a non-integer scalar, it would be stored in **untyped_value**. -If all values can be represented in **paths**, the top-level value and metadata are set to null. If an array contains a mix of elements that could and couldn't be fully represented in **paths**, then elements that were fully represented are replaced will Variant NULL in the top-level value. For objects, fully represented fields are simply omitted from the object. +If a value is an object, and the **object** field is present but does not contain all of the fields in the value, then any remaining fields are stored in an object in **untyped_value**. In the Parquet example above, if field **b** was an object of the form **{"c": 1, "d": 2}"**, then the object **{"d": 2}** would be stored in **untyped_value**, and the **c** field would be shredded recursively under **object.c**. + +Note that an array is always fully shredded if there is an **array** field, so the above consideration for **object** is not relevant for arrays: only one of **array** or **untyped_value** may be non-null at a given level. # Using untyped_value vs. typed_value @@ -82,11 +93,11 @@ In general, it is desirable to store values in the **typed_value** field rather 1) In a case where there are rare type mismatches (for example, a numeric field with rare strings like “n/a”), we allow the field to be shredded, which could still be a significant performance benefit compared to fetching and decoding the full value/metadata binary. 2) Since there is a single schema per file, there would be no easy way to recover from a type mismatch encountered late in a file write. Parquet files can be large, and buffering all file data before starting to write could be expensive. Including an untyped column for every field guarantees we can adhere to the requested shredding schema. -The **untyped_value** is stored in a single binary column, rather than storing the value and metadata separately as is done in the top-level binary format. The motivation for storing them separately at the top level is that this lets the engine encode and compress the metadata more efficiently when the fields are consistent across rows. We chose to combine them in the shredded fields: we expect the encoding/compression benefit to be lower, since in the case of uniform data, the values should be stored in the typed column. Using a single binary reduces the extra metadata required in the Parquet schema, which can be significant in some cases. +The **untyped_value** is stored in a single binary column, rather than storing the value and metadata separately as is done in the unshredded binary format. The motivation for storing them separately for unshredded data is that this lets the engine encode and compress the metadata more efficiently when the fields are consistent across rows. We chose to combine them in the shredded fields: we expect the encoding/compression benefit to be lower, since in the case of uniform data, the values should be stored in typed columns. Using a single binary reduces the extra metadata required in the Parquet schema, which can be significant in some cases. # Data Skipping -Shredded columns are expected to store statistics in the same format as a normal Parquet column. In general, the engine can only skip a row group or page if all rows in the **untyped_value** field are null, since it is possible for a `variant_get` expression to successfully cast a value from the **untyped_value** to the target type. For example, if **typed_value** is of type `int64`, then the string “123” might be contained in **untyped_value**, which would not be reflected in statistics, but could be retained by a filter like `where variant_get(col, “$.field”, “long”) = 123`. If **untyped_value** is all-null, then the engine can prune pages or row groups based on **typed_value**. +Shredded columns are expected to store statistics in the same format as a normal Parquet column. In general, the engine can only skip a row group or page if all rows in the **untyped_value** field are null, since it is possible for a `variant_get` expression to successfully cast a value from the **untyped_value** to the target type. For example, if **typed_value** is of type `int64`, then the string “123” might be contained in **untyped_value**, which would not be reflected in statistics, but could be retained by a filter like `where variant_get(col, “$.field”, “long”) = 123`. If **untyped_value** is all-null, then the engine can prune pages or row groups based on **typed_value**. This specification is not strict about what values may be stored in **untyped_value** rather than **typed_value**, so it is not safe to skip rows based on **typed_value** unless the corresponding **untyped_value** column is all-null, or the engine has specific knowledge of the behavior of the writer that produced the shredded data. # Shredding Semantics @@ -106,42 +117,37 @@ Our approach is a pragmatic compromise that allows the use of **typed_value** in # Reconstructing a Variant -It is possible to recover a full Variant value using a recursive algorithm, where the initial call is to `ConstructVariant` with the Variant value implied by the top-level value/metadata, and the nested structure implied by the top-level **paths**. - -Note that the top-level **paths** may represent an object, array or even a scalar (that is, **paths** itself may be the leaf of the tree in the pseudocode below). +It is possible to recover a full Variant value using a recursive algorithm, where the initial call is to `ConstructVariant` with the top-level fields, which are assumed to be null if they are not present in the schema. ``` -# Constructs a Variant from `value` and `paths`. -# Either may be null, in which case the result is the Variant value implied by the other one. -def ConstructVariant(value, paths): - if paths is null: return value - elif is_struct(paths): - return ConstructObject(value, paths) - elif is_array(paths): - return ConstructArray(value, paths) +# Constructs a Variant from `untyped_value`, `object`, `array` and `typed_value`. +# Only one of object, array and typed_value may be non-null. +def ConstructVariant(untyped_value, object, array, typed_value): + if object is null and array is null and typed_value is null: return untyped_value + elif object is not null: + return ConstructObject(untyped_value, object) + elif array is not null: + return ConstructArray(array) else: - # Leaf in paths. If the value is also non-empty, then the result is ambiguous. - assert(value is null or value is VariantNull) - return coalesce(paths.untyped_value, cast(paths.typed_value as Variant)) - -# Construct an object from a `paths` group, and a (possibly null) Variant value -def ConstructObject(value, paths) - # If value is present and is not an Object, then the result is ambiguous. - assert(value is null or is_object(value)) - all_keys = Union(value.keys, paths.fields) + # Leaf in the tree. + assert(untyped_value is null or untyped_value is VariantNull) + return coalesce(untyped_value, cast(typed_value as Variant)) + +# Construct an object from an `object` group, and a (possibly null) Variant untyped_value +def ConstructObject(untyped_value, object) + # If untyped_value is present and is not an Object, then the result is ambiguous. + assert(untyped_value is null or is_object(untyped_value)) + all_keys = Union(untyped_value.keys, object.fields) return VariantObject(all_keys.map { key -> - if paths[field] is null: (key, value[field]) - # Note: value[field] can be null here. - else: (key, ConstructVariant(value[field], paths[field])) + if object[field] is null: (key, untyped_value[field]) + else: (key, ConstructVariant(null, object[field], null, null)) } -def ConstructArray(value, paths) - # If value is present and is not an Array of the same size, then the result is ambiguous. - assert(value is null or value is VariantNull or (is_array(value) and value.size == paths.size) +def ConstructArray(array) newVariantArray = VariantArray() - for i in range(paths.size): - # value[i] is null iff value is null - newVariantArray.append(ConstructVariant(value[i], paths[i]) + for i in range(array.size): + # Any of these may be missing from the schema, in which case they are null. + newVariantArray.append(ConstructVariant(array[i].untyped_value, array[i].object, array[i].array, array[i].typed_value) ``` # Nested Parquet Example @@ -168,22 +174,27 @@ The corresponding Parquet schema with “a” and “b” as leaf types is: ``` optional group variant_col { - optional binary value; - optional binary metadata; - optional group paths (LIST) { + optional binary untyped_value; + optional group array (LIST) { repeated group list { optional group element { - optional group a (LIST) { - repeated group list { - optional group element { - optional int64 typed_value; - optional binary untyped_value; + optional binary untyped_value; + optional group object { + optional group a { + optional binary untyped_value; + optional group array (LIST) { + repeated group list { + optional group element { + optional int64 typed_value; + optional binary untyped_value; + } + } } } - } - optional group b { - optional int64 typed_value; - optional binary untyped_value; + optional group b { + optional int64 typed_value; + optional binary untyped_value; + } } } } @@ -191,9 +202,11 @@ optional group variant_col { } ``` -In the above example schema, if “a” contains a mix of integer and non-integer values, the engine will shred individual elements appropriately into either **typed_value** or **untyped_value**. If the top-level Variant is not an array (for example, an object), the engine cannot shred the value and it will store it in the top-level value/metadata. +In the above example schema, if “a” is an array containing a mix of integer and non-integer values, the engine will shred individual elements appropriately into either **typed_value** or **untyped_value**. +If the top-level Variant is not an array (for example, an object), the engine cannot shred the value and it will store it in the top-level **untyped_value**. +Similarly, if "a" is not an array, it will be stored in the **untyped_value** under "a". -If the array contains a mix of elements that can and cannot be fully shredded, then the unshredded elements are stored as a Variant NULL. Consider the following example: +Consider the following example: ``` [ @@ -210,7 +223,7 @@ If the array contains a mix of elements that can and cannot be fully shredded, t ] ``` -The second array element can be fully shredded, but the first and third cannot be. The resulting top-level Variant would be as follows: +The second array element can be fully shredded, but the first and third cannot be. The contents of `variant_col.array[*].untyped_value` would be as follows: ``` [ @@ -220,33 +233,8 @@ The second array element can be fully shredded, but the first and third cannot b ] ``` -An alternative Parquet schema for the data above is the following: - -``` -optional group variant_col { - optional binary value; - optional binary metadata; - optional group paths (LIST) { - repeated group list { - optional group element { - optional group a { - optional group typed_value (LIST) { - repeated group list { - optional int64 element; - } - } - } - optional binary untyped_value; - } - optional group b { - optional int64 typed_value; - optional binary untyped_value; - } - } - } - } -} -``` - -This is similar to the previous schema, but the **typed_value** is an entire list. In this case, if the “a” field is an array, but contains values that cannot be stored as int64, then the entire array is stored as a single **untyped_value** binary. We expect the pros and cons of these different schemes to be marginal, and we may choose not to support the latter on the write path. The example mainly serves to demonstrate flexibility in deciding how to shred. +# Forward compatibility +We will follow the convention defined in https://github.com/delta-io/delta/blob/master/protocol_rfcs/variant-type.md#variant-data-in-parquet, and ignore any fields in the same group as typed_value/untyped_value that start with `_` (underscore). +This is intended to allow future backwards-compatible extensions. In particular, the field names `_metadata_key_paths` and any name starting with `_spark` are reserved, and should not be used by other implementations. +Any extra field names that do not start with an underscore should be assumed to be backwards incompatible, and readers should fail when reading such a schema. From fdfed88ea7703ac52e86278854991ed1a191a2e1 Mon Sep 17 00:00:00 2001 From: cashmand Date: Wed, 10 Jul 2024 16:17:28 -0400 Subject: [PATCH 4/4] Minor updates --- common/variant/shredding.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/common/variant/shredding.md b/common/variant/shredding.md index 79efcb6233bf4..ba5e3f8cdcc2b 100644 --- a/common/variant/shredding.md +++ b/common/variant/shredding.md @@ -233,8 +233,12 @@ The second array element can be fully shredded, but the first and third cannot b ] ``` -# Forward compatibility +# Backward and forward compatibility + +Shredding is an optional features of Variant, and readers must continue to be able to read a group containing only a `value` and `metadata` column. We will follow the convention defined in https://github.com/delta-io/delta/blob/master/protocol_rfcs/variant-type.md#variant-data-in-parquet, and ignore any fields in the same group as typed_value/untyped_value that start with `_` (underscore). This is intended to allow future backwards-compatible extensions. In particular, the field names `_metadata_key_paths` and any name starting with `_spark` are reserved, and should not be used by other implementations. Any extra field names that do not start with an underscore should be assumed to be backwards incompatible, and readers should fail when reading such a schema. + +Engines without shredding support are not expected to be able to read Parquet files that use shredding. Since different files may contain conflicting schemas (e.g. a `typed_value` column with incompatible types in two files), it may not be possible to infer or specify a single schema that would allow all Parquet files for a table to be read.