-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Staging in parallel #157
Staging in parallel #157
Conversation
Signed-off-by: Hongxin Liang <[email protected]>
@@ -73,7 +74,8 @@ public Integer call() { | |||
|
|||
try (FlyteAdminClient adminClient = | |||
FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), tokenSource)) { | |||
Supplier<ArtifactStager> stagerSupplier = () -> ArtifactStager.create(config, modules); | |||
Supplier<ArtifactStager> stagerSupplier = | |||
() -> ArtifactStager.create(config, modules, new ForkJoinPool()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems good enough to use default configuration of ForkJoinPool
.
@@ -116,6 +147,7 @@ void stageArtifact(Artifact artifact, ByteSource content) { | |||
throw new UncheckedIOException(e); | |||
} | |||
} else { | |||
LOG.info("[{}] already staged to [{}]", artifact.name(), artifact.location()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid confusion that users might think jflyte always stages files no matter they exist at the destination or not.
for (int i = 0; i < futures.size(); ++i) { | ||
try { | ||
artifacts.add(futures.get(i).get()); | ||
} catch (InterruptedException | ExecutionException e) { | ||
if (e instanceof InterruptedException) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
for (int j = i; j < futures.size(); ++j) { | ||
futures.get(j).cancel(true); | ||
} | ||
|
||
throw new RuntimeException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for (int i = 0; i < futures.size(); ++i) { | |
try { | |
artifacts.add(futures.get(i).get()); | |
} catch (InterruptedException | ExecutionException e) { | |
if (e instanceof InterruptedException) { | |
Thread.currentThread().interrupt(); | |
} | |
for (int j = i; j < futures.size(); ++j) { | |
futures.get(j).cancel(true); | |
} | |
throw new RuntimeException(e); | |
} | |
List<Artifact> artifacts = getAll(futures); |
Can we hide all the Future get and exception handling inside and auxiliary method? Maybe CompletableFutures.getAll
to mimic the Spotify library.
Is that we are mingling all that threading low level code inside the method that deals with files and artifacts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that getAll
thing but then stageFiles
became almost empty then I thought it didn't increase readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot use CompletableFutures because we want to do cancellation.
Signed-off-by: Hongxin Liang <[email protected]>
Signed-off-by: Hongxin Liang <[email protected]>
Signed-off-by: Hongxin Liang <[email protected]>
* Staging in parallel Signed-off-by: Hongxin Liang <[email protected]> Signed-off-by: Andres Gomez Ferrer <[email protected]>
TL;DR
Staging artifacts in parallel.
Type
Are all requirements met?
The happy path is covered by integration tests.
Complete description
Currently artifacts are staged sequentially, which is very slow due to IO blocking. By staging them in multiple threads, the throughput can be largely improved.
Tracking Issue
Closes flyteorg/flyte#3146
Follow-up issue
NA