Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages are retrieved before modifyAckDeadline time expires #413

Closed
TraxRetail opened this issue Dec 16, 2018 · 16 comments
Closed

Messages are retrieved before modifyAckDeadline time expires #413

TraxRetail opened this issue Dec 16, 2018 · 16 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. 🚨 This issue needs some love. status: investigating The issue is under investigation, which is determined to be non-trivial. triage me I really want to be triaged. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@TraxRetail
Copy link

TraxRetail commented Dec 16, 2018

modifyAckDeadline doesn't work as expected.
We are able to retrieve messages before the lock is expired:

@google-cloud/pubsub 0.22.2 (reproducible in lower versions)
nodejs 8.12.0
npm 6.5.0
linux mint 17.3
The queue default AckDeadline is 30 seconds in our code and in the cloud.

Our mocha test:
0 - send a message
0 - pulldown 1 message from subscription with 30 second deadline and it's received.
0 (later) - modifyAckDeadline by 30 seconds
25 sec - pulldown 1 message, it is retrieved though it should be locked.

It happens with smaller numbers as well.

           const cp = CloudProvider({type: GCP, env: 'INT'});
            const rand = Math.random().toString(16);
            await cp.queue.sendMessage(GCP_TOPIC_QUEUE, {test: rand});

            const messages = await cp.queue.receiveMessage(GCP_QUEUE, {maxMessages: 10, waitTimeout: 2});
            const message = messages.find(m => m.data.test === rand);

            // Retrieving the message
            assert.ok(message);

            await cp.queue.extendMessageLock(GCP_QUEUE, message, {lockTimeout: 30});
            await delay(25000);
            const messages2 = await cp.queue.receiveMessage(GCP_QUEUE, {
                maxMessages: 10,
                waitTimeout: 2
            });
            const message2 = messages2.find(m => m.data.test === rand);

            // Message should not be not be here, but is retrieved (it usually don't work but sometimes it
            // does), fails here.
            assert.ok(!message2);
            // wait for the lock to be done
            await delay(5000);

            const messages3 = await cp.queue.receiveMessage(GCP_QUEUE, {maxMessages: 10, waitTimeout: 2});
            const message3 = messages3.find(m => m.data.test === rand);

            // Message should be here because `lockTimeout` is over
            assert.ok(message3);

extendMessageLock function

async function extendMessageLock(queue, message, options = {}) {
        const client = new pubSubHelper.v1.SubscriberClient({});
        const subscription = client.subscriptionPath(GCP_PROJECT_ID, queue);
        const ackDeadlineSeconds = isFinite(options.lockTimeout) ? options.lockTimeout : MESSAGE_LOCK_TIMEOUT;
        const ids = messagesToIdArray(message);

        return await client.modifyAckDeadline({subscription, ackIds: ids, ackDeadlineSeconds});
    }
@JustinBeckwith JustinBeckwith added the triage me I really want to be triaged. label Dec 17, 2018
@callmehiphop
Copy link
Contributor

@TraxRetail does the rest of your CloudProvider class use the v1.SubscriberClient client?

@callmehiphop callmehiphop added the needs more info This issue needs more information from the customer to proceed. label Dec 18, 2018
@TraxRetail
Copy link
Author

TraxRetail commented Dec 19, 2018

Yes, Our removeMessage (acknowledge) and extendMessageLock(modifyAckDeadline) use
v1.SubscriberClient
while recieveMessage uses :
const pubsub = new PubSub({projectId: GCP_PROJECT_ID}))
const subscription = pubSub.subscription(queue) ;
subscription.on('message', handleMessage) ....

I followed your documentation:
https://github.com/googleapis/nodejs-pubsub/blob/master/samples/subscriptions.js

***Additional data:
** Testing subscription.on('message', handleMessage) and locking it for 600 sec (in the queue default ack deadline and manually setting it) gives us lower numbers than 600 Sec (320, 167...), it means that the message is released prematurely as well..
**This functionality works for our python team (python lib).

@callmehiphop
Copy link
Contributor

You might consider adjusting the maxConnections option. This controls the number of StreamingPull streams we establish with the backend. My concern is that the message might actually be redelivered before the lock is put in place

const subscription = pubSub.subscription(queue, {maxConnections: 1});

@callmehiphop callmehiphop added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. status: investigating The issue is under investigation, which is determined to be non-trivial. and removed needs more info This issue needs more information from the customer to proceed. triage me I really want to be triaged. labels Dec 19, 2018
@JustinBeckwith JustinBeckwith added the triage me I really want to be triaged. label Dec 19, 2018
@TraxRetail
Copy link
Author

Tried it now, it's still inconsistent, it gets retrieved before the lock expires.
Also tried without the modifyAckDeadline and it didn't work.

@callmehiphop
Copy link
Contributor

So by default the handwritten client attempts to lock messages based on the 99th percentile of acknowledge times. It is very likely that your modifyAckDeadline call is getting stomped on by this feature which would make the message deadline much shorter, although this does not explain the redelivery itself, just the time at which it happens.

If you want to manage the lock yourself, I would suggest using the v1.SubscriberClient#streamingPull method in place of the handwritten client.

Alternatively we have a PR (#388) that aims to give a finer level of control for locking messages if you'd like to give that a test run.

@TraxRetail
Copy link
Author

TraxRetail commented Dec 20, 2018

The pr #388 doesn't run. 'Cannot read property '_readableState' of undefined'.
streamingPull doesn't work.

After trying many things, When getting a message using a a topic in the subscription helps:

subscription = await pubSub.subscription(queue, {
            topic,
            maxConnections: 1
});

After finding the topic that it listens to it works better for smaller (20,30,80,200) most of the times it succeeds, but for bigger numbers like 300+ it might cut it in half.
I feel like I'm missing something.

@JustinBeckwith JustinBeckwith added the 🚨 This issue needs some love. label Dec 21, 2018
@TraxRetail
Copy link
Author

TraxRetail commented Dec 21, 2018

Are we supposed to use just v1 subscriber client OR subscriber but not both?

  • After debugging the subscription set an ackDeadline of 40 seconds for example:
    The leaseMessage_ is called to lease the message for 40 sec,
    the setLeaseTimeout_ is called and sets a timeout of 23 seconds (depends on the Math.random()),
    after the 23 sec timeout renewLeases_ is called with 10 sec (usually) and sets the deadline to
    10 more seconds which by then the isOpen is false and my message gets received by 33~
    seconds.
    Even if I modify ackDeadline manually i could have a bunch of race conditions with my receive message call (subscription.on..).

@callmehiphop
Copy link
Contributor

callmehiphop commented Dec 21, 2018

@TraxRetail I think in most cases the handwritten client should fit most needs, however if you wanted to manage locks yourself then you definitely want to use v1.Subscriber. If you're looking for more control while receiving messages there's also a v1.Subscriber#pull method that you can use to poll the server for messages (as opposed to having them pushed to your client).

@TraxRetail
Copy link
Author

TraxRetail commented Dec 23, 2018

So after transforming receive message to use v1 subscriber client (with pull method) it works perfectly.

*** When using subscription.on('message', messageHandler) and receiving messages,
I can see that the message Is not being released until I remove the subscription listener/ack the message. It setLeaseTimeout_ and renewLeases_ all the time.
Is that correct behavior? Isn't it supposed to be released after default Acknowledgment Deadline?

@callmehiphop
Copy link
Contributor

Not with the hand written client, no. We lock all messages until they are either ack'd or nack'd. Its possible that they will be redelivered if the backend sends the messages again or if we encounter some unexpected latency.

@TraxRetail
Copy link
Author

TraxRetail commented Dec 25, 2018

So I'm not sure it's a bug, You can't use subscription.on by opening and closing the connection (and reading messages)
and modifyAckDeadline lock with v1 client, you should write it in your documentation.

@callmehiphop
Copy link
Contributor

I agree, we could do a better job of documenting it and I believe there are efforts underway to revamp our documentation, so I'll ask that you bare with us in the mean time. I'm going to close this out, but please feel free to re-open if something else comes up!

@kir-titievsky
Copy link

@callmehiphop -- what would be the appropriate change in the docs here? Perhaps we could contribute a PR. I know you've got a lot going on.

@slavoroi
Copy link

slavoroi commented Jan 7, 2019 via email

@callmehiphop
Copy link
Contributor

So we actually do make mention of the message lock functionality in the Subscription overview, however it might be worth an additional mention in the README.

As far as docs explaining the differences between Subscription and v1.SubscriberClient.. that sounds like it could be tricky since all the documentation for it is auto-generated. @JustinBeckwith will any of the upcoming doc changes help explain the differences between the two?

@slavoroi
Copy link

slavoroi commented Jan 7, 2019 via email

@google-cloud-label-sync google-cloud-label-sync bot added the api: pubsub Issues related to the googleapis/nodejs-pubsub API. label Jan 31, 2020
feywind pushed a commit to feywind/nodejs-pubsub that referenced this issue Nov 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. 🚨 This issue needs some love. status: investigating The issue is under investigation, which is determined to be non-trivial. triage me I really want to be triaged. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

5 participants