Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Streaming executor fixes #5 #32951

Merged
merged 8 commits into from
Mar 5, 2023

Conversation

jianoaix
Copy link
Contributor

@jianoaix jianoaix commented Mar 1, 2023

Why are these changes needed?

Related issue number

#32132

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

cached_metadata = [
self._cached_metadata[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)]
for i in range(len(self._cached_metadata))
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm it seems like this cached_metadata should always be homogeneous, i.e. it should always contain a BlockMetadata for each element. Do you know how this heterogeneity is happening?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set only the first block when figuring out the schema, so e.g. with this:

import ray

inputs = ["example://iris.csv"] * 100
ds = ray.data.read_csv(inputs, parallelism=10)
print("before:", ds._plan._in_blocks._cached_metadata)
ds.schema()
print("after:", ds._plan._in_blocks._cached_metadata)
ds._plan._in_blocks.split(2)

It's producing:

before: [None, None, None, None, None, None, None, None, None, None]
after: [[BlockMetadata(num_rows=1500, size_bytes=66500, schema=sepal.length: double
sepal.width: double
petal.length: double
petal.width: double
variety: string, input_files=array(['/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
       '/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
       '/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
       '/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
       '/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
       '/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
       '/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
       '/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
       '/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
       '/home/ubuntu/ray/python/ray/data/examples/data/iris.csv'],
      dtype='<U55'), exec_stats={'wall_time_s': 0.36291642487049103, 'cpu_time_s': 0.3410033880000001, 'node_id': 'f3e389087180baf4bcde82efe3873d1139be957718e69465786af17d'})], None, None, None, None, None, None, None, None, None]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we pull this out into an array split util and use it for the above splits too?

Otherwise it's not clear this is doing the same thing as array split.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg, done.

@jianoaix jianoaix changed the title [Datasets] Split the list mannually for heterogeneous array [Datasets] Streaming executor fixes #5 Mar 2, 2023
@@ -4579,7 +4579,7 @@ def test_warning_execute_with_no_cpu(ray_start_cluster):
try:
ds = ray.data.range(10)
ds = ds.map_batches(lambda x: x)
ds.take()
ds._plan.execute()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this test is testing ExecutionPlan.execute() so we run it directly. Running ds.take() may invoke the new execution backend when the flag is on.

@jianoaix jianoaix force-pushed the streamingexecfix5 branch from 11c5af8 to 90954c5 Compare March 2, 2023 18:06
@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 2, 2023
@jianoaix jianoaix force-pushed the streamingexecfix5 branch from 90954c5 to 6b7130b Compare March 3, 2023 00:45
@jianoaix jianoaix removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 3, 2023
@jianoaix
Copy link
Contributor Author

jianoaix commented Mar 3, 2023

Tests passing (the failures are relevant to this pr).

@ericl ericl merged commit 303ac3b into ray-project:master Mar 5, 2023
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request Mar 21, 2023
edoakes pushed a commit to edoakes/ray that referenced this pull request Mar 22, 2023
peytondmurray pushed a commit to peytondmurray/ray that referenced this pull request Mar 22, 2023
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request May 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Datasets] LazyBlocklist split fails to split heteroeneous list
3 participants