Skip to content

Commit

Permalink
add partitions to range scan
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Nov 20, 2024
1 parent 2b89ce1 commit 4f1210c
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 14 deletions.
29 changes: 20 additions & 9 deletions daft/io/_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,30 @@
from daft.table.table import Table


def _range_generators(start: int, end: int, step: int) -> Iterator[Callable[[], Iterator[Table]]]:
def generator_for_value(value: int) -> Callable[[], Iterator[Table]]:
def generator() -> Iterator[Table]:
yield Table.from_pydict({"id": [value]})
def _range_generators(start: int, end: int, step: int, partitions: int) -> Iterator[Callable[[], Iterator[Table]]]:
# TODO: Partitioning with range scan is currently untested and unused.
# There may be issues with balanced partitions and step size.

return generator
# Calculate partition bounds upfront
partition_size = (end - start) // partitions
partition_bounds = [
(start + (i * partition_size), start + ((i + 1) * partition_size) if i < partitions - 1 else end)
for i in range(partitions)
]

for value in range(start, end, step):
yield generator_for_value(value)
def generator(partition_idx: int) -> Iterator[Table]:
partition_start, partition_end = partition_bounds[partition_idx]
values = list(range(partition_start, partition_end, step))
yield Table.from_pydict({"id": values})

from functools import partial

for partition_idx in range(partitions):
yield partial(generator, partition_idx)


class RangeScanOperator(GeneratorScanOperator):
def __init__(self, start: int, end: int, step: int = 1) -> None:
def __init__(self, start: int, end: int, step: int = 1, partitions: int = 1) -> None:
schema = Schema._from_field_name_and_types([("id", DataType.int64())])

super().__init__(schema=schema, generators=_range_generators(start, end, step))
super().__init__(schema=schema, generators=_range_generators(start, end, step, partitions))
8 changes: 4 additions & 4 deletions src/daft-connect/src/translation/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ fn range(range: Range) -> eyre::Result<LogicalPlanBuilder> {
num_partitions,
} = range;

if let Some(partitions) = num_partitions {
warn!("{partitions} ignored");
}
let partitions = num_partitions.unwrap_or(1);

ensure!(partitions > 0, "num_partitions must be greater than 0");

let start = start.unwrap_or(0);

Expand All @@ -51,7 +51,7 @@ fn range(range: Range) -> eyre::Result<LogicalPlanBuilder> {
.wrap_err("Failed to get range function")?;

let range = range
.call1((start, end, step))
.call1((start, end, step, partitions))
.wrap_err("Failed to create range scan operator")?
.to_object(py);

Expand Down
1 change: 0 additions & 1 deletion tests/connect/test_range_simple.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

# import time
import pytest
from pyspark.sql import SparkSession

Expand Down
Empty file added xyz
Empty file.

0 comments on commit 4f1210c

Please sign in to comment.