Skip to content

Commit

Permalink
Further increase transient expansion service timeout (#25176)
Browse files Browse the repository at this point in the history
* Increase transient expansion service timeout
  • Loading branch information
Abacn authored Jan 26, 2023
1 parent 48712b2 commit d39e9c5
Showing 1 changed file with 3 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,11 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Wrapper for invoking external Python transforms. */
public class PythonExternalTransform<InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {

private static final Logger LOG = LoggerFactory.getLogger(PythonExternalTransform.class);

private static final SchemaRegistry SCHEMA_REGISTRY = SchemaRegistry.createDefault();
private String fullyQualifiedName;

Expand Down Expand Up @@ -448,7 +444,7 @@ public OutputT expand(InputT input) {
PythonService.waitForPort(
Iterables.get(Splitter.on(':').split(expansionService), 0),
Integer.parseInt(Iterables.get(Splitter.on(':').split(expansionService), 1)),
60000);
15000);
return apply(input, expansionService, payload);
} else {
int port = PythonService.findAvailablePort();
Expand All @@ -472,13 +468,8 @@ public OutputT expand(InputT input) {
"apache_beam.runners.portability.expansion_service_main", args.build())
.withExtraPackages(extraPackages);
try (AutoCloseable p = service.start()) {
// allow more time for service with extra packages to response.
int timeoutSeconds = extraPackages.isEmpty() ? 15 : 30;
LOG.info(
"Expanding Python external transform {} using default transient expansion service with timeout {}s.",
fullyQualifiedName,
timeoutSeconds);
PythonService.waitForPort("localhost", port, timeoutSeconds * 1000);
// allow more time waiting for the port ready for transient expansion service setup.
PythonService.waitForPort("localhost", port, 60000);
return apply(input, String.format("localhost:%s", port), payload);
}
}
Expand Down

0 comments on commit d39e9c5

Please sign in to comment.