Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscription.on message wasn't received of messagActually unrelated failures, but we should bump this anyway.es after 10 mins or more. #10

Closed
stephenplusplus opened this issue Dec 13, 2017 · 40 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. release blocking Required feature/issue must be fixed prior to next release. 🚨 This issue needs some love. status: blocked Resolving the issue is dependent on other work. triage me I really want to be triaged. triaged for GA

Comments

@stephenplusplus
Copy link
Contributor

From @QTGate on September 7, 2017 17:42

  • OS: Debian
  • Node.js version: 8.4.0
  • npm version: 5.4.0
  • google-cloud-node version: 0.56.0
  • @google-cloud/pubsub version: 0.14.0

Steps to reproduce

const pubsub = require ( '@google-cloud/pubsub' )( connectOption )

const subscription = pubsub.topic ( topicName ).subscription ( subScription )

subscription.on ('message', message => {
    message.ack()
    ....
})

subscription.once('error', err => {
    ....
})

I received all un-received messages when I restart it.
I checked the connectionPool object when it have not received message:
connectionPool.isPaused [false]
connectionPool.isOpen [true]
connectionPool.connections [5]

Thank you.

Copied from original issue: googleapis/google-cloud-node#2598

@stephenplusplus stephenplusplus added api: pubsub status: blocked Resolving the issue is dependent on other work. labels Dec 13, 2017
@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on September 8, 2017 18:19

Thanks for reporting! We've had multiple reports of similar behavior and we're currently investigating this issue. In the meantime you might want to use pubsub v0.13.x as a temporary workaround.

@stephenplusplus
Copy link
Contributor Author

From @QTGate on September 8, 2017 23:32

Thank you.
I did a temp code to use the pubsub v0.14.0 like this:

const pubsub = require ( '@google-cloud/pubsub' )( connectOption )

const listenSub = () => {
    let subscription = pubsub.topic ( topicName ).subscription ( subScription )
    subscription.on ( 'message', message => {
        message.ack()
        ....
   })

    subscription.once('error', err => {
        ....
    })

    //        each 10 mins restart
    const tt = setTimeout (() => {
	return subscription.close().then(() => {
	    listenPubSubMaster = null
	    return listenSub ()
    }, 600000 )
}

listenSub()

@stephenplusplus
Copy link
Contributor Author

From @caseyduquettesc on September 18, 2017 3:20

We see a similar behavior, although it takes a few hours. Eventually the subscription stops processing new messages and we have to restart the subscription. This only started after upgrading to 0.14.

@stephenplusplus
Copy link
Contributor Author

From @mscottx88 on September 22, 2017 18:52

I too am seeing the same problem. We set up a forever running process on GKE and it seems to go unresponsive after some time. I came to the same conclusion of putting a restarter.

Let's hope this gets resolved soon!

@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on September 23, 2017 0:4

This is definitely a result of moving towards the new streaming API, I'm working on resolving this issue, but in the mean time I suggest downgrading to 0.13. I'll update this issue once it's been resolved.

@stephenplusplus
Copy link
Contributor Author

From @pongad on September 23, 2017 10:44

@callmehiphop FYI, we resolved this in Go by adding gRPC keepalive. I'm not sure how the code should look like in Node though. https://code-review.googlesource.com/#/c/gocloud/+/17070/

@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on September 25, 2017 14:10

@pongad thanks for the update! I believe we just have to set the grpc.keepalive_time_ms option.

@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on September 25, 2017 14:17

@QTGate @mscottx88 @caseyduquettesc I've opened a PR (#2627) which I believe will fix the issue you are seeing, would any of you mind testing it out? Thanks!

@stephenplusplus
Copy link
Contributor Author

From @mscottx88 on September 25, 2017 14:18

I'm willing to give it a go, for sure!

On Mon, Sep 25, 2017, 7:17 AM Dave Gramlich [email protected]
wrote:

@QTGate https://github.com/qtgate @mscottx88
https://github.com/mscottx88 @caseyduquettesc
https://github.com/caseyduquettesc I've opened a PR (#2627
googleapis/google-cloud-node#2627)
which I believe will fix the issue you are seeing, would any of you mind
testing it out? Thanks!


You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub
googleapis/google-cloud-node#2598 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AT6MMUjOHHVuq1ZzzlIViODrOF50HIecks5sl7X8gaJpZM4PQKSI
.

@stephenplusplus
Copy link
Contributor Author

From @mscottx88 on September 25, 2017 22:46

@callmehiphop I applied the two fixed files to my deployment but now I am seeing duplication of message delivery. That is, I am seeing the same message being pulled more than one time.

update Also, it appears that the connection has been lost or something as it is sitting idle again even with messages being published to it.

@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on September 26, 2017 17:30

@jganetsk I'm noticing that messages that are not acked before a stream is closed will be redelivered when the next stream is opened regardless of whether or not the ack deadline was hit. Is that the expected behavior? If so I'm assuming we need to filter these messages out.

@stephenplusplus
Copy link
Contributor Author

From @jganetsk on September 26, 2017 17:45

@callmehiphop I don't think you should be filtering any messages out. In general, it is possible to receive duplicates, and it is possible that they come before the ack deadline was hit. If this happens consistently, that's a problem that we should look at. How much time passes between delivery and redelivery? Does it correspond to the default ack deadline?

@stephenplusplus
Copy link
Contributor Author

From @jganetsk on September 26, 2017 18:5

We will take a look at this, thanks for reporting it.

@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on September 26, 2017 18:7

@jganetsk as far as I can tell it only happens when closing and re-opening a stream. I'm putting together an elaborate test and I'll give you the details of my findings once I have some to share.

@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on September 26, 2017 21:6

There appears to be 2 different scenarios in which messages will be redelivered before they have been acked.

Streams being closed.

In the event of a stream being closed it appears that all un-acked messages will be redelivered to a different stream regardless of the current ack deadline.

  • Test 1: Set the initial streamAckDeadlineSeconds to 60 (1 minute) and close a stream after 30 seconds.
    Result: All messages were redelivered despite not hitting the deadline.
  • Test 2: Modify the deadline just before closing the stream.
    Result: All messages were redelivered even though the deadline was successfully modified.

Race Condition

I believe we occasionally hit a race condition when attempting to modify an existing ack deadline. Currently we calculate the ack deadline using the 99th percentile of ack times (p99). We then calculate a delay to determine when we should modify the ack deadline (Math.random() * n99 * 0.9).

If our p99 value is 10, then it is possible we will not modify the ack deadline until roughly 1 second before it hits. The redelivery cases I have observed occurred when the delay was within 1-2 seconds of the p99 value.

@lukesneeringer have you seen anything similar to what I'm describing above in the Python client? If this is indeed the reason why a redelivery occurs, is it reasonable to calculate the sleep time at a lower percent?

@stephenplusplus
Copy link
Contributor Author

From @mscottx88 on September 30, 2017 0:22

Here is my solution to the race condition you describe. A simple activeMessages tracking mechanism, combined with async shutdown that is event-driven.

'use strict';

const config = require('@google-cloud/rcloadenv');
const cp = require('child_process');
const fs = require('fs-extra');
const logger = require('winston');
const pubsub = require('./providers/pubsub');
const uuid = require('uuid/v4');

const { name: configurationStorageName } = require('../package.json');

/**
 * Listen for and process incoming messages.
 *
 * @returns {Function}
 * An async function that can be called to stop listening for messages.
 */
const listen = () => {
  let activeMessages = 0;
  logger.info(`Subscription started at ${new Date().toISOString()}.`);

  const subscription = pubsub
    .get()
    .topic(process.env.SUBSCRIBE_TOPIC_NAME)
    .subscription(process.env.SUBSCRIBE_SUBSCRIPTION_NAME, {
      flowControl: {
        maxMessages: +process.env.MAX_CONCURRENT_MESSAGES
      }
    });

  subscription.on('error', (error) => {
    logger.error(error);
  });

  subscription.on('message', (message) => {
    const { email, pwd, uid } = message.attributes;
    logger.info(`Message received at ${new Date().toISOString()} for uuid ${uid}.`);

    const sessionId = uuid();
    const downloadPath = `${__dirname}/../downloads/${sessionId}`;
    const profilePath = `${__dirname}/../profiles/${sessionId}`;

    const args = [
      uid,
      email,
      pwd,
      sessionId,
      downloadPath,
      profilePath
    ];

    const options = {
      cwd: process.cwd(),
      env: process.env,
      execArgv: [],
      silent: true
    };

    logger.info(`Starting session ${sessionId}`);
    const downloader = cp.fork(`${__dirname}/download.js`, args, options);
    activeMessages++;

    downloader.stdout.on('data', (data) => {
      const text = new Buffer(data).toString('utf8');
      logger.info(text);
    });

    downloader.on('exit', async (code) => {
      logger.info(`Session ${sessionId} completed with exit code ${code}.`);

      try {
        await Promise.all([fs.remove(downloadPath), fs.remove(profilePath)]);
      } catch (error) {
        logger.error(`Could not cleanup path(s) in session ${sessionId} because of ${error}.`);
      }

      try {
        message.ack();
      } finally {
        activeMessages--;
      }
      if (activeMessages === 0) {
        subscription.emit('importer-idle');
      }
    });
  });

  return () => {
    return new Promise((resolve, reject) => {
      // stop the flow of incoming messages
      subscription.removeAllListeners('message');

      const shutdown = async () => {
        try {
          await subscription.close();
          logger.info(`Subscription ended at ${new Date().toISOString()}.`);
          resolve();
        } catch (error) {
          reject(error);
        }
      };

      if (activeMessages !== 0) {
        subscription.on('importer-idle', shutdown);
      } else {
        shutdown();
      }
    });
  };
};

/**
 * Run the process.
 *
 * @returns {void}
 */
const run = async () => {
  logger.level = process.env.LOG_LEVEL;

  try {
    await config.getAndApply(configurationStorageName);
  } catch (error) {
    logger.error(error);
    throw error;
  }

  let unlisten = listen();

  setInterval(async () => {
    try {
      await unlisten();
    } catch (error) {
      logger.error(error);
    } finally {
      unlisten = listen();
    }
  }, +process.env.RESTART_INTERVAL);
};

module.exports = {
  run
};

@stephenplusplus
Copy link
Contributor Author

From @QTGate on October 16, 2017 1:4

  • OS: Debian
  • Node.js version: 8.7.0
  • npm version: 5.5.1
  • @google-cloud/pubsub version: 0.14.5

Steps to reproduce

const pubsub = require ( '@google-cloud/pubsub' )( connectOption )

const listenSub = () => {
    let subscription = pubsub.topic ( topicName ).subscription ( subScription )
    subscription.on ( 'message', message => {
        message.ack()
        ....
   })

    //        each 10 mins restart
    const tt = setTimeout (() => {
	return subscription.close().then(() => {
	    listenPubSubMaster = null
	    return listenSub ()
    }, 600000 )
}

listenSub()

I did make new subscription each 10 mins.
But looks the @google-cloud/pubsub did not release memory when pubsub exit with subscription.close().


pubsub

@stephenplusplus
Copy link
Contributor Author

From @ShahNewazKhan on October 18, 2017 19:23

  • OS: Debian
    
  • Node.js version: 6.11.1
    
  • npm version: 3.10.10
    
  • version: 0.14.5
    

I am also seeing the memory increase and not being released after implementing the subscriber refresh.

@stephenplusplus
Copy link
Contributor Author

@callmehiphop will this be addressed by #2716 or another issue/PR you've already worked on?

@stephenplusplus
Copy link
Contributor Author

From @QTGate on November 3, 2017 8:0

No I am not. Even the new v0.14.6. Also have memory problem. I plan do subscription.on gCloud API's by my self.

@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on November 3, 2017 18:46

will this be addressed by #2716

I believe it will!

@stephenplusplus
Copy link
Contributor Author

From @ChrisBartonLC on November 5, 2017 23:12

I am using PubSub node.js client version 0.14.5 and I sometimes have CloudFunctions that when triggered are no longer able to add messages to a topic or pull messages from a subscription. The fix is to redeploy the CloudFunction which mostly works but not always. The issue is intermittent.

@stephenplusplus
Copy link
Contributor Author

From @rhodgkins on November 9, 2017 10:51

Apologies for asking the obvious...!

But is this now fixed as #2716 has been released with 0.14.8?

Cheers!

@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on November 9, 2017 19:6

@rhodgkins which parts? I think there are a number of issues being described here.

@stephenplusplus
Copy link
Contributor Author

From @rhodgkins on November 13, 2017 18:38

@callmehiphop the issue around no messages being received on a subscription after a period of time.

@stephenplusplus
Copy link
Contributor Author

From @callmehiphop on November 13, 2017 18:48

@rhodgkins from what I understand the only environment in which this occurs is GKE, but I also know a fix was deployed on the GKE side of things that should have resolved it. I haven't seen or heard of any reports verifying this though.

@tinyrobot
Copy link

@stephenplusplus we are currently seeing this issue on Google App Engine with the latest release of the library, downgrading to 0.13.1 fixes the issue.

@lc-chrisbarton
Copy link

I get confused by the moving around of these tickets but we see issues in Cloud Functions with PubSub which we fix by pinning to version 0.14.0

@mchavarriae
Copy link

mchavarriae commented Dec 29, 2017

Recently we start experimenting the problem using version 0.13.1, Does anybody has this problem?

@lc-chrisbarton
Copy link

We still see this issue unless we pin to version 0.14.0 of the node.js client lib. Even with that version there are now lots of errors due to recent changes with Cloud Functions as reported here in the private Cloud Functions Beta Testers group:

https://groups.google.com/forum/#!topic/cloud-functions-beta-testers/CwGuBwHaZo8

@mscottx88
Copy link

So far, v 0.15.0 seems to be working 🤞

@stephenplusplus
Copy link
Contributor Author

Can anyone else confirm 0.15.0 is working?

@tinyrobot
Copy link

@stephenplusplus I'll test this when I'm back at work on Tuesday and reply here

@lc-chrisbarton
Copy link

version "google-cloud/pubsub": "0.16.2" works in Cloud Functions using our integration test suite.

@stephenplusplus
Copy link
Contributor Author

@callmehiphop does this sound resolved to you?

@lc-chrisbarton
Copy link

lc-chrisbarton commented Jan 18, 2018

Unfortunately it is still broken in Cloud Functions. Last night we ran a few tests but this morning with many developers using the new version, messages are intermittent:

{
insertId: "000000-3d55d68d-ee61-4721-95bd-e8e7dfe65852"
labels: {…}
logName: "projects/xxxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"
receiveTimestamp: "2018-01-18T21:21:55.361917350Z"
resource: {…}
severity: "ERROR"
textPayload: "{ Error: Stream removed
at /user_code/node_modules/@google-cloud/pubsub/node_modules/grpc/src/client.js:554:15 code: 2, metadata: Metadata { _internal_repr: {} } } '{"code":2,"metadata":{"_internal_repr":{}}}'"
timestamp: "2018-01-18T21:21:46.968Z"
}

@lc-chrisbarton
Copy link

We have downgraded and pinned to v0.14.0 which is more reliable in Cloud Functions

@danoscarmike
Copy link

@stephenplusplus can you please disambiguate the various issues here and update the description? Or close this issue and if there unaddressed elements (e.g. the most recent version's performance in Cloud Functions) here open new issues for them? Thanks!

@danoscarmike danoscarmike added triaged for GA release blocking Required feature/issue must be fixed prior to next release. labels Feb 8, 2018
@stephenplusplus
Copy link
Contributor Author

I think @callmehiphop can do a better job of that.

@callmehiphop
Copy link
Contributor

From what I can tell there are 4 issues being described here.

  1. Workers stop receiving messages until they are restarted (original issue) - Duplicate of messages sit in queue until GKE pod with subscriber gets reset #11
  2. Messages are being re-delivered - Duplicate of unacked messages being redelivered #2
  3. Possible memory leak - Duplicate of possible memory leak? #13
  4. gRPC Stream Removed error - Duplicate of 'Error: Stream removed\n at /user_code/node_modules/grpc/src/client.js:554:15' nodejs-datastore#35

Because there are already threads for the various issues being described here, I'm going to go ahead and close this issue. For inquiries about the status of one specific issue, please visit the corresponding thread mentioned above. For any other issues, please feel free to open a new issue if one does not already exist.

@google-cloud-label-sync google-cloud-label-sync bot added the api: pubsub Issues related to the googleapis/nodejs-pubsub API. label Jan 31, 2020
@yoshi-automation yoshi-automation added 🚨 This issue needs some love. triage me I really want to be triaged. labels Apr 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. release blocking Required feature/issue must be fixed prior to next release. 🚨 This issue needs some love. status: blocked Resolving the issue is dependent on other work. triage me I really want to be triaged. triaged for GA
Projects
None yet
Development

No branches or pull requests

8 participants