Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization: Avoid sort for already sorted Parquet files that do not overlap values on condition #6672

Open
Tracked by #10313
simonvandel opened this issue Jun 14, 2023 · 8 comments
Labels
question Further information is requested

Comments

@simonvandel
Copy link
Contributor

Describe the bug

I'm testing performance of querying a number of Parquet files, where I can make some assumptions about the Parquet files.

  • Each Parquet file is already sorted on the column "timestamp".
  • Each Parquet file does not overlap values on the column "timestamp". For instance, file A has values for timestamps for 2022, and file B has values for timestamps 2023.

The schema of the files are:

  • "timestamp": TimestampMillisecond
  • "value": Float64

Consider the following query and it's query plan:

SELECT timestamp, value 
FROM samples 
ORDER BY timestamp ASC
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortPreservingMergeExec: [timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=572.526968ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   |   ParquetExec: file_groups={20 groups: [[0.parquet], [1.parquet], [2.parquet], [3.parquet], [4.parquet], ...]}, projection=[timestamp, value], output_ordering=[timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=20ns, num_predicate_creation_errors=0, predicate_evaluation_errors=0, bytes_scanned=57972, page_index_rows_filtered=0, row_groups_pruned=0, pushdown_rows_filtered=0, time_elapsed_processing=51.918935ms, page_index_eval_time=40ns, time_elapsed_scanning_total=48.94925ms, time_elapsed_opening=2.996325ms, time_elapsed_scanning_until_data=48.311008ms, pushdown_eval_time=40ns] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

The 572 milliseconds on the SortPreservingMergeExec seems to be the bottleneck in the query, so I would like to optimize it.

Given the assumptions I can make about the Parquet files, I think that the SortPreservingMergeExec can be replaced by what is essentially a concatenation of each of the Parquet files.

What would be the best approach to remove the SortPreservingMergeExec?
My ideas:

But I would like to hear if there are any better ways.

To Reproduce

No response

Expected behavior

No response

Additional context

No response

@simonvandel simonvandel added the bug Something isn't working label Jun 14, 2023
@alamb
Copy link
Contributor

alamb commented Jun 15, 2023

Given the assumptions I can make about the Parquet files, I think that the SortPreservingMergeExec can be replaced by what is essentially a concatenation of each of the Parquet files.

I agree

I have an idea of implementing a custom PhysicalOptimizerRule that looks for the SortPreservingMergeExec ParquetExec pattern, and replaces it with a concatenation instead.

Yes, I think this would work. We do some similar things in IOx (interestingly also for the timeseries usecase with non-overlapping timeranges).

It was implemented by @crepererum which you can see in https://github.com/influxdata/influxdb_iox/tree/main/iox_query/src/physical_optimizer

Manually re-partition the Parquet files into a single Parquet file using this new API: https://docs.rs/parquet/latest/parquet/file/writer/struct.SerializedRowGroupWriter.html#method.append_column

I think this is likely the solution that would be the fastest for querying because then time predicates could be used to prune out entire row groups and you would have lower file opening overhead

The downside, is of course, you would need to rewrite the parquet files

@alamb alamb added question Further information is requested and removed bug Something isn't working labels Jun 15, 2023
@alamb
Copy link
Contributor

alamb commented Jun 15, 2023

I am marking this as a question as I am not sure it is really a bug -- though please let me know if you disagree

@simonvandel
Copy link
Contributor Author

I think this is likely the solution that would be the fastest for querying because then time predicates could be used to prune out entire row groups and you would have lower file opening overhead

Thanks, I'll try this.

I am marking this as a question as I am not sure it is really a bug -- though please let me know if you disagree

My bad, it was a question.

Although one could argue it is also a feature request for an inbuilt optimization that removes sorts if it can detect non-overlaps using either hints or directly looking at min/max statistics on inputs.
Do you think that is reasonable, or is it too specific for just my use case?

@alamb
Copy link
Contributor

alamb commented Jun 16, 2023

Although one could argue it is also a feature request for an inbuilt optimization that removes sorts if it can detect non-overlaps using either hints or directly looking at min/max statistics on inputs.

Do you think that is reasonable, or is it too specific for just my use case?

I think it is a reasonable request as having data sorted by date is so common, though the trick would be making the API reasonable and general purpose 🤔

@suremarc
Copy link
Contributor

suremarc commented Jun 26, 2023

I have had a somewhat overlapping (no pun intended) issue where DataFusion abandons the SortPreservingMergeStream and does a global sort if there are multiple files in any file groups. It should be possible for DataFusion to realize that, if the files are non-overlapping, the file groups can be re-ordered to satisfy the required output ordering. We would be partitioning a poset of files into a series of chains, where A < B if they are non-overlapping, and every row in A goes before every row in B. Then each chain becomes one file group in the physical plan, which would be read sequentially. Using statistics and partition columns it should be possible to generate a reasonable execution plan without reading any rows.

@alamb
Copy link
Contributor

alamb commented Jun 26, 2023

I have had a somewhat overlapping (no pun intended) issue where DataFusion abandons the SortPreservingMergeStream and does a global sort if there are multiple files in any file groups. It should be possible for DataFusion to realize that, if the files are non-overlapping, the file groups can be re-ordered to satisfy the required output ordering.

Yes, that is correct -- each partition stream from the parquet reader is produced back to back, so if there are multiple files, the resulting stream is not ordered even if all the input files were

We would be partitioning a poset of files into a series of chains, where A < B if they are non-overlapping, and every row in A goes before every row in B.

Indeed as long as each output group was ordered in non overlapping time the parquet reader would not need to be changed at all

@alamb
Copy link
Contributor

alamb commented Nov 27, 2023

FWIW @NGA-TRAN is working on something similar downstream in InfluxDB IOx

@alamb
Copy link
Contributor

alamb commented Apr 30, 2024

FWIW @NGA-TRAN is working on something similar downstream in InfluxDB IOx

Follow up: #10316

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants