Skip to content

Commit

Permalink
Merge pull request #33026: Update Java ExpansionService to use arbitr…
Browse files Browse the repository at this point in the history
…ary PipelineOptions set through an ExpansionRequest
  • Loading branch information
chamikaramj authored Nov 7, 2024
2 parents 6742499 + 3a941c0 commit 8a920c1
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class PipelineOptionsTranslation {
new ObjectMapper()
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));

public static final String PIPELINE_OPTIONS_URN_PREFIX = "beam:option:";
public static final String PIPELINE_OPTIONS_URN_SUFFIX = ":v1";

/** Converts the provided {@link PipelineOptions} to a {@link Struct}. */
public static Struct toProto(PipelineOptions options) {
Struct.Builder builder = Struct.newBuilder();
Expand All @@ -65,9 +68,9 @@ public static Struct toProto(PipelineOptions options) {
while (optionsEntries.hasNext()) {
Map.Entry<String, JsonNode> entry = optionsEntries.next();
optionsUsingUrns.put(
"beam:option:"
PIPELINE_OPTIONS_URN_PREFIX
+ CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, entry.getKey())
+ ":v1",
+ PIPELINE_OPTIONS_URN_SUFFIX,
entry.getValue());
}

Expand All @@ -92,7 +95,9 @@ public static PipelineOptions fromProto(Struct protoOptions) {
mapWithoutUrns.put(
CaseFormat.LOWER_UNDERSCORE.to(
CaseFormat.LOWER_CAMEL,
optionKey.substring("beam:option:".length(), optionKey.length() - ":v1".length())),
optionKey.substring(
PIPELINE_OPTIONS_URN_PREFIX.length(),
optionKey.length() - PIPELINE_OPTIONS_URN_SUFFIX.length())),
optionValue);
}
return MAPPER.readValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
Expand Down Expand Up @@ -535,7 +534,7 @@ private static <ConfigT> void invokeSetter(ConfigT config, @Nullable Object valu
}

private @MonotonicNonNull Map<String, TransformProvider> registeredTransforms;
private final PipelineOptions pipelineOptions;
private final PipelineOptions commandLineOptions;
private final @Nullable String loopbackAddress;

public ExpansionService() {
Expand All @@ -551,7 +550,7 @@ public ExpansionService(PipelineOptions opts) {
}

public ExpansionService(PipelineOptions opts, @Nullable String loopbackAddress) {
this.pipelineOptions = opts;
this.commandLineOptions = opts;
this.loopbackAddress = loopbackAddress;
}

Expand Down Expand Up @@ -587,12 +586,15 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
request.getTransform().getSpec().getUrn());
LOG.debug("Full transform: {}", request.getTransform());
Set<String> existingTransformIds = request.getComponents().getTransformsMap().keySet();
Pipeline pipeline =
createPipeline(PipelineOptionsTranslation.fromProto(request.getPipelineOptions()));

PipelineOptions pipelineOptionsFromRequest =
PipelineOptionsTranslation.fromProto(request.getPipelineOptions());
Pipeline pipeline = createPipeline(pipelineOptionsFromRequest);

boolean isUseDeprecatedRead =
ExperimentalOptions.hasExperiment(pipelineOptions, "use_deprecated_read")
ExperimentalOptions.hasExperiment(commandLineOptions, "use_deprecated_read")
|| ExperimentalOptions.hasExperiment(
pipelineOptions, "beam_fn_api_use_deprecated_read");
commandLineOptions, "beam_fn_api_use_deprecated_read");
if (!isUseDeprecatedRead) {
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
Expand Down Expand Up @@ -629,7 +631,7 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
if (transformProvider == null) {
if (getUrn(ExpansionMethods.Enum.JAVA_CLASS_LOOKUP).equals(urn)) {
AllowList allowList =
pipelineOptions.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlist();
commandLineOptions.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlist();
assert allowList != null;
transformProvider = new JavaClassLookupTransformProvider(allowList);
} else if (getUrn(SCHEMA_TRANSFORM).equals(urn)) {
Expand Down Expand Up @@ -671,7 +673,7 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
RunnerApi.Environment defaultEnvironment =
Environments.createOrGetDefaultEnvironment(
pipeline.getOptions().as(PortablePipelineOptions.class));
if (pipelineOptions.as(ExpansionServiceOptions.class).getAlsoStartLoopbackWorker()) {
if (commandLineOptions.as(ExpansionServiceOptions.class).getAlsoStartLoopbackWorker()) {
PortablePipelineOptions externalOptions =
PipelineOptionsFactory.create().as(PortablePipelineOptions.class);
externalOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_EXTERNAL);
Expand Down Expand Up @@ -723,35 +725,34 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
}

protected Pipeline createPipeline(PipelineOptions requestOptions) {
// TODO: [https://github.com/apache/beam/issues/21064]: implement proper validation
PipelineOptions effectiveOpts = PipelineOptionsFactory.create();
PortablePipelineOptions portableOptions = effectiveOpts.as(PortablePipelineOptions.class);
PortablePipelineOptions specifiedOptions = pipelineOptions.as(PortablePipelineOptions.class);
Optional.ofNullable(specifiedOptions.getDefaultEnvironmentType())
.ifPresent(portableOptions::setDefaultEnvironmentType);
Optional.ofNullable(specifiedOptions.getDefaultEnvironmentConfig())
.ifPresent(portableOptions::setDefaultEnvironmentConfig);
List<String> filesToStage = specifiedOptions.getFilesToStage();
// We expect the ExpansionRequest to contain a valid set of options to be used for this
// expansion.
// Additionally, we override selected options using options values set via command line or
// ExpansionService wide overrides.

PortablePipelineOptions requestPortablePipelineOptions =
requestOptions.as(PortablePipelineOptions.class);
PortablePipelineOptions commandLinePortablePipelineOptions =
commandLineOptions.as(PortablePipelineOptions.class);
Optional.ofNullable(commandLinePortablePipelineOptions.getDefaultEnvironmentType())
.ifPresent(requestPortablePipelineOptions::setDefaultEnvironmentType);
Optional.ofNullable(commandLinePortablePipelineOptions.getDefaultEnvironmentConfig())
.ifPresent(requestPortablePipelineOptions::setDefaultEnvironmentConfig);
List<String> filesToStage = commandLinePortablePipelineOptions.getFilesToStage();
if (filesToStage != null) {
effectiveOpts.as(PortablePipelineOptions.class).setFilesToStage(filesToStage);
requestPortablePipelineOptions
.as(PortablePipelineOptions.class)
.setFilesToStage(filesToStage);
}
effectiveOpts
requestPortablePipelineOptions
.as(ExperimentalOptions.class)
.setExperiments(pipelineOptions.as(ExperimentalOptions.class).getExperiments());
effectiveOpts.setRunner(NotRunnableRunner.class);
effectiveOpts
.setExperiments(commandLineOptions.as(ExperimentalOptions.class).getExperiments());
requestPortablePipelineOptions.setRunner(NotRunnableRunner.class);
requestPortablePipelineOptions
.as(ExpansionServiceOptions.class)
.setExpansionServiceConfig(
pipelineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig());
// TODO(https://github.com/apache/beam/issues/20090): Figure out the correct subset of options
// to propagate.
if (requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion() != null) {
effectiveOpts
.as(StreamingOptions.class)
.setUpdateCompatibilityVersion(
requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion());
}
return Pipeline.create(effectiveOpts);
commandLineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig());
return Pipeline.create(requestOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.expansion.service;

import static org.apache.beam.sdk.util.construction.PipelineOptionsTranslation.PIPELINE_OPTIONS_URN_PREFIX;
import static org.apache.beam.sdk.util.construction.PipelineOptionsTranslation.PIPELINE_OPTIONS_URN_SUFFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -49,6 +51,8 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
Expand All @@ -58,15 +62,20 @@
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Value;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Resources;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/** Tests for {@link ExpansionService}. */
Expand All @@ -76,6 +85,7 @@
public class ExpansionServiceTest {

private static final String TEST_URN = "test:beam:transforms:count";
private static final String TEST_OPTIONS_URN = "test:beam:transforms:test_options";

private static final String TEST_NAME = "TestName";

Expand All @@ -98,9 +108,59 @@ public class ExpansionServiceTest {
@AutoService(ExpansionService.ExpansionServiceRegistrar.class)
public static class TestTransformRegistrar implements ExpansionService.ExpansionServiceRegistrar {

static final String EXPECTED_STRING_VALUE = "abcde";
static final Boolean EXPECTED_BOOLEAN_VALUE = true;
static final Integer EXPECTED_INTEGER_VALUE = 12345;

@Override
public Map<String, TransformProvider> knownTransforms() {
return ImmutableMap.of(TEST_URN, (spec, options) -> Count.perElement());
return ImmutableMap.of(
TEST_URN, (spec, options) -> Count.perElement(),
TEST_OPTIONS_URN,
(spec, options) ->
new TestOptionsTransform(
EXPECTED_STRING_VALUE, EXPECTED_BOOLEAN_VALUE, EXPECTED_INTEGER_VALUE));
}
}

public interface TestOptions extends PipelineOptions {
String getStringOption();

void setStringOption(String value);

Boolean getBooleanOption();

void setBooleanOption(Boolean value);

Integer getIntegerOption();

void setIntegerOption(Integer value);
}

public static class TestOptionsTransform
extends PTransform<PCollection<String>, PCollection<String>> {
String expectedStringValue;

Boolean expectedBooleanValue;

Integer expectedIntegerValue;

public TestOptionsTransform(
String expectedStringValue, Boolean expectedBooleanValue, Integer expectedIntegerValue) {
this.expectedStringValue = expectedStringValue;
this.expectedBooleanValue = expectedBooleanValue;
this.expectedIntegerValue = expectedIntegerValue;
}

@Override
public PCollection<String> expand(PCollection<String> input) {
TestOptions testOption = input.getPipeline().getOptions().as(TestOptions.class);

Assert.assertEquals(expectedStringValue, testOption.getStringOption());
Assert.assertEquals(expectedBooleanValue, testOption.getBooleanOption());
Assert.assertEquals(expectedIntegerValue, testOption.getIntegerOption());

return input;
}
}

Expand Down Expand Up @@ -146,6 +206,58 @@ public void testConstruct() {
}
}

@Test
public void testConstructWithPipelineOptions() {
PipelineOptionsFactory.register(TestOptions.class);
Pipeline p = Pipeline.create();
p.apply(Impulse.create());
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
String inputPcollId =
Iterables.getOnlyElement(
Iterables.getOnlyElement(pipelineProto.getComponents().getTransformsMap().values())
.getOutputsMap()
.values());

Struct optionsStruct =
Struct.newBuilder()
.putFields(
PIPELINE_OPTIONS_URN_PREFIX + "string_option" + PIPELINE_OPTIONS_URN_SUFFIX,
Value.newBuilder()
.setStringValue(TestTransformRegistrar.EXPECTED_STRING_VALUE)
.build())
.putFields(
PIPELINE_OPTIONS_URN_PREFIX + "boolean_option" + PIPELINE_OPTIONS_URN_SUFFIX,
Value.newBuilder()
.setBoolValue(TestTransformRegistrar.EXPECTED_BOOLEAN_VALUE)
.build())
.putFields(
PIPELINE_OPTIONS_URN_PREFIX + "integer_option" + PIPELINE_OPTIONS_URN_SUFFIX,
Value.newBuilder()
.setNumberValue(TestTransformRegistrar.EXPECTED_INTEGER_VALUE)
.build())
.build();
ExpansionApi.ExpansionRequest request =
ExpansionApi.ExpansionRequest.newBuilder()
.setComponents(pipelineProto.getComponents())
.setPipelineOptions(optionsStruct)
.setTransform(
RunnerApi.PTransform.newBuilder()
.setUniqueName(TEST_NAME)
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(TEST_OPTIONS_URN))
.putInputs("input", inputPcollId))
.setNamespace(TEST_NAMESPACE)
.build();
ExpansionApi.ExpansionResponse response = expansionService.expand(request);
RunnerApi.PTransform expandedTransform = response.getTransform();
assertEquals(TEST_NAMESPACE + TEST_NAME, expandedTransform.getUniqueName());

// Verify it has the right input.
assertThat(expandedTransform.getInputsMap().values(), contains(inputPcollId));

// Verify it has the right output.
assertThat(expandedTransform.getOutputsMap().keySet(), contains("output"));
}

@Test
public void testConstructGenerateSequenceWithRegistration() {
ExternalTransforms.ExternalConfigurationPayload payload =
Expand Down

0 comments on commit 8a920c1

Please sign in to comment.