Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use map_partitions to compute LIMIT / OFFSET #517

Merged
merged 2 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 18 additions & 31 deletions dask_sql/physical/rel/logical/limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dask_sql.datacontainer import DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.physical.rex import RexConverter
from dask_sql.physical.utils.map import map_on_partition_index

if TYPE_CHECKING:
import dask_sql
Expand Down Expand Up @@ -38,25 +37,18 @@ def convert(
if offset:
end += offset

df = self._apply_offset(df, offset, end)
df = self._apply_limit(df, offset, end)

cc = self.fix_column_to_row_type(cc, rel.getRowType())
# No column type has changed, so no need to cast again
return DataContainer(df, cc)

def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame:
def _apply_limit(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame:
"""
Limit the dataframe to the window [offset, end].
That is unfortunately, not so simple as we do not know how many
items we have in each partition. We have therefore no other way than to
calculate (!!!) the sizes of each partition.

After that, we can create a new dataframe from the old
dataframe by calculating for each partition if and how much
it should be used.
We do this via generating our own dask computation graph as
we need to pass the partition number to the selection
function, which is not possible with normal "map_partitions".

Unfortunately, Dask does not currently support row selection through `iloc`, so this must be done using a custom partition function.
However, it is sometimes possible to compute this window using `head` when an `offset` is not specified.
"""
if not offset:
# We do a (hopefully) very quick check: if the first partition
Expand All @@ -65,23 +57,19 @@ def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame
if first_partition_length >= end:
return df.head(end, compute=False)

# First, we need to find out which partitions we want to use.
# Therefore we count the total number of entries
# compute the size of each partition
# TODO: compute `cumsum` here when dask#9067 is resolved
partition_borders = df.map_partitions(lambda x: len(x))

# Now we let each of the partitions figure out, how much it needs to return
# using these partition borders
# For this, we generate out own dask computation graph (as it does not really
# fit well with one of the already present methods).

# (a) we define a method to be calculated on each partition
# This method returns the part of the partition, which falls between [offset, fetch]
# Please note that the dask object "partition_borders", will be turned into
# its pandas representation at this point and we can calculate the cumsum
# (which is not possible on the dask object). Recalculating it should not cost
# us much, as we assume the number of partitions is rather small.
def select_from_to(df, partition_index, partition_borders):
def limit_partition_func(df, partition_borders, partition_info=None):
"""Limit the partition to values contained within the specified window, returning an empty dataframe if there are none"""

# TODO: remove the `cumsum` call here when dask#9067 is resolved
partition_borders = partition_borders.cumsum().to_dict()
partition_index = (
partition_info["number"] if partition_info is not None else 0
)

this_partition_border_left = (
partition_borders[partition_index - 1] if partition_index > 0 else 0
)
Expand All @@ -101,8 +89,7 @@ def select_from_to(df, partition_index, partition_borders):

return df.iloc[from_index:to_index]

# (b) Now we just need to apply the function on every partition
# We do this via the delayed interface, which seems the easiest one.
return map_on_partition_index(
df, select_from_to, partition_borders, meta=df._meta
return df.map_partitions(
limit_partition_func,
partition_borders=partition_borders,
)
17 changes: 0 additions & 17 deletions dask_sql/physical/utils/map.py

This file was deleted.