Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use map_partitions to compute LIMIT / OFFSET #517

Merged
merged 2 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 18 additions & 31 deletions dask_sql/physical/rel/logical/limit.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import TYPE_CHECKING

import dask.dataframe as dd
from dask.blockwise import BlockIndex

from dask_sql.datacontainer import DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.physical.rex import RexConverter
from dask_sql.physical.utils.map import map_on_partition_index

if TYPE_CHECKING:
import dask_sql
Expand Down Expand Up @@ -38,25 +38,18 @@ def convert(
if offset:
end += offset

df = self._apply_offset(df, offset, end)
df = self._apply_limit(df, offset, end)

cc = self.fix_column_to_row_type(cc, rel.getRowType())
# No column type has changed, so no need to cast again
return DataContainer(df, cc)

def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame:
def _apply_limit(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame:
"""
Limit the dataframe to the window [offset, end].
That is unfortunately, not so simple as we do not know how many
items we have in each partition. We have therefore no other way than to
calculate (!!!) the sizes of each partition.

After that, we can create a new dataframe from the old
dataframe by calculating for each partition if and how much
it should be used.
We do this via generating our own dask computation graph as
we need to pass the partition number to the selection
function, which is not possible with normal "map_partitions".

Unfortunately, Dask does not currently support row selection through `iloc`, so this must be done using a custom partition function.
However, it is sometimes possible to compute this window using `head` when an `offset` is not specified.
"""
if not offset:
# We do a (hopefully) very quick check: if the first partition
Expand All @@ -65,23 +58,17 @@ def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame
if first_partition_length >= end:
return df.head(end, compute=False)

# First, we need to find out which partitions we want to use.
# Therefore we count the total number of entries
# compute the size of each partition
# TODO: compute `cumsum` here when dask#9067 is resolved
partition_borders = df.map_partitions(lambda x: len(x))

# Now we let each of the partitions figure out, how much it needs to return
# using these partition borders
# For this, we generate out own dask computation graph (as it does not really
# fit well with one of the already present methods).

# (a) we define a method to be calculated on each partition
# This method returns the part of the partition, which falls between [offset, fetch]
# Please note that the dask object "partition_borders", will be turned into
# its pandas representation at this point and we can calculate the cumsum
# (which is not possible on the dask object). Recalculating it should not cost
# us much, as we assume the number of partitions is rather small.
def select_from_to(df, partition_index, partition_borders):
def limit_partition_func(df, partition_index, partition_borders):
"""Limit the partition to values contained within the specified window, returning an empty dataframe if there are none"""

# TODO: remove the `cumsum` call here when dask#9067 is resolved
partition_borders = partition_borders.cumsum().to_dict()
partition_index = partition_index[0]

this_partition_border_left = (
partition_borders[partition_index - 1] if partition_index > 0 else 0
)
Expand All @@ -101,8 +88,8 @@ def select_from_to(df, partition_index, partition_borders):

return df.iloc[from_index:to_index]

# (b) Now we just need to apply the function on every partition
# We do this via the delayed interface, which seems the easiest one.
return map_on_partition_index(
df, select_from_to, partition_borders, meta=df._meta
return df.map_partitions(
limit_partition_func,
BlockIndex(numblocks=(df.npartitions,)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc, the partition_index attribute associates a partition with a number/value. Is there a reason why BlockIndex is preferred in this situation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc, the partition_index attribute associates a partition with a number/value

Correct, partition_index is expected to be the index of the partition within the overall Dask dataframe - the first partition should have partition_index=0, the second partition_index=1, and so on.

Is there a reason why BlockIndex is preferred in this situation?

My understanding here is the "blocks" of a Dask dataframe are its partitions, making BlockIndex roughly synonymous with partition index here.

Chatting with @rjzamora, I now know that another approach to getting partition index would be to add kwarg partition_info to the function, which would then be populated with the partition index, among some other info. Perhaps that approach might be a little clearer to future developers?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference here and BlockIndex getting the same info from dask should be good for the sake of this pr.
For me personally, explicitly mentioning the argument name when calling map_partitions should be enough to give an idea of what this is, something like df.map_partitions(..., partition_index=BlockIndex(numblocks=(df.npartitions,)).

More generally, the reason for my confusion was primarily because I've seen Block like terminology used in the context of IO in dask and partition like terminology used in the context of dataframes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like df.map_partitions(..., partition_index=BlockIndex(numblocks=(df.npartitions,)).

Unfortunately, map_partitions has different handling for args versus kwargs that makes it so we must supply the BlockIndex as an argument in order for it to be computed for the partition functions.

I'll try out partition_info, since it makes it somewhat clearer what is being done and seems to be the more documented way of achieving what we want (getting partition info within the function).

Copy link
Contributor

@rjzamora rjzamora May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using partition_info is fine. Under the hood, it will just add on a BlockwiseDep argument (one that is a bit heavier weight than BlockIndex). I think you should feel comfortable going that route here.

If it turns out that you need to use BlockIndex directly in the future, you can probably make the code a bit more intuitive by naming the argument more clearly:

partition_indices = BlockIndex(numblocks=(df.npartitions,)
df.map_partitions(..., partition_indices)

partition_borders=partition_borders,
)
17 changes: 0 additions & 17 deletions dask_sql/physical/utils/map.py

This file was deleted.