Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub : add the concept of message abandonment #20

Closed
dhoard opened this issue Oct 25, 2018 · 15 comments
Closed

PubSub : add the concept of message abandonment #20

dhoard opened this issue Oct 25, 2018 · 15 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@dhoard
Copy link

dhoard commented Oct 25, 2018

PusubMessage / AckReplyConsumer needs the concept of "abandoning" a message.

Abandoning a message would mean that the message ack deadline would no longer be auto-extended.

public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
	try {

		// message processing

		consumer.ack();
	} catch (Throwable t) {
		// In an effort to prevent a tight message delivery / processing loop,
		// we want to abandon the message (stop extending the ack deadline and
		// let the message expire "naturally")
		
		consumer.abandon();
	}
}
@jcerwinske
Copy link

jcerwinske commented Dec 5, 2018

I would second this request. I need to be able to abort processing without what is essentially instantaneous redelivery. If I'm writing messages to a DB and the DB is down, for example, and will be down for an an extended period, there is no need or desire to instantaneous reprocess what would fail thousands and and thousands of times. But I dont want to throw it away, as service will eventually be restored.

Simply nacking messages basically ensures I'm going to blow up my stackdriver logging costs, for example. It will result in potentially millions of unnecessary instant failed retry attempts, all logged and eating at my bill.

The "abandoned" concept would allow the service to simply redeliver based on the subscription ack timeout, which at least is better than immediately, before any type of downstream service recovery has occurred.

Or I could even ework with a .nack(redelivery_delay) call and manage the redelivery timing myself, per message.

Native, innate, support for built in exponential backoffs for nacked messages, configured at the subscription level would be ideal, but I'll take anything at this point.

@dhoard dhoard changed the title Feature : PubSub - add the concept of message abandonment PubSub : add the concept of message abandonment Dec 8, 2018
@nhartner
Copy link

Why was this feature reverted in 0.76.0? It was a great idea.

@dhoard
Copy link
Author

dhoard commented Jan 16, 2019

@chingor13 Can you provide an insight as to why this was removed?

@sduskis
Copy link
Contributor

sduskis commented Jan 16, 2019

The pub/sub team has to approve this feature request.

@sduskis sduskis reopened this Jan 16, 2019
@sduskis
Copy link
Contributor

sduskis commented Jan 16, 2019

@jadekler and @kolea2, can you please discuss this issue with the pub/sub team?

@kolea2
Copy link
Contributor

kolea2 commented Jan 23, 2019

The ack deadline is not intended as a way to delay delivery, but to signal that more time is needed to process a message. Delayed redelivery (and better control over redelivery semantics in general) is a useful feature and one we'd like to consider adding to the service, but we do not want to implement this feature just by overloading the meaning of ack deadline. This change will also need to be coordinated across all of the client library supported languages in order to maintain consistency.

@nhartner
Copy link

Please give me some way for my message subscriber to tell the library that I no longer want it to keep extending the deadline. Rename the method to forget or stopExtendingDeadline and mark it Beta. But don't make this impossible to do. As long as the method name and javadoc make no mention or guarantees of redelivery delay, I don't understand the push back.

@dpcollins-google
Copy link
Contributor

dpcollins-google commented Feb 17, 2019

@nhartner, if you simply stop extending the ack deadline, the Cloud Pub/Sub service will likely resend the exact same message back to you instantaneously once the most recent period expires. This means it will be sent back to you in 30 seconds (or whatever your configured ack extensions period is) instead of .5 seconds when you nack it. If you want to implement a per-message exponential backoff on another service (a database for example) you should consider implementing that in your own code, since Cloud Pub/Sub has no way of knowing what backoff pattern you would find acceptable.

You can implement this in your message handler by performing x requests with y delay between them on failure, acking on any success and nacking on final failure. Google's api-client-library provides a useful utility for generating backoff times here. But simply not extending the deadline is not a useful answer, as it gives the user no control over when they want to retry.

As an aside, if the answer to 'when to retry' is 'never', I would suggest using a dead letter queue pattern, where you publish unprocessable messages to a separate topic, ack them once publish is guaranteed, and handle them separately.

@dhoard
Copy link
Author

dhoard commented Feb 17, 2019

@dpcollins-google I used the term abandon() but in reality it's an async delayed nack(). I understand that this can be implemented by a developer ... but it seems like such a common pattern that it should be in the API proper.

For example ... given a Subscriber with a max ack extension period of 1 hour ...

public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
	try {
		// message processing

		consumer.ack();
	} catch (Throwable t) {
		// In an effort to prevent a tight message delivery / processing loop
                // perform a delayed nack() ... asynchronous nack after 600000 milliseconds
                consumer.nack(600000);
	}
}

@dpcollins-google
Copy link
Contributor

@dhoard Whether this is a common pattern is kind of orthogonal to the question of whether this is something that should be handled by pubsub. If you want to retry after 10 minutes, every 10 minutes, thats fine if that is the correct backoff strategy for whatever service that might fail that you're interacting with. I'd argue that that backoff strategy should be part of the client library for the service you're communicating with, not the Cloud Pub/Sub library. If you're making requests to an external service that might fail that does not implement a backoff strategy, the correct thing to do (in my opinion) would be to implement a backoff strategy appropriate for that service in that client library.

@dhoard
Copy link
Author

dhoard commented Feb 17, 2019

@dpcollins-google your points are valid.

@sduskis given the discussion above, I have no problem with closing this issue.


For any possibly subscribers on the issue, something like the code below (not tested) could be used in your application to perform a simple delayed nack()

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Class to perform assynchronous nack of AckReplyConsumer
 */
public class AckReplyConsumerUtils {
    
    private static final Logger logger = LoggerFactory.getLogger(AckReplyConsumerUtils.class);
    
    private static AckReplyConsumerUtils instance = new AckReplyConsumerUtils();
    
    private ScheduledExecutorService scheduledExecutorService;
    
    private AckReplyConsumerUtils() {
        this.scheduledExecutorService = Executors.newScheduledThreadPool(10);
    }

    /**
     * Method to get instance
     * 
     * @return AckReplyConsumerUtils
     */
    public static AckReplyConsumerUtils getInstance() {
        return instance;
    }

    /**
     * Method to asynchronously nack an AckReplayConsumer after a delay
     * 
     * @param ackReplyConsumer
     * @param delay
     * @param timeUnit
     */
    public void nack(AckReplyConsumer ackReplyConsumer, long delay, TimeUnit timeUnit) {
        this.scheduledExecutorService.schedule(
                new Runnable() {
                    @Override
                    public void run() {
                        try {
                            ackReplyConsumer.nack();;
                        } catch (Throwable t) {
                            logger.warn("Exception nacking message");
                        }
                    }
                },
                delay,
                timeUnit);
    }
}

AckReplyConsumerUtils usage example ...

public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
	try {
		// message processing

		consumer.ack();
	} catch (Throwable t) {
		AckReplyConsumerUtils.getInstance().nack(consumer, 10, TimeUnit.MINUTES);
	}
}

@kamalaboulhosn
Copy link
Contributor

Please note that @kolea2's response is still correct. We do agree this is a useful feature, but would want to ensure the service supports it natively rather than just using modify ack deadline for it. Delaying the nack on a message may have unintended consequences if flow control limits are set. These messages will count toward the outstanding messages and so more messages will not be delivered if enough message responses are delayed. We will provide updates when we have more to share on the possibility of supporting delayed redelivery for messages.

@nhartner
Copy link

nhartner commented Feb 17, 2019

@dhoard thanks for the suggested workaround. I'll be interested to play around with this. I suspect 100 threads is overkill since tasks are internally put in a queue by ScheduledThreadPoolExecutor and don't tie up a thread until they are ready to be processed.

@chingor13 chingor13 transferred this issue from googleapis/google-cloud-java Dec 4, 2019
@chingor13 chingor13 added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Dec 4, 2019
@google-cloud-label-sync google-cloud-label-sync bot added the api: pubsub Issues related to the googleapis/java-pubsub API. label Jan 29, 2020
@kamalaboulhosn
Copy link
Contributor

Since this request is more of a service-wide request and not something limited to the Java client library, we will track it via the issue tracker. I should also mention that there should be some news soon regarding features to support more control over retry semantics so that nacked messages won't come back to the subscriber immediately.

@davidxia
Copy link

Not directly related to abandoning a message, but for people who wanted to abandon because they wanted to use it to make PubSub redeliver the message with a delay, this info might be helpful. I noticed Pub/Sub supports retry policies 1 that are GA as of 2020-06-16 2.

If the acknowledgement deadline expires or a subscriber responds with a negative acknowledgement, Pub/Sub can send the message again using exponential backoff.

If the retry policy isn't set, Pub/Sub resends the message as soon as the acknowledgement deadline expires or a subscriber responds with a negative acknowledgement.

If the maximum backoff duration is set, the default minimum backoff duration is 10 seconds. If the minimum backoff duration is set, the default maximum backoff duration is 600 seconds.

The longest backoff duration that you can specify is 600 seconds.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

9 participants