From b1860627424752af6e1fb9db8c2df9704fc92c82 Mon Sep 17 00:00:00 2001 From: Amrane Ait Zeouay Date: Wed, 11 Jan 2023 15:06:58 +0100 Subject: [PATCH 1/6] [#24971] Add a retry policy for JmsIO #24971 Add a retry policy to publish messages in case of connection closed Fixes#24971 Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com> --- CHANGES.md | 1 + .../org/apache/beam/sdk/io/jms/JmsIO.java | 148 +++++----- .../apache/beam/sdk/io/jms/JmsIOProducer.java | 222 +++++++++++++++ .../sdk/io/jms/PublicationRetryPolicy.java | 56 ++++ .../beam/sdk/io/jms/FakeConnection.java | 100 +++++++ .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 254 +++++++++++++++++- 6 files changed, 697 insertions(+), 84 deletions(-) create mode 100644 sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOProducer.java create mode 100644 sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/PublicationRetryPolicy.java create mode 100644 sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/FakeConnection.java diff --git a/CHANGES.md b/CHANGES.md index b26b5f886fdf..36e20398db15 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -136,6 +136,7 @@ * S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)). * Support for SingleStoreDB source and sink added (Java) ([#22617](https://github.com/apache/beam/issues/22617)). * Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([#24210](https://github.com/apache/beam/issues/24210)). +* Added in JmsIO a retry policy for failed publications ([#24971](https://github.com/apache/beam/issues/24971)). ## New Features / Improvements diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index c77c70820b27..5c72bd946f62 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -33,10 +33,8 @@ import java.util.stream.Stream; import javax.jms.Connection; import javax.jms.ConnectionFactory; -import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.beam.sdk.annotations.Experimental; @@ -49,17 +47,12 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -706,6 +699,10 @@ public abstract static class Write abstract @Nullable SerializableFunction getTopicNameMapper(); + abstract @Nullable PublicationRetryPolicy getRetryPublicationPolicy(); + + abstract @Nullable SerializableFunction getConnectionFailedPredicate(); + abstract Builder builder(); @AutoValue.Builder @@ -726,6 +723,12 @@ abstract Builder setValueMapper( abstract Builder setTopicNameMapper( SerializableFunction topicNameMapper); + abstract Builder setRetryPublicationPolicy( + PublicationRetryPolicy publicationRetryPolicy); + + abstract Builder setConnectionFailedPredicate( + SerializableFunction predicate); + abstract Write build(); } @@ -866,6 +869,60 @@ public Write withValueMapper( return builder().setValueMapper(valueMapper).build(); } + /** + * Specify the JMS retry policy. The {@link JmsIO.Write} acts as a publisher on the topic. + * + *

Allows a retry for failed published messages, the user should specify the maximum number + * of retries and a duration for retrying. By default, the duration used by JmsIO is 15s {@link + * PublicationRetryPolicy} + * + *

For example: + * + *

{@code
+     * PublicationRetryPolicy publicationRetryPolicy =
+     *   PublicationRetryPolicy.create(5, Duration.standardSeconds(30));
+     * }
+ * + *
{@code
+     * .apply(JmsIO.write().withPublicationRetryPolicy(publicationRetryPolicy)
+     * }
+ * + * @param publicationRetryPolicy The retry policy that should be used in case of failed + * publications. + * @return The corresponding {@link JmsIO.Write}. + */ + public Write withPublicationRetryPolicy(PublicationRetryPolicy publicationRetryPolicy) { + checkArgument(publicationRetryPolicy != null, "publicationRetryPolicy can not be null"); + return builder().setRetryPublicationPolicy(publicationRetryPolicy).build(); + } + + /** + * Specify the predicate function to check whether to retry to reconnect or not. The {@link + * JmsIO.Write} acts as a publisher on the topic. + * + *

Allows you to choose depending on the given exception if to reconnect or not. + * + *

For example: + * + *

{@code
+     * SerializableFunction reconnectOnFailedExceptionPredicate =
+     *   (exception -> exception.getClass() == JmsException.class);
+     * }
+ * + *
{@code
+     * .apply(JmsIO.write().withConnectionFailedPredicate(reconnectOnFailedExceptionPredicate)
+     * }
+ * + * @param connectionFailedPredicate The predicate function to be used to check if JmsIO should + * reconnect or not. + * @return The corresponding {@link JmsIO.Write}. + */ + public Write withConnectionFailedPredicate( + SerializableFunction connectionFailedPredicate) { + checkArgument(connectionFailedPredicate != null, "connectionFailedPredicate can not be null"); + return builder().setConnectionFailedPredicate(connectionFailedPredicate).build(); + } + @Override public WriteJmsResult expand(PCollection input) { checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required"); @@ -878,15 +935,7 @@ public WriteJmsResult expand(PCollection input) { "Only one of withQueue(queue), withTopic(topic), or withTopicNameMapper(function) must be set."); checkArgument(getValueMapper() != null, "withValueMapper() is required"); - final TupleTag failedMessagesTag = new TupleTag<>(); - final TupleTag messagesTag = new TupleTag<>(); - PCollectionTuple res = - input.apply( - ParDo.of(new WriterFn<>(this, failedMessagesTag)) - .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag))); - PCollection failedMessages = res.get(failedMessagesTag).setCoder(input.getCoder()); - res.get(messagesTag).setCoder(input.getCoder()); - return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages); + return input.apply(new JmsIOProducer<>(this)); } private boolean isExclusiveTopicQueue() { @@ -897,72 +946,5 @@ private boolean isExclusiveTopicQueue() { == 1; return exclusiveTopicQueue; } - - private static class WriterFn extends DoFn { - - private Write spec; - - private Connection connection; - private Session session; - private MessageProducer producer; - private Destination destination; - private final TupleTag failedMessageTag; - - public WriterFn(Write spec, TupleTag failedMessageTag) { - this.spec = spec; - this.failedMessageTag = failedMessageTag; - } - - @Setup - public void setup() throws Exception { - if (producer == null) { - if (spec.getUsername() != null) { - this.connection = - spec.getConnectionFactory() - .createConnection(spec.getUsername(), spec.getPassword()); - } else { - this.connection = spec.getConnectionFactory().createConnection(); - } - this.connection.start(); - // false means we don't use JMS transaction. - this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - if (spec.getQueue() != null) { - this.destination = session.createQueue(spec.getQueue()); - } else if (spec.getTopic() != null) { - this.destination = session.createTopic(spec.getTopic()); - } - - this.producer = this.session.createProducer(null); - } - } - - @ProcessElement - public void processElement(ProcessContext ctx) { - Destination destinationToSendTo = destination; - try { - Message message = spec.getValueMapper().apply(ctx.element(), session); - if (spec.getTopicNameMapper() != null) { - destinationToSendTo = - session.createTopic(spec.getTopicNameMapper().apply(ctx.element())); - } - producer.send(destinationToSendTo, message); - } catch (Exception ex) { - LOG.error("Error sending message on topic {}", destinationToSendTo); - ctx.output(failedMessageTag, ctx.element()); - } - } - - @Teardown - public void teardown() throws Exception { - producer.close(); - producer = null; - session.close(); - session = null; - connection.stop(); - connection.close(); - connection = null; - } - } } } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOProducer.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOProducer.java new file mode 100644 index 000000000000..4533d62ac5f5 --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOProducer.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import static org.apache.beam.sdk.io.jms.PublicationRetryPolicy.DEFAULT_PUBLICATION_RETRY_DURATION; + +import java.io.IOException; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class JmsIOProducer extends PTransform, WriteJmsResult> { + + public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors"; + public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries"; + public static final String JMS_IO_PRODUCER_METRIC_NAME = JmsIOProducer.class.getCanonicalName(); + + private static final Logger LOG = LoggerFactory.getLogger(JmsIOProducer.class); + private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS"; + + private final JmsIO.Write spec; + private final TupleTag messagesTag; + private final TupleTag failedMessagesTag; + + JmsIOProducer(JmsIO.Write spec) { + this.spec = spec; + this.messagesTag = new TupleTag<>(); + this.failedMessagesTag = new TupleTag<>(); + } + + @Override + public WriteJmsResult expand(PCollection input) { + PCollectionTuple failedPublishedMessages = + input.apply( + PUBLISH_TO_JMS_STEP_NAME, + ParDo.of(new JmsIOProducerFn()) + .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag))); + PCollection failedMessages = + failedPublishedMessages.get(failedMessagesTag).setCoder(input.getCoder()); + failedPublishedMessages.get(messagesTag).setCoder(input.getCoder()); + return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages); + } + + private class JmsIOProducerFn extends DoFn { + + private transient @Initialized Session session; + private transient @Initialized Connection connection; + private transient @Initialized Destination destination; + private transient @Initialized MessageProducer producer; + private transient @Initialized FluentBackoff retryPublicationBackoff; + + private final Counter connectionErrors = + Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME); + private final Counter publicationRetries = + Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME); + + @Setup + public void setup() { + retryPublicationBackoff = + FluentBackoff.DEFAULT + .withMaxRetries(0) + .withInitialBackoff(DEFAULT_PUBLICATION_RETRY_DURATION); + + if (spec.getRetryPublicationPolicy() != null) { + retryPublicationBackoff = + retryPublicationBackoff + .withMaxRetries(spec.getRetryPublicationPolicy().maxPublicationAttempts()) + .withInitialBackoff(spec.getRetryPublicationPolicy().retryDuration()); + } + } + + @StartBundle + public void start() throws JMSException { + if (producer == null) { + ConnectionFactory connectionFactory = spec.getConnectionFactory(); + if (spec.getUsername() != null) { + this.connection = + connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); + } else { + this.connection = connectionFactory.createConnection(); + } + this.connection.setExceptionListener( + exception -> { + if (spec.getConnectionFailedPredicate() != null + && spec.getConnectionFailedPredicate().apply(exception)) { + LOG.error("Jms connection encountered the following exception:", exception); + connectionErrors.inc(); + try { + restartJmsConnection(); + } catch (JMSException e) { + LOG.error("An error occurred while reconnecting:", e); + } + } + }); + this.connection.start(); + // false means we don't use JMS transaction. + this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (spec.getQueue() != null) { + this.destination = session.createQueue(spec.getQueue()); + } else if (spec.getTopic() != null) { + this.destination = session.createTopic(spec.getTopic()); + } + this.producer = this.session.createProducer(this.destination); + } + } + + @ProcessElement + public void processElement(@Element T input, ProcessContext context) { + try { + publishMessage(input, context); + } catch (IOException | InterruptedException exception) { + LOG.error("Error while publishing the message", exception); + context.output(failedMessagesTag, input); + Thread.currentThread().interrupt(); + } + } + + private void publishMessage(T input, ProcessContext context) + throws IOException, InterruptedException { + Sleeper sleeper = Sleeper.DEFAULT; + Destination destinationToSendTo = destination; + BackOff backoff = retryPublicationBackoff.backoff(); + int publicationAttempt = 0; + while (publicationAttempt >= 0) { + publicationAttempt++; + try { + Message message = spec.getValueMapper().apply(input, session); + if (spec.getTopicNameMapper() != null) { + destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input)); + } + producer.send(destinationToSendTo, message); + publicationAttempt = -1; + } catch (Exception exception) { + if (spec.getRetryPublicationPolicy() == null || !BackOffUtils.next(sleeper, backoff)) { + outputFailedMessage(input, context, destinationToSendTo, exception); + LOG.debug("Message published to topic {}", destinationToSendTo); + publicationAttempt = -1; + } else { + publicationRetries.inc(); + LOG.warn( + "Error sending message on topic {}, retry attempt {}", + destinationToSendTo, + publicationAttempt, + exception); + } + } + } + } + + private void outputFailedMessage( + T input, ProcessContext context, Destination destinationToSendTo, Exception exception) { + LOG.error("The message wasn't published to topic {}", destinationToSendTo, exception); + context.output(failedMessagesTag, input); + } + + private void restartJmsConnection() throws JMSException { + teardown(); + start(); + } + + @Teardown + public void teardown() throws JMSException { + if (producer != null) { + producer.close(); + producer = null; + } + if (session != null) { + session.close(); + session = null; + } + if (connection != null) { + try { + // If the connection failed, stopping the connection will throw a JMSException + connection.stop(); + } catch (JMSException exception) { + LOG.warn("The connection couldn't be closed", exception); + } + connection.close(); + connection = null; + } + } + } +} diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/PublicationRetryPolicy.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/PublicationRetryPolicy.java new file mode 100644 index 000000000000..c6e793ee7608 --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/PublicationRetryPolicy.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.checkerframework.dataflow.qual.Pure; +import org.joda.time.Duration; + +@AutoValue +public abstract class PublicationRetryPolicy implements Serializable { + public static final Duration DEFAULT_PUBLICATION_RETRY_DURATION = Duration.standardSeconds(15); + + public abstract @Pure int maxPublicationAttempts(); + + public abstract @Pure Duration retryDuration(); + + public static PublicationRetryPolicy create(int maxPublicationAttempts, Duration retryDuration) { + Duration defaultRetryDuration = + retryDuration == null ? DEFAULT_PUBLICATION_RETRY_DURATION : retryDuration; + checkArgument(maxPublicationAttempts > 0, "maxPublicationAttempts should be greater than 0"); + checkArgument( + defaultRetryDuration != null && defaultRetryDuration.isLongerThan(Duration.ZERO), + "retryDuration should be greater than 0"); + return new AutoValue_PublicationRetryPolicy.Builder() + .setMaxPublicationAttempts(maxPublicationAttempts) + .setRetryDuration(defaultRetryDuration) + .build(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract PublicationRetryPolicy.Builder setMaxPublicationAttempts(int maxPublicationAttempts); + + abstract PublicationRetryPolicy.Builder setRetryDuration(Duration retryDuration); + + abstract PublicationRetryPolicy build(); + } +} diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/FakeConnection.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/FakeConnection.java new file mode 100644 index 000000000000..6097a07835f0 --- /dev/null +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/FakeConnection.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import java.io.Serializable; +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.activemq.AlreadyClosedException; +import org.apache.activemq.ConnectionFailedException; +import org.mockito.Mockito; + +public class FakeConnection implements Connection, Serializable { + + private ExceptionListener listener; + private static int counter = 0; + + @Override + public Session createSession(boolean transacted, int acknowledgeMode) { + return Mockito.mock(Session.class); + } + + @Override + public String getClientID() { + return null; + } + + @Override + public void setClientID(String clientID) {} + + @Override + public ConnectionMetaData getMetaData() { + return null; + } + + @Override + public ExceptionListener getExceptionListener() { + return listener; + } + + @Override + public void setExceptionListener(ExceptionListener listener) { + this.listener = listener; + } + + @Override + public void start() { + if (counter == 0) { + counter++; + this.listener.onException(new ConnectionFailedException()); + } else { + this.listener.onException(new AlreadyClosedException()); + } + } + + @Override + public void stop() {} + + @Override + public void close() {} + + @Override + public ConnectionConsumer createConnectionConsumer( + Destination destination, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) { + return null; + } + + @Override + public ConnectionConsumer createDurableConnectionConsumer( + Topic topic, + String subscriptionName, + String messageSelector, + ServerSessionPool sessionPool, + int maxMessages) { + return null; + } +} diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 1979d7b4ff60..77218f3f005e 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -18,16 +18,28 @@ package org.apache.beam.sdk.io.jms; import static org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN; +import static org.apache.beam.sdk.io.jms.JmsIOProducer.CONNECTION_ERRORS_METRIC_NAME; +import static org.apache.beam.sdk.io.jms.JmsIOProducer.JMS_IO_PRODUCER_METRIC_NAME; +import static org.apache.beam.sdk.io.jms.JmsIOProducer.PUBLICATION_RETRIES_METRIC_NAME; +import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.core.StringContains.containsString; +import static org.hamcrest.object.HasToString.hasToString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -38,9 +50,13 @@ import java.io.Serializable; import java.lang.reflect.Proxy; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.Enumeration; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -55,6 +71,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ConnectionFailedException; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQMessage; @@ -62,8 +79,12 @@ import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.util.Callback; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -73,6 +94,7 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; import org.joda.time.Duration; @@ -84,6 +106,9 @@ import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.mockito.internal.invocation.InterceptedInvocation; +import org.mockito.listeners.InvocationListener; +import org.mockito.listeners.MethodInvocationReport; /** Tests of {@link JmsIO}. */ @RunWith(JUnit4.class) @@ -665,6 +690,182 @@ public void testDiscardCheckpointMark() throws Exception { assertEquals(6, count(QUEUE)); } + @Test + public void testPublisherWithRetryPolicy() { + PublicationRetryPolicy retryPolicy = + PublicationRetryPolicy.create(5, Duration.standardSeconds(15)); + JmsIO.Write publisher = + JmsIO.write() + .withConnectionFactory(connectionFactory) + .withPublicationRetryPolicy(retryPolicy) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD); + assertEquals( + publisher.getRetryPublicationPolicy(), + PublicationRetryPolicy.create(5, Duration.standardSeconds(15))); + } + + @Test + public void testWriteMessageWithRetryPolicy() throws Exception { + int waitingSeconds = 5; + // Margin of the pipeline execution in seconds that should be taken into consideration + int pipelineDuration = 5; + Instant now = Instant.now(); + String messageText = now.toString(); + ArrayList data = new ArrayList<>(Collections.singleton(messageText)); + PublicationRetryPolicy retryPolicy = + PublicationRetryPolicy.create(3, Duration.standardSeconds(waitingSeconds)); + + WriteJmsResult output = + pipeline + .apply(Create.of(data)) + .apply( + JmsIO.write() + .withConnectionFactory(connectionFactory) + .withValueMapper(new TextMessageMapperWithErrorCounter()) + .withPublicationRetryPolicy(retryPolicy) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD)); + + PAssert.that(output.getFailedMessages()).empty(); + pipeline.run(); + + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); + + Message message = consumer.receive(1000); + assertNotNull(message); + long maximumTimestamp = + now.plus(java.time.Duration.ofSeconds(waitingSeconds + pipelineDuration)).toEpochMilli(); + assertThat( + message.getJMSTimestamp(), + allOf(greaterThanOrEqualTo(now.toEpochMilli()), lessThan(maximumTimestamp))); + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWriteMessagesWithOnConnectionFailedPredicate() throws JMSException { + String messageText = "message"; + ExceptionInvocationListener invocationListener = new ExceptionInvocationListener(); + + ArrayList data = new ArrayList<>(Collections.singleton(messageText)); + SerializableFunction mockedOnFailedConnectionPredicate = + Mockito.mock( + FailedConnectionPredicate.class, + Mockito.withSettings() + .serializable() + .invocationListeners(invocationListener) + .defaultAnswer(Mockito.CALLS_REAL_METHODS)); + + ConnectionFactory mockedConnectionFactory = + Mockito.mock(ConnectionFactory.class, Mockito.withSettings().serializable()); + Connection mockedConnection = + Mockito.mock( + FakeConnection.class, + Mockito.withSettings().serializable().defaultAnswer(Mockito.CALLS_REAL_METHODS)); + doReturn(mockedConnection).when(mockedConnectionFactory).createConnection(USERNAME, PASSWORD); + + pipeline + .apply(Create.of(data)) + .apply( + JmsIO.write() + .withConnectionFactory(mockedConnectionFactory) + .withConnectionFailedPredicate(mockedOnFailedConnectionPredicate) + .withValueMapper(new TextMessageMapper()) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD)); + PipelineResult pipelineResult = pipeline.run(); + MetricQueryResults metricQueryResults = + pipelineResult + .metrics() + .queryMetrics( + MetricsFilter.builder() + .addNameFilter( + MetricNameFilter.named( + JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME)) + .build()); + assertThat( + metricQueryResults.getCounters(), + contains( + allOf( + hasProperty("attempted", is((long) 1)), + hasProperty( + "key", + hasToString( + containsString( + String.format( + "%s:%s", + JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME))))))); + Map invocationCounter = invocationListener.getInvocationCounters(); + invocationCounter.forEach( + (exception, isPredicated) -> { + assertEquals(exception.getClass() == ConnectionFailedException.class, isPredicated); + }); + assertEquals(2, invocationCounter.size()); + } + + @Test + public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception { + String messageText = "text"; + int maxPublicationAttempts = 5; + ArrayList data = new ArrayList<>(Collections.singleton(messageText)); + PublicationRetryPolicy retryPolicy = + PublicationRetryPolicy.create(maxPublicationAttempts, Duration.standardSeconds(1)); + + WriteJmsResult output = + pipeline + .apply(Create.of(data)) + .apply( + JmsIO.write() + .withConnectionFactory(connectionFactory) + .withValueMapper( + (SerializableBiFunction) + (s, session) -> { + throw new IllegalArgumentException("Error!!"); + }) + .withPublicationRetryPolicy(retryPolicy) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD)); + + PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageText); + PipelineResult pipelineResult = pipeline.run(); + + MetricQueryResults metrics = + pipelineResult + .metrics() + .queryMetrics( + MetricsFilter.builder() + .addNameFilter( + MetricNameFilter.named( + JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME)) + .build()); + + assertThat( + metrics.getCounters(), + contains( + allOf( + hasProperty("attempted", is((long) maxPublicationAttempts)), + hasProperty( + "key", + hasToString( + containsString( + String.format( + "%s:%s", + JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME))))))); + + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); + assertNull(consumer.receiveNoWait()); + } + private int count(String queue) throws Exception { Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); connection.start(); @@ -785,4 +986,55 @@ public Message apply(String value, Session session) { } } } + + private static class TextMessageMapperWithErrorCounter + implements SerializableBiFunction { + + private static int errorCounter; + + TextMessageMapperWithErrorCounter() { + errorCounter = 0; + } + + @Override + public Message apply(String value, Session session) { + try { + if (errorCounter == 0) { + errorCounter++; + throw new JMSException("Error!!"); + } + TextMessage msg = session.createTextMessage(); + msg.setText(value); + return msg; + } catch (JMSException e) { + throw new JmsIOException("Error creating TextMessage", e); + } + } + } + + private static class FailedConnectionPredicate + implements SerializableFunction { + FailedConnectionPredicate() {} + + @Override + public Boolean apply(Exception exception) { + return exception.getClass() == ConnectionFailedException.class; + } + } + + private static class ExceptionInvocationListener implements InvocationListener, Serializable { + + private static final Map invocationCounters = new HashMap<>(); + + public Map getInvocationCounters() { + return invocationCounters; + } + + @Override + public void reportInvocation(MethodInvocationReport methodInvocationReport) { + invocationCounters.put( + ((InterceptedInvocation) methodInvocationReport.getInvocation()).getArgument(0), + (boolean) methodInvocationReport.getReturnedValue()); + } + } } From 7f1891c6d37938e9ebccf1c50692a45f7298757b Mon Sep 17 00:00:00 2001 From: Amrane Ait Zeouay Date: Mon, 23 Jan 2023 16:17:37 +0100 Subject: [PATCH 2/6] [#24971] refactor(JmsIO): apply PR remarks #24973 Fixes #24971 Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com> --- CHANGES.md | 2 +- .../org/apache/beam/sdk/io/jms/JmsIO.java | 219 ++++++++++++++++- .../apache/beam/sdk/io/jms/JmsIOProducer.java | 222 ------------------ .../sdk/io/jms/PublicationRetryPolicy.java | 56 ----- .../beam/sdk/io/jms/RetryConfiguration.java | 71 ++++++ .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 40 ++-- 6 files changed, 303 insertions(+), 307 deletions(-) delete mode 100644 sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOProducer.java delete mode 100644 sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/PublicationRetryPolicy.java create mode 100644 sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java diff --git a/CHANGES.md b/CHANGES.md index 36e20398db15..9ee67af081df 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -93,6 +93,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * MongoDB IO connector added (Go) ([#24575](https://github.com/apache/beam/issues/24575)). +* Added in JmsIO a retry policy for failed publications (Java) ([#24971](https://github.com/apache/beam/issues/24971)). ## New Features / Improvements @@ -136,7 +137,6 @@ * S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)). * Support for SingleStoreDB source and sink added (Java) ([#22617](https://github.com/apache/beam/issues/22617)). * Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([#24210](https://github.com/apache/beam/issues/24210)). -* Added in JmsIO a retry policy for failed publications ([#24971](https://github.com/apache/beam/issues/24971)). ## New Features / Improvements diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 5c72bd946f62..e8cc5c9cdcaf 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.jms; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; @@ -33,8 +34,11 @@ import java.util.stream.Stream; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.beam.sdk.annotations.Experimental; @@ -45,14 +49,26 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -699,7 +715,7 @@ public abstract static class Write abstract @Nullable SerializableFunction getTopicNameMapper(); - abstract @Nullable PublicationRetryPolicy getRetryPublicationPolicy(); + abstract @Nullable RetryConfiguration getRetryConfiguration(); abstract @Nullable SerializableFunction getConnectionFailedPredicate(); @@ -723,8 +739,7 @@ abstract Builder setValueMapper( abstract Builder setTopicNameMapper( SerializableFunction topicNameMapper); - abstract Builder setRetryPublicationPolicy( - PublicationRetryPolicy publicationRetryPolicy); + abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration); abstract Builder setConnectionFailedPredicate( SerializableFunction predicate); @@ -870,30 +885,45 @@ public Write withValueMapper( } /** - * Specify the JMS retry policy. The {@link JmsIO.Write} acts as a publisher on the topic. + * Specify the JMS retry configuration. The {@link JmsIO.Write} acts as a publisher on the + * topic. * *

Allows a retry for failed published messages, the user should specify the maximum number - * of retries and a duration for retrying. By default, the duration used by JmsIO is 15s {@link - * PublicationRetryPolicy} + * of retries, a duration for retrying and a maximum cumulative retries. By default, the + * duration for retrying used is 15s and the maximum cumulative is 1000 days {@link + * RetryConfiguration} * *

For example: * *

{@code
-     * PublicationRetryPolicy publicationRetryPolicy =
-     *   PublicationRetryPolicy.create(5, Duration.standardSeconds(30));
+     * RetryConfiguration retryConfiguration = RetryConfiguration.create(5);
+     * }
+ * + * or + * + *
{@code
+     * RetryConfiguration retryConfiguration =
+     *   RetryConfiguration.create(5, Duration.standardSeconds(30), null);
+     * }
+ * + * or + * + *
{@code
+     * RetryConfiguration retryConfiguration =
+     *   RetryConfiguration.create(5, Duration.standardSeconds(30), Duration.standardDays(15));
      * }
* *
{@code
      * .apply(JmsIO.write().withPublicationRetryPolicy(publicationRetryPolicy)
      * }
* - * @param publicationRetryPolicy The retry policy that should be used in case of failed + * @param retryConfiguration The retry configuration that should be used in case of failed * publications. * @return The corresponding {@link JmsIO.Write}. */ - public Write withPublicationRetryPolicy(PublicationRetryPolicy publicationRetryPolicy) { - checkArgument(publicationRetryPolicy != null, "publicationRetryPolicy can not be null"); - return builder().setRetryPublicationPolicy(publicationRetryPolicy).build(); + public Write withRetryConfiguration(RetryConfiguration retryConfiguration) { + checkArgument(retryConfiguration != null, "retryConfiguration can not be null"); + return builder().setRetryConfiguration(retryConfiguration).build(); } /** @@ -947,4 +977,169 @@ private boolean isExclusiveTopicQueue() { return exclusiveTopicQueue; } } + + public static class JmsIOProducer extends PTransform, WriteJmsResult> { + + public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors"; + public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries"; + public static final String JMS_IO_PRODUCER_METRIC_NAME = JmsIOProducer.class.getCanonicalName(); + + private static final Logger LOG = LoggerFactory.getLogger(JmsIOProducer.class); + private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS"; + + private final JmsIO.Write spec; + private final TupleTag messagesTag; + private final TupleTag failedMessagesTag; + + JmsIOProducer(JmsIO.Write spec) { + this.spec = spec; + this.messagesTag = new TupleTag<>(); + this.failedMessagesTag = new TupleTag<>(); + } + + @Override + public WriteJmsResult expand(PCollection input) { + PCollectionTuple failedPublishedMessages = + input.apply( + PUBLISH_TO_JMS_STEP_NAME, + ParDo.of(new JmsIOProducerFn()) + .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag))); + PCollection failedMessages = + failedPublishedMessages.get(failedMessagesTag).setCoder(input.getCoder()); + failedPublishedMessages.get(messagesTag).setCoder(input.getCoder()); + return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages); + } + + private class JmsIOProducerFn extends DoFn { + + private transient @Initialized FluentBackoff retryBackOff; + + private transient @Initialized Session session; + private transient @Initialized Connection connection; + private transient @Initialized Destination destination; + private transient @Initialized MessageProducer producer; + + private final Counter connectionErrors = + Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME); + private final Counter publicationRetries = + Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME); + + @Setup + public void setup() { + RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration()); + + retryBackOff = + FluentBackoff.DEFAULT + .withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration())) + .withMaxCumulativeBackoff(checkStateNotNull(retryConfiguration.getMaxDuration())) + .withMaxRetries(retryConfiguration.getMaxAttempts()); + } + + @StartBundle + public void start() throws JMSException { + if (producer == null) { + ConnectionFactory connectionFactory = spec.getConnectionFactory(); + if (spec.getUsername() != null) { + this.connection = + connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); + } else { + this.connection = connectionFactory.createConnection(); + } + this.connection.setExceptionListener( + exception -> { + if (spec.getConnectionFailedPredicate() != null + && spec.getConnectionFailedPredicate().apply(exception)) { + LOG.error("Jms connection encountered the following exception:", exception); + connectionErrors.inc(); + try { + restartJmsConnection(); + } catch (JMSException e) { + LOG.error("An error occurred while reconnecting:", e); + } + } + }); + this.connection.start(); + // false means we don't use JMS transaction. + this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (spec.getQueue() != null) { + this.destination = session.createQueue(spec.getQueue()); + } else if (spec.getTopic() != null) { + this.destination = session.createTopic(spec.getTopic()); + } + this.producer = this.session.createProducer(this.destination); + } + } + + @ProcessElement + public void processElement(@Element T input, ProcessContext context) { + try { + publishMessage(input, context); + } catch (IOException | InterruptedException exception) { + LOG.error("Error while publishing the message", exception); + context.output(failedMessagesTag, input); + Thread.currentThread().interrupt(); + } + } + + private void publishMessage(T input, ProcessContext context) + throws IOException, InterruptedException { + Sleeper sleeper = Sleeper.DEFAULT; + Destination destinationToSendTo = destination; + BackOff backoff = checkStateNotNull(retryBackOff).backoff(); + int publicationAttempt = 0; + while (publicationAttempt >= 0) { + publicationAttempt++; + try { + Message message = spec.getValueMapper().apply(input, session); + if (spec.getTopicNameMapper() != null) { + destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input)); + } + producer.send(destinationToSendTo, message); + publicationAttempt = -1; + } catch (Exception exception) { + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("The message wasn't published to topic {}", destinationToSendTo, exception); + context.output(failedMessagesTag, input); + publicationAttempt = -1; + } else { + publicationRetries.inc(); + LOG.warn( + "Error sending message on topic {}, retry attempt {}", + destinationToSendTo, + publicationAttempt, + exception); + } + } + } + } + + private void restartJmsConnection() throws JMSException { + teardown(); + start(); + } + + @Teardown + public void teardown() throws JMSException { + if (producer != null) { + producer.close(); + producer = null; + } + if (session != null) { + session.close(); + session = null; + } + if (connection != null) { + try { + // If the connection failed, stopping the connection will throw a JMSException + connection.stop(); + } catch (JMSException exception) { + LOG.warn("The connection couldn't be closed", exception); + } + connection.close(); + connection = null; + } + } + } + } } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOProducer.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOProducer.java deleted file mode 100644 index 4533d62ac5f5..000000000000 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOProducer.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.jms; - -import static org.apache.beam.sdk.io.jms.PublicationRetryPolicy.DEFAULT_PUBLICATION_RETRY_DURATION; - -import java.io.IOException; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.BackOff; -import org.apache.beam.sdk.util.BackOffUtils; -import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.Sleeper; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -public class JmsIOProducer extends PTransform, WriteJmsResult> { - - public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors"; - public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries"; - public static final String JMS_IO_PRODUCER_METRIC_NAME = JmsIOProducer.class.getCanonicalName(); - - private static final Logger LOG = LoggerFactory.getLogger(JmsIOProducer.class); - private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS"; - - private final JmsIO.Write spec; - private final TupleTag messagesTag; - private final TupleTag failedMessagesTag; - - JmsIOProducer(JmsIO.Write spec) { - this.spec = spec; - this.messagesTag = new TupleTag<>(); - this.failedMessagesTag = new TupleTag<>(); - } - - @Override - public WriteJmsResult expand(PCollection input) { - PCollectionTuple failedPublishedMessages = - input.apply( - PUBLISH_TO_JMS_STEP_NAME, - ParDo.of(new JmsIOProducerFn()) - .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag))); - PCollection failedMessages = - failedPublishedMessages.get(failedMessagesTag).setCoder(input.getCoder()); - failedPublishedMessages.get(messagesTag).setCoder(input.getCoder()); - return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages); - } - - private class JmsIOProducerFn extends DoFn { - - private transient @Initialized Session session; - private transient @Initialized Connection connection; - private transient @Initialized Destination destination; - private transient @Initialized MessageProducer producer; - private transient @Initialized FluentBackoff retryPublicationBackoff; - - private final Counter connectionErrors = - Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME); - private final Counter publicationRetries = - Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME); - - @Setup - public void setup() { - retryPublicationBackoff = - FluentBackoff.DEFAULT - .withMaxRetries(0) - .withInitialBackoff(DEFAULT_PUBLICATION_RETRY_DURATION); - - if (spec.getRetryPublicationPolicy() != null) { - retryPublicationBackoff = - retryPublicationBackoff - .withMaxRetries(spec.getRetryPublicationPolicy().maxPublicationAttempts()) - .withInitialBackoff(spec.getRetryPublicationPolicy().retryDuration()); - } - } - - @StartBundle - public void start() throws JMSException { - if (producer == null) { - ConnectionFactory connectionFactory = spec.getConnectionFactory(); - if (spec.getUsername() != null) { - this.connection = - connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); - } else { - this.connection = connectionFactory.createConnection(); - } - this.connection.setExceptionListener( - exception -> { - if (spec.getConnectionFailedPredicate() != null - && spec.getConnectionFailedPredicate().apply(exception)) { - LOG.error("Jms connection encountered the following exception:", exception); - connectionErrors.inc(); - try { - restartJmsConnection(); - } catch (JMSException e) { - LOG.error("An error occurred while reconnecting:", e); - } - } - }); - this.connection.start(); - // false means we don't use JMS transaction. - this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - if (spec.getQueue() != null) { - this.destination = session.createQueue(spec.getQueue()); - } else if (spec.getTopic() != null) { - this.destination = session.createTopic(spec.getTopic()); - } - this.producer = this.session.createProducer(this.destination); - } - } - - @ProcessElement - public void processElement(@Element T input, ProcessContext context) { - try { - publishMessage(input, context); - } catch (IOException | InterruptedException exception) { - LOG.error("Error while publishing the message", exception); - context.output(failedMessagesTag, input); - Thread.currentThread().interrupt(); - } - } - - private void publishMessage(T input, ProcessContext context) - throws IOException, InterruptedException { - Sleeper sleeper = Sleeper.DEFAULT; - Destination destinationToSendTo = destination; - BackOff backoff = retryPublicationBackoff.backoff(); - int publicationAttempt = 0; - while (publicationAttempt >= 0) { - publicationAttempt++; - try { - Message message = spec.getValueMapper().apply(input, session); - if (spec.getTopicNameMapper() != null) { - destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input)); - } - producer.send(destinationToSendTo, message); - publicationAttempt = -1; - } catch (Exception exception) { - if (spec.getRetryPublicationPolicy() == null || !BackOffUtils.next(sleeper, backoff)) { - outputFailedMessage(input, context, destinationToSendTo, exception); - LOG.debug("Message published to topic {}", destinationToSendTo); - publicationAttempt = -1; - } else { - publicationRetries.inc(); - LOG.warn( - "Error sending message on topic {}, retry attempt {}", - destinationToSendTo, - publicationAttempt, - exception); - } - } - } - } - - private void outputFailedMessage( - T input, ProcessContext context, Destination destinationToSendTo, Exception exception) { - LOG.error("The message wasn't published to topic {}", destinationToSendTo, exception); - context.output(failedMessagesTag, input); - } - - private void restartJmsConnection() throws JMSException { - teardown(); - start(); - } - - @Teardown - public void teardown() throws JMSException { - if (producer != null) { - producer.close(); - producer = null; - } - if (session != null) { - session.close(); - session = null; - } - if (connection != null) { - try { - // If the connection failed, stopping the connection will throw a JMSException - connection.stop(); - } catch (JMSException exception) { - LOG.warn("The connection couldn't be closed", exception); - } - connection.close(); - connection = null; - } - } - } -} diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/PublicationRetryPolicy.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/PublicationRetryPolicy.java deleted file mode 100644 index c6e793ee7608..000000000000 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/PublicationRetryPolicy.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.jms; - -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; - -import com.google.auto.value.AutoValue; -import java.io.Serializable; -import org.checkerframework.dataflow.qual.Pure; -import org.joda.time.Duration; - -@AutoValue -public abstract class PublicationRetryPolicy implements Serializable { - public static final Duration DEFAULT_PUBLICATION_RETRY_DURATION = Duration.standardSeconds(15); - - public abstract @Pure int maxPublicationAttempts(); - - public abstract @Pure Duration retryDuration(); - - public static PublicationRetryPolicy create(int maxPublicationAttempts, Duration retryDuration) { - Duration defaultRetryDuration = - retryDuration == null ? DEFAULT_PUBLICATION_RETRY_DURATION : retryDuration; - checkArgument(maxPublicationAttempts > 0, "maxPublicationAttempts should be greater than 0"); - checkArgument( - defaultRetryDuration != null && defaultRetryDuration.isLongerThan(Duration.ZERO), - "retryDuration should be greater than 0"); - return new AutoValue_PublicationRetryPolicy.Builder() - .setMaxPublicationAttempts(maxPublicationAttempts) - .setRetryDuration(defaultRetryDuration) - .build(); - } - - @AutoValue.Builder - abstract static class Builder { - abstract PublicationRetryPolicy.Builder setMaxPublicationAttempts(int maxPublicationAttempts); - - abstract PublicationRetryPolicy.Builder setRetryDuration(Duration retryDuration); - - abstract PublicationRetryPolicy build(); - } -} diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java new file mode 100644 index 000000000000..b98b6e15343e --- /dev/null +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +@AutoValue +public abstract class RetryConfiguration implements Serializable { + private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(15); + private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000); + + abstract int getMaxAttempts(); + + abstract @Nullable Duration getMaxDuration(); + + abstract @Nullable Duration getInitialDuration(); + + public static RetryConfiguration create(int maxAttempts) { + return create(maxAttempts, null, null); + } + + public static RetryConfiguration create( + int maxAttempts, @Nullable Duration maxDuration, @Nullable Duration initialDuration) { + checkArgument(maxAttempts > 0, "maxAttempts should be greater than 0"); + + if (maxDuration == null || maxDuration.equals(Duration.ZERO)) { + maxDuration = DEFAULT_MAX_CUMULATIVE_BACKOFF; + } + + if (initialDuration == null || initialDuration.equals(Duration.ZERO)) { + initialDuration = DEFAULT_INITIAL_BACKOFF; + } + + return new AutoValue_RetryConfiguration.Builder() + .setMaxAttempts(maxAttempts) + .setInitialDuration(initialDuration) + .setMaxDuration(maxDuration) + .build(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMaxAttempts(int maxAttempts); + + abstract Builder setMaxDuration(Duration maxDuration); + + abstract Builder setInitialDuration(Duration initialDuration); + + abstract RetryConfiguration build(); + } +} diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 77218f3f005e..e23c154a24e6 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -18,9 +18,9 @@ package org.apache.beam.sdk.io.jms; import static org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN; -import static org.apache.beam.sdk.io.jms.JmsIOProducer.CONNECTION_ERRORS_METRIC_NAME; -import static org.apache.beam.sdk.io.jms.JmsIOProducer.JMS_IO_PRODUCER_METRIC_NAME; -import static org.apache.beam.sdk.io.jms.JmsIOProducer.PUBLICATION_RETRIES_METRIC_NAME; +import static org.apache.beam.sdk.io.jms.JmsIO.JmsIOProducer.CONNECTION_ERRORS_METRIC_NAME; +import static org.apache.beam.sdk.io.jms.JmsIO.JmsIOProducer.JMS_IO_PRODUCER_METRIC_NAME; +import static org.apache.beam.sdk.io.jms.JmsIO.JmsIOProducer.PUBLICATION_RETRIES_METRIC_NAME; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -130,6 +130,9 @@ public class JmsIOTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private final RetryConfiguration retryConfiguration = + RetryConfiguration.create(1, Duration.standardSeconds(1), null); + @Before public void startBroker() throws Exception { broker = new BrokerService(); @@ -278,6 +281,7 @@ public void testWriteMessage() throws Exception { JmsIO.write() .withConnectionFactory(connectionFactory) .withValueMapper(new TextMessageMapper()) + .withRetryConfiguration(retryConfiguration) .withQueue(QUEUE) .withUsername(USERNAME) .withPassword(PASSWORD)); @@ -309,6 +313,7 @@ public void testWriteMessageWithError() throws Exception { JmsIO.write() .withConnectionFactory(connectionFactory) .withValueMapper(new TextMessageMapperWithError()) + .withRetryConfiguration(retryConfiguration) .withQueue(QUEUE) .withUsername(USERNAME) .withPassword(PASSWORD)); @@ -349,6 +354,7 @@ public void testWriteDynamicMessage() throws Exception { .withConnectionFactory(connectionFactory) .withUsername(USERNAME) .withPassword(PASSWORD) + .withRetryConfiguration(retryConfiguration) .withTopicNameMapper(e -> e.getTopicName()) .withValueMapper( (e, s) -> { @@ -691,19 +697,19 @@ public void testDiscardCheckpointMark() throws Exception { } @Test - public void testPublisherWithRetryPolicy() { - PublicationRetryPolicy retryPolicy = - PublicationRetryPolicy.create(5, Duration.standardSeconds(15)); + public void testPublisherWithRetryConfiguration() { + RetryConfiguration retryPolicy = + RetryConfiguration.create(5, Duration.standardSeconds(15), null); JmsIO.Write publisher = JmsIO.write() .withConnectionFactory(connectionFactory) - .withPublicationRetryPolicy(retryPolicy) + .withRetryConfiguration(retryPolicy) .withQueue(QUEUE) .withUsername(USERNAME) .withPassword(PASSWORD); assertEquals( - publisher.getRetryPublicationPolicy(), - PublicationRetryPolicy.create(5, Duration.standardSeconds(15))); + publisher.getRetryConfiguration(), + RetryConfiguration.create(5, Duration.standardSeconds(15), null)); } @Test @@ -714,8 +720,9 @@ public void testWriteMessageWithRetryPolicy() throws Exception { Instant now = Instant.now(); String messageText = now.toString(); ArrayList data = new ArrayList<>(Collections.singleton(messageText)); - PublicationRetryPolicy retryPolicy = - PublicationRetryPolicy.create(3, Duration.standardSeconds(waitingSeconds)); + RetryConfiguration retryPolicy = + RetryConfiguration.create( + 3, Duration.standardSeconds(waitingSeconds), Duration.standardDays(10)); WriteJmsResult output = pipeline @@ -724,7 +731,7 @@ public void testWriteMessageWithRetryPolicy() throws Exception { JmsIO.write() .withConnectionFactory(connectionFactory) .withValueMapper(new TextMessageMapperWithErrorCounter()) - .withPublicationRetryPolicy(retryPolicy) + .withRetryConfiguration(retryPolicy) .withQueue(QUEUE) .withUsername(USERNAME) .withPassword(PASSWORD)); @@ -776,6 +783,7 @@ public void testWriteMessagesWithOnConnectionFailedPredicate() throws JMSExcepti .withConnectionFactory(mockedConnectionFactory) .withConnectionFailedPredicate(mockedOnFailedConnectionPredicate) .withValueMapper(new TextMessageMapper()) + .withRetryConfiguration(retryConfiguration) .withQueue(QUEUE) .withUsername(USERNAME) .withPassword(PASSWORD)); @@ -812,10 +820,10 @@ public void testWriteMessagesWithOnConnectionFailedPredicate() throws JMSExcepti @Test public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception { String messageText = "text"; - int maxPublicationAttempts = 5; + int maxPublicationAttempts = 2; ArrayList data = new ArrayList<>(Collections.singleton(messageText)); - PublicationRetryPolicy retryPolicy = - PublicationRetryPolicy.create(maxPublicationAttempts, Duration.standardSeconds(1)); + RetryConfiguration retryConfiguration = + RetryConfiguration.create(maxPublicationAttempts, null, null); WriteJmsResult output = pipeline @@ -828,7 +836,7 @@ public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception { (s, session) -> { throw new IllegalArgumentException("Error!!"); }) - .withPublicationRetryPolicy(retryPolicy) + .withRetryConfiguration(retryConfiguration) .withQueue(QUEUE) .withUsername(USERNAME) .withPassword(PASSWORD)); From 7e4669bb3a4684e7d9a231e4f8218b25b2f11e76 Mon Sep 17 00:00:00 2001 From: Amrane Ait Zeouay Date: Wed, 25 Jan 2023 23:20:33 +0100 Subject: [PATCH 3/6] [#24971] refactor(JmsIO): replace tearDown with finishBundle + rethrow exceptions #24973 Fixes #24971 Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com> --- CHANGES.md | 2 +- .../org/apache/beam/sdk/io/jms/JmsIO.java | 91 ++++------------ .../beam/sdk/io/jms/FakeConnection.java | 100 ------------------ .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 98 ----------------- 4 files changed, 23 insertions(+), 268 deletions(-) delete mode 100644 sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/FakeConnection.java diff --git a/CHANGES.md b/CHANGES.md index 9ee67af081df..793114ec906b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Added in JmsIO a retry policy for failed publications (Java) ([#24971](https://github.com/apache/beam/issues/24971)). ## New Features / Improvements @@ -93,7 +94,6 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * MongoDB IO connector added (Go) ([#24575](https://github.com/apache/beam/issues/24575)). -* Added in JmsIO a retry policy for failed publications (Java) ([#24971](https://github.com/apache/beam/issues/24971)). ## New Features / Improvements diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index e8cc5c9cdcaf..93aebc3080df 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -717,8 +717,6 @@ public abstract static class Write abstract @Nullable RetryConfiguration getRetryConfiguration(); - abstract @Nullable SerializableFunction getConnectionFailedPredicate(); - abstract Builder builder(); @AutoValue.Builder @@ -741,9 +739,6 @@ abstract Builder setTopicNameMapper( abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration); - abstract Builder setConnectionFailedPredicate( - SerializableFunction predicate); - abstract Write build(); } @@ -926,33 +921,6 @@ public Write withRetryConfiguration(RetryConfiguration retryConfiguratio return builder().setRetryConfiguration(retryConfiguration).build(); } - /** - * Specify the predicate function to check whether to retry to reconnect or not. The {@link - * JmsIO.Write} acts as a publisher on the topic. - * - *

Allows you to choose depending on the given exception if to reconnect or not. - * - *

For example: - * - *

{@code
-     * SerializableFunction reconnectOnFailedExceptionPredicate =
-     *   (exception -> exception.getClass() == JmsException.class);
-     * }
- * - *
{@code
-     * .apply(JmsIO.write().withConnectionFailedPredicate(reconnectOnFailedExceptionPredicate)
-     * }
- * - * @param connectionFailedPredicate The predicate function to be used to check if JmsIO should - * reconnect or not. - * @return The corresponding {@link JmsIO.Write}. - */ - public Write withConnectionFailedPredicate( - SerializableFunction connectionFailedPredicate) { - checkArgument(connectionFailedPredicate != null, "connectionFailedPredicate can not be null"); - return builder().setConnectionFailedPredicate(connectionFailedPredicate).build(); - } - @Override public WriteJmsResult expand(PCollection input) { checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required"); @@ -1037,38 +1005,28 @@ public void setup() { @StartBundle public void start() throws JMSException { - if (producer == null) { - ConnectionFactory connectionFactory = spec.getConnectionFactory(); - if (spec.getUsername() != null) { - this.connection = - connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); - } else { - this.connection = connectionFactory.createConnection(); - } - this.connection.setExceptionListener( - exception -> { - if (spec.getConnectionFailedPredicate() != null - && spec.getConnectionFailedPredicate().apply(exception)) { - LOG.error("Jms connection encountered the following exception:", exception); - connectionErrors.inc(); - try { - restartJmsConnection(); - } catch (JMSException e) { - LOG.error("An error occurred while reconnecting:", e); - } - } - }); - this.connection.start(); - // false means we don't use JMS transaction. - this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - if (spec.getQueue() != null) { - this.destination = session.createQueue(spec.getQueue()); - } else if (spec.getTopic() != null) { - this.destination = session.createTopic(spec.getTopic()); - } - this.producer = this.session.createProducer(this.destination); + ConnectionFactory connectionFactory = spec.getConnectionFactory(); + if (spec.getUsername() != null) { + this.connection = + connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); + } else { + this.connection = connectionFactory.createConnection(); } + this.connection.setExceptionListener( + exception -> { + connectionErrors.inc(); + throw new JmsIOException("An error occurred with JMS connection", exception); + }); + this.connection.start(); + // false means we don't use JMS transaction. + this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (spec.getQueue() != null) { + this.destination = session.createQueue(spec.getQueue()); + } else if (spec.getTopic() != null) { + this.destination = session.createTopic(spec.getTopic()); + } + this.producer = this.session.createProducer(this.destination); } @ProcessElement @@ -1114,12 +1072,7 @@ private void publishMessage(T input, ProcessContext context) } } - private void restartJmsConnection() throws JMSException { - teardown(); - start(); - } - - @Teardown + @FinishBundle public void teardown() throws JMSException { if (producer != null) { producer.close(); diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/FakeConnection.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/FakeConnection.java deleted file mode 100644 index 6097a07835f0..000000000000 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/FakeConnection.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.jms; - -import java.io.Serializable; -import javax.jms.Connection; -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionMetaData; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.activemq.AlreadyClosedException; -import org.apache.activemq.ConnectionFailedException; -import org.mockito.Mockito; - -public class FakeConnection implements Connection, Serializable { - - private ExceptionListener listener; - private static int counter = 0; - - @Override - public Session createSession(boolean transacted, int acknowledgeMode) { - return Mockito.mock(Session.class); - } - - @Override - public String getClientID() { - return null; - } - - @Override - public void setClientID(String clientID) {} - - @Override - public ConnectionMetaData getMetaData() { - return null; - } - - @Override - public ExceptionListener getExceptionListener() { - return listener; - } - - @Override - public void setExceptionListener(ExceptionListener listener) { - this.listener = listener; - } - - @Override - public void start() { - if (counter == 0) { - counter++; - this.listener.onException(new ConnectionFailedException()); - } else { - this.listener.onException(new AlreadyClosedException()); - } - } - - @Override - public void stop() {} - - @Override - public void close() {} - - @Override - public ConnectionConsumer createConnectionConsumer( - Destination destination, - String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) { - return null; - } - - @Override - public ConnectionConsumer createDurableConnectionConsumer( - Topic topic, - String subscriptionName, - String messageSelector, - ServerSessionPool sessionPool, - int maxMessages) { - return null; - } -} diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index e23c154a24e6..0a55b2bc75d8 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.jms; import static org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN; -import static org.apache.beam.sdk.io.jms.JmsIO.JmsIOProducer.CONNECTION_ERRORS_METRIC_NAME; import static org.apache.beam.sdk.io.jms.JmsIO.JmsIOProducer.JMS_IO_PRODUCER_METRIC_NAME; import static org.apache.beam.sdk.io.jms.JmsIO.JmsIOProducer.PUBLICATION_RETRIES_METRIC_NAME; import static org.hamcrest.CoreMatchers.allOf; @@ -39,7 +38,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -54,9 +52,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -71,7 +67,6 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ConnectionFailedException; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQMessage; @@ -94,7 +89,6 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableBiFunction; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; import org.joda.time.Duration; @@ -106,9 +100,6 @@ import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import org.mockito.internal.invocation.InterceptedInvocation; -import org.mockito.listeners.InvocationListener; -import org.mockito.listeners.MethodInvocationReport; /** Tests of {@link JmsIO}. */ @RunWith(JUnit4.class) @@ -754,69 +745,6 @@ public void testWriteMessageWithRetryPolicy() throws Exception { assertNull(consumer.receiveNoWait()); } - @Test - public void testWriteMessagesWithOnConnectionFailedPredicate() throws JMSException { - String messageText = "message"; - ExceptionInvocationListener invocationListener = new ExceptionInvocationListener(); - - ArrayList data = new ArrayList<>(Collections.singleton(messageText)); - SerializableFunction mockedOnFailedConnectionPredicate = - Mockito.mock( - FailedConnectionPredicate.class, - Mockito.withSettings() - .serializable() - .invocationListeners(invocationListener) - .defaultAnswer(Mockito.CALLS_REAL_METHODS)); - - ConnectionFactory mockedConnectionFactory = - Mockito.mock(ConnectionFactory.class, Mockito.withSettings().serializable()); - Connection mockedConnection = - Mockito.mock( - FakeConnection.class, - Mockito.withSettings().serializable().defaultAnswer(Mockito.CALLS_REAL_METHODS)); - doReturn(mockedConnection).when(mockedConnectionFactory).createConnection(USERNAME, PASSWORD); - - pipeline - .apply(Create.of(data)) - .apply( - JmsIO.write() - .withConnectionFactory(mockedConnectionFactory) - .withConnectionFailedPredicate(mockedOnFailedConnectionPredicate) - .withValueMapper(new TextMessageMapper()) - .withRetryConfiguration(retryConfiguration) - .withQueue(QUEUE) - .withUsername(USERNAME) - .withPassword(PASSWORD)); - PipelineResult pipelineResult = pipeline.run(); - MetricQueryResults metricQueryResults = - pipelineResult - .metrics() - .queryMetrics( - MetricsFilter.builder() - .addNameFilter( - MetricNameFilter.named( - JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME)) - .build()); - assertThat( - metricQueryResults.getCounters(), - contains( - allOf( - hasProperty("attempted", is((long) 1)), - hasProperty( - "key", - hasToString( - containsString( - String.format( - "%s:%s", - JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME))))))); - Map invocationCounter = invocationListener.getInvocationCounters(); - invocationCounter.forEach( - (exception, isPredicated) -> { - assertEquals(exception.getClass() == ConnectionFailedException.class, isPredicated); - }); - assertEquals(2, invocationCounter.size()); - } - @Test public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception { String messageText = "text"; @@ -1019,30 +947,4 @@ public Message apply(String value, Session session) { } } } - - private static class FailedConnectionPredicate - implements SerializableFunction { - FailedConnectionPredicate() {} - - @Override - public Boolean apply(Exception exception) { - return exception.getClass() == ConnectionFailedException.class; - } - } - - private static class ExceptionInvocationListener implements InvocationListener, Serializable { - - private static final Map invocationCounters = new HashMap<>(); - - public Map getInvocationCounters() { - return invocationCounters; - } - - @Override - public void reportInvocation(MethodInvocationReport methodInvocationReport) { - invocationCounters.put( - ((InterceptedInvocation) methodInvocationReport.getInvocation()).getArgument(0), - (boolean) methodInvocationReport.getReturnedValue()); - } - } } From 7ae4fe6e1ee78143f81939baa12f4ad297fa570b Mon Sep 17 00:00:00 2001 From: Amrane Ait Zeouay Date: Sun, 5 Feb 2023 22:27:59 +0100 Subject: [PATCH 4/6] [#24971] refactor(JmsIO): redesign JmsIOWriter lifecycle #24973 Fixes #24971 Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com> --- .../org/apache/beam/sdk/io/jms/JmsIO.java | 240 ++++++++++++------ .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 75 +++++- 2 files changed, 231 insertions(+), 84 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 93aebc3080df..ed4ca1b16873 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -933,7 +933,7 @@ public WriteJmsResult expand(PCollection input) { "Only one of withQueue(queue), withTopic(topic), or withTopicNameMapper(function) must be set."); checkArgument(getValueMapper() != null, "withValueMapper() is required"); - return input.apply(new JmsIOProducer<>(this)); + return input.apply(new Writer<>(this)); } private boolean isExclusiveTopicQueue() { @@ -946,56 +946,186 @@ private boolean isExclusiveTopicQueue() { } } - public static class JmsIOProducer extends PTransform, WriteJmsResult> { + static class Writer extends PTransform, WriteJmsResult> { public static final String CONNECTION_ERRORS_METRIC_NAME = "connectionErrors"; public static final String PUBLICATION_RETRIES_METRIC_NAME = "publicationRetries"; - public static final String JMS_IO_PRODUCER_METRIC_NAME = JmsIOProducer.class.getCanonicalName(); + public static final String JMS_IO_PRODUCER_METRIC_NAME = Writer.class.getCanonicalName(); - private static final Logger LOG = LoggerFactory.getLogger(JmsIOProducer.class); + private static final Logger LOG = LoggerFactory.getLogger(Writer.class); private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS"; + private static final String REPUBLISH_TO_JMS_STEP_NAME = "Republish to JMS"; private final JmsIO.Write spec; private final TupleTag messagesTag; private final TupleTag failedMessagesTag; + private final TupleTag failedRepublishedMessagesTag; - JmsIOProducer(JmsIO.Write spec) { + Writer(JmsIO.Write spec) { this.spec = spec; this.messagesTag = new TupleTag<>(); this.failedMessagesTag = new TupleTag<>(); + this.failedRepublishedMessagesTag = new TupleTag<>(); } @Override public WriteJmsResult expand(PCollection input) { - PCollectionTuple failedPublishedMessages = + PCollectionTuple failedPublishedMessagesTuple = input.apply( PUBLISH_TO_JMS_STEP_NAME, - ParDo.of(new JmsIOProducerFn()) + ParDo.of(new JmsIOProducerFn<>(spec, failedMessagesTag)) .withOutputTags(messagesTag, TupleTagList.of(failedMessagesTag))); - PCollection failedMessages = - failedPublishedMessages.get(failedMessagesTag).setCoder(input.getCoder()); - failedPublishedMessages.get(messagesTag).setCoder(input.getCoder()); - return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedMessages); + PCollection failedPublishedMessages = + failedPublishedMessagesTuple.get(failedMessagesTag).setCoder(input.getCoder()); + failedPublishedMessagesTuple.get(messagesTag).setCoder(input.getCoder()); + + // Republishing failed messages + PCollectionTuple failedRepublishedMessagesTuple = + failedPublishedMessages.apply( + REPUBLISH_TO_JMS_STEP_NAME, + ParDo.of(new JmsIOProduceRetryFn<>(spec, failedRepublishedMessagesTag)) + .withOutputTags(messagesTag, TupleTagList.of(failedRepublishedMessagesTag))); + PCollection failedRepublishedMessages = + failedRepublishedMessagesTuple + .get(failedRepublishedMessagesTag) + .setCoder(input.getCoder()); + failedRepublishedMessagesTuple.get(messagesTag).setCoder(input.getCoder()); + + return WriteJmsResult.in( + input.getPipeline(), failedRepublishedMessagesTag, failedRepublishedMessages); } - private class JmsIOProducerFn extends DoFn { + private static class JmsConnection implements Serializable { - private transient @Initialized FluentBackoff retryBackOff; + private static final long serialVersionUID = 1L; private transient @Initialized Session session; private transient @Initialized Connection connection; private transient @Initialized Destination destination; private transient @Initialized MessageProducer producer; + private boolean isProducerNeedsToBeCreated = true; + private final JmsIO.Write spec; private final Counter connectionErrors = Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME); + + public JmsConnection(Write spec) { + this.spec = spec; + } + + public void start() throws JMSException { + if (isProducerNeedsToBeCreated) { + ConnectionFactory connectionFactory = spec.getConnectionFactory(); + if (spec.getUsername() != null) { + this.connection = + connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); + } else { + this.connection = connectionFactory.createConnection(); + } + this.connection.setExceptionListener( + exception -> { + this.isProducerNeedsToBeCreated = true; + this.connectionErrors.inc(); + }); + this.connection.start(); + // false means we don't use JMS transaction. + this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (spec.getQueue() != null) { + this.destination = session.createQueue(spec.getQueue()); + } else if (spec.getTopic() != null) { + this.destination = session.createTopic(spec.getTopic()); + } + this.producer = this.session.createProducer(this.destination); + } + } + + public void publishMessage(T input) throws JMSException, JmsIOException { + Destination destinationToSendTo = destination; + try { + Message message = spec.getValueMapper().apply(input, session); + if (spec.getTopicNameMapper() != null) { + destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input)); + } + producer.send(destinationToSendTo, message); + } catch (JMSException | JmsIOException exception) { + LOG.warn("Error sending message to the topic {}", destinationToSendTo, exception); + throw exception; + } + } + + public void close() throws JMSException { + if (producer != null) { + producer.close(); + producer = null; + } + if (session != null) { + session.close(); + session = null; + } + if (connection != null) { + try { + // If the connection failed, stopping the connection will throw a JMSException + connection.stop(); + } catch (JMSException exception) { + LOG.warn("The connection couldn't be closed", exception); + } + connection.close(); + connection = null; + } + } + } + + private static class JmsIOProducerFn extends DoFn { + + private final @Initialized JmsConnection jmsConnection; + private final TupleTag failedMessagesTag; + + JmsIOProducerFn(JmsIO.Write spec, TupleTag failedMessagesTag) { + this.failedMessagesTag = failedMessagesTag; + this.jmsConnection = new JmsConnection<>(spec); + } + + @Setup + public void setup() throws JMSException { + this.jmsConnection.start(); + } + + @ProcessElement + public void processElement(@Element T input, ProcessContext context) { + try { + this.jmsConnection.publishMessage(input); + } catch (JMSException | JmsIOException exception) { + context.output(this.failedMessagesTag, input); + } + } + + @Teardown + public void teardown() throws JMSException { + this.jmsConnection.close(); + } + } + + static class JmsIOProduceRetryFn extends DoFn { + + private transient @Initialized FluentBackoff retryBackOff; + + private final JmsIO.Write spec; + private final TupleTag failedMessagesTags; + private final @Initialized JmsConnection jmsConnection; private final Counter publicationRetries = Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME); + JmsIOProduceRetryFn(JmsIO.Write spec, TupleTag failedMessagesTags) { + this.spec = spec; + this.failedMessagesTags = failedMessagesTags; + this.jmsConnection = new JmsConnection<>(spec); + } + @Setup - public void setup() { + public void setup() throws JMSException { + this.jmsConnection.start(); RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration()); - retryBackOff = FluentBackoff.DEFAULT .withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration())) @@ -1003,95 +1133,47 @@ public void setup() { .withMaxRetries(retryConfiguration.getMaxAttempts()); } - @StartBundle - public void start() throws JMSException { - ConnectionFactory connectionFactory = spec.getConnectionFactory(); - if (spec.getUsername() != null) { - this.connection = - connectionFactory.createConnection(spec.getUsername(), spec.getPassword()); - } else { - this.connection = connectionFactory.createConnection(); - } - this.connection.setExceptionListener( - exception -> { - connectionErrors.inc(); - throw new JmsIOException("An error occurred with JMS connection", exception); - }); - this.connection.start(); - // false means we don't use JMS transaction. - this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - if (spec.getQueue() != null) { - this.destination = session.createQueue(spec.getQueue()); - } else if (spec.getTopic() != null) { - this.destination = session.createTopic(spec.getTopic()); - } - this.producer = this.session.createProducer(this.destination); - } - @ProcessElement public void processElement(@Element T input, ProcessContext context) { try { publishMessage(input, context); } catch (IOException | InterruptedException exception) { LOG.error("Error while publishing the message", exception); - context.output(failedMessagesTag, input); - Thread.currentThread().interrupt(); + context.output(this.failedMessagesTags, input); + if (exception instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } } private void publishMessage(T input, ProcessContext context) throws IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; - Destination destinationToSendTo = destination; BackOff backoff = checkStateNotNull(retryBackOff).backoff(); - int publicationAttempt = 0; - while (publicationAttempt >= 0) { - publicationAttempt++; + int retryAttempt = 0; + while (true) { try { - Message message = spec.getValueMapper().apply(input, session); - if (spec.getTopicNameMapper() != null) { - destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input)); - } - producer.send(destinationToSendTo, message); - publicationAttempt = -1; - } catch (Exception exception) { + this.jmsConnection.publishMessage(input); + break; + } catch (JMSException | JmsIOException exception) { if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("The message wasn't published to topic {}", destinationToSendTo, exception); - context.output(failedMessagesTag, input); - publicationAttempt = -1; + LOG.error("Error sending message", exception); + context.output(this.failedMessagesTags, input); + break; } else { publicationRetries.inc(); LOG.warn( - "Error sending message on topic {}, retry attempt {}", - destinationToSendTo, - publicationAttempt, + "Error sending message, retrying to publish attempt {}", + retryAttempt++, exception); } } } } - @FinishBundle + @Teardown public void teardown() throws JMSException { - if (producer != null) { - producer.close(); - producer = null; - } - if (session != null) { - session.close(); - session = null; - } - if (connection != null) { - try { - // If the connection failed, stopping the connection will throw a JMSException - connection.stop(); - } catch (JMSException exception) { - LOG.warn("The connection couldn't be closed", exception); - } - connection.close(); - connection = null; - } + this.jmsConnection.close(); } } } diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 0a55b2bc75d8..b55e58bfae70 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -18,8 +18,8 @@ package org.apache.beam.sdk.io.jms; import static org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN; -import static org.apache.beam.sdk.io.jms.JmsIO.JmsIOProducer.JMS_IO_PRODUCER_METRIC_NAME; -import static org.apache.beam.sdk.io.jms.JmsIO.JmsIOProducer.PUBLICATION_RETRIES_METRIC_NAME; +import static org.apache.beam.sdk.io.jms.JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME; +import static org.apache.beam.sdk.io.jms.JmsIO.Writer.PUBLICATION_RETRIES_METRIC_NAME; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -50,6 +50,7 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Enumeration; import java.util.List; @@ -710,7 +711,7 @@ public void testWriteMessageWithRetryPolicy() throws Exception { int pipelineDuration = 5; Instant now = Instant.now(); String messageText = now.toString(); - ArrayList data = new ArrayList<>(Collections.singleton(messageText)); + List data = Collections.singletonList(messageText); RetryConfiguration retryPolicy = RetryConfiguration.create( 3, Duration.standardSeconds(waitingSeconds), Duration.standardDays(10)); @@ -749,7 +750,7 @@ public void testWriteMessageWithRetryPolicy() throws Exception { public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception { String messageText = "text"; int maxPublicationAttempts = 2; - ArrayList data = new ArrayList<>(Collections.singleton(messageText)); + List data = Collections.singletonList(messageText); RetryConfiguration retryConfiguration = RetryConfiguration.create(maxPublicationAttempts, null, null); @@ -762,7 +763,7 @@ public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception { .withValueMapper( (SerializableBiFunction) (s, session) -> { - throw new IllegalArgumentException("Error!!"); + throw new JmsIOException("Error!!"); }) .withRetryConfiguration(retryConfiguration) .withQueue(QUEUE) @@ -802,6 +803,45 @@ public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception { assertNull(consumer.receiveNoWait()); } + @Test + public void testWriteMessagesWithErrors() throws Exception { + int maxPublicationAttempts = 2; + // Message 1 should fail for Published DoFn handled by the republished DoFn and published to the + // queue + // Message 2 should fail both DoFn + // Message 3 & 4 should pass the publish DoFn + List data = Arrays.asList("Message 1", "Message 2", "Message 3", "Message 4"); + + RetryConfiguration retryConfiguration = + RetryConfiguration.create(maxPublicationAttempts, null, null); + + WriteJmsResult output = + pipeline + .apply(Create.of(data)) + .apply( + JmsIO.write() + .withConnectionFactory(connectionFactory) + .withValueMapper(new TextMessageMapperWithErrorAndCounter()) + .withRetryConfiguration(retryConfiguration) + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD)); + + PAssert.that(output.getFailedMessages()).containsInAnyOrder("Message 2"); + pipeline.run(); + + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); + int count = 0; + while (consumer.receive(1000) != null) { + count++; + } + assertEquals(3, count); + System.out.println(count); + } + private int count(String queue) throws Exception { Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); connection.start(); @@ -947,4 +987,29 @@ public Message apply(String value, Session session) { } } } + + private static class TextMessageMapperWithErrorAndCounter + implements SerializableBiFunction { + private static int errorCounter = 0; + + @Override + public Message apply(String value, Session session) { + try { + if (value.equals("Message 1") || value.equals("Message 2")) { + if (errorCounter != 0 && value.equals("Message 1")) { + TextMessage msg = session.createTextMessage(); + msg.setText(value); + return msg; + } + errorCounter++; + throw new JMSException("Error!!"); + } + TextMessage msg = session.createTextMessage(); + msg.setText(value); + return msg; + } catch (JMSException e) { + throw new JmsIOException("Error creating TextMessage", e); + } + } + } } From 01a8ef28d76d1a89a8c3d2daef3c32b95e7030f2 Mon Sep 17 00:00:00 2001 From: Amrane Ait Zeouay Date: Fri, 10 Feb 2023 10:39:15 +0100 Subject: [PATCH 5/6] [#24971] refactor(JmsIO): fix some issues to restart the producer #24973 Fixes #24971 Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com> --- .../org/apache/beam/sdk/io/jms/JmsIO.java | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index ed4ca1b16873..90661deb1ee6 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -1037,6 +1037,7 @@ public void start() throws JMSException { this.destination = session.createTopic(spec.getTopic()); } this.producer = this.session.createProducer(this.destination); + this.isProducerNeedsToBeCreated = false; } } @@ -1048,13 +1049,17 @@ public void publishMessage(T input) throws JMSException, JmsIOException { destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(input)); } producer.send(destinationToSendTo, message); - } catch (JMSException | JmsIOException exception) { - LOG.warn("Error sending message to the topic {}", destinationToSendTo, exception); + } catch (JMSException | JmsIOException | NullPointerException exception) { + // Handle NPE in case of getValueMapper or getTopicNameMapper returns NPE + if (exception instanceof NullPointerException) { + throw new JmsIOException("An error occurred", exception); + } throw exception; } } public void close() throws JMSException { + isProducerNeedsToBeCreated = true; if (producer != null) { producer.close(); producer = null; @@ -1086,8 +1091,8 @@ private static class JmsIOProducerFn extends DoFn { this.jmsConnection = new JmsConnection<>(spec); } - @Setup - public void setup() throws JMSException { + @StartBundle + public void startBundle() throws JMSException { this.jmsConnection.start(); } @@ -1100,8 +1105,13 @@ public void processElement(@Element T input, ProcessContext context) { } } + @FinishBundle + public void finishBundle() throws JMSException { + this.jmsConnection.close(); + } + @Teardown - public void teardown() throws JMSException { + public void tearDown() throws JMSException { this.jmsConnection.close(); } } @@ -1123,8 +1133,7 @@ static class JmsIOProduceRetryFn extends DoFn { } @Setup - public void setup() throws JMSException { - this.jmsConnection.start(); + public void setup() { RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration()); retryBackOff = FluentBackoff.DEFAULT @@ -1133,6 +1142,11 @@ public void setup() throws JMSException { .withMaxRetries(retryConfiguration.getMaxAttempts()); } + @StartBundle + public void startBundle() throws JMSException { + this.jmsConnection.start(); + } + @ProcessElement public void processElement(@Element T input, ProcessContext context) { try { @@ -1150,7 +1164,6 @@ private void publishMessage(T input, ProcessContext context) throws IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = checkStateNotNull(retryBackOff).backoff(); - int retryAttempt = 0; while (true) { try { this.jmsConnection.publishMessage(input); @@ -1162,17 +1175,18 @@ private void publishMessage(T input, ProcessContext context) break; } else { publicationRetries.inc(); - LOG.warn( - "Error sending message, retrying to publish attempt {}", - retryAttempt++, - exception); } } } } + @FinishBundle + public void finishBundle() throws JMSException { + this.jmsConnection.close(); + } + @Teardown - public void teardown() throws JMSException { + public void tearDown() throws JMSException { this.jmsConnection.close(); } } From 3f23058150acd3a9308bd2dea86459b9d4733b06 Mon Sep 17 00:00:00 2001 From: Amrane Ait Zeouay Date: Fri, 10 Feb 2023 22:46:49 +0100 Subject: [PATCH 6/6] [#24971] refactor(JmsIO): use only one DoFn to publish messages #24973 Fixes #24971 Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com> --- .../org/apache/beam/sdk/io/jms/JmsIO.java | 69 +++---------------- 1 file changed, 8 insertions(+), 61 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 90661deb1ee6..db90c098e341 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -954,18 +954,15 @@ static class Writer extends PTransform, WriteJmsResult> { private static final Logger LOG = LoggerFactory.getLogger(Writer.class); private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS"; - private static final String REPUBLISH_TO_JMS_STEP_NAME = "Republish to JMS"; private final JmsIO.Write spec; private final TupleTag messagesTag; private final TupleTag failedMessagesTag; - private final TupleTag failedRepublishedMessagesTag; Writer(JmsIO.Write spec) { this.spec = spec; this.messagesTag = new TupleTag<>(); this.failedMessagesTag = new TupleTag<>(); - this.failedRepublishedMessagesTag = new TupleTag<>(); } @Override @@ -979,20 +976,7 @@ public WriteJmsResult expand(PCollection input) { failedPublishedMessagesTuple.get(failedMessagesTag).setCoder(input.getCoder()); failedPublishedMessagesTuple.get(messagesTag).setCoder(input.getCoder()); - // Republishing failed messages - PCollectionTuple failedRepublishedMessagesTuple = - failedPublishedMessages.apply( - REPUBLISH_TO_JMS_STEP_NAME, - ParDo.of(new JmsIOProduceRetryFn<>(spec, failedRepublishedMessagesTag)) - .withOutputTags(messagesTag, TupleTagList.of(failedRepublishedMessagesTag))); - PCollection failedRepublishedMessages = - failedRepublishedMessagesTuple - .get(failedRepublishedMessagesTag) - .setCoder(input.getCoder()); - failedRepublishedMessagesTuple.get(messagesTag).setCoder(input.getCoder()); - - return WriteJmsResult.in( - input.getPipeline(), failedRepublishedMessagesTag, failedRepublishedMessages); + return WriteJmsResult.in(input.getPipeline(), failedMessagesTag, failedPublishedMessages); } private static class JmsConnection implements Serializable { @@ -1081,42 +1065,7 @@ public void close() throws JMSException { } } - private static class JmsIOProducerFn extends DoFn { - - private final @Initialized JmsConnection jmsConnection; - private final TupleTag failedMessagesTag; - - JmsIOProducerFn(JmsIO.Write spec, TupleTag failedMessagesTag) { - this.failedMessagesTag = failedMessagesTag; - this.jmsConnection = new JmsConnection<>(spec); - } - - @StartBundle - public void startBundle() throws JMSException { - this.jmsConnection.start(); - } - - @ProcessElement - public void processElement(@Element T input, ProcessContext context) { - try { - this.jmsConnection.publishMessage(input); - } catch (JMSException | JmsIOException exception) { - context.output(this.failedMessagesTag, input); - } - } - - @FinishBundle - public void finishBundle() throws JMSException { - this.jmsConnection.close(); - } - - @Teardown - public void tearDown() throws JMSException { - this.jmsConnection.close(); - } - } - - static class JmsIOProduceRetryFn extends DoFn { + static class JmsIOProducerFn extends DoFn { private transient @Initialized FluentBackoff retryBackOff; @@ -1126,7 +1075,7 @@ static class JmsIOProduceRetryFn extends DoFn { private final Counter publicationRetries = Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME); - JmsIOProduceRetryFn(JmsIO.Write spec, TupleTag failedMessagesTags) { + JmsIOProducerFn(JmsIO.Write spec, TupleTag failedMessagesTags) { this.spec = spec; this.failedMessagesTags = failedMessagesTags; this.jmsConnection = new JmsConnection<>(spec); @@ -1150,8 +1099,8 @@ public void startBundle() throws JMSException { @ProcessElement public void processElement(@Element T input, ProcessContext context) { try { - publishMessage(input, context); - } catch (IOException | InterruptedException exception) { + publishMessage(input); + } catch (JMSException | JmsIOException | IOException | InterruptedException exception) { LOG.error("Error while publishing the message", exception); context.output(this.failedMessagesTags, input); if (exception instanceof InterruptedException) { @@ -1160,8 +1109,8 @@ public void processElement(@Element T input, ProcessContext context) { } } - private void publishMessage(T input, ProcessContext context) - throws IOException, InterruptedException { + private void publishMessage(T input) + throws JMSException, JmsIOException, IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = checkStateNotNull(retryBackOff).backoff(); while (true) { @@ -1170,9 +1119,7 @@ private void publishMessage(T input, ProcessContext context) break; } catch (JMSException | JmsIOException exception) { if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("Error sending message", exception); - context.output(this.failedMessagesTags, input); - break; + throw exception; } else { publicationRetries.inc(); }