Skip to content

Commit

Permalink
[Data] Make ExecutionPlan.execute return RefBundle instead of `Bl…
Browse files Browse the repository at this point in the history
…ockList` (ray-project#45852)

This PR is part of a larger effort to remove `LazyBlockList`.

Currently, `ExecutionPlan.execute()` doesn't always perform execution.
Sometimes, it skips execution and returns a `LazyBlockList`. This
`LazyBlockList` has a code path separate from the standard streaming
execution code path to load data.

This PR updates the `execute()` implementation so that it always
actually perform execution, and returns a `RefBundle` rather than a
`BlockList`.

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Co-authored-by: Scott Lee <[email protected]>
  • Loading branch information
bveeramani and scottjlee authored Jun 12, 2024
1 parent e4b68f4 commit a1ccd21
Show file tree
Hide file tree
Showing 22 changed files with 270 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,7 @@ def num_rows(self) -> Optional[int]:
actual computation.
"""
return None

def input_files(self) -> Optional[List[str]]:
"""The input files of this operator, or ``None`` if not known."""
return None
5 changes: 4 additions & 1 deletion python/ray/data/_internal/logical/operators/read_operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, List, Optional, Union

from ray.data._internal.logical.operators.map_operator import AbstractMap
from ray.data.datasource.datasource import Datasource, Reader
Expand Down Expand Up @@ -48,3 +48,6 @@ def schema(self):

def num_rows(self):
return self._datasource.num_rows()

def input_files(self) -> Optional[List[str]]:
return self._datasource.input_files()
116 changes: 25 additions & 91 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import copy
import itertools
import logging
from typing import TYPE_CHECKING, Iterator, Optional, Tuple, Type, Union
from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple, Type, Union

import pyarrow

Expand Down Expand Up @@ -90,6 +90,8 @@ def __init__(
self._run_by_consumer = run_by_consumer
self._dataset_name = None

self._has_started_execution = False

if data_context is None:
# Snapshot the current context, so that the config of Datasets is always
# determined by the config at the time it was created.
Expand Down Expand Up @@ -124,10 +126,7 @@ def get_plan_as_string(self, dataset_cls: Type["Dataset"]) -> str:
plan_str = ""
plan_max_depth = 0
dataset_blocks = None
if (
self._snapshot_bundle is None
or self._snapshot_operator != self._logical_plan.dag
):
if not self.has_computed_output():

def generate_logical_plan_string(
op: LogicalOperator,
Expand Down Expand Up @@ -159,21 +158,12 @@ def generate_logical_plan_string(
self._logical_plan.dag
)

# Get schema of initial blocks.
if self.needs_eager_execution():
# In the case where the plan contains only a Read/From operator,
# it is cheap to execute it.
# This allows us to get the most accurate estimates related
# to the dataset, after applying execution plan optimizer rules
# (e.g. number of blocks may change based on parallelism).
self.execute()
if self._snapshot_blocks is not None:
schema = self._get_unified_blocks_schema(
self._snapshot_blocks, fetch_if_missing=False
)
dataset_blocks = self._snapshot_blocks
else:
assert self._in_blocks is not None
schema = self._get_unified_blocks_schema(
self._in_blocks, fetch_if_missing=False
)
Expand Down Expand Up @@ -359,10 +349,7 @@ def schema(
return self._schema

schema = None
if (
self._snapshot_bundle is not None
and not self._snapshot_operator.output_dependencies
):
if self.has_computed_output():
schema = unify_block_metadata_schema(self._snapshot_bundle.metadata)
elif self._logical_plan.dag.schema() is not None:
schema = self._logical_plan.dag.schema()
Expand Down Expand Up @@ -390,6 +377,10 @@ def schema(
def cache_schema(self, schema: Union[type, "pyarrow.lib.Schema"]):
self._schema = schema

def input_files(self) -> Optional[List[str]]:
"""Get the input files of the dataset, if available."""
return self._logical_plan.dag.input_files()

def _get_unified_blocks_schema(
self, blocks: BlockList, fetch_if_missing: bool = False
) -> Union[type, "pyarrow.lib.Schema"]:
Expand All @@ -413,14 +404,6 @@ def _get_unified_blocks_schema(
return unified_schema
if not fetch_if_missing:
return None
# Synchronously fetch the schema.
# For lazy block lists, this launches read tasks and fetches block metadata
# until we find the first valid block schema. This is to minimize new
# computations when fetching the schema.
for _, m in blocks.iter_blocks_with_metadata():
if m.schema is not None and (m.num_rows is None or m.num_rows > 0):
return m.schema
return None

def meta_count(self) -> Optional[int]:
"""Get the number of rows after applying all plan optimizations, if possible.
Expand All @@ -430,10 +413,7 @@ def meta_count(self) -> Optional[int]:
Returns:
The number of records of the result Dataset, or None.
"""
if (
self._snapshot_bundle is not None
and not self._snapshot_operator.output_dependencies
):
if self.has_computed_output():
num_rows = sum(m.num_rows for m in self._snapshot_bundle.metadata)
elif self._logical_plan.dag.num_rows() is not None:
num_rows = self._logical_plan.dag.num_rows()
Expand Down Expand Up @@ -468,18 +448,14 @@ def execute_to_iterator(
Returns:
Tuple of iterator over output blocks and the executor.
"""
self._has_started_execution = True

# Always used the saved context for execution.
ctx = self._context

if self.has_computed_output():
return (
self.execute(
allow_clear_input_blocks, force_read=False
).iter_blocks_with_metadata(),
self._snapshot_stats,
None,
)
bundle = self.execute(allow_clear_input_blocks)
return iter(bundle.blocks), self._snapshot_stats, None

from ray.data._internal.execution.legacy_compat import (
execute_to_legacy_block_iterator,
Expand Down Expand Up @@ -508,20 +484,19 @@ def execute_to_iterator(
def execute(
self,
allow_clear_input_blocks: bool = True,
force_read: bool = False,
preserve_order: bool = False,
) -> BlockList:
) -> RefBundle:
"""Execute this plan.
Args:
allow_clear_input_blocks: Whether we should try to clear the input blocks
for each operator.
force_read: Whether to force the read operator to fully execute.
preserve_order: Whether to preserve order in execution.
Returns:
The blocks of the output dataset.
"""
self._has_started_execution = True

# Always used the saved context for execution.
context = self._context
Expand All @@ -540,7 +515,6 @@ def execute(
from ray.data._internal.execution.legacy_compat import (
_get_initial_stats_from_plan,
execute_to_legacy_block_list,
get_legacy_lazy_block_list_read_only,
)

if self.is_from_in_memory_only():
Expand All @@ -549,12 +523,6 @@ def execute(
# recording unnecessary metrics for an empty plan execution.
blocks = self._in_blocks
stats = _get_initial_stats_from_plan(self)
elif self.is_read_only():
# If the Dataset is read-only, get the LazyBlockList without
# executing the plan by only fetching metadata available from
# the input Datasource or Reader without executing its ReadTasks.
blocks = get_legacy_lazy_block_list_read_only(self)
stats = _get_initial_stats_from_plan(self)
else:
from ray.data._internal.execution.streaming_executor import (
StreamingExecutor,
Expand Down Expand Up @@ -618,36 +586,19 @@ def collect_stats(cur_stats):

# Set the snapshot to the output of the final operator.
self._snapshot_blocks = blocks
if not isinstance(blocks, LazyBlockList):
self._snapshot_bundle = RefBundle(
tuple(blocks.iter_blocks_with_metadata()),
owns_blocks=blocks._owned_by_consumer,
)
self._snapshot_bundle = RefBundle(
tuple(blocks.iter_blocks_with_metadata()),
owns_blocks=blocks._owned_by_consumer,
)
self._snapshot_operator = self._logical_plan.dag
self._snapshot_stats = stats
self._snapshot_stats.dataset_uuid = self._dataset_uuid
return self._snapshot_bundle

# In the case of a read-only dataset, we replace the
# input LazyBlockList with a copy that includes the
# calculated metadata from initializing the InputDataBuffer.
if self.is_read_only():
self._in_blocks = blocks
if isinstance(self._snapshot_blocks, LazyBlockList) and force_read:
executed_blocks = self._snapshot_blocks.compute_to_blocklist()
# After executing the snapshot blocks, get its updated stats.
# The snapshot blocks after execution will contain the execution stats.
self._snapshot_stats = self._snapshot_blocks.stats()
self._snapshot_blocks = executed_blocks
assert not isinstance(executed_blocks, LazyBlockList), type(executed_blocks)
self._snapshot_bundle = RefBundle(
tuple(executed_blocks.iter_blocks_with_metadata()),
owns_blocks=executed_blocks._owned_by_consumer,
)
self._snapshot_operator = self._logical_plan.dag
# When force-read is enabled, we similarly update self._in_blocks.
if self.is_read_only():
self._in_blocks = self._snapshot_blocks
return self._snapshot_blocks
@property
def has_started_execution(self) -> bool:
"""Return ``True`` if this plan has been partially or fully executed."""
return self._has_started_execution

def clear_block_refs(self) -> None:
"""Clear all cached block references of this plan, including input blocks.
Expand Down Expand Up @@ -679,22 +630,6 @@ def has_lazy_input(self) -> bool:
"""Return whether this plan has lazy input blocks."""
return isinstance(self._in_blocks, LazyBlockList)

def needs_eager_execution(self, root_op: Optional[LogicalOperator] = None) -> bool:
"""Return whether the LogicalPlan corresponding to `root_op`
should be eagerly executed. By default, the last operator of
the LogicalPlan is used.
This is often useful for input/read-only plans,
where eager execution fetches accurate metadata for the dataset
without executing the underlying read tasks."""
if root_op is None:
root_op = self._logical_plan.dag
# Since read tasks will not be scheduled until data is consumed or materialized,
# it is cheap to execute the plan (i.e. run the plan optimizer).
# In the case where the data is already in-memory (InputData,
# FromXXX operator), it is similarly also cheap to execute it.
return self.is_from_in_memory_only(root_op) or self.is_read_only(root_op)

def is_read_only(self, root_op: Optional[LogicalOperator] = None) -> bool:
"""Return whether the LogicalPlan corresponding to `root_op`
contains only a Read op. By default, the last operator of
Expand All @@ -721,8 +656,7 @@ def has_computed_output(self) -> bool:
output of this plan.
"""
return (
self._snapshot_blocks is not None
and not self._snapshot_blocks.is_cleared()
self._snapshot_bundle is not None
and self._snapshot_operator == self._logical_plan.dag
)

Expand Down
Loading

0 comments on commit a1ccd21

Please sign in to comment.