Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVE-6506 parameter for concurrent message processing #1221

Merged

Conversation

waisingyiu
Copy link
Contributor

@waisingyiu waisingyiu commented Apr 30, 2024

What does this change?

We are evaluating the throughput of our new implementation with Firebase individual send operation on PROD.

When we tested it on fairly large notifications (with more than 40,000 subscribers), we saw that the Android sender lambda function threw an exception when it attempted to create threads. In these cases, the lambda was invoked with nearly 20 SQS messages and when it processed the 17th or 18th messages, it threw the exceptions and stopped working. It was terminated by AWS lambda service when it went over the 3 minute timeout. It appears that the lambda runtime may be have sufficient resources / file descriptor space to support 20 HttpClient instances.

This PR addresses this problem by -

  1. make it configurable to change the number of HttpClient instances so we can adjust this number in our experiments
  2. make it configurable to change the number of messages to process concurrently
  3. spread out our requests evenly over the pool of HttpClient instances within a message
  4. display Java runtime version and maximum heap size in log
  5. add AWS request ID to some logs which have throughput data, which make us easier to compare the throughput

How to test

I ran the android lambda locally simulating a batch of 4 messages each with 5 device tokens. I set the configuration as -

  • process 2 messages concurrently at a time
  • use two HttpClient instnaces

I was able to observe the logs written in the expected order -

Log message Remark
Sending notification 068b3d2b-dc9d-482b-a1c9-bd0f5dd8ebd7 with individual API Start processing 1st message
Sending notification 068b3d2b-dc9d-482b-a1c9-bd0f5dd8ebd7 with individual API Start processing 2nd message
Results (notification: 068b3d2b-dc9d-482b-a1c9-bd0f5dd8ebd7 ShardRange(0,1)) Finish processing 1st / 2nd message
Results (notification: 068b3d2b-dc9d-482b-a1c9-bd0f5dd8ebd7 ShardRange(0,1)) Finish processing 1st / 2nd message
Sending notification 068b3d2b-dc9d-482b-a1c9-bd0f5dd8ebd7 with individual API Start processing 3rd message
Sending notification 068b3d2b-dc9d-482b-a1c9-bd0f5dd8ebd7 with individual API Start processing 4th message
Results (notification: 068b3d2b-dc9d-482b-a1c9-bd0f5dd8ebd7 ShardRange(0,1)) Finish processing 3rd / 4th message
Results (notification: 068b3d2b-dc9d-482b-a1c9-bd0f5dd8ebd7 ShardRange(0,1)) Finish processing 3rd / 4th message

On CODE, we sent a test notification and my Android emulator received it successfully.

We were also able to see the AWS request ID in some of the logs we use to measure the throughput:
Screenshot 2024-04-30 at 12 46 45
Screenshot 2024-04-30 at 12 36 11

As an aside, we printed out the maximum JVM heap size obtained via Runtime.getRuntime().maxMemory() call
Screenshot 2024-04-30 at 12 35 37
and it was around 10GB, nearly the amount we set for the lambda. It confirmed that the Java runtime was using nearly all the memory allocated.

How can we measure success?

We have been testing the implementation with small notifications on PROD. It is a success if

  1. it achieves a reasonable level of throughput compared to the multicast API
  2. no expected errors happen

Have we considered potential risks?

Severe performance degradation or unexpected errors. We can revert back to multicast API via changing the fcm.allowedTopicsForIndividualSend parameter in SSM.

@waisingyiu waisingyiu marked this pull request as ready for review April 30, 2024 11:47
@waisingyiu waisingyiu requested a review from a team as a code owner April 30, 2024 11:47
Copy link
Contributor

@lindseydew lindseydew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work 🔥


val deliveryServiceStream: Stream[IO, Fcm[IO]] =
Stream.emits(fcmClients).covary[IO].flatMap(_.fold(e => Stream.raiseError[IO](e), c => Stream.eval[IO, Fcm[IO]]( IO.delay(new Fcm(c)))))
Stream.emits(fcmClients).covary[IO].flatMap(_.fold(e => Stream.raiseError[IO](e), c => Stream.eval[IO, Fcm[IO]]( IO.pure(new Fcm(c)))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with the IO API. Curious about the decision to change the delay function to a pure one?

Copy link
Contributor Author

@waisingyiu waisingyiu Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we create the IO with delay method, the new Fcm will be executed every time we evaluate this IO object.

Now that we want to evaluate this IO object to get the Fcm for every device token, it may be good not to create this Fcm instance for every device? So I use pure here to create the Fcm instance and put this instance into this IO object.

The Fcm does not have any states so it should be safe to reuse the same instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation

.broadcastTo(
reportBatchSuccesses(chunkedTokens, sentTime, functionStartTime, sqsMessageBatchSize),
reportBatchSuccesses(chunkedTokens, sentTime, functionStartTime, sqsMessageBatchSize, awsRequestId),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good shout to log out the request id

@@ -35,15 +35,18 @@ trait SenderRequestHandler[C <: DeliveryClient] extends Logging {
implicit val timer: Timer[IO] = IO.timer(ec)
implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)

def reportSuccesses[C <: DeliveryClient](chunkedTokens: ChunkedTokens, sentTime: Long, functionStartTime: Instant, sqsMessageBatchSize: Int): Pipe[IO, Either[DeliveryException, DeliverySuccess], Unit] = { input =>
logger.info("Java version: " + System.getProperty("java.version"))
logger.info(s"Max heap size: ${Runtime.getRuntime().maxMemory()}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, good shout to log out available memory

event
}
val localTestContext: Context = new Context{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether a mock might be more appropriate here if we only need to have one of the variables defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to use mock objects, we may need to use the spec2 and extend the class to Mockito (correct me if I was wrong), but this NotificationWorkerLocalRun is not a test case. It is more of a command line Java entry class for us to run the Android sender lambda code locally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I'm sorry, my mistake, I thought this was a test file. Yeah makes sense for a real invocation

@waisingyiu waisingyiu merged commit e4f7dbc into main Apr 30, 2024
11 checks passed
@waisingyiu waisingyiu deleted the LIVE-6506-parameter-for-concurrent-message-processing branch April 30, 2024 15:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants