Skip to content
This repository has been archived by the owner on Feb 13, 2024. It is now read-only.

flush does not guarantee that all inflight messages are sent #309

Open
yo1dog opened this issue Nov 12, 2021 · 43 comments · Fixed by #352 or #353
Open

flush does not guarantee that all inflight messages are sent #309

yo1dog opened this issue Nov 12, 2021 · 43 comments · Fixed by #352 or #353

Comments

@yo1dog
Copy link

yo1dog commented Nov 12, 2021

flush does not guarantee that all inflight messages are sent before calling the given callback. Instead, flush simply sends a batch of queued messages and waits for only that batch's response before callback.

This is contrary to common expectations that a "flush" function completely empties the buffer to the destination. In fact, there are several open issues caused by this assumption.

Further, this means there is no way to know when both all queued and all inflight messages are full sent (fully flushed). For example, if you were to call flush while another flush was already in progress, the callback could be called before the previous flush was complete.

This can be a very common occurrence in short-lived environments like cloud functions.

Example:

const analytics = new Analytics(writeKey, {flushAt: 2});
analytics.track(...); // This will trigger a flush because line 212: flushed === false
analytics.track(...);
analytics.track(...); // This will trigger a flush because line 218: queue.length >= flushAt
analytics.flush().then(() => {
  // This callback will be called immediately because line 249: queue.length === 0
  // The process would exit before any messages are sent.
  console.log('done');
  process.exit(0);
});

Similarly:

const analytics = new Analytics(writeKey, {flushAt: 2});
analytics.track(...); // This will trigger a flush because line 212: flushed === false
analytics.track(...);
// The following will trigger a flush because there is a message in the queue.
analytics.flush().then(() => {
  // This calllback may be called before the other flush completes.
  // The process may exit while the message from the other flush is still inflight.
  console.log('done');
  process.exit(0);
});
// There are now 2 concurrent flushes running in parallel.
// Waiting on the final flush creates a race condition as it may finish before the previous flush.

I suggest adding a new function that flush()es all queued messages and does not callback until all concurrent flushes are complete. Also updating the documentation to make it clear that flush may defy assumptions about awaiting all queued and inflight messages.

@yo1dog yo1dog changed the title flush does not guarantee that all queued messages are sent flush does not guarantee that all in-flight messages are sent Nov 12, 2021
@yo1dog yo1dog changed the title flush does not guarantee that all in-flight messages are sent flush does not guarantee that all inflight messages are sent Nov 12, 2021
@felipe-najson-ntf
Copy link
Contributor

Hello @yo1dog, thanks for your contribution to the library! I just tested the cases mentioned above and clearly flush creates a race condition in which 2 simultaneous downloads are executed in parallel.

I'll be working on a solution to solve this next week.

Thanks, Segment team.

@benjaminhoffman
Copy link
Contributor

benjaminhoffman commented Feb 8, 2022

I'm also experiencing issues with queueing and flush. Would love to see this fixed to help eliminate issues in my debugging.

edit: we are using serverless functions.

@yo1dog
Copy link
Author

yo1dog commented Feb 8, 2022

@benjaminhoffman My quick workaround for Lambda functions was to manually track the callbacks for every Segment call. I used a small class to help. This works OK for cloud function executions as they have a predictably short and linear lifecycle. This solution would not work in a long-lived environment as the promises array would grow infinitely. Example:

class SegmentBatch {
  constructor() {
    this.promises = [];
  }
  
  createCB() {
    let resolve;
    this.promises.push(new Promise(_resolve => {resolve = _resolve;}));
    return err => resolve(err);
  }
  
  async finish() {
    const results = await Promise.all(this.promises);
    const err = results.find(err => err);
    if (err) throw err;
  }
}

All the class does is maintain a list of promises which are resolved when the callback is called. Then we can await all the promises to ensure they all callbacks have been called and thus all events have been fullsent. I use it like so:

const Analytics = require('analytics-node');

const client = new Analytics(writeKey);
client.flushed = true; // Prevent flushing after first event.

const batch = new SegmentBatch();
client.track({...}, batch.createCB());
client.track({...}, batch.createCB());
...
client.track({...}, batch.createCB());

client.flush();
await batch.finish();

You could modify this for use in long-lived environments by adding logic that removes promises from the array once they have been resolved.

@benjaminhoffman
Copy link
Contributor

That worked great for me! Thank you 🙏 . I rewrote this in typescript and it's worth noting the flushed prop is not part of the type so you'll have to `@ts-ignore it

@yo1dog should I be concerned about latency or any delay from the Segment API? Even at 200ms, I hate to add unnecessary delay. Or what if the Segment API times out?

@yo1dog
Copy link
Author

yo1dog commented Feb 11, 2022

Yes. The only way to guarantee events are fully sent to Segment is to wait for the underlying HTTP request to complete. Network unreliability can cause this to take a significant amount of time: 100s to 10,000s of milliseconds depending on your setup and timeout settings.

Some cloud functions provide execution environments independent of individual executions (outside of the function's handler). You could move the management and flushing of events to this space so long as you can guarantee the flush can be awaited before the environment is stopped/shutdown. This is not possible with AWS Lambda, for example, as the execution environment is destroyed without warning. In this case, the only alternative to blocking invocation ("add unnecessary delay") would be to not emit events to Segment directly but instead to send the events to an external queue to be processed later. Presumably that queue could be "nearby" (peer process, or in same data center, etc.) resulting in much lower latency would and much higher reliability.

@yo1dog
Copy link
Author

yo1dog commented Feb 11, 2022

@benjaminhoffman I just realized you stated you are "not using serverless functions." The solution I posted above was geared specifically for short-lived environments such as a cloud function invocation.

In a traditional long-lived server process, rather than flushing events and blocking, you should manage the events outside of request handlers (as in, you should send events to Segment API "in the background"). This is what the Segment library aims to provide (I assume. I am not affiliated with Segment or this library in any way): You can simply call client.track(...) and know that events will be queued and eventually sent to Segment. The problem is when that process experiences an interruption (for example, a server restart) queued events can be lost because the Segment library does not provide any way to know when all queued messages have been fully sent to Segment. Again, this assumes you have the ability to know of and delay a process shutdown request.

My solution here was to use a wrapper around the Segment library which was similar to the batch class above. Something like this (untested):

class ManagedSegmentClient {
  private readonly client: Analytics;
  private readonly promises: Promise<Error | undefined>[];
  
  public constructor(client: Analytics) {
    this.client = client;
    this.promises = [];
  }
  
  public track(...args: Parameters<Analytics['track']>) {
    this.client.track(args[0], this.createCB(args[1]));
  }
  public identify(...args: Parameters<Analytics['identify']>) {
    this.client.identify(args[0], this.createCB(args[1]));
  }
  
  public async finish() {
    const results = await Promise.all(this.promises);
    const err = results.find(err => err);
    if (err) throw err;
  }
  
  private createCB(cb?: (err: Error) => void) {
    let resolve: (err?: Error) => void;
    const promise = new Promise<Error | undefined>(_resolve => {resolve = _resolve;});
    this.promises.push(promise);
    promise.finally(() => {
      const index = this.promises.indexOf(promise);
      if (index > -1) this.promises.splice(index, 1);
    });
    
    return (err?: Error) => {
      resolve(err);
      if (cb) cb(err);
    };
  }
}

And use it like so:

const managedSegmentClient = new ManagedSegmentClient(new Analytics(writeKey));

expressApp.get('/fubar', (req, res) => {
  // ...
  const utmSource = req.query.utm_source;
  managedSegmentClient.track({event: 'blah', properties: {utmSource}});
  return res.status(200).json({ok: true});
});

process.once('SIGINT', () => exitGracefully());
process.once('SIGTERM', () => exitGracefully());

function exitGracefully() {
  // ...
  
  // Flush Segment messages before exiting.
  managedSegmentClient.finish(err => {
    if (err) console.error('Error flushing Segment messages.');
    else console.log('Segment messages flushed.');
    
    // Now exit.
    process.exit(exitCode);
  });
}

Whenever a track or identify event is called, it adds a promise to an array which is resolved when the callback is called (which is after the message has been fully sent). Whenever the promise is resolved, it is removed from the array. Then the finish method awaits all promises in the array to ensure all messages are sent.

@benjaminhoffman
Copy link
Contributor

super helpful @yo1dog and i think others reading this will also gain from your sample code... turns our we are using serverless functions (specifically, nextJS API routes deployed on Vercel) and your first solution worked great. I spoke with our CTO and he said the latency concerns at this point would be a pre-mature optimization and to keep an eye on things. If we see trouble from the SegmentBatch solution, then I'm going to build my own queue, as you've suggested above. Thanks so much for your help!

@seekayel
Copy link

We are hitting this issue as well. Seems like a pretty fundamental error in the Segment sdk that can cause dropped events which can corrupt data/monitoring in downstream systems. Surprised it has been open for so long.

Here is the example I provided Segment support to reproduce. Set a process.env.SEGMENT_WRITE_KEY and pick fail true of false.

import Analytics from 'analytics-node'

const analytics = new Analytics(process.env.SEGMENT_WRITE_KEY);

const fail = true

analytics.track({
  event: 'event name',
  userId: 'user id'
})

analytics.flush(function(err, batch){
  console.log('Flushed, and now this program can exit!');
  if (fail) {
    process.exit(0)
  }
});

await new Promise(r => setTimeout(r, 15000));

@emont01
Copy link

emont01 commented Mar 1, 2022

@yo1dog I think you rather want to use Promise.allSettled instead of Promise.all, the former waits for all promises to be completed while the latter rejects immediately upon any of the input promises rejecting or non-promises throwing an error, and will reject with this first rejection message / error.

@yo1dog
Copy link
Author

yo1dog commented Mar 1, 2022

@emont01 I could see that. However, in this case It doesn’t matter as it is impossible for the promises to reject by design. The promises are resolved with an optional error instead of being rejected. Which you could argue is strange. But I was specially avoiding throwing and rejecting as the intent was to collect errors to process later. I thought ‘allSettled’ could give the impression that a rejected promise was a possibility that could or should be handled.

@emont01
Copy link

emont01 commented Mar 2, 2022

@seekayel and @yo1dog: you guys probably are aware of this, but your examples seems to me like an illustration of a misuse of process.exit, consider the following scenario:

  1. Two long-running processes are started and then
  2. A short-lived process calls process.exit

Here, the expected behavior would be that the long-running processes to be terminated without enough time to complete.

Now if we apply this to the original ticket: "if you were to call flush while another flush was already in progress" in fact the second flush callback should be called first.

See the following example:

// Simple promise that resolves after a given time
const resolveAfter = t => new Promise((resolve, reject) => {
  setTimeout(() => resolve(`Completed in ${t}`), t)
})

Promise.allSettled([
  resolveAfter(20000),
  resolveAfter(20000)
]);

Promise.allSettled([
  resolveAfter(10000),
  resolveAfter(10000)
]);

Promise.resolve(0).then(() => {
  // The process would exit before any promise is processed.
  console.log('done');
  process.exit(0);
});

I agree with @yo1dog that you probably want to handle closing events (SIGINT, SIGTERM...) to call flush before the app is terminated to avoid losing queued events.

process.once('SIGINT', () => exitGracefully());
process.once('SIGTERM', () => exitGracefully());

@seekayel
Copy link

seekayel commented Mar 3, 2022

@emont01: I structured the example code using the flush example syntax given in the segment docs:

(at the end of this section) https://segment.com/docs/connections/sources/catalog/libraries/server/node/#batching

You can also flush on demand. For example, at the end of your program, you need to flush to make sure that nothing is left in the queue. To do that, call the flush method:

analytics.flush(function(err, batch){
  console.log('Flushed, and now this program can exit!');
});

This code implies that all segment api calls will have completed by the time the callback passed to the invoke of flush is called.

So I think we are in agreement?

@emont01
Copy link

emont01 commented Mar 3, 2022

@seekayel I agree that you must call flush, but you shouldn't call exit directly, instead make sure there is no additional work pending in the event loop, in case you need to change the exit code set it using process.exitCode = <NUMBER>; and allow the process to exit naturally

@seekayel
Copy link

seekayel commented Mar 3, 2022

@emont01 I'm not trying to call exit 😆 ... just pointing out that flush doesn't actually behave as described in the docs.

My solution is to just call the api directly with axios and then track the promises myself instead of trying to manage through the sdk.

@yo1dog
Copy link
Author

yo1dog commented Mar 4, 2022

@emont01 Yes, I believe it is clear to everyone here that calling process.exit() immediately ends the Node process. Hence the need to await a full flesh before calling process.exit().

As for you workaround of setting the exit code and simply waiting for the event loop to end (which means waiting for the JavaScript event queue to empty), this is not always practical or possible. Database connection pools, HTTP servers, TCP connections, message queues (AMQP), scheduled tasks, etc. could all keep the event queue populated for an indiscriminate amount of time. Ideally these would be shutdown gracefully as well, but not all libraries handle this properly.

Further, some environment lifecycles are not dictated by the event loop at all. Cloud functions such as AWS Lambda may execute multiple functions in the same Node.js worker (thus sharing the same event loop across multiple function executions). It may also abruptly end the worker after your function has returned (possibly before the event queue is emptied).

In all these cases, the ability to explicitly await a full flushing of Segment messages is necessary.

Regardless, the library does not operate as described. Thus this ticket.

@pooyaj
Copy link
Contributor

pooyaj commented Mar 23, 2022

Hey folks, reading through the thread, looks like we have a solution here: #309 (comment) ... closing this issue. Hopefully in the next version of our library, we will support a Promise based API and add the ability to wait for all the inflight messages to be flushed.

@pooyaj pooyaj closed this as completed Mar 23, 2022
@yo1dog
Copy link
Author

yo1dog commented Mar 24, 2022

@yo1dog Sorry, I thought that your solution makes flushing deterministic.

It does. It's workaround for part of the issue (deterministic flushing) but it is not a "solution" in that it does not solve the underlying problem.

We are working on a new version of this library, and hoping to have this addressed, for the time being I'll re-open the issue, and hopefully in a month or two we will have a new version of our Node library.

Looking forward to the new version.

@nd4p90x nd4p90x linked a pull request Mar 29, 2022 that will close this issue
@MerrickClark
Copy link

MerrickClark commented May 31, 2022

@yo1dog Sorry, I thought that your solution makes flushing deterministic ( although I agree, not super clean to pass callbacks to all track calls etc. ). We are working on a new version of this library, and hoping to have this addressed, for the time being I'll re-open the issue, and hopefully in a month or two we will have a new version of our Node library.

Anywhere I can stay tuned to this updated library's progress? I was surprised to find my analytics bugs were coming from the library and not my code. I don't have a ton of time for analytics and I need it to all work. Adding analytics to make sure my analytics is working seems like madness.

@tspoke
Copy link

tspoke commented Jun 9, 2022

@pooyaj What's the status of this issue and the status of the new SDK since the 23 march ?

@longlostnick
Copy link

Running into this problem too. Really scary to find out some of my events are being dropped. Considering dropping segment all together and just building my own pipeline, but I really don't want to have to do that.

@cipriancaba
Copy link

This is a bit embarrassing. Lambdas are a very good candidate for event processing and make sense for sending data to segment. This library is unusable at the moment

@SantiSuarezMS
Copy link

Hey @nd4p90x @pooyaj , what's the status on this issue? Is a fix planned on an upcoming release? Thanks!

@yo1dog
Copy link
Author

yo1dog commented Aug 18, 2022

Unfortunately, I think "embarrassing" is the right word. I recommend that no one use this library. At least not directly. Just grab your favorite async batching library instead.

Segment has failed to fix a critical bug 9 months after it was pointed out. Nor has the documentation been updated.

In fact, I found that they have copied my suggestions and workarounds, sometimes verbatim, into their documentation. Normally I wouldn't mind this at all, but they are being used incorrectly: near opposite in meaning. The documentation is now even more misleading.

Me: "The problem is when that process experiences an interruption (for example, a server restart) queued events can be lost because the Segment library does not provide any way to know when all queued messages have been fully sent to Segment."
Docs: "To prevent losing messages, be sure to capture any interruption (for example, a server restart) and call flush to know of and delay the process shutdown."

Me: "My quick workaround for Lambda functions was to manually track the callbacks for every Segment call. I used a small class to help. This works OK for cloud function executions as they have a predictably short and linear lifecycle."
Docs: "Short-lived functions have a predictably short and linear lifecycle, so use a queue big enough to hold all messages and then await flush to complete its work."

https://segment.com/docs/connections/sources/catalog/libraries/server/node/#long-running-process

I feel I have explained in minute, step-by-step detail exactly what the issue is and how to fix it. For this issue and several others. Segment even emailed asking me to open a PR... to just do it for them... for free. They did not respond when I asked if they had a bug bounty program.

@rdsedmundo
Copy link

Any updates here? I don't understand why this has not been prioritized ASAP, this library serves one purpose and it's not being fulfilled due to this 10 month bug.

@benjaminhoffman
Copy link
Contributor

@yo1dog they did you dirty. sorry man. they should at least get the bounty program running.

fwiw your solution helped me so much that without it, we wouldn’t have been able to use this at all within our app. much appreciated 🙏🏽

@Jackman3005
Copy link

We too are in this situation. I have opened a support ticket citing this issue. I will report back with their response.

@Jackman3005
Copy link

This is the response I have received from customer support. Screen Shot 2022-09-14 at 1 49 23 pm

@seekayel
Copy link

What is so pernicious about this bug is it manifests in only intermittent drops of events. The only way to catch is if you have an out of band measurement system.

The reputation risk to an analytics company dropping events seems huge. I guess they don’t see it that way.

@yo1dog
Copy link
Author

yo1dog commented Sep 14, 2022

Yup. 10 months in and not an ETA in sight. I don't blame the devs. Segment is clearly not devoting any resources to this. A complete re-write should take a single dev 1 month tops. And if you don't reinvent the async-batching wheel, 2 weeks tops.

Segment, I do consulting work if you still want me to do it. lol

@seekayel
Copy link

I have started a very early npm module to do just that. Ran into a wall trying to get esm and cjs both to work. Might just fall back to cjs. Would not say it is production ready yet.

Open to PRs, suggestions or guidance. Still finding my way as a side project.

npm: https://www.npmjs.com/package/analytics-async

code: https://github.com/cyclic-software/analytics-async

@Jackman3005
Copy link

I prodded a bit further and got a response from the engineering team via support.
Screen Shot 2022-09-16 at 12 52 36 pm

@yo1dog
Copy link
Author

yo1dog commented Sep 18, 2022

FYI, that is what they said 6 months ago as well:

Mar 23: We are working on a new version of this library, and hoping to have this addressed

#309 (comment)

@jessecurry
Copy link

Would it be possible to update the docs to reference that flush does not work as described? We just lost a few days trying to figure out what was wrong with our backfill scripts.

@druwynings
Copy link

@pooyaj Any updates?

@benjaminhoffman
Copy link
Contributor

@yo1dog does their recent PR #353 solve this problem for sure? said another way, do you think it's safe to unwind your suggested solution from our codebase?

@yo1dog
Copy link
Author

yo1dog commented Nov 17, 2022

@benjaminhoffman
Lol. Of course not.

See my comment: #353 (comment)

They are just throwing spaghetti at the wall and not even bothering to test if the solution fixes the example scenarios listed in the bug. Much less solves the class of problem. Given this is the 3rd (4th?) failed attempt to fix this, it's obvious they either don't know what they are doing and/or are putting forth 0 effort.

Either way, I must again stress that no one use this library. Extremely unlikely it will be fixed anytime soon.

@Jackman3005
Copy link

I mean, just to be clear, this library is built and maintained by a commercial company with the sole purpose to integrate node apps with their services, correct?

I'm struggling to understand why this library is not deserving of attention & budget from Segment? I mean, they're a multi-billion dollar company... We're paying for this.

@chrisradek
Copy link

Hey everyone, wanted to share an update on where we are with analytics-node.

We've created a new package - @segment/analytics-node that's currently in early beta. One of the requests we frequently saw after rewriting analytics.js was bringing the new plugin support to node.js. We decided it would be easier to build from a core library shared with analytics.js to keep plugins shareable between the two libraries than shoehorn that functionality into the existing library.

As part of that work we also focused on the flushing behavior. We introduced a closeAndFlush() method to the new library that returns only after all pending events have been sent to Segment.

The README for the package has some usage instructions - we tried to keep most of the public API the same to ease migration - and we're working on more thorough documentation to add to the Segment documentation website.

We'd love to get any early feedback on the library - here's a link to the repo, and the beta has been published to npm:
https://github.com/segmentio/analytics-next/tree/master/packages/node

@chrisradek chrisradek reopened this Nov 18, 2022
@yo1dog
Copy link
Author

yo1dog commented Nov 18, 2022

New code base looks fantastic! Great work. Love to see queue classes and event emitters. Code quality looks very high as well. Complements to the chef.

Until the new library is released, please, at least update the incorrect and misleading documentation to reflect the current (broken) behavior of flush: https://segment.com/docs/connections/sources/catalog/libraries/server/node/#batching

Still sucks that the current library has been left broken for so long.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.