Skip to content

Commit

Permalink
Add custom pubsub source and sink experiment support for runner v2.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp committed Nov 15, 2023
1 parent d49c268 commit 3a7c5f2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SINK;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SOURCE;
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
Expand Down Expand Up @@ -549,13 +551,13 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
new SplittableParDoOverrides.SplittableParDoOverrideFactory()));

if (streaming) {
if (!hasExperiment(options, "enable_custom_pubsub_source")) {
if (!hasExperiment(options, ENABLE_CUSTOM_PUBSUB_SOURCE)) {
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
new StreamingPubsubIOReadOverrideFactory()));
}
if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
if (!hasExperiment(options, ENABLE_CUSTOM_PUBSUB_SINK)) {
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SINK;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SOURCE;

import com.google.auto.service.AutoService;
import java.util.Collections;
import java.util.Map;
Expand All @@ -33,6 +36,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubSource;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
Expand All @@ -59,6 +63,10 @@ public String getUrn() {
@Override
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, Unbounded<?>> transform, SdkComponents components) {
if (ExperimentalOptions.hasExperiment(
transform.getPipeline().getOptions(), ENABLE_CUSTOM_PUBSUB_SOURCE)) {
return null;
}
if (!(transform.getTransform().getSource() instanceof PubsubUnboundedSource.PubsubSource)) {
return null;
}
Expand Down Expand Up @@ -111,6 +119,10 @@ public String getUrn() {
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubSink> transform,
SdkComponents components) {
if (ExperimentalOptions.hasExperiment(
transform.getPipeline().getOptions(), ENABLE_CUSTOM_PUBSUB_SINK)) {
return null;
}
PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder();
ValueProvider<TopicPath> topicProvider =
Preconditions.checkStateNotNull(transform.getTransform().outer.getTopicProvider());
Expand Down Expand Up @@ -145,6 +157,10 @@ public String getUrn() {
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubDynamicSink> transform,
SdkComponents components) {
if (ExperimentalOptions.hasExperiment(
transform.getPipeline().getOptions(), ENABLE_CUSTOM_PUBSUB_SINK)) {
return null;
}
PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder();
if (transform.getTransform().outer.getTimestampAttribute() != null) {
payloadBuilder.setTimestampAttribute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ public class PubsubIO {
private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/";
private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";

public static final String ENABLE_CUSTOM_PUBSUB_SINK = "enable_custom_pubsub_sink";
public static final String ENABLE_CUSTOM_PUBSUB_SOURCE = "enable_custom_pubsub_source";

private static void validateProjectName(String project) {
Matcher match = PROJECT_ID_REGEXP.matcher(project);
if (!match.matches()) {
Expand Down

0 comments on commit 3a7c5f2

Please sign in to comment.