Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#24971] Add a retry policy for JmsIO #24971 #24973

Merged
merged 9 commits into from
Feb 11, 2023
Merged

[#24971] Add a retry policy for JmsIO #24971 #24973

merged 9 commits into from
Feb 11, 2023

Conversation

Amraneze
Copy link
Contributor

@Amraneze Amraneze commented Jan 11, 2023

Add a retry policy to publish messages in case of connection closed

Fixes #24971

Co-Authored-By: Amrane Ait Zeouay [email protected]

When trying to publish messages using JmsIO with a closed/failed connection, the messages are not published and lost. We will need to get the failed messages with the function getFailedMessages and try again and again multiple times.

var failedPublishedMessages = messages.apply(getWriter(sinkOptions)).getFailedMessages();
var refailedPublishedMessages = failedPublishedMessages.apply(REPUBLISH_FAILED_MESSAGE_NAME, getWriter(sinkOptions)).getFailedMessages();
....
return PDone.in(cdpMessages.getPipeline());

To not have multiple steps for retrying the failed publication, we added a Retry Policy which consist on retrying the publication n times (the number is given by the user) until the timeout (given by the user too) is finished. Also, we added a predicate function, to give the user the option to handle when the connection should be reconnected or not in case of a failed connection.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Abacn
Copy link
Contributor

Abacn commented Jan 17, 2023

just fyi beam has backoff utility classes for retry:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java

EDIT: never mind, I see the implenetation is using FluentBackoff.
Is there a need to create a separate class for RetryPolicy? It is only used in JmsProducer and most likely users do not to set them. Generally api calls should have a retry mechanism by default.

@Amraneze
Copy link
Contributor Author

Amraneze commented Jan 18, 2023

just fyi beam has backoff utility classes for retry: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java

EDIT: never mind, I see the implenetation is using FluentBackoff. Is there a need to create a separate class for RetryPolicy? It is only used in JmsProducer and most likely users do not to set them. Generally api calls should have a retry mechanism by default.

The issue is that JmsIO doesn't retry at all when publishing a message, any message that is failed to publish is pushed to ouput as you can see in JmsIO code:

@ProcessElement
public void processElement(ProcessContext ctx) {
  Destination destinationToSendTo = destination;
  try {
    Message message = spec.getValueMapper().apply(ctx.element(), session);
    if (spec.getTopicNameMapper() != null) {
      destinationToSendTo =
          session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
    }
    producer.send(destinationToSendTo, message);
  } catch (Exception ex) {
    LOG.error("Error sending message on topic {}", destinationToSendTo);
    ctx.output(failedMessageTag, ctx.element());
  }
}

If there are any exceptions, they will be catched and the message is pushed to failed message tag's output. Which means we will need to get the failed messages and retry by ourselves N times like this:

public PDone expand(PCollection<Message> messages) {
    // Retry 3 times when the session is closed
    messages.apply(getJmsWriter(sinkOptions))
          .getFailedMessages()
         .apply(REPUBLISH_FAILED_MESSAGE_NAME, getJmsWriter(sinkOptions))
           .getFailedMessages()
         .apply(REPUBLISH_FAILED_MESSAGE_NAME, getJmsWriter(sinkOptions))
           .getFailedMessages()
         .apply(REPUBLISH_FAILED_MESSAGE_NAME, getJmsWriter(sinkOptions));

    return PDone.in(messages.getPipeline());
  }

  private static JmsIO.Write<Message> getJmsWriter(SinkOptions sinkOptions) {
    return JmsIO.<Message>write()
        .withConnectionFactory(SinkOptions.getConnectionFactory())
        .withValueMapper(getValueMapper())
        .withTopicNameMapper(getTopicNameMapper());
  }

When the session is closed and the first PTransform that is publishing the message is displaying error logs Error sending message on topic, it will not reconnect and just send all messages to the second step as you can see in the screenshot

image

Note: The graph of first PTransform is the same as the second PTransform after the session is closed, which means the first step doesn't publish at all and it just send all messages to the second step.

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change! I see what has caused the issue, and commented above that there are changes still needed in order to fix it. Please let me know if it is not clear to you or if there is any questions.

CHANGES.md Outdated Show resolved Hide resolved
@github-actions
Copy link
Contributor

github-actions bot commented Feb 2, 2023

Reminder, please take a look at this pr: @robertwb @johnjcasey

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change. got a couple of comments should be easy to resolve. Thanks for your patience.

Amraneze and others added 4 commits February 3, 2023 09:28
Add a retry policy to publish messages in case of connection closed

Fixes#24971

Co-Authored-By: Amrane Ait Zeouay <[email protected]>
@Amraneze
Copy link
Contributor Author

Amraneze commented Feb 5, 2023

Hello @Abacn, Sorry I was busy with work. I redesigned the flow of JmsIOWriter. I tried to create a cache and flush all the messages in finishBundle but we will need to handle the exception in processElement otherwise the DoFn will tear down without calling finishBundle. I added a new step which will be used only for processing failed published messages and also I added a condition whenever the producer needs to be recreated it. Let me know what do you think and if is there any tests to add ?

PS: Should I add a benchmark test ?

@github-actions
Copy link
Contributor

github-actions bot commented Feb 8, 2023

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @apilloud for label java.
R: @Abacn for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@apilloud
Copy link
Member

apilloud commented Feb 9, 2023

R: @Abacn

@Abacn
Copy link
Contributor

Abacn commented Feb 9, 2023

Sorry for the delay. Will take another look tomorrow.

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it's getting close. Found a bug and a design suggestion here.

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This LGTM.

nit: the added back-off unit tests take a while to complete. In the case of test may use a mocked sleeper

@Amraneze
Copy link
Contributor Author

Thanks. This LGTM.

nit: the added back-off unit tests take a while to complete. In the case of test may use a mocked sleeper

I will improve the test with the new API calls.

@Amraneze
Copy link
Contributor Author

@Abacn can you please merge the PR, I don't have access to merge it :( ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: Messages are not published when a connection is closed with JmsIO
3 participants