Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support window functions with PARTITION BY clause #299

Closed
Dandandan opened this issue May 9, 2021 · 7 comments · Fixed by #558
Closed

Support window functions with PARTITION BY clause #299

Dandandan opened this issue May 9, 2021 · 7 comments · Fixed by #558
Labels
enhancement New feature or request

Comments

@Dandandan
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Window functions have a PARTITION BY clause to split the data in partitions and calculate window functions over those partitions individually.

Describe the solution you'd like
We can use Repartition::Hash to parallelize the execution.

Describe alternatives you've considered
n/a

Additional context
http://www.vldb.org/pvldb/vol8/p1058-leis.pdf&ved=2ahUKEwj80-3OjrfwAhUJPOwKHfdRAssQFjAMegQIEhAC&usg=AOvVaw2KKUPeYhyc-pEFTmlqyboj

@jimexist
Copy link
Member

jimexist commented May 19, 2021

@jimexist
Copy link
Member

jimexist commented Jun 11, 2021

We can use Repartition::Hash to parallelize the execution.

@Dandandan I don't think it's that straightforward.

when using Repartition we can hash M partitions into N partitions if the aggregation functions can be idempotent, meaning for e.g. accumulator avg it's easy to combine two aggregated results into one ((sum_a + sum_b) / (count_a + count_b)). this is not the same case with window functions, esp. when the function relies on relative positions and sliding windows - you can't just simply split the partition into two and combine the results back into one.

given this, we can parallel inter-partitions if we make sure all rows within a partition is processed together with order preserved, but that also means splitting and deciding on number of partitions requires knowledge of the actual data (i.e. dependencies on &RecordBatch). intra-partition parallelism is even harder because window frame when present can mean serial relationship between rows and it's inherently sequential.

@Dandandan
Copy link
Contributor Author

@jimexist

No, but AFAIK you can pre-partition based on the partition expression, like for example we do for hash joins.

You have to execute the partition too in the implementation of the window functions, but each partition has all of the equal partition values after doing a hash repartition.

So a HashPartition(partition_by_expr) -> Window(partition_by_expr, order_by) (per partition), should be the same as Window(partition_by_expr, order_by) (on 1 partition)

@jimexist
Copy link
Member

jimexist commented Jun 11, 2021

@jimexist

No, but AFAIK you can pre-partition based on the partition expression, like for example we do for hash joins.

You have to execute the partition too in the implementation of the window functions, but each partition has all of the equal partition values after doing a hash repartition.

So a HashPartition(partition_by_expr) -> Window(partition_by_expr, order_by) (per partition), should be the same as Window(partition_by_expr, order_by) (on 1 partition)

i wonder why postgres decided to use sort instead of hash partition regardless

# explain select max(c2) over (partition by c3) from test;
                            QUERY PLAN
-------------------------------------------------------------------
 WindowAgg  (cost=44.96..55.81 rows=620 width=6)
   ->  Sort  (cost=44.96..46.51 rows=620 width=6)
         Sort Key: c3
         ->  Seq Scan on test  (cost=0.00..16.20 rows=620 width=6)
(4 rows)

maybe most likely due to code reuse?

@Dandandan
Copy link
Contributor Author

Dandandan commented Jun 11, 2021

@jimexist
No, but AFAIK you can pre-partition based on the partition expression, like for example we do for hash joins.
You have to execute the partition too in the implementation of the window functions, but each partition has all of the equal partition values after doing a hash repartition.
So a HashPartition(partition_by_expr) -> Window(partition_by_expr, order_by) (per partition), should be the same as Window(partition_by_expr, order_by) (on 1 partition)

i wonder why postgres decided to use sort instead of hash partition regardless

# explain select max(c2) over (partition by c3) from test;
                            QUERY PLAN
-------------------------------------------------------------------
 WindowAgg  (cost=44.96..55.81 rows=620 width=6)
   ->  Sort  (cost=44.96..46.51 rows=620 width=6)
         Sort Key: c3
         ->  Seq Scan on test  (cost=0.00..16.20 rows=620 width=6)
(4 rows)

maybe most likely due to code reuse?

PostgreSQL uses a minimal amount of multithreading, as it is designed mostly for transactional processing (OLTP) on smaller datasets. For execution on one thread, doing extra work would slow it down a bit, so it would be better to not use that at all. For hash join we do the same too, the partitioning is only applied when concurrency>1.
Only for really big tables / costly queries PostgreSQL will opt to use multiple workers (which will be visible in the query plan), but not sure whether it will even use hash repartitioning in that case.

I believe e.g. Spark always does a partitioning based on partition by, which makes it execute much faster / scalable in the presence of a partition by clause as each worker/thread can execute each part individually.

@jimexist
Copy link
Member

jimexist commented Jun 11, 2021

@jimexist
No, but AFAIK you can pre-partition based on the partition expression, like for example we do for hash joins.
You have to execute the partition too in the implementation of the window functions, but each partition has all of the equal partition values after doing a hash repartition.
So a HashPartition(partition_by_expr) -> Window(partition_by_expr, order_by) (per partition), should be the same as Window(partition_by_expr, order_by) (on 1 partition)

i wonder why postgres decided to use sort instead of hash partition regardless

# explain select max(c2) over (partition by c3) from test;
                            QUERY PLAN
-------------------------------------------------------------------
 WindowAgg  (cost=44.96..55.81 rows=620 width=6)
   ->  Sort  (cost=44.96..46.51 rows=620 width=6)
         Sort Key: c3
         ->  Seq Scan on test  (cost=0.00..16.20 rows=620 width=6)
(4 rows)

maybe most likely due to code reuse?

PostgreSQL uses a minimal amount of multithreading, as it is designed mostly for transactional processing (OLTP) on smaller datasets. For execution on one thread, doing extra work would slow it down a bit, so it would be better to not use that at all. For hash join we do the same too, the partitioning is only applied when concurrency>1.
Only for really big tables / costly queries PostgreSQL will opt to use multiple workers (which will be visible in the query plan), but not sure whether it will even use hash repartitioning in that case.

I believe e.g. Spark always does a partitioning based on partition by, which makes it execute much faster / scalable in the presence of a partition by clause as each worker/thread can execute each part individually.

that's a valid point. i guess here my plan is / continues to be:

  1. implement a correct version using global sort that covers partition and order by and window frame
  2. setup integration tests that compare results
  3. come up with a more realistic benchmark dataset that's much larger than 100 rows
  4. migrate to use repartition for inter-partition parallism

@Dandandan
Copy link
Contributor Author

Dandandan commented Jun 12, 2021

That sounds like a great plan @jimexist .

I agree, let's tackle correctness / completeness first, then try to improve performance based on some benchmarks 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants