Skip to content
This repository has been archived by the owner on Feb 13, 2024. It is now read-only.

Solving race condition between flushes. #316

Conversation

felipe-najson-ntf
Copy link
Contributor

@felipe-najson-ntf felipe-najson-ntf commented Dec 7, 2021

Enqueue's flush now awaits before continuing.

There are cases where race conditions can be given like the examples presented in issue #309. Implementing it asynchronously we make sure that the in-flight flushes ended before continuing.

This problem is also related to the strange behavior that the user has in issue #303

@felipe-najson-ntf
Copy link
Contributor Author

@pooyaj Please check issue #309 and confirm if this is an error or an expected behavior. Greetings

Copy link
Contributor

@pooyaj pooyaj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @felipe-najson-ntf for the PR. Looks like there is a CI failure, can you please look into it? Also do we need to do a major version bump for this change?

@yo1dog
Copy link

yo1dog commented Feb 2, 2022

FYI, this pull does not solve #309 unless the expectation is that the consumer must now await every identify, track, etc. call. This means the user can no longer "fire and forget". Instead, the user must now be concerned with the implantation details and potential unrelated side affects of a flush being triggered as a result of an identify/track/etc call. This also requires every calling context to be async and possibly blocking.

Namely, every call to track now requires the user to wait for completion of and contended with delays caused by a possible flush.

I believe this violates users' expectations. Users expect to be able to:

  1. Queue messages and allow the library to transparently manage the queuing and flushing ("fire and forget").
  2. Be notified when events occurred. Such as when a specific message was sent or when all messages are fully flushed.
  3. Manually trigger a flush.

As such, I believe making these methods async is not the right solution. Instead, the library should track in-progress flushes and provide a method that awaits all in-progress flushes. This could be as simple as maintaining an array of all axios promises and then await Promise.allSettled(...).

Further, making these methods async would cause existing code to violate ESLint's no-floating-promises rule (which is enabled by default via eslint:recommended) and would be a breaking change for many users. They would have to find every identify/track/etc call and prepend with void to resolve.


I hope you don't take this the wrong way. Async queues can be a hard problem. Luckily they are a solved problem so there is no need to reinvent the wheel. Instead of implementing this yourself, I would use an existing solution. For example, you could use async's cargo queue like so:

class Analytics {
  constructor() {
    const maxConcurrentRequests = 10;
    const maxMessagesPerRequest = 100;
    
    this.cargoQueue = async.cargoQueue((messages, cb) => {
      sendMessagesToServer(messages, err => {
        // ...
      });
    }, maxConcurrentRequests, maxMessagesPerRequest);
  }
  
  enqueue(message, cb) {
    this.cargoQueue.push(message, cb);
  }
  
  flush(cb) {
    this.cargoQueue.drain(cb);
  }
}

Close in functionality to your current implementation. If you wanted finer control you could manually .pause() and .resume() the queue. Maybe. I haven't tested. Or use a more robust async queuing/batching library.

@pooyaj pooyaj closed this Mar 23, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
4 participants