Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Fix NotSerializableException when migrating Spark tables #11157

Merged
merged 2 commits into from
Nov 20, 2024

Conversation

manuzhang
Copy link
Collaborator

@manuzhang manuzhang commented Sep 18, 2024

@manuzhang manuzhang force-pushed the fix-migrate-partitioned-tables branch 2 times, most recently from 999c1ba to e4aeb31 Compare September 18, 2024 13:49
@manuzhang manuzhang force-pushed the fix-migrate-partitioned-tables branch from e4aeb31 to 005114b Compare October 6, 2024 16:45
@manuzhang
Copy link
Collaborator Author

@nastra could you help take a look?

@RussellSpitzer RussellSpitzer added this to the Iceberg 1.7.0 milestone Oct 23, 2024
@RussellSpitzer
Copy link
Member

RussellSpitzer commented Oct 24, 2024

@manuzhang Can you summerize the usage of ExecutorService on the Spark Executors? It looks like the current fix involves making a new Executor service per task and i'm not sure that's what we want to do. I wonder if it makes more sense to pass in a Supplier so we don't have to implement a wrapper class.

But before we do that I want to make sure we are using the ExecutorService for the right reasons in the code path.

@manuzhang
Copy link
Collaborator Author

ExecutorService is used to parallelize reading files to build manifests on the Spark executors for Spark table migration procedures (add_files, migrate, snapshot).

@RussellSpitzer
Copy link
Member

RussellSpitzer commented Oct 25, 2024

ExecutorService is used to parallelize reading files to build manifests on the Spark executors for Spark table migration procedures (add_files, migrate, snapshot).

I meant specifically, are we using this in listPartitions or in the FileIndex implementation?

The title of this issue says this is an issue for "partitioned tables" so why does it work in that case but not in this case? Is it because the listPartitions code is using the executor service or what?

@manuzhang
Copy link
Collaborator Author

manuzhang commented Oct 25, 2024

Yes, it's used in listPartitions while the title was not accurate. Migrating unpartitioned Spark tables has the same issue. I've updated the title.

@manuzhang manuzhang changed the title Spark 3.5: Fix NotSerializableException when migrating partitioned Spark tables Spark 3.5: Fix NotSerializableException when migrating Spark tables Oct 25, 2024
@github-actions github-actions bot added the API label Oct 26, 2024
@RussellSpitzer
Copy link
Member

After thinking about this for a while, I think you are probably right that we need to build a specific LazyExecutorService like you did originally. I'm sorry I lend you on a goose chase here, Let's make sure it is Spark specific and doesn't touch any of the other implementations.

@manuzhang
Copy link
Collaborator Author

@RussellSpitzer I can revert to previous commit and this is Spark specific, but can you elaborate on why LazyExecutorService is better thanSerializableSupplier? I agree with you that people can pass around a non-lazy ExecutorService in future implementations.

I submitted #11417 to add warning in the doc since this PR can't get into 1.7.0

@RussellSpitzer
Copy link
Member

RussellSpitzer commented Oct 29, 2024

Main reason was that the API that is specified in the API Module allows withExecutorService(executor service) so unless we want to break that api (which we could) we need to stick with just passing through an executor service. We could alternatively just change the API if you think that's warrrented. If we did that, I'd probably remove "executor service" all together

@manuzhang manuzhang force-pushed the fix-migrate-partitioned-tables branch from ac414d8 to 005114b Compare October 30, 2024 02:22
@github-actions github-actions bot removed the API label Oct 30, 2024
@manuzhang
Copy link
Collaborator Author

@RussellSpitzer I've reverted to lazy executor service. Please check again. Thanks.

@manuzhang
Copy link
Collaborator Author

@RussellSpitzer Can we move this PR forward?

return parallelism == 1 ? null : new ExecutorServiceFactory(parallelism);
}

private static class ExecutorServiceFactory implements ExecutorService, Serializable {
Copy link
Member

@RussellSpitzer RussellSpitzer Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably needs a rename since this doesn't actually make Executor Services, probably just LazyExecutorService?

@@ -971,4 +979,109 @@ public int hashCode() {
return Objects.hashCode(values, uri, format);
}
}

@Nullable
public static ExecutorService executorService(int parallelism) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this specifically makes ExecutorServices with the TableMigrationUtil.migrationService(parallelism); so we should probably indicate that in the name as well. (I think we have thread pool labeling in that method?)

sql("CALL %s.system.migrate(table => '%s', parallelism => %d)", catalogName, tableName, 2);
assertEquals("Procedure output must match", ImmutableList.of(row(2L)), result);
assertEquals(
"Should have expected rows",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick check do these tests fail without this patch? I just want to make sure because I'm pretty sure we are running this code in local mode and I want to make sure the serializers break without this patch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

@RussellSpitzer RussellSpitzer merged commit 799925a into apache:main Nov 20, 2024
49 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

procedure add_files parallelism > 1 -> NotSerializableException
3 participants