Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing to Pubsub with ordering key; Add PubsubMessage SchemaCoder #31608

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7306870
support writing pubsub messages with ordering key
ahmedabu98 Jun 15, 2024
c64ae31
Merge branch 'master' of https://github.com/ahmedabu98/beam into pubs…
ahmedabu98 Sep 10, 2024
4bcd3b4
Add ordering key size validation to validatePubsubMessageSize
sjvanrossum Jun 18, 2024
9627cbe
Refactor writeMessagesWithOrderingKey into withOrderingKey
sjvanrossum Jun 17, 2024
ddd916f
Route to bad records if key is defined, but would be dropped silently
sjvanrossum Jun 19, 2024
4791fca
Add publishBatchWithOrderingKey to PubsubUnboundedSink
sjvanrossum Jun 19, 2024
73b07c1
Abort override if PubsubUnboundedSink set publishBatchWithOrderingKey
sjvanrossum Jun 19, 2024
986c2a5
Add support for ordering keys in PubsubBoundedWriter
sjvanrossum Jun 19, 2024
f5f8b57
Add support for ordering keys in PubsubUnboundedSink
sjvanrossum Jun 19, 2024
21e8e8e
Remove nullable ordering keys, null and empty are equivalent
sjvanrossum Jul 10, 2024
42bbb77
Construct OutgoingMessage with Beam PubsubMessage to reduce repetition
sjvanrossum Jul 10, 2024
394d135
Improve readability of PubsubUnboundedSink batch assignment
sjvanrossum Jul 10, 2024
1043961
Add size validation TODOs
sjvanrossum Jul 10, 2024
cd727c2
Replace auto-sharding sink comment with FR link, move to relevant place
sjvanrossum Jul 10, 2024
20e7bb9
Add links to Pub/Sub documentation
sjvanrossum Jul 10, 2024
5911f63
Refine comment about lack of ordering key support in Dataflow's sink
sjvanrossum Jul 10, 2024
ad397aa
Add TODO to remove ordering key check once all sinks support this
sjvanrossum Jul 10, 2024
53134d6
Add missing return statement
sjvanrossum Sep 10, 2024
402ec94
Remove duplicated statements
sjvanrossum Sep 10, 2024
a97f64c
Apply Spotless
sjvanrossum Sep 10, 2024
4513db4
Add notable changes
sjvanrossum Sep 10, 2024
dd8af6d
Merge pull request #427 from sjvanrossum/pr31608
ahmedabu98 Sep 11, 2024
a60d689
address comments
ahmedabu98 Sep 13, 2024
5bac762
allow messages with ordering keys even when the sink isn't configured…
ahmedabu98 Sep 15, 2024
53d47a7
spotless
ahmedabu98 Sep 16, 2024
cb9e7fb
spotless
ahmedabu98 Sep 16, 2024
bbe25ca
add warning log when ordering key is not configured
ahmedabu98 Sep 16, 2024
fa80a24
address comments
ahmedabu98 Sep 27, 2024
1bf4ae2
Merge branch 'master' of https://github.com/ahmedabu98/beam into pubs…
ahmedabu98 Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)).
* Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349))
* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
* Added support for writing to Pubsub with ordering keys (Java) ([#21162](https://github.com/apache/beam/issues/21162))
* Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)).

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SINK;
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SOURCE;
import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
Expand Down Expand Up @@ -2033,6 +2034,15 @@ private static void translate(
PubsubUnboundedSink overriddenTransform,
StepTranslationContext stepContext,
PCollection input) {
if (overriddenTransform.getPublishBatchWithOrderingKey()) {
throw new UnsupportedOperationException(
String.format(
"The DataflowRunner does not currently support publishing to Pubsub with ordering keys. "
+ "%s is required to support publishing with ordering keys. "
+ "Set the pipeline option --experiments=%s to use this PTransform. "
+ "See https://issuetracker.google.com/issues/200955424 for current status.",
PubsubUnboundedSink.class.getSimpleName(), ENABLE_CUSTOM_PUBSUB_SINK));
}
stepContext.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopicProvider() != null) {
if (overriddenTransform.getTopicProvider().isAccessible()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,25 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage> {
private static final Logger LOG = LoggerFactory.getLogger(PreparePubsubWriteDoFn.class);
// See https://cloud.google.com/pubsub/quotas#resource_limits.
private static final int PUBSUB_MESSAGE_DATA_MAX_BYTES = 10 << 20;
private static final int PUBSUB_MESSAGE_MAX_ATTRIBUTES = 100;
private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES = 256;
private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024;
private static final int ORDERING_KEY_MAX_BYTE_SIZE = 1024;
// The amount of bytes that each attribute entry adds up to the request
private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6;
private final boolean usesOrderingKey;
private int maxPublishBatchSize;

private boolean logOrderingKeyUnconfigured = false;
private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
@Nullable SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction;
/** Last TopicPath that reported Lineage. */
Expand All @@ -66,6 +72,20 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
}
int totalSize = payloadSize;

@Nullable String orderingKey = message.getOrderingKey();
if (orderingKey != null) {
int orderingKeySize = orderingKey.getBytes(StandardCharsets.UTF_8).length;
if (orderingKeySize > ORDERING_KEY_MAX_BYTE_SIZE) {
throw new SizeLimitExceededException(
"Pubsub message ordering key of length "
+ orderingKeySize
+ " exceeds maximum of "
+ ORDERING_KEY_MAX_BYTE_SIZE
+ " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits");
}
totalSize += orderingKeySize;
}

@Nullable Map<String, String> attributes = message.getAttributeMap();
if (attributes != null) {
if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) {
Expand Down Expand Up @@ -125,12 +145,14 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction,
@Nullable
SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction,
boolean usesOrderingKey,
int maxPublishBatchSize,
BadRecordRouter badRecordRouter,
Coder<InputT> inputCoder,
TupleTag<PubsubMessage> outputTag) {
this.formatFunction = formatFunction;
this.topicFunction = topicFunction;
this.usesOrderingKey = usesOrderingKey;
this.maxPublishBatchSize = maxPublishBatchSize;
this.badRecordRouter = badRecordRouter;
this.inputCoder = inputCoder;
Expand Down Expand Up @@ -175,6 +197,14 @@ public void process(
.add("pubsub", "topic", PubsubClient.topicPathFromPath(topic).getDataCatalogSegments());
reportedLineage = topic;
}
if (!usesOrderingKey
&& !Strings.isNullOrEmpty(message.getOrderingKey())
&& !logOrderingKeyUnconfigured) {
LOG.warn(
"Encountered Pubsub message with ordering key but this sink was not configured to "
+ "retain ordering keys, so they will be dropped. Please set #withOrderingKeys().");
logOrderingKeyUnconfigured = true;
}
try {
validatePubsubMessageSize(message, maxPublishBatchSize);
} catch (SizeLimitExceededException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.api.client.util.Clock;
import com.google.auto.value.AutoValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -1337,6 +1336,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>

abstract @Nullable String getPubsubRootUrl();

abstract boolean getPublishWithOrderingKey();

abstract BadRecordRouter getBadRecordRouter();

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
Expand All @@ -1350,6 +1351,7 @@ static <T> Builder<T> newBuilder(
builder.setFormatFn(formatFn);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setPublishWithOrderingKey(false);
return builder;
}

Expand Down Expand Up @@ -1381,6 +1383,8 @@ abstract Builder<T> setFormatFn(

abstract Builder<T> setPubsubRootUrl(String pubsubRootUrl);

abstract Builder<T> setPublishWithOrderingKey(boolean publishWithOrderingKey);

abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);

abstract Builder<T> setBadRecordErrorHandler(
Expand Down Expand Up @@ -1454,6 +1458,19 @@ public Write<T> withMaxBatchBytesSize(int maxBatchBytesSize) {
return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build();
}

/**
* Writes to Pub/Sub with each record's ordering key. A subscription with message ordering
* enabled will receive messages published in the same region with the same ordering key in the
* order in which they were received by the service. Note that the order in which Beam publishes
* records to the service remains unspecified.
*
* @see <a href="https://cloud.google.com/pubsub/docs/ordering">Pub/Sub documentation on message
* ordering</a>
*/
public Write<T> withOrderingKey() {
return toBuilder().setPublishWithOrderingKey(true).build();
}

/**
* Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute
* with the specified name. The value of the attribute will be a number representing the number
Expand Down Expand Up @@ -1525,6 +1542,7 @@ public PDone expand(PCollection<T> input) {
new PreparePubsubWriteDoFn<>(
getFormatFn(),
topicFunction,
getPublishWithOrderingKey(),
maxMessageSize,
getBadRecordRouter(),
input.getCoder(),
Expand All @@ -1536,8 +1554,12 @@ public PDone expand(PCollection<T> input) {
pubsubMessageTuple
.get(BAD_RECORD_TAG)
.setCoder(BadRecord.getCoder(input.getPipeline())));
PCollection<PubsubMessage> pubsubMessages =
pubsubMessageTuple.get(pubsubMessageTupleTag).setCoder(PubsubMessageWithTopicCoder.of());
PCollection<PubsubMessage> pubsubMessages = pubsubMessageTuple.get(pubsubMessageTupleTag);
if (getPublishWithOrderingKey()) {
pubsubMessages.setCoder(PubsubMessageSchemaCoder.getSchemaCoder());
} else {
pubsubMessages.setCoder(PubsubMessageWithTopicCoder.of());
}
switch (input.isBounded()) {
case BOUNDED:
pubsubMessages.apply(
Expand All @@ -1557,6 +1579,7 @@ public PDone expand(PCollection<T> input) {
getTimestampAttribute(),
getIdAttribute(),
100 /* numShards */,
getPublishWithOrderingKey(),
MoreObjects.firstNonNull(
getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
MoreObjects.firstNonNull(
Expand Down Expand Up @@ -1589,7 +1612,9 @@ private class OutgoingData {
}
}

private transient Map<PubsubTopic, OutgoingData> output;
// NOTE: A single publish request may only write to one ordering key.
// See https://cloud.google.com/pubsub/docs/publisher#using-ordering-keys for details.
private transient Map<KV<PubsubTopic, String>, OutgoingData> output;

private transient PubsubClient pubsubClient;

Expand Down Expand Up @@ -1620,51 +1645,44 @@ public void startBundle(StartBundleContext c) throws IOException {
public void processElement(@Element PubsubMessage message, @Timestamp Instant timestamp)
throws IOException, SizeLimitExceededException {
// Validate again here just as a sanity check.
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
PreparePubsubWriteDoFn.validatePubsubMessageSize(message, maxPublishBatchByteSize);
byte[] payload = message.getPayload();
int messageSize = payload.length;
// NOTE: The record id is always null.
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
final OutgoingMessage msg =
OutgoingMessage.of(message, timestamp.getMillis(), null, message.getTopic());
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
final int messageSize = msg.getMessage().getData().size();

PubsubTopic pubsubTopic;
final PubsubTopic pubsubTopic;
if (getTopicProvider() != null) {
pubsubTopic = getTopicProvider().get();
} else {
pubsubTopic =
PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic()));
}
// Checking before adding the message stops us from violating max batch size or bytes
OutgoingData currentTopicOutput =
output.computeIfAbsent(pubsubTopic, t -> new OutgoingData());
if (currentTopicOutput.messages.size() >= maxPublishBatchSize
|| (!currentTopicOutput.messages.isEmpty()
&& (currentTopicOutput.bytes + messageSize) >= maxPublishBatchByteSize)) {
publish(pubsubTopic, currentTopicOutput.messages);
currentTopicOutput.messages.clear();
currentTopicOutput.bytes = 0;
pubsubTopic = PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(msg.topic()));
}

Map<String, String> attributes = message.getAttributeMap();
String orderingKey = message.getOrderingKey();

com.google.pubsub.v1.PubsubMessage.Builder msgBuilder =
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(payload))
.putAllAttributes(attributes);

if (orderingKey != null) {
msgBuilder.setOrderingKey(orderingKey);
// Checking before adding the message stops us from violating max batch size or bytes
String orderingKey = getPublishWithOrderingKey() ? msg.getMessage().getOrderingKey() : "";
final OutgoingData currentTopicAndOrderingKeyOutput =
output.computeIfAbsent(KV.of(pubsubTopic, orderingKey), t -> new OutgoingData());
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
if (currentTopicAndOrderingKeyOutput.messages.size() >= maxPublishBatchSize
|| (!currentTopicAndOrderingKeyOutput.messages.isEmpty()
&& (currentTopicAndOrderingKeyOutput.bytes + messageSize)
>= maxPublishBatchByteSize)) {
publish(pubsubTopic, currentTopicAndOrderingKeyOutput.messages);
currentTopicAndOrderingKeyOutput.messages.clear();
currentTopicAndOrderingKeyOutput.bytes = 0;
}

// NOTE: The record id is always null.
currentTopicOutput.messages.add(
OutgoingMessage.of(
msgBuilder.build(), timestamp.getMillis(), null, message.getTopic()));
currentTopicOutput.bytes += messageSize;
currentTopicAndOrderingKeyOutput.messages.add(msg);
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
currentTopicAndOrderingKeyOutput.bytes += messageSize;
}

@FinishBundle
public void finishBundle() throws IOException {
for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) {
publish(entry.getKey(), entry.getValue().messages);
for (Map.Entry<KV<PubsubTopic, String>, OutgoingData> entry : output.entrySet()) {
publish(entry.getKey().getKey(), entry.getValue().messages);
}
output = null;
pubsubClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,11 @@ public List<IncomingMessage> pull(
com.google.pubsub.v1.PubsubMessage.newBuilder();
protoMessage.setData(ByteString.copyFrom(elementBytes));
protoMessage.putAllAttributes(attributes);
// PubsubMessage uses `null` to represent no ordering key where we want a default of "".
// {@link PubsubMessage} uses `null` or empty string to represent no ordering key.
// {@link com.google.pubsub.v1.PubsubMessage} does not track string field presence and uses
// empty string as a default.
if (pubsubMessage.getOrderingKey() != null) {
protoMessage.setOrderingKey(pubsubMessage.getOrderingKey());
} else {
protoMessage.setOrderingKey("");
}
incomingMessages.add(
IncomingMessage.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;

/**
* Provides a {@link SchemaCoder} for {@link PubsubMessage}, including the topic and all fields of a
* PubSub message from server.
*
* <p>{@link SchemaCoder} is used so that fields can be added in the future without breaking update
* compatibility.
*/
public class PubsubMessageSchemaCoder {
private static final Schema PUBSUB_MESSAGE_SCHEMA =
Schema.builder()
.addByteArrayField("payload")
.addNullableStringField("topic")
.addNullableMapField("attributes", Schema.FieldType.STRING, Schema.FieldType.STRING)
.addNullableStringField("message_id")
.addNullableStringField("ordering_key")
.build();

private static final SerializableFunction<PubsubMessage, Row> TO_ROW =
(PubsubMessage message) -> {
Map<String, Object> fieldValues = new HashMap<>();
fieldValues.put("payload", message.getPayload());

String topic = message.getTopic();
if (topic != null) {
fieldValues.put("topic", topic);
}
Map<String, String> attributeMap = message.getAttributeMap();
if (attributeMap != null) {
fieldValues.put("attributes", attributeMap);
}
String messageId = message.getMessageId();
if (messageId != null) {
fieldValues.put("message_id", messageId);
}
String orderingKey = message.getOrderingKey();
if (orderingKey != null) {
fieldValues.put("ordering_key", orderingKey);
}
return Row.withSchema(PUBSUB_MESSAGE_SCHEMA).withFieldValues(fieldValues).build();
};

private static final SerializableFunction<Row, PubsubMessage> FROM_ROW =
(Row row) -> {
PubsubMessage message =
new PubsubMessage(
Preconditions.checkNotNull(row.getBytes("payload")),
row.getMap("attributes"),
row.getString("message_id"),
row.getString("ordering_key"));

String topic = row.getString("topic");
if (topic != null) {
message = message.withTopic(topic);
}
return message;
};

public static SchemaCoder<PubsubMessage> getSchemaCoder() {
return SchemaCoder.of(
PUBSUB_MESSAGE_SCHEMA, TypeDescriptor.of(PubsubMessage.class), TO_ROW, FROM_ROW);
}
}
Loading
Loading