Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48495][SQL][DOCS] Describe shredding scheme for Variant #46831

Closed
wants to merge 4 commits into from

Conversation

cashmand
Copy link
Contributor

@cashmand cashmand commented May 31, 2024

What changes were proposed in this pull request?

For the Variant data type, we plan to add support for columnar storage formats (e.g. Parquet) to write the data shredded across multiple physical columns, and read only the data required for a given query. This PR merges a document describing the approach we plan to take. We can continue to update it as the implementation progresses.

Why are the changes needed?

When implemented, can allow much better performance when reading from columnar storage. More detail is given in the document.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

It is internal documentation, no testing should be needed.

Was this patch authored or co-authored using generative AI tooling?

No.

@HyukjinKwon
Copy link
Member

Merged to master.

@HyukjinKwon
Copy link
Member

Alright, I had an offline discussion with the PR author. While it looks fine to me, the author would like to have some more feedback and discussion. Respecting that, I will revert and reopen this.

@HyukjinKwon HyukjinKwon reopened this Jun 3, 2024
@HyukjinKwon HyukjinKwon changed the title [SPARK-45891] Describe shredding scheme for Variant [SPARK-48495][SQL] Describe shredding scheme for Variant Jun 3, 2024
@HyukjinKwon HyukjinKwon changed the title [SPARK-48495][SQL] Describe shredding scheme for Variant [SPARK-48495][SQL][DOCS] Describe shredding scheme for Variant Jun 3, 2024
@shaeqahmed
Copy link

I read through the proposal and some thoughts:

It would be really useful to add to this PR a list of ways nested structs (struct-of-structs) and array-of-structs can be represented because this is not clarified. From my understanding, a nested object path value can be represented as either a fully "typed_value", or a variant within a variant (containing a required value/metadata, and optional paths), or by directly nesting into the paths (a.b) without introducing an intermediate definition level.

Looks like the current proposal for columnarization works well if the data in a file mostly has one global structure. However, for heterogenous data sources or data sources with fields that alternate between more than one type of value occasionally, it seems there are limitations such as potentially needing to store the value in the top level value field bag if there is a single type conflict for some deeply nested path.


I would like to propose an alternate way to encode the data that allows for more flexibility in representing nested structures and also allows for more efficient encoding of the data.

  • I propose that simplify the design we require every path part to be immediately be followed by a definition level ($typed_value_*/$untyped_value_variant) that indicates the type of the value at that path, allowing for a fully recursive definition of the variant type as union of the types observed at each path.
  • I also propose that we allow storing the key paths in an untyped value variant separately as a native parquet list to enable field membership checks without having to scan the metadata. In my proposal, the metadata fields are also made optional, which if not present, means that the metadata is encoded in the value.

Simplest variant example representations, according to my proposal:

optional group message { // message: variant (untyped)
    optional group $untyped_value_variant {
        optional binary value;
    }
}
optional group message { // message: string (typed)
    optional binary $typed_value_string;
}

Nested struct example (w/ subcolumnarized paths, nested type conflicts) representation, according to my proposal:

optional group a {
    optional group $typed_value_object {
        optional group b {
            optional group $typed_value_object {
                optional group c {
                    optional group $typed_value_object {
                        optional group d { // d: string | untyped (value+metadata)
                                optional binary $typed_value_string;
                                optional group $untyped_value_variant {
                                     optional binary value;
                                     optional binary metadata; // make metadata optional, if not present, it is included in the value
                                     optional group metadata_key_paths (LIST) { // also allow to optionally store the list of flattened paths in the value as parquet array to enable  dictionary encoding / bloom filters for fast lookup without having to scan the metadata.
                                        repeated group list {
                                            optional binary element;
                                        }
                                    }
                                }
                        }

                        optional group e { // e: untyped (value) | object (subcolumnarized paths e.x, e.y)
                            optional group $untyped_value_variant {
                                optional binary value;
                            }
                            optional group $typed_value_object {
                                optional group x {
                                    optional binary $typed_value_string;
                                }
                                optional group y {
                                    optional int64 $typed_value_int64;
                                }
                            }
                        }

                        optional group f { // f: int64
                            optional int64 $typed_value_int64;
                        }
                    }
                }
            }        
        }
    }
}

NOTE: To reduce the nesting in cases where a field is only present as a single type, a short form could be introduced that allows concatenating the definition level into the path name, making the simplest example representation even compacter:

optional string message.$typed_value_string;

@Samrose-Ahmed
Copy link

Samrose-Ahmed commented Jun 6, 2024

I suggest moving the implementation of this Variant format to a separate repo outside of the Spark project. I see the usage of "Open Variant" instead of "Spark Variant" in recent announcements. Other projects and table formats like Delta Lake and Apache Iceberg have adopted or are considering adopting this format, respectively.

Can you clarify if this is not a Spark specific internal Variant, because it so it would be useful in terms of specification and implementation, and interoperability for this to not be embedded inside of the Apache Spark project.

riyaverm-db pushed a commit to riyaverm-db/spark that referenced this pull request Jun 7, 2024
### What changes were proposed in this pull request?

For the Variant data type, we plan to add support for columnar storage formats (e.g. Parquet) to write the data shredded across multiple physical columns, and read only the data required for a given query. This PR merges a document describing the approach we plan to take. We can continue to update it as the implementation progresses.

### Why are the changes needed?

When implemented, can allow much better performance when reading from columnar storage. More detail is given in the document.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It is internal documentation, no testing should be needed.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46831 from cashmand/SPARK-45891.

Authored-by: cashmand <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@cashmand
Copy link
Contributor Author

cashmand commented Jun 7, 2024

Hi @shaeqahmed, thanks for your detailed response. Your suggestions add a lot of flexibility to the shredding scheme! At the same time, we are wary of adding complexity that could be a burden on implementations to support. Our expectation is that the primary candidate for shredding is data with a fairly uniform sub-structure. In particular, this assumption simplifies the shredding decision and behavior. With a more flexible shredding scheme it also becomes more difficult to decide what to shred since there are significantly more viable options with nuanced tradeoffs. Simplicity in implementation and user-observed behavior is very important to us.

Since the benefits of shredding are both data and workload dependent, could you help us understand concrete query examples for your suggested features? Do you have particular use cases where you expect to write a non-uniform shredding scheme, and get a significant performance benefit?

A few specific points:

In my proposal, the metadata fields are also made optional, which if not present, means that the metadata is encoded in the value.

Do you have specific typed/untyped combinations in mind that are common based on your experience? Adding options to the spec increases the implementation complexity (readers need to support both versions to function correctly), and we’d like to explore the impact of these choices more concretely.
Our motivation for combining the metadata and value is to reduce the size of the Parquet schema. Large schemas can be quite a performance burden due to how Parquet stores its footer, especially for selective queries.

metadata_key_paths

Do you have a concrete query in mind for this feature? My understanding is that it is redundant, and readers could safely ignore it. We’ve purposely designed shredding without redundancy to avoid unexpected increases in storage. In the RFC for Delta here, it mentions struct fields which start with _ (underscore) can be safely ignored. I think we could add that to the spec here, and perhaps reserve _metadata_key_paths as a keyword for a future addition to the spec. As long as readers ignore fields with underscore, it shouldn’t cause any backwards compatibility issues.

union of the types observed at each path

I’d be interested in understanding the expected use cases. Distinguishing different types for scalar fields does not seem to add much value compared to storing mismatched types in an untyped_value column, and adds complexity to the spec and implementation. Could you highlight an example or query pattern where having different typed-values would provide significant benefits over a single typed-value? Our assumption is that one of the types will be most common and shredding should focus on that one.

@cashmand
Copy link
Contributor Author

cashmand commented Jun 7, 2024

Hi @Samrose-Ahmed, our intent is for this to be an open format that other engines can adopt. We're aiming to put common code in a Java library under common/variant, so that other engines can reuse the core operations. At this point, I think it makes more sense to leave it in the Spark repo while we iterate on the implementation. We can choose to pull it out into a separate repo later.

@shaeqahmed
Copy link

Thanks for the response and feedback @cashmand!

Can you please include an example on the document which clarifies how the current proposal deals with nested structs and the general way deeply nested data is expected to be converted from JSON -> the shredded variant form? It is not clear how a nested key path with an object value should be encoded as there are 2-3 implicit ways this can be done IIUC, each with their own tradeoffs: either by adding it as a nested variant (assuming this is supported), or adding the struct directly as a nested key path within the existing path/paths.*, or maybe adding it as a struct typed_value (assuming this is not supported, but it is not actually stated whether or not it is allowed to have a typed value as a struct, and if typed values can be nested within typed values?). Elaborating on this would be useful for readers to understand how this proposal deals with the different cases of structs of structs with structured and unstructured parts.


The real world use case I have in mind is semi structured log analytics, particularly on data that comes from upstream sources that contain heterogenous loosely typed data. A good example of this is AWS Cloudtrail Logs (https://www.databricks.com/blog/2022/06/03/building-etl-pipelines-for-the-cybersecurity-lakehouse-with-delta-live-tables.html), which has variant fields like requestParameters and responseElements whose schema shape is largely relational but directly determined by the AWS service (of which there are a few hundred, the cardinality of the eventProvider field) that a given log row belongs to. Fields like requestParameters and responseElements also contain arbitrary user input that is completely unstructured and such key paths' data should ideally end up stored in an untyped blob field, while all other key paths should be subcolumnarized for performance in analytical queries. The current proposal makes it difficult to encode this data in a subcolumnarized way as there is no single global schema that can be inferred by reading the first N rows from a large batch (e.g a file).

I agree that having more than one typed field for a given key path per smaller batch of rows (e.g. ~10,000) is not necessary,
but the reason for adding this flexibility to the variant representation is that the current proposal does not allow for taking a series of row batches representing different locally discovered schemas and unioning them together to form a file containing a large batch of rows (256MB-10GB) efficiently and without type conflicts. The idea is that the writer should group the rows in smaller batches and sort in a way that is designed to place similarly shaped data closer together in the file.

My proposal is inspired by some of the state of the art research done for the Umbra database in the JSON Tiles paper (https://db.in.tum.de/~durner/papers/json-tiles-sigmod21.pdf), which describes a columnar format and approach designed to exploit implicit schemas in semi structured data and popular real world implementations of Variant such as that in Apache Doris Lakehouse (apache/doris#26225).

In our case for open tables, Parquet v1/v2 has some limitations that must be kept in mind like extra overhead associated with wide tables / too many definition levels (> tens of thousands of columns) and the inability to have a separate subschema per row group which can result in sparse null columns. However, it is still possible to take advantage of subcolumnarization on heterogenous data if the data is laid out correctly so as to maximize RLE on null arrays and using a compact representation that doesn't require an extra definition level (e.g. x.typed_value in the current proposal) for value paths that have no conflicts in a file.

@cashmand
Copy link
Contributor Author

Hi @shaeqahmed, sorry for the delay, and for not replying earlier about how nested structs are handled. I’ll try to update the doc with an example, but in the meantime, the plan is to support two of the cases you described:

  • Adding the struct directly as a nested key path within the existing paths structure is meant to be the primary approach. The example at the end of the doc shows an array-of-struct with this form, but a struct-of-struct would look the same. At any nesting level, if a given key doesn’t exist in the parquet schema, it would be stored in the top-level value binary. A request for any non-leaf field would require checking the top-level value, and merging the result with the shredded values (as described in the pseudo-code in the PR).
  • Adding a nested key path as a nested Variant is supported. This is indicated by just including untyped_value, with no corresponding typed_value. But in this case, it wouldn’t be possible to recursively shred the nested value.

Please let me know if the above is clear, or if I’m misunderstanding the question.

Thanks for describing your use case and the papers you’re referenced. The CloudTrail use case makes a lot of sense, and is definitely one that we should consider carefully. For the current approach, I think it would make sense to shred a field like requestParameters as a Variant binary. This would provide a lot of the benefit, since queries on requestParameters would not need to fetch the top-level binary or any other columns.

I can see that the more flexible schema you’ve proposed could provide better performance for some query patterns, though. At the same time, we’d like to aim to minimize the complexity in the spec, the Parquet footer, and implementation.

I’d like to spend a bit more time looking at the papers you’ve linked to, and considering the trade-offs between the proposals. Can you give us a better idea of what type of queries you expect to see on the read path, and how your scheme would benefit? E.g. would you expect to typically see a mix of queries that need all of requestParameters, and others that only need a field or two? What type of query is likely to benefit significantly from shredding different types (e.g. integer and string) vs. just shredding the most common type, and fetching the rest from the binary? We would like to better understand how the shredding scheme will improve read performance for your workload. Thanks!

@shaeqahmed
Copy link

shaeqahmed commented Jun 17, 2024

Thanks @cashmand and yes that is clear. The type of queries one would expect to see on AWS CloudTrail requestParameters and responseElements would both be (needle in haystack) queries that would filter on just a few nested field paths corresponding to the AWS service, returning all columns (benefits from late materialization) AND analytical dashboards that aggregate/summarize on a few fields such as queries for service level dashboards (e.g. AWS S3, etc.) that perform aggregations on some of the sub fields pertaining to each service within requestParameters and responseElements.

An example of the first type is the following AWS repository containing a variety of search oriented security detection rules: https://github.com/sbasu7241/AWS-Threat-Simulation-and-Detection.

Both of these types of queries (highly selective search & analytical) would benefit massively from shredding/subcolumnarization over being stored as a binary variant due to the compression, statistics, and reduction in data scanned. Note that these fields can contain massive user input strings blobs coming from user input (e.g. a stringified SQL query string as a request parameter https://docs.aws.amazon.com/athena/latest/APIReference/API_StartQueryExecution.html#athena-StartQueryExecution-request-QueryString) alongside compact low cardinality or numerical fields which are useful in a query or a dashboard (e.g. viewing distinct requestParameters.policyDocument.statements[*].action or a search like requestParameters.ipPermissions.items[0].ipRanges.items[0].cidrIp == "127.0.0.1"), which is why shredding is important for performance on this type of semi structured field.

@cashmand
Copy link
Contributor Author

cashmand commented Jul 3, 2024

Hi @shaeqahmed, I updated the scheme based on the discussion above, while still trying to keep the scheme relatively simple. At a high level, I added the option to define one or more of object, array, typed_value or untyped_value at each path segment (including at the top level, rather than having the one-off value/metadata). This provides the flexibility to union multiple schemas, and avoids the problem of having to fetch the top-level value to determine if an intermediate path was only partially shredded.

We decided to allow only one typed_value at each level, rather than providing one per type. The storage overhead of storing alternative scalar values in untyped_value should be fairly low after encoding/compression, and it should still be possible to define custom stats/metadata schemes later if that turns out to be useful for filtering applications.

Please take a look, and let me know if you have more feedback.

Copy link
Contributor

@gene-db gene-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for augmenting the scheme and updating the doc!

Copy link

@shaeqahmed shaeqahmed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @cashmand thanks for updating the scheme. This looks good to me overall just a few thoughts:

  • In the current scheme the parquet files containg a field written as a shredded variant are no longer guaranteed to be readable as a dataset/dataframe using Hive unless the variant resolution logic is implemented in the engine because the schema's cannot be unioned together. The main source of a potential schema conflict is currently the 'paths' key which can either be an array or struct and the 'typed_value' which can represent a different primitive type across different files. I'm wondering if it is desirable it to include the type name in the paths key (e.g. paths.object vs paths.array) and the primitive types (e.g. typed_value_int64 vs typed_value_binary) to preserve the ability to read parquet files written using the shredded variant format in older engines that haven't implemented this type or don't use a table format. If not, we should call this out as a non-goal as part of the design.
  • Would it be useful to include a shorter hand form as part of the design. For a use case like a leaf field called coordinates that represents lat/lon coordinates as a consistent physical type of a primitive array that is always a [int64, int64] tuple, in the current scheme this always requires two extra definition levels, one for .array and one for .typed_value. If we introduce a shorter form that can 'compress' the representation of a typing definition level directly into a parent, one or both of these could be removed which would be good for performance and size. Example:
// current scheme required to represent coordinates: [int64, int64] field
optional group coordinates {
    optional group array (LIST) {
        repeated group list {
            optional group element {
                optional int64 typed_value;
            }
        }
    }
}

// if we allowed collapsing one definition level
optional group coordinates.$array (LIST) {
    repeated group list {
        optional group element {
            optional int64 typed_value;
        }
    }
}

// or even collapsing both for primitive arrays 
optional group coordinates.$array<$int64> (LIST) {
    repeated group list {
        optional int64 element;
    }
}

@cashmand
Copy link
Contributor Author

Hi @shaeqahmed, I think the ability to read with older engines is not a goal, and I don't think it's worth adding extra complexity to the scheme to allow it. I can update the document to make that clear.

I also don't think there's much benefit to collapsing of the levels in the schema. It adds extra complexity to parse and handle that case, and ensure that the meaning of a name can't be ambiguous. I don't think it really saves much, if anything - the number of column chunks won't change, and in the current scheme, marking the intermediate groups as required instead of optional should result in the same column chunk size as if they had been collapsed.

@shaeqahmed
Copy link

@cashmand Ah that makes sense, since marking those intermediate columns as required means the writer does not have to write an extra definition level. Thanks for updating the doc, this looks good to me!

@HyukjinKwon
Copy link
Member

Merged to master.


Consider the following Parquet schema together with how Variant values might be mapped to it. Notice that we represent each shredded field in **object** as a group of two fields, **typed_value** and **untyped_value**. We extract all homogenous data items of a certain path into **typed_value**, and set aside incompatible data items in **untyped_value**. Intuitively, incompatibilities within the same path may occur because we store the shredding schema per Parquet file, and each file can contain several row groups. Selecting a type for each field that is acceptable for all rows would be impractical because it would require buffering the contents of an entire file before writing.

Typically, the expectation is that **untyped_value** exists at every level as an option, along with one of **object**, **array** or **typed_value**. If the actual Variant value contains a type that does not match the provided schema, it is stored in **untyped_value**. An **untyped_value** may also be populated if an object can be partially represented: any fields that are present in the schema must be written to those fields, and any missing fields are written to **untyped_valud**.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo untyped_valud

```

| Variant Value | Top-level untyped_value | b.untyped_value | Non-null in a | Non-null in b.c |
|---------------|--------------------------|---------------|---------------|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbalanced table - #47407


At a high level, we replace the **value** and **metadata** of the Variant Parquet group with one or more fields called **object**, **array**, **typed_value** and **untyped_value**. These represent a fixed schema suitable for constructing the full Variant value for each row.

Shredding lets Spark (or any other query engine) reap the full benefits of Parquet's columnar representation, such as more compact data encoding, min/max statistics for data skipping, and I/O and CPU savings from pruning unnecessary fields not accessed by a query (including the non-shredded Variant binary data).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: and IO and CPU savings

If this was in the comma separated section earlier that would fit better

| null | null | Field is missing in the reconstructed Variant. |
| null | non-null | Field may be any type in the reconstructed Variant. |
| non-null | null | Field has this column’s type in the reconstructed Variant. |
| non-null | non-null | Invalid |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should implementations do when they encounter this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing seems like the right choice.

An alternative might be to prefer untyped_value over the others, which could allow an engine to write some redundant data into the typed column if it knew how to interpret it later. E.g. storing "123" in a string column in addition to keeping it as an integer in the untyped_value column. This seems like added confusion for marginal benefit, though.


The **typed_value** may be absent from the Parquet schema for any field, which is equivalent to its value being always null (in which case the shredded field is always stored as a Variant binary). By the same token, **untyped_value** may be absent, which is equivalent to their value being always null (in which case the field will always be missing or have the type of the **typed_value** column).

The full metadata and value can be reconstructed from **untyped_value** by treating the leading bytes as metadata, and using the header, dictionary size and final dictionary offset to determine the start of the Variant value section. (See the metadata description in the common/variant/README.md for more detail on how to interpret it.) For example, in the binary below, there is a one-element dictionary, and the final offset (`offset[1]`) indicates that the last dictionary entry ends at the second byte. Therefore the full metadata size is six bytes, and the rest is the value section of the Variant.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing to me. I thought that each variant value (top-level) had to share metadata and it was not allowed to have embedded metadata. Can individual variant values within a variant have a new metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata is mainly the dictionary of all object keys. Once we shred, the keys that were associated with shredded fields are no longer needed (since they're stored as field names in the Parquet schema). The keys that are still needed (e.g. for an object that wasn't fully shredded) are stored as needed in the metadata of objects that use those keys. When reconstructing a full Variant value, the metadata will need to be rebuilt as the full value is built up from the shredded components.

I'm open to other ideas, but the main motivation for not leaving a single metadata at the top level is that it would need to be fetched any time we fetch an untyped_value column. In a situation where some portion of the original value has very irregular keys from row to row, this could be a large penalty.

The downsides I can see are:

  1. Needing to inspect and possibly rebuild each untyped_value when reconstructing the full Variant value. I think this is the biggest problem with this approach.
  2. Storing a small metadata next to every untyped_value adds overhead. Experimentally, this seems to be pretty small (~1%) after compression (I tested with snappy and zstd) if the metadata is regular.
  3. If the same key is used in different parts of the schema, they'll be duplicated. This doesn't seem likely to be a major issue, since the point of the metadata is to capture duplication from row to row, or in multiple array elements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to add complexity to the format and cost. When writing, this must produce a filtered dictionary for each value. Those dictionaries may be overlapping and this could increase storage cost. And when reading the dictionaries would need to be merged. I think I prefer the simpler approach of keeping a single metadata/dictionary at the top level and reusing it to avoid all of the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the downsides are relatively small compared to the benefit on the read side can be significant in cases where only some fields are filtered or projected. It's not uncommon to have irregular metadata, making the metadata column quite expensive to fetch. If we need to fetch and decode the entire metadata to decode any untyped_value, it would create a very large penalty to read any scalar field that does not perfectly shred to its target type.


1) All integer and decimal types in Variant are conceptually a single “number” type. The engine may shred any number into the **typed_value** of any other number, provided that no information about the value is lost. For example, the integer 123 may be shredded as Decimal<9, 2>, but 1.23 may not be shredded as any integer type.

2) To ensure that behavior remains unchanged before and after shredding, we will aim to have all Spark expressions that operate on Variant be agnostic to the specific numeric type. For example, `cast(val as string)` should produce the string “123” if `val` is any integer or decimal type that is exactly equal to 123. Note that this is unlike the normal Spark behavior for `decimal` types, which would produce “123.00” for `Decimal<9,2>`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this is something you can guarantee across engines. I don't think the shredding can have an effect on how engines implement casting.

jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
### What changes were proposed in this pull request?

For the Variant data type, we plan to add support for columnar storage formats (e.g. Parquet) to write the data shredded across multiple physical columns, and read only the data required for a given query. This PR merges a document describing the approach we plan to take. We can continue to update it as the implementation progresses.

### Why are the changes needed?

When implemented, can allow much better performance when reading from columnar storage. More detail is given in the document.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It is internal documentation, no testing should be needed.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46831 from cashmand/SPARK-45891.

Authored-by: cashmand <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>

On the other hand, storing arbitrarily casted values in the **typed_value** column could create inconsistent behavior before and after shredding, and could leak behavior from the writing engine to the reading engine. For example, double-to-string casts can produce different results in different engines. Performing such a cast while shredding (even if we somehow retained the knowledge that the original value was a `double`) could result in confusing behavior changes if shredding took place using a different tool from the query engine that wrote it.

Our approach is a pragmatic compromise that allows the use of **typed_value** in cases where the type can be losslessly widened without resulting in a significant difference in the reconstructed Variant:
Copy link
Contributor

@emkornfield emkornfield Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems an alternative here could be to have multiple typed values "typed_value_int32" "typed_value_int64", if necessary. This carries additional overhead, was that overhead deemed unacceptable or too complex? From an implementation standpoint, this seems the simplest conceptually with a lower potential of having details of the variant be lost (e.g. not being careful with a variant cast in one place, or requiring deducing which fields can be placed in the numeric column)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A case we were concerned about is ingesting JSON data, which is expected to be a very common use of Variant. In that situation, there can easily be a variety of decimal precisions and integer sizes for a given field. Adding a separate column for each type has two problems that I can see:

  1. It may add many sparse columns to the parquet schema, which can have performance implications.
  2. Any flow that tries to infer an appropriate parquet schema, without exhaustively sampling the entire file contents could easily miss a particular type, which would then need to be written to the untyped_value, making stats unusable for that column.

I think multiple typed values could be reasonable in some scenarios, but it would be good to clarify the use cases, and be confident that the benefit justifies the extra complexity.

Copy link
Contributor

@emkornfield emkornfield Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thing I was considering was preserving numeric types unambiguously but still have them shredded. After sleeping on it a better solution could be having an "original type column" which stores the type ID of the original numeric type that was shredded. This could be optional if all numeric types were of the same type as parquet schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I think this is my preferred solution to resolving the concerns that you and @rdblue raised.

@Zouxxyy
Copy link
Contributor

Zouxxyy commented Sep 18, 2024

@cashmand Shredding will be a great improvement for variant, looking forward to its implementation! How is its progress so far~ And after reading this document, I have some questions and hope to get help:

  1. Will shredded variant be a new type? Because I see that it is currently a nested and changing Struct type, it is a bit difficult to imagine how to describe it.
  2. For the write side, how is the shredding schema generated adaptively? From the description in the document, it looks dynamic, is it at the table level / file level / or even rowGroup level? And, I see that many layers of nesting are currently designed, does this have an impact on the write overhead.
  3. For the read side, if it is a file-level schema, how should spark integrate it when reading. For example, if we want to obtain a certain path, but if the schemas of different files are different, how should we determine the physical plan.

@cashmand
Copy link
Contributor Author

@Zouxxyy, thanks, these are great questions, which we don't have clear answers for yet, but I'll give you my high-level thoughts.

  1. Will shredded variant be a new type? Because I see that it is currently a nested and changing Struct type, it is a bit difficult to imagine how to describe it.

The intent is for it to not be an entirely new type. For the purposes of describing the desired read/write schema to a data source, I think we might want to do something like extend the current VariantType to specify a shredding schema, but I don't think most of Spark should need to consider it to be a distinct type.

  1. For the write side, how is the shredding schema generated adaptively? From the description in the document, it looks dynamic, is it at the table level / file level / or even rowGroup level? And, I see that many layers of nesting are currently designed, does this have an impact on the write overhead.

The intent is to allow it to vary at the file level. (I don't think row group level is an option for Parquet, since Parquet metadata has a single schema per-file.) The exact mechanism is still up in the air. We could start as simply as having a single user-specified schema per write job, but ultimately I think we'd like to either see Spark determine a shredding schema dynamically, or provide the flexibility in the data source API to allow connectors to determine a shredding schema.

  1. For the read side, if it is a file-level schema, how should spark integrate it when reading. For example, if we want to obtain a certain path, but if the schemas of different files are different, how should we determine the physical plan.

Also a tough question. I think we'll either need the Parquet reader to handle the per-file manipulation, or provide an API to allow data sources to inject per-file expressions to produce the data needed by the query. (This could be useful in other scenarios like type widening, which might have data source specific requirements.) We're still looking into what the best approach is here.

@Zouxxyy
Copy link
Contributor

Zouxxyy commented Sep 20, 2024

@cashmand Got it, thanks for the answer. The reading and writing of shredding scheme is implicit in the variant type, and the user is unaware of it, which is user-friendly. looking forward to the final implementation !

1) In a case where there are rare type mismatches (for example, a numeric field with rare strings like “n/a”), we allow the field to be shredded, which could still be a significant performance benefit compared to fetching and decoding the full value/metadata binary.
2) Since there is a single schema per file, there would be no easy way to recover from a type mismatch encountered late in a file write. Parquet files can be large, and buffering all file data before starting to write could be expensive. Including an untyped column for every field guarantees we can adhere to the requested shredding schema.

The **untyped_value** is stored in a single binary column, rather than storing the value and metadata separately as is done in the unshredded binary format. The motivation for storing them separately for unshredded data is that this lets the engine encode and compress the metadata more efficiently when the fields are consistent across rows. We chose to combine them in the shredded fields: we expect the encoding/compression benefit to be lower, since in the case of uniform data, the values should be stored in typed columns. Using a single binary reduces the extra metadata required in the Parquet schema, which can be significant in some cases.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for storing them separately for unshredded data is that this lets the engine encode and compress the metadata more efficiently when the fields are consistent across rows.

Hi @cashmand Is there any performance comparison for this part? We are considering introducing variants into Apache Paimon as a new type. If variant is a single binary column, it will be easier to integrate. If it is a group containing value and metadata, the integration cost will be higher, such as: dedicated Unsafe Row reader writer, dedicated ColumnVector for reader/writer, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Zouxxyy, sorry for the delayed response. I don't have specific numbers to share, but we did measure moderate file size benefit from keeping them separate. I think the two main benefits are:

  1. For data with a uniform schema, the metadata can be dictionary-encoded.
  2. Even if the metadata is less uniform, compression tends to be more effective when the value and metadata binaries are separate, since they don't tend to have much commonality.

The results are likely to be very data-dependent, though. It might also be sensitive to the compression codec used, which we didn't look at.

attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
### What changes were proposed in this pull request?

For the Variant data type, we plan to add support for columnar storage formats (e.g. Parquet) to write the data shredded across multiple physical columns, and read only the data required for a given query. This PR merges a document describing the approach we plan to take. We can continue to update it as the implementation progresses.

### Why are the changes needed?

When implemented, can allow much better performance when reading from columnar storage. More detail is given in the document.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It is internal documentation, no testing should be needed.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46831 from cashmand/SPARK-45891.

Authored-by: cashmand <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
### What changes were proposed in this pull request?

For the Variant data type, we plan to add support for columnar storage formats (e.g. Parquet) to write the data shredded across multiple physical columns, and read only the data required for a given query. This PR merges a document describing the approach we plan to take. We can continue to update it as the implementation progresses.

### Why are the changes needed?

When implemented, can allow much better performance when reading from columnar storage. More detail is given in the document.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It is internal documentation, no testing should be needed.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46831 from cashmand/SPARK-45891.

Authored-by: cashmand <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants