diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index b33b8fb18027..c317b5666189 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -36,6 +36,7 @@ dependencies { implementation library.java.joda_time implementation library.java.solace implementation project(":sdks:java:extensions:avro") + implementation library.java.vendored_grpc_1_60_1 implementation library.java.avro permitUnusedDeclared library.java.avro } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java index 076a16b96ceb..79057445a4e4 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java @@ -17,26 +17,23 @@ */ package org.apache.beam.sdk.io.solace.data; +import com.google.auto.value.AutoValue; +import com.solacesystems.jcsmp.BytesXMLMessage; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** - * A record to be written to a Solace topic. - * - *
You need to transform to {@link Solace.Record} to be able to write to Solace. For that, you - * can use the {@link Solace.Record.Builder} provided with this class. - * - *
For instance, to create a record, use the following code: - * - *
{@code - * Solace.Record record = Solace.Record.builder() - * .setMessageId(messageId) - * .setSenderTimestamp(timestampMillis) - * .setPayload(payload) - * .build(); - * }- * - * Setting the message id and the timestamp is mandatory. + * Provides core data models and utilities for working with Solace messages in the context of Apache + * Beam pipelines. This class includes representations for Solace topics, queues, destinations, and + * message records, as well as a utility for converting Solace messages into Beam-compatible + * records. */ public class Solace { - + /** Represents a Solace queue. */ public static class Queue { private final String name; @@ -52,7 +49,7 @@ public String getName() { return name; } } - + /** Represents a Solace topic. */ public static class Topic { private final String name; @@ -68,4 +65,290 @@ public String getName() { return name; } } + /** Represents a Solace destination type. */ + public enum DestinationType { + TOPIC, + QUEUE, + UNKNOWN + } + + /** Represents a Solace message destination (either a Topic or a Queue). */ + @AutoValue + public abstract static class Destination { + /** + * Gets the name of the destination. + * + * @return The destination name. + */ + public abstract String getName(); + + /** + * Gets the type of the destination (TOPIC, QUEUE or UNKNOWN). + * + * @return The destination type. + */ + public abstract DestinationType getType(); + + static Builder builder() { + return new AutoValue_Solace_Destination.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setName(String name); + + abstract Builder setType(DestinationType type); + + abstract Destination build(); + } + } + + /** Represents a Solace message record with its associated metadata. */ + @AutoValue + public abstract static class Record { + /** + * Gets the unique identifier of the message, a string for an application-specific message + * identifier. + * + *
Mapped from {@link BytesXMLMessage#getApplicationMessageId()} + * + * @return The message ID, or null if not available. + */ + public abstract @Nullable String getMessageId(); + + /** + * Gets the payload of the message as a ByteString. + * + *
Mapped from {@link BytesXMLMessage#getBytes()} + * + * @return The message payload. + */ + public abstract ByteString getPayload(); + /** + * Gets the destination (topic or queue) to which the message was sent. + * + *
Mapped from {@link BytesXMLMessage#getDestination()} + * + * @return The destination, or null if not available. + */ + public abstract @Nullable Destination getDestination(); + + /** + * Gets the message expiration time in milliseconds since the Unix epoch. + * + *
A value of 0 indicates the message does not expire. + * + *
Mapped from {@link BytesXMLMessage#getExpiration()} + * + * @return The expiration timestamp. + */ + public abstract long getExpiration(); + + /** + * Gets the priority level of the message (0-255, higher is more important). -1 if not set. + * + *
Mapped from {@link BytesXMLMessage#getPriority()} + * + * @return The message priority. + */ + public abstract int getPriority(); + + /** + * Indicates whether the message has been redelivered due to a prior delivery failure. + * + *
Mapped from {@link BytesXMLMessage#getRedelivered()} + * + * @return True if redelivered, false otherwise. + */ + public abstract boolean getRedelivered(); + + /** + * Gets the destination to which replies to this message should be sent. + * + *
Mapped from {@link BytesXMLMessage#getReplyTo()} + * + * @return The reply-to destination, or null if not specified. + */ + public abstract @Nullable Destination getReplyTo(); + + /** + * Gets the timestamp (in milliseconds since the Unix epoch) when the message was received by + * the Solace broker. + * + *
Mapped from {@link BytesXMLMessage#getReceiveTimestamp()} + * + * @return The timestamp. + */ + public abstract long getReceiveTimestamp(); + + /** + * Gets the timestamp (in milliseconds since the Unix epoch) when the message was sent by the + * sender. Can be null if not provided. + * + * @return The sender timestamp, or null if not available. + */ + public abstract @Nullable Long getSenderTimestamp(); + + /** + * Gets the sequence number of the message (if applicable). + * + *
Mapped from {@link BytesXMLMessage#getSequenceNumber()} + * + * @return The sequence number, or null if not available. + */ + public abstract @Nullable Long getSequenceNumber(); + + /** + * The number of milliseconds before the message is discarded or moved to Dead Message Queue. A + * value of 0 means the message will never expire. The default value is 0. + * + *
Mapped from {@link BytesXMLMessage#getTimeToLive()} + * + * @return The time-to-live value. + */ + public abstract long getTimeToLive(); + + /** + * Gets the ID for the message within its replication group (if applicable). + * + *
Mapped from {@link BytesXMLMessage#getReplicationGroupMessageId()} + * + *
The ID for a particular message is only guaranteed to be the same for a particular copy of + * a message on a particular queue or topic endpoint within a replication group. The same + * message on different queues or topic endpoints within the same replication group may or may + * not have the same replication group message ID. See more at https://docs.solace.com/API/API-Developer-Guide/Detecting-Duplicate-Mess.htm + * + * @return The replication group message ID, or null if not present. + */ + public abstract @Nullable String getReplicationGroupMessageId(); + /** + * Gets the attachment data of the message as a ByteString, if any. This might represent files + * or other binary content associated with the message. + * + *
Mapped from {@link BytesXMLMessage#getAttachmentByteBuffer()} + * + * @return The attachment data, or an empty ByteString if no attachment is present. + */ + public abstract ByteString getAttachmentBytes(); + + static Builder builder() { + return new AutoValue_Solace_Record.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMessageId(@Nullable String messageId); + + abstract Builder setPayload(ByteString payload); + + abstract Builder setDestination(@Nullable Destination destination); + + abstract Builder setExpiration(long expiration); + + abstract Builder setPriority(int priority); + + abstract Builder setRedelivered(boolean redelivered); + + abstract Builder setReplyTo(@Nullable Destination replyTo); + + abstract Builder setReceiveTimestamp(long receiveTimestamp); + + abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp); + + abstract Builder setSequenceNumber(@Nullable Long sequenceNumber); + + abstract Builder setTimeToLive(long timeToLive); + + abstract Builder setReplicationGroupMessageId(@Nullable String replicationGroupMessageId); + + abstract Builder setAttachmentBytes(ByteString attachmentBytes); + + abstract Record build(); + } + } + /** + * A utility class for mapping {@link BytesXMLMessage} instances to {@link Solace.Record} objects. + * This simplifies the process of converting raw Solace messages into a format suitable for use + * within Apache Beam pipelines. + */ + public static class SolaceRecordMapper { + private static final Logger LOG = LoggerFactory.getLogger(SolaceRecordMapper.class); + /** + * Maps a {@link BytesXMLMessage} (if not null) to a {@link Solace.Record}. + * + *
Extracts relevant information from the message, including payload, metadata, and + * destination details. + * + * @param msg The Solace message to map. + * @return A Solace Record representing the message, or null if the input message was null. + */ + public static @Nullable Record map(@Nullable BytesXMLMessage msg) { + if (msg == null) { + return null; + } + + ByteArrayOutputStream payloadBytesStream = new ByteArrayOutputStream(); + if (msg.getContentLength() != 0) { + try { + payloadBytesStream.write(msg.getBytes()); + } catch (IOException e) { + LOG.error("Could not write bytes from the BytesXMLMessage to the Solace.record.", e); + } + } + + ByteArrayOutputStream attachmentBytesStream = new ByteArrayOutputStream(); + if (msg.getAttachmentContentLength() != 0) { + try { + attachmentBytesStream.write(msg.getAttachmentByteBuffer().array()); + } catch (IOException e) { + LOG.error( + "Could not AttachmentByteBuffer from the BytesXMLMessage to the Solace.record.", e); + } + } + + Destination replyTo = getDestination(msg.getCorrelationId(), msg.getReplyTo()); + Destination destination = getDestination(msg.getCorrelationId(), msg.getDestination()); + + return Record.builder() + .setMessageId(msg.getApplicationMessageId()) + .setPayload(ByteString.copyFrom(payloadBytesStream.toByteArray())) + .setDestination(destination) + .setExpiration(msg.getExpiration()) + .setPriority(msg.getPriority()) + .setRedelivered(msg.getRedelivered()) + .setReplyTo(replyTo) + .setReceiveTimestamp(msg.getReceiveTimestamp()) + .setSenderTimestamp(msg.getSenderTimestamp()) + .setSequenceNumber(msg.getSequenceNumber()) + .setTimeToLive(msg.getTimeToLive()) + .setReplicationGroupMessageId( + msg.getReplicationGroupMessageId() != null + ? msg.getReplicationGroupMessageId().toString() + : null) + .setAttachmentBytes(ByteString.copyFrom(attachmentBytesStream.toByteArray())) + .build(); + } + + private static @Nullable Destination getDestination( + String msgId, com.solacesystems.jcsmp.Destination originalDestinationField) { + if (originalDestinationField == null) { + return null; + } + Destination.Builder destinationBuilder = + Destination.builder().setName(originalDestinationField.getName()); + if (originalDestinationField instanceof com.solacesystems.jcsmp.Topic) { + destinationBuilder.setType(DestinationType.TOPIC); + } else if (originalDestinationField instanceof com.solacesystems.jcsmp.Queue) { + destinationBuilder.setType(DestinationType.QUEUE); + } else { + LOG.error( + "SolaceIO: Unknown destination type type for message {}, setting to {}", + msgId, + DestinationType.UNKNOWN.name()); + destinationBuilder.setType(DestinationType.UNKNOWN); + } + return destinationBuilder.build(); + } + } }