Skip to content

Commit

Permalink
fetch at fixed intervals to avoid aggressive CPU usage when no messag…
Browse files Browse the repository at this point in the history
…e are available
  • Loading branch information
lolboxen committed Mar 20, 2022
1 parent 30b673c commit 3bd40bb
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
organization := "com.lolboxen"
name := "akka-stream-nats"
version := "0.1.5"
version := "0.1.6"
ThisBuild / versionScheme := Some("semver-spec")

scalaVersion := "2.13.8"
Expand Down
18 changes: 12 additions & 6 deletions src/main/scala/com/lolboxen/nats/SubscriptionSource.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package com.lolboxen.nats

import akka.Done
import akka.stream.stage.{AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, OutHandler}
import akka.stream.stage.{AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, OutHandler, TimerGraphStageLogic}
import akka.stream.{Attributes, Outlet, SourceShape}
import io.nats.client.Message

import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
import scala.util.{Failure, Success, Try}

class SubscriptionSource(adapter: SubscriptionAdapter) extends GraphStageWithMaterializedValue[SourceShape[Message], Control] {
case object TimerKey

val out: Outlet[Message] = Outlet("SubscriberSource.out")
override def shape: SourceShape[Message] = SourceShape(out)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Control) = {
val stageLogic: GraphStageLogic with Control = new GraphStageLogic(shape) with Control with AdapterListener {
val stageLogic: GraphStageLogic with Control = new TimerGraphStageLogic(shape) with Control with AdapterListener {

val shutdownPromise: Promise[Done] = Promise[Done]()
val fetchCallback: AsyncCallback[Try[Seq[Message]]] = getAsyncCallback(fetchComplete)
Expand All @@ -37,12 +40,15 @@ class SubscriptionSource(adapter: SubscriptionAdapter) extends GraphStageWithMat

override def resumeOperations(): Unit = {
isConnected = true
ensureQueueHasItems()
scheduleAtFixedRate(TimerKey, 0.seconds, 250.milliseconds)
pushIfNeeded()
completeIfNeeded()
}

override def suspendOperations(): Unit = isConnected = false
override def suspendOperations(): Unit = {
cancelTimer(TimerKey)
isConnected = false
}

override def suspendOperationsIndefinitely(cause: Throwable): Unit = failStage(cause)

Expand All @@ -57,9 +63,10 @@ class SubscriptionSource(adapter: SubscriptionAdapter) extends GraphStageWithMat

override def whenShutdown: Future[Done] = shutdownPromise.future

override def onTimer(timerKey: Any): Unit = ensureQueueHasItems()

setHandler(out, new OutHandler {
override def onPull(): Unit = {
ensureQueueHasItems()
pushIfNeeded()
completeIfNeeded()
}
Expand Down Expand Up @@ -90,7 +97,6 @@ class SubscriptionSource(adapter: SubscriptionAdapter) extends GraphStageWithMat
result match {
case Success(messages) =>
messages.foreach(queue.enqueue)
ensureQueueHasItems()
pushIfNeeded()
completeIfNeeded()
case Failure(cause) => failStage(cause)
Expand Down

0 comments on commit 3bd40bb

Please sign in to comment.