diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bcd45e8..95eebc0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: steps: - uses: actions/checkout@v2.3.4 - uses: olafurpg/setup-scala@v10 - - run: sbt ++2.13.1 docs/docusaurusCreateSite + - run: sbt ++2.13.5 docs/docusaurusCreateSite test: name: ${{ matrix.command }} ${{ matrix.java }} @@ -25,8 +25,7 @@ jobs: matrix: java: ['adopt@1.8', 'adopt@1.11'] command: - - "++2.12.10 ci" - - "++2.13.1 ci" + - "++2.13.5 ci" steps: - uses: actions/checkout@v2.3.4 - uses: olafurpg/setup-scala@v10 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8a60889..80df2b0 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -19,6 +19,6 @@ jobs: SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} - name: Publish docs - run: sbt ++2.13.1 docs/docusaurusPublishGhpages + run: sbt ++2.13.5 docs/docusaurusPublishGhpages env: GITHUB_DEPLOY_KEY: ${{ secrets.GITHUB_DEPLOY_KEY }} diff --git a/.gitignore b/.gitignore index 3aba384..070a4e8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ *.class .idea .DS_Store +.bsp logs/ project/target/ target/ diff --git a/bench/src/main/scala/io/fmq/JeroMQSocketBenchmark.scala b/bench/src/main/scala/io/fmq/JeroMQSocketBenchmark.scala index 2612d00..43e9c58 100644 --- a/bench/src/main/scala/io/fmq/JeroMQSocketBenchmark.scala +++ b/bench/src/main/scala/io/fmq/JeroMQSocketBenchmark.scala @@ -3,14 +3,13 @@ package io.fmq import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import cats.effect.{ContextShift, Fiber, IO} +import cats.effect.unsafe.IORuntime +import cats.effect.{Fiber, IO} import io.fmq.JeroMQSocketBenchmark.MessagesCounter import org.openjdk.jmh.annotations._ import org.zeromq.{SocketType, ZContext} import zmq.ZMQ -import scala.concurrent.ExecutionContext - //jmh:run io.fmq.JeroMQSocketBenchmark @BenchmarkMode(Array(Mode.Throughput)) @State(Scope.Benchmark) @@ -21,15 +20,15 @@ import scala.concurrent.ExecutionContext @SuppressWarnings(Array("org.wartremover.warts.All")) class JeroMQSocketBenchmark { - private implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + private implicit val runtime: IORuntime = IORuntime.global @Param(Array("128", "256", "512", "1024")) var messageSize: Int = _ private val recording = new AtomicBoolean - private var publisher: Fiber[IO, Unit] = _ - private var consumer: Fiber[IO, Unit] = _ + private var publisher: Fiber[IO, Throwable, Unit] = _ + private var consumer: Fiber[IO, Throwable, Unit] = _ @Setup(Level.Iteration) def setup(): Unit = { @@ -53,7 +52,7 @@ class JeroMQSocketBenchmark { } publisher = IO - .delay { + .interruptible(false) { while (true) { push.send(msg.data()) } @@ -62,7 +61,7 @@ class JeroMQSocketBenchmark { .unsafeRunSync() consumer = IO - .delay { + .interruptible(false) { while (true) { pull.recv() diff --git a/bench/src/main/scala/io/fmq/SocketBenchmark.scala b/bench/src/main/scala/io/fmq/SocketBenchmark.scala index 484a797..58bde40 100644 --- a/bench/src/main/scala/io/fmq/SocketBenchmark.scala +++ b/bench/src/main/scala/io/fmq/SocketBenchmark.scala @@ -3,15 +3,13 @@ package io.fmq import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import cats.effect.{Blocker, ContextShift, Fiber, IO, Resource} -import cats.syntax.flatMap._ +import cats.effect.unsafe.IORuntime +import cats.effect.{Fiber, IO, Resource} import io.fmq.SocketBenchmark.MessagesCounter import io.fmq.syntax.literals._ import org.openjdk.jmh.annotations._ import zmq.ZMQ -import scala.concurrent.ExecutionContext - //jmh:run io.fmq.SocketBenchmark @BenchmarkMode(Array(Mode.Throughput)) @State(Scope.Benchmark) @@ -22,15 +20,15 @@ import scala.concurrent.ExecutionContext @SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.Var")) class SocketBenchmark { - private implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + private implicit val runtime: IORuntime = IORuntime.global @Param(Array("128", "256", "512", "1024")) var messageSize: Int = _ private val recording = new AtomicBoolean - private var publisher: Fiber[IO, Nothing] = _ - private var consumer: Fiber[IO, Nothing] = _ + private var publisher: Fiber[IO, Throwable, Nothing] = _ + private var consumer: Fiber[IO, Throwable, Nothing] = _ @Setup(Level.Iteration) def setup(): Unit = { @@ -38,8 +36,7 @@ class SocketBenchmark { val ((pull, push), _) = (for { - blocker <- Blocker[IO] - context <- Context.create[IO](1, blocker) + context <- Context.create[IO](1) consumer <- Resource.suspend(context.createPull.map(_.bindToRandomPort(uri))) producer <- Resource.suspend(context.createPush.map(_.connect(consumer.uri))) } yield (consumer, producer)).allocated.unsafeRunSync() @@ -59,8 +56,8 @@ class SocketBenchmark { @TearDown(Level.Iteration) def teardown(): Unit = { - consumer.cancel.unsafeRunSync() - publisher.cancel.unsafeRunSync() + consumer.cancel.unsafeRunAndForget() + publisher.cancel.unsafeRunAndForget() } @Benchmark @@ -74,6 +71,7 @@ class SocketBenchmark { } +@SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.Var")) object SocketBenchmark { @AuxCounters @@ -81,11 +79,13 @@ object SocketBenchmark { class MessagesCounter { @Setup(Level.Iteration) - def clean(): Unit = messagesCounter.set(0) + def clean(): Unit = { + messagesCounter = new AtomicLong + } def messagesPerSecond: Long = messagesCounter.get } - private val messagesCounter = new AtomicLong + private var messagesCounter = new AtomicLong } diff --git a/build.sbt b/build.sbt index 0bc3099..e43ee83 100644 --- a/build.sbt +++ b/build.sbt @@ -9,6 +9,7 @@ lazy val core = project .in(file("core")) .settings(commonSettings) .settings(commandSettings) + .settings(pluginAbsolutePathSettings) .settings( name := "fmq-core", libraryDependencies ++= Dependencies.core(scalaVersion.value) @@ -63,7 +64,7 @@ lazy val docs = project lazy val commonSettings = Seq( scalaVersion := Versions.scala_213, - crossScalaVersions := Seq(scalaVersion.value, Versions.scala_212), + crossScalaVersions := Seq(scalaVersion.value), Test / fork := true, Test / parallelExecution := false, Compile / compile / wartremoverErrors ++= Warts.allBut(Wart.Any, Wart.Nothing), // false positive @@ -97,6 +98,21 @@ lazy val noPublishSettings = Seq( publish / skip := true ) +// See https://github.com/sbt/sbt/issues/6027 +lazy val pluginAbsolutePathSettings = Seq(Compile, Test).map { c => + c / scalacOptions := { + val prefix = "-Xplugin:" + (c / scalacOptions).value.map { opt => + if (opt.startsWith(prefix)) { + val originalPluginFile = file(opt.drop(prefix.length)) + prefix + originalPluginFile.toPath.toAbsolutePath + } else { + opt + } + } + } +} + inThisBuild( Seq( organization := "io.github.irevive", diff --git a/core/src/main/scala/io/fmq/Context.scala b/core/src/main/scala/io/fmq/Context.scala index 40fc7e7..8012c29 100644 --- a/core/src/main/scala/io/fmq/Context.scala +++ b/core/src/main/scala/io/fmq/Context.scala @@ -1,6 +1,6 @@ package io.fmq -import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync} +import cats.effect.kernel.{Async, Resource} import io.fmq.poll.Poller import io.fmq.proxy.Proxy import io.fmq.socket.pipeline.{Pull, Push} @@ -8,60 +8,60 @@ import io.fmq.socket.pubsub.{Publisher, Subscriber, XPublisher, XSubscriber} import io.fmq.socket.reqrep.{Dealer, Reply, Request, Router} import org.zeromq.{SocketType, ZContext, ZMQ} -final class Context[F[_]: Sync: ContextShift] private (private[fmq] val ctx: ZContext, blocker: Blocker) { +final class Context[F[_]: Async] private (private[fmq] val ctx: ZContext) { def createSubscriber(topic: Subscriber.Topic): F[Subscriber[F]] = createSocket(SocketType.SUB) { socket => val _ = socket.subscribe(topic.value) - new Subscriber[F](topic, socket, blocker) + new Subscriber[F](topic, socket) } def createPublisher: F[Publisher[F]] = - createSocket(SocketType.PUB)(socket => new Publisher[F](socket, blocker)) + createSocket(SocketType.PUB)(socket => new Publisher[F](socket)) def createXSubscriber: F[XSubscriber[F]] = - createSocket(SocketType.XSUB)(socket => new XSubscriber(socket, blocker)) + createSocket(SocketType.XSUB)(socket => new XSubscriber(socket)) def createXPublisher: F[XPublisher[F]] = - createSocket(SocketType.XPUB)(socket => new XPublisher(socket, blocker)) + createSocket(SocketType.XPUB)(socket => new XPublisher(socket)) def createPull: F[Pull[F]] = - createSocket(SocketType.PULL)(socket => new Pull(socket, blocker)) + createSocket(SocketType.PULL)(socket => new Pull(socket)) def createPush: F[Push[F]] = - createSocket(SocketType.PUSH)(socket => new Push(socket, blocker)) + createSocket(SocketType.PUSH)(socket => new Push(socket)) def createRequest: F[Request[F]] = - createSocket(SocketType.REQ)(socket => new Request(socket, blocker)) + createSocket(SocketType.REQ)(socket => new Request(socket)) def createReply: F[Reply[F]] = - createSocket(SocketType.REP)(socket => new Reply(socket, blocker)) + createSocket(SocketType.REP)(socket => new Reply(socket)) def createRouter: F[Router[F]] = - createSocket(SocketType.ROUTER)(socket => new Router(socket, blocker)) + createSocket(SocketType.ROUTER)(socket => new Router(socket)) def createDealer: F[Dealer[F]] = - createSocket(SocketType.DEALER)(socket => new Dealer(socket, blocker)) + createSocket(SocketType.DEALER)(socket => new Dealer(socket)) def createPoller: Resource[F, Poller[F]] = for { - selector <- Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(ctx.getContext.selector())) + selector <- Resource.fromAutoCloseable(Async[F].delay(ctx.getContext.selector())) } yield Poller.fromSelector[F](selector) - def proxy(implicit ev: Concurrent[F]): Proxy[F] = new Proxy[F](this) + val proxy: Proxy[F] = new Proxy[F](this) - def isClosed: F[Boolean] = Sync[F].delay(ctx.isClosed) + def isClosed: F[Boolean] = Async[F].delay(ctx.isClosed) private def createSocket[A](tpe: SocketType)(fa: ZMQ.Socket => A): F[A] = - Sync[F].delay(fa(ctx.createSocket(tpe))) + Async[F].delay(fa(ctx.createSocket(tpe))) } object Context { - def create[F[_]: Sync: ContextShift](ioThreads: Int, blocker: Blocker): Resource[F, Context[F]] = + def create[F[_]: Async](ioThreads: Int): Resource[F, Context[F]] = for { - ctx <- Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(new ZContext(ioThreads))) - } yield new Context[F](ctx, blocker) + ctx <- Resource.fromAutoCloseable(Async[F].delay(new ZContext(ioThreads))) + } yield new Context[F](ctx) } diff --git a/core/src/main/scala/io/fmq/poll/PollTimeout.scala b/core/src/main/scala/io/fmq/poll/PollTimeout.scala index 57b1ede..c43c151 100644 --- a/core/src/main/scala/io/fmq/poll/PollTimeout.scala +++ b/core/src/main/scala/io/fmq/poll/PollTimeout.scala @@ -1,5 +1,7 @@ package io.fmq.poll +import cats.Eq + import scala.concurrent.duration.FiniteDuration sealed abstract class PollTimeout(val value: Long) @@ -16,4 +18,6 @@ object PollTimeout { */ final case class Fixed(duration: FiniteDuration) extends PollTimeout(duration.toMillis) + implicit val pollTimeoutEq: Eq[PollTimeout] = Eq.fromUniversalEquals + } diff --git a/core/src/main/scala/io/fmq/poll/Poller.scala b/core/src/main/scala/io/fmq/poll/Poller.scala index a8d93ac..1015612 100644 --- a/core/src/main/scala/io/fmq/poll/Poller.scala +++ b/core/src/main/scala/io/fmq/poll/Poller.scala @@ -1,30 +1,137 @@ package io.fmq.poll -import java.nio.channels.Selector +import java.io.IOException +import java.nio.channels._ +import java.util.concurrent.TimeUnit +import cats.Monad import cats.data.NonEmptyList -import cats.effect.Sync +import cats.effect.kernel.Sync import cats.syntax.applicative._ +import cats.syntax.applicativeError._ +import cats.syntax.order._ import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.syntax.traverse._ +import zmq.ZError import zmq.poll.{PollItem => ZPollItem} +import scala.jdk.CollectionConverters._ + final class Poller[F[_]: Sync] private (private[fmq] val selector: Selector) { /** * In the case of [[PollTimeout.Infinity]] the thread will be '''blocked''' until at least one socket - * can either receive or send a message (based on the socket type). + * can either receive or send a message (according to the type of a socket). * * @return total number of available events */ def poll(items: NonEmptyList[PollEntry[F]], timeout: PollTimeout): F[Int] = for { polling <- items.map(item => (item, toZmqPollItem(item))).toList.pure[F] - events <- Sync[F].delay(zmq.ZMQ.poll(selector, polling.toMap.values.toArray, timeout.value)) + events <- pollInternal(polling.toMap.values.toList, timeout) _ <- polling.traverse((dispatchItem _).tupled) } yield events + private def pollInternal(items: List[ZPollItem], timeout: PollTimeout): F[Int] = + (Sync[F].delay(attachItemsToSelector(selector, items)) >> doPoll(timeout)).recoverWith { + case _: ClosedSelectorException => Sync[F].pure(-1) + case _: ClosedChannelException => Sync[F].pure(-1) + case e: IOException => Sync[F].raiseError(new ZError.IOException(e)) + } + + private def doPoll(timeout: PollTimeout): F[Int] = + Monad[F].tailRecM[(Long, Long, Boolean), Int]((0L, 0L, true)) { case (now, end, firstPass) => + val waitMillis: Long = + if (firstPass) 0L + else if (timeout.value === 0L || timeout === PollTimeout.Infinity) -1L + else { + val diff = TimeUnit.NANOSECONDS.toMillis(end - now) + if (diff === 0L) 1L else diff + } + + Sync[F].interruptible(many = false)(readyKeys(waitMillis)).flatMap { + case nevents if nevents > 0 => + Sync[F].pure(Right(nevents)) + + case nevents if timeout.value === 0L => + Sync[F].pure(Right(nevents)) + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + case _ if timeout === PollTimeout.Infinity => + Sync[F].pure(Left((now, end, false))) + + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + case nevents if firstPass => + val now = zmq.util.Clock.nowNS() + val end = now + TimeUnit.MILLISECONDS.toNanos(timeout.value) + Sync[F].pure(Either.cond(now === end, nevents, (now, end, false))) + + // Find out whether timeout have expired. + case nevents => + val now = zmq.util.Clock.nowNS() + Sync[F].pure(Either.cond(now >= end, nevents, (now, end, false))) + } + } + + private def attachItemsToSelector(selector: Selector, items: List[ZPollItem]): Unit = { + val saved = new scala.collection.mutable.HashMap[SelectableChannel, SelectionKey] + for (key <- selector.keys().asScala if key.isValid) saved.put(key.channel(), key) + + for (item <- items) { + val ch = item.getChannel // mailbox channel if ZMQ socket + saved.remove(ch) match { + case Some(key) => + if (key.interestOps() =!= item.interestOps()) { + val _ = key.interestOps(item.interestOps()) + } + key.attach(item) + + case None => + ch.register(selector, item.interestOps(), item) + } + } + + if (saved.nonEmpty) { + for (deprecated <- saved.values) + deprecated.cancel() + } + } + + private def readyKeys(waitMillis: Long): Int = { + + @SuppressWarnings(Array("org.wartremover.warts.TraversableOps", "org.wartremover.warts.AsInstanceOf")) + @scala.annotation.tailrec + def loop(keys: Set[SelectionKey], counter: Int, rc: Int): Int = + if (keys.nonEmpty) { + val key = keys.head + + val item = key.attachment().asInstanceOf[ZPollItem] + val ready = item.readyOps(key, rc) + + if (ready < 0) -1 + else if (ready === 0) loop(keys.tail, counter, rc) + else loop(keys.tail, counter + 1, rc) + } else { + counter + } + + val rc = + if (waitMillis < 0) selector.select(0) + else if (waitMillis === 0) selector.selectNow() + else selector.select(waitMillis) + + val result = loop(selector.keys().asScala.toSet, 0, rc) + if (result >= 0) { + selector.selectedKeys().clear() + } + result + } + private def dispatchItem(entity: PollEntry[F], item: ZPollItem): F[Unit] = { val availableEvents = item.readyOps() diff --git a/core/src/main/scala/io/fmq/proxy/Proxy.scala b/core/src/main/scala/io/fmq/proxy/Proxy.scala index b3c1863..9e896c0 100644 --- a/core/src/main/scala/io/fmq/proxy/Proxy.scala +++ b/core/src/main/scala/io/fmq/proxy/Proxy.scala @@ -2,16 +2,17 @@ package io.fmq package proxy import cats.data.{Kleisli, NonEmptyList} -import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync} -import cats.effect.syntax.concurrent._ +import cats.effect.kernel.{Async, Outcome, Resource} import cats.syntax.apply._ import cats.syntax.flatMap._ import cats.syntax.functor._ import io.fmq.poll.{ConsumerHandler, PollEntry, PollTimeout, Poller} import io.fmq.socket.{BidirectionalSocket, ConsumerSocket, ProducerSocket} +import scala.concurrent.ExecutionContext + @SuppressWarnings(Array("org.wartremover.warts.Overloading")) -final class Proxy[F[_]: Concurrent: ContextShift](ctx: Context[F]) { +final class Proxy[F[_]: Async](ctx: Context[F]) { def unidirectional(frontend: ConsumerSocket[F], backend: ProducerSocket[F]): Resource[F, Proxy.Configured[F]] = unidirectional(frontend, backend, None) @@ -56,7 +57,7 @@ final class Proxy[F[_]: Concurrent: ContextShift](ctx: Context[F]) { val withCapture: (ProducerSocket[F] => F[Unit]) => F[Unit] = capture match { case Some(c) => f => f(c.socket) - case None => _ => Sync[F].unit + case None => _ => Async[F].unit } def send(message: Array[Byte], socket: ConsumerSocket[F]): F[Unit] = @@ -78,13 +79,13 @@ final class Proxy[F[_]: Concurrent: ContextShift](ctx: Context[F]) { object Proxy { - final class Configured[F[_]: Concurrent: ContextShift] private[Proxy] ( + final class Configured[F[_]: Async] private[Proxy] ( poller: Poller[F], items: NonEmptyList[PollEntry[F]] ) { - def start(blocker: Blocker): Resource[F, F[Unit]] = - blocker.blockOn(poller.poll(items, PollTimeout.Infinity).foreverM[Unit]).background + def start(ec: ExecutionContext): Resource[F, F[Outcome[F, Throwable, Unit]]] = + Async[F].backgroundOn(poller.poll(items, PollTimeout.Infinity).foreverM[Unit], ec) } diff --git a/core/src/main/scala/io/fmq/socket/ConnectedSocket.scala b/core/src/main/scala/io/fmq/socket/ConnectedSocket.scala index a99128b..3a56988 100644 --- a/core/src/main/scala/io/fmq/socket/ConnectedSocket.scala +++ b/core/src/main/scala/io/fmq/socket/ConnectedSocket.scala @@ -1,6 +1,6 @@ package io.fmq.socket -import cats.effect.Sync +import cats.effect.kernel.Sync import io.fmq.address.Uri import org.zeromq.ZMQ diff --git a/core/src/main/scala/io/fmq/socket/Connectivity.scala b/core/src/main/scala/io/fmq/socket/Connectivity.scala index 4242704..b4fbff4 100644 --- a/core/src/main/scala/io/fmq/socket/Connectivity.scala +++ b/core/src/main/scala/io/fmq/socket/Connectivity.scala @@ -1,12 +1,11 @@ package io.fmq.socket -import cats.effect.{ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.socket.api.{BindApi, ConnectApi, SocketFactory} object Connectivity { abstract class All[F[_], Socket[_[_]]]( - protected implicit val CS: ContextShift[F], protected implicit val F: Sync[F], protected implicit val SF: SocketFactory[Socket] ) extends BindApi[F, Socket] diff --git a/core/src/main/scala/io/fmq/socket/ConsumerSocket.scala b/core/src/main/scala/io/fmq/socket/ConsumerSocket.scala index bb4161d..53b9237 100644 --- a/core/src/main/scala/io/fmq/socket/ConsumerSocket.scala +++ b/core/src/main/scala/io/fmq/socket/ConsumerSocket.scala @@ -1,7 +1,7 @@ package io.fmq.socket import cats.data.NonEmptyList -import cats.effect.Sync +import cats.effect.kernel.Sync import cats.syntax.flatMap._ import cats.syntax.functor._ import io.fmq.frame.{Frame, FrameDecoder} @@ -30,23 +30,23 @@ trait ConsumerSocket[F[_]] extends ConnectedSocket with SocketOptions[F] with Co * * The operation blocks a thread until a new message is available. * - * Use `blocker.blockOn(socket.receive[Array[Byte]])` or consume messages on a blocking context in the background: + * Use `socket.receive[Array[Byte]].evalOn(blocker)` or consume messages on a blocking context in the background: * * {{{ - * import cats.effect.syntax.concurrent._ - * import cats.effect.{Blocker, Concurrent, ContextShift, Resource} + * import cats.effect.syntax.async._ + * import cats.effect.{Async, Concurrent, Resource} + * import cats.effect.std.Queue * import fs2.Stream - * import fs2.concurrent.Queue * import io.fmq.socket.ConsumerSocket * - * def consume[F[_]: Concurrent: ContextShift](blocker: Blocker, socket: ConsumerSocket[F]): Stream[F, Array[Byte]] = { + * def consume[F[_]: Async](blocker: ExecutionContext, socket: ConsumerSocket[F]): Stream[F, Array[Byte]] = { * def process(queue: Queue[F, Array[Byte]]) = - * blocker.blockOn(Stream.repeatEval(socket.receive[Array[Byte]]).through(queue.enqueue).compile.drain) + * Stream.repeatEval(socket.receive[Array[Byte]]).evalMap(queue.offer).compile.drain * * for { * queue <- Stream.eval(Queue.unbounded[F, Array[Byte]]) - * _ <- Stream.resource(process(queue).background) - * result <- queue.dequeue + * _ <- Stream.resource(process(queue).backgroundOn(blocker)) + * result <- Stream.repeatEval(queue.take) * } yield result * } * }}} @@ -57,7 +57,7 @@ trait ConsumerSocket[F[_]] extends ConnectedSocket with SocketOptions[F] with Co * }}} */ def receive[A: FrameDecoder]: F[A] = - F.delay(FrameDecoder[A].decode(socket.recv())) + F.interruptible(many = true)(FrameDecoder[A].decode(socket.recv())) /** * Low-level API. diff --git a/core/src/main/scala/io/fmq/socket/ProducerSocket.scala b/core/src/main/scala/io/fmq/socket/ProducerSocket.scala index 7c16e79..d28a3ec 100644 --- a/core/src/main/scala/io/fmq/socket/ProducerSocket.scala +++ b/core/src/main/scala/io/fmq/socket/ProducerSocket.scala @@ -1,6 +1,6 @@ package io.fmq.socket -import cats.effect.Sync +import cats.effect.kernel.Sync import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.syntax.traverse._ diff --git a/core/src/main/scala/io/fmq/socket/api/BindApi.scala b/core/src/main/scala/io/fmq/socket/api/BindApi.scala index dcd013f..f2a9db0 100644 --- a/core/src/main/scala/io/fmq/socket/api/BindApi.scala +++ b/core/src/main/scala/io/fmq/socket/api/BindApi.scala @@ -1,36 +1,34 @@ package io.fmq.socket.api -import cats.effect.{Blocker, ContextShift, Resource, Sync} +import cats.effect.kernel.{Resource, Sync} import cats.syntax.functor._ import io.fmq.address.{Address, Uri} import org.zeromq.ZMQ trait BindApi[F[_], Socket[_[_]]] { - protected implicit val CS: ContextShift[F] protected implicit val F: Sync[F] protected implicit val SF: SocketFactory[Socket] protected[fmq] def socket: ZMQ.Socket - protected def blocker: Blocker final def bind(uri: Uri.Complete): Resource[F, Socket[F]] = { val address = uri.materialize - val acquire: F[ZMQ.Socket] = blocker.delay(socket.bind(address)).as(socket) - def release(s: ZMQ.Socket): F[Unit] = blocker.delay(s.unbind(address)).void + val acquire: F[ZMQ.Socket] = F.blocking(socket.bind(address)).as(socket) + def release(s: ZMQ.Socket): F[Unit] = F.blocking(s.unbind(address)).void Resource.make(acquire)(release).as(SF.create[F](socket, uri)) } final def bindToRandomPort(uri: Uri.Incomplete.TCP): Resource[F, Socket[F]] = { - val acquire: F[Uri.Complete.TCP] = blocker.delay { + val acquire: F[Uri.Complete.TCP] = F.blocking { val port = socket.bindToRandomPort(uri.materialize) Uri.Complete.TCP(Address.Full(uri.address.host, port)) } def release(uri: Uri.Complete.TCP): F[Unit] = - blocker.delay(socket.unbind(uri.materialize)).void + F.blocking(socket.unbind(uri.materialize)).void for { completeUri <- Resource.make(acquire)(release) diff --git a/core/src/main/scala/io/fmq/socket/api/ConnectApi.scala b/core/src/main/scala/io/fmq/socket/api/ConnectApi.scala index b42925b..04f80b9 100644 --- a/core/src/main/scala/io/fmq/socket/api/ConnectApi.scala +++ b/core/src/main/scala/io/fmq/socket/api/ConnectApi.scala @@ -1,24 +1,22 @@ package io.fmq.socket.api -import cats.effect.{Blocker, ContextShift, Resource, Sync} +import cats.effect.kernel.{Resource, Sync} import cats.syntax.functor._ import io.fmq.address.Uri import org.zeromq.ZMQ trait ConnectApi[F[_], Socket[_[_]]] { - protected implicit val CS: ContextShift[F] protected implicit val F: Sync[F] protected implicit val SF: SocketFactory[Socket] protected[fmq] def socket: ZMQ.Socket - protected def blocker: Blocker final def connect(uri: Uri.Complete): Resource[F, Socket[F]] = { val address = uri.materialize - val acquire: F[ZMQ.Socket] = blocker.delay(socket.connect(address)).as(socket) - def release(s: ZMQ.Socket): F[Unit] = blocker.delay(s.disconnect(address)).void + val acquire: F[ZMQ.Socket] = F.blocking(socket.connect(address)).as(socket) + def release(s: ZMQ.Socket): F[Unit] = F.blocking(s.disconnect(address)).void Resource.make(acquire)(release).as(SF.create[F](socket, uri)) } diff --git a/core/src/main/scala/io/fmq/socket/api/SocketFactory.scala b/core/src/main/scala/io/fmq/socket/api/SocketFactory.scala index a8f0a22..4456e07 100644 --- a/core/src/main/scala/io/fmq/socket/api/SocketFactory.scala +++ b/core/src/main/scala/io/fmq/socket/api/SocketFactory.scala @@ -1,6 +1,6 @@ package io.fmq.socket.api -import cats.effect.Sync +import cats.effect.kernel.Sync import io.fmq.address.Uri import org.zeromq.ZMQ diff --git a/core/src/main/scala/io/fmq/socket/api/SocketOptions.scala b/core/src/main/scala/io/fmq/socket/api/SocketOptions.scala index c5fa450..94a60e1 100644 --- a/core/src/main/scala/io/fmq/socket/api/SocketOptions.scala +++ b/core/src/main/scala/io/fmq/socket/api/SocketOptions.scala @@ -1,6 +1,6 @@ package io.fmq.socket.api -import cats.effect.Sync +import cats.effect.kernel.Sync import org.zeromq.ZMQ private[socket] trait SocketOptions[F[_]] { diff --git a/core/src/main/scala/io/fmq/socket/pipeline/Pull.scala b/core/src/main/scala/io/fmq/socket/pipeline/Pull.scala index 3279ba3..addf46a 100644 --- a/core/src/main/scala/io/fmq/socket/pipeline/Pull.scala +++ b/core/src/main/scala/io/fmq/socket/pipeline/Pull.scala @@ -1,14 +1,13 @@ package io.fmq.socket.pipeline -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri import io.fmq.socket.api.{CommonOptions, ReceiveOptions, SocketFactory, SocketOptions} import io.fmq.socket.{Connectivity, ConsumerSocket} import org.zeromq.ZMQ -final class Pull[F[_]: Sync: ContextShift] private[fmq] ( - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker +final class Pull[F[_]: Sync] private[fmq] ( + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, Pull.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/main/scala/io/fmq/socket/pipeline/Push.scala b/core/src/main/scala/io/fmq/socket/pipeline/Push.scala index 897fd96..e1ea19b 100644 --- a/core/src/main/scala/io/fmq/socket/pipeline/Push.scala +++ b/core/src/main/scala/io/fmq/socket/pipeline/Push.scala @@ -1,14 +1,13 @@ package io.fmq.socket.pipeline -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri import io.fmq.socket.api.{CommonOptions, SendOptions, SocketFactory, SocketOptions} import io.fmq.socket.{Connectivity, ProducerSocket} import org.zeromq.ZMQ -final class Push[F[_]: Sync: ContextShift] private[fmq] ( - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker +final class Push[F[_]: Sync] private[fmq] ( + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, Push.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/main/scala/io/fmq/socket/pubsub/Publisher.scala b/core/src/main/scala/io/fmq/socket/pubsub/Publisher.scala index 25906d9..ee17963 100644 --- a/core/src/main/scala/io/fmq/socket/pubsub/Publisher.scala +++ b/core/src/main/scala/io/fmq/socket/pubsub/Publisher.scala @@ -1,14 +1,13 @@ package io.fmq.socket.pubsub -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri import io.fmq.socket.api.{CommonOptions, SendOptions, SocketFactory, SocketOptions} import io.fmq.socket.{Connectivity, ProducerSocket} import org.zeromq.ZMQ -final class Publisher[F[_]: Sync: ContextShift] private[fmq] ( - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker +final class Publisher[F[_]: Sync] private[fmq] ( + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, Publisher.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/main/scala/io/fmq/socket/pubsub/Subscriber.scala b/core/src/main/scala/io/fmq/socket/pubsub/Subscriber.scala index 45d29e3..7360f8d 100644 --- a/core/src/main/scala/io/fmq/socket/pubsub/Subscriber.scala +++ b/core/src/main/scala/io/fmq/socket/pubsub/Subscriber.scala @@ -2,16 +2,15 @@ package io.fmq.socket.pubsub import java.nio.charset.StandardCharsets -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri import io.fmq.socket.api.{CommonOptions, ReceiveOptions, SocketFactory, SocketOptions} import io.fmq.socket.{Connectivity, ConsumerSocket} import org.zeromq.ZMQ -final class Subscriber[F[_]: Sync: ContextShift] private[fmq] ( +final class Subscriber[F[_]: Sync] private[fmq] ( val topic: Subscriber.Topic, - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, Subscriber.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/main/scala/io/fmq/socket/pubsub/XPublisher.scala b/core/src/main/scala/io/fmq/socket/pubsub/XPublisher.scala index dc0d949..abab4ee 100644 --- a/core/src/main/scala/io/fmq/socket/pubsub/XPublisher.scala +++ b/core/src/main/scala/io/fmq/socket/pubsub/XPublisher.scala @@ -1,14 +1,13 @@ package io.fmq.socket.pubsub -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri import io.fmq.socket.api.{CommonOptions, ReceiveOptions, SendOptions, SocketFactory, SocketOptions} import io.fmq.socket.{BidirectionalSocket, Connectivity} import org.zeromq.ZMQ -final class XPublisher[F[_]: Sync: ContextShift] private[fmq] ( - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker +final class XPublisher[F[_]: Sync] private[fmq] ( + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, XPublisher.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/main/scala/io/fmq/socket/pubsub/XSubscriber.scala b/core/src/main/scala/io/fmq/socket/pubsub/XSubscriber.scala index e8c1f8d..d6bab33 100644 --- a/core/src/main/scala/io/fmq/socket/pubsub/XSubscriber.scala +++ b/core/src/main/scala/io/fmq/socket/pubsub/XSubscriber.scala @@ -1,14 +1,13 @@ package io.fmq.socket.pubsub -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri -import io.fmq.socket.api.{CommonOptions, ReceiveOptions, SendOptions, SocketFactory, SocketOptions} +import io.fmq.socket.api._ import io.fmq.socket.{BidirectionalSocket, Connectivity} import org.zeromq.ZMQ -final class XSubscriber[F[_]: Sync: ContextShift] private[fmq] ( - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker +final class XSubscriber[F[_]: Sync] private[fmq] ( + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, XSubscriber.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/main/scala/io/fmq/socket/reqrep/Dealer.scala b/core/src/main/scala/io/fmq/socket/reqrep/Dealer.scala index 33dbf3f..33e67d2 100644 --- a/core/src/main/scala/io/fmq/socket/reqrep/Dealer.scala +++ b/core/src/main/scala/io/fmq/socket/reqrep/Dealer.scala @@ -1,14 +1,13 @@ package io.fmq.socket.reqrep -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri import io.fmq.socket.api.{CommonOptions, ReceiveOptions, SendOptions, SocketFactory, SocketOptions} import io.fmq.socket.{BidirectionalSocket, Connectivity} import org.zeromq.ZMQ -final class Dealer[F[_]: Sync: ContextShift] private[fmq] ( - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker +final class Dealer[F[_]: Sync] private[fmq] ( + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, Dealer.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/main/scala/io/fmq/socket/reqrep/Reply.scala b/core/src/main/scala/io/fmq/socket/reqrep/Reply.scala index cdb006a..dd2d9a6 100644 --- a/core/src/main/scala/io/fmq/socket/reqrep/Reply.scala +++ b/core/src/main/scala/io/fmq/socket/reqrep/Reply.scala @@ -1,14 +1,13 @@ package io.fmq.socket.reqrep -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri import io.fmq.socket.api.{CommonOptions, SendOptions, SocketFactory, SocketOptions} import io.fmq.socket.{BidirectionalSocket, Connectivity} import org.zeromq.ZMQ -final class Reply[F[_]: Sync: ContextShift] private[fmq] ( - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker +final class Reply[F[_]: Sync] private[fmq] ( + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, Reply.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/main/scala/io/fmq/socket/reqrep/Request.scala b/core/src/main/scala/io/fmq/socket/reqrep/Request.scala index d3c767d..0edfa2f 100644 --- a/core/src/main/scala/io/fmq/socket/reqrep/Request.scala +++ b/core/src/main/scala/io/fmq/socket/reqrep/Request.scala @@ -1,14 +1,13 @@ package io.fmq.socket.reqrep -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri import io.fmq.socket.api.{CommonOptions, ReceiveOptions, SocketFactory, SocketOptions} import io.fmq.socket.{BidirectionalSocket, Connectivity} import org.zeromq.ZMQ -final class Request[F[_]: Sync: ContextShift] private[fmq] ( - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker +final class Request[F[_]: Sync] private[fmq] ( + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, Request.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/main/scala/io/fmq/socket/reqrep/Router.scala b/core/src/main/scala/io/fmq/socket/reqrep/Router.scala index 345aa3a..70733d3 100644 --- a/core/src/main/scala/io/fmq/socket/reqrep/Router.scala +++ b/core/src/main/scala/io/fmq/socket/reqrep/Router.scala @@ -1,15 +1,14 @@ package io.fmq.socket.reqrep -import cats.effect.{Blocker, ContextShift, Sync} +import cats.effect.kernel.Sync import io.fmq.address.Uri import io.fmq.options.{RouterHandover, RouterMandatory} import io.fmq.socket.api.{CommonOptions, ReceiveOptions, SendOptions, SocketFactory, SocketOptions} import io.fmq.socket.{BidirectionalSocket, Connectivity} import org.zeromq.ZMQ -final class Router[F[_]: Sync: ContextShift] private[fmq] ( - protected[fmq] val socket: ZMQ.Socket, - protected val blocker: Blocker +final class Router[F[_]: Sync] private[fmq] ( + protected[fmq] val socket: ZMQ.Socket ) extends Connectivity.All[F, Router.Socket] with SocketOptions[F] with CommonOptions.All[F] diff --git a/core/src/test/scala/io/fmq/ContextSpec.scala b/core/src/test/scala/io/fmq/ContextSpec.scala index 8f50277..6961e41 100644 --- a/core/src/test/scala/io/fmq/ContextSpec.scala +++ b/core/src/test/scala/io/fmq/ContextSpec.scala @@ -1,6 +1,6 @@ package io.fmq -import cats.effect.{Blocker, IO} +import cats.effect.IO import cats.syntax.functor._ import org.scalatest.OptionValues._ @@ -11,8 +11,8 @@ class ContextSpec extends IOSpec { "Context" should { "release allocated context" in { - val (isClosed, ctx) = Blocker[IO] - .flatMap(blocker => Context.create[IO](1, blocker)) + val (isClosed, ctx) = Context + .create[IO](1) .use(ctx => ctx.isClosed.tupleRight(ctx)) .unsafeRunTimed(3.seconds) .value diff --git a/core/src/test/scala/io/fmq/IOSpec.scala b/core/src/test/scala/io/fmq/IOSpec.scala index bed4f5a..34e2ed7 100644 --- a/core/src/test/scala/io/fmq/IOSpec.scala +++ b/core/src/test/scala/io/fmq/IOSpec.scala @@ -1,27 +1,24 @@ package io.fmq -import cats.effect.syntax.effect._ -import cats.effect.{Blocker, ContextShift, Effect, IO, Sync, Timer} +import cats.effect.IO +import cats.effect.unsafe.IORuntime import org.scalatest.OptionValues._ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ trait IOSpec extends AnyWordSpecLike with Matchers { - protected implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - protected implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val runtime: IORuntime = IORuntime.global @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) - protected def withContext[F[_]: Sync: ContextShift: Effect, A]( - timeout: FiniteDuration = 3.seconds - )(fa: Context[F] => F[A]): A = - Blocker[F] - .flatMap(blocker => Context.create[F](1, blocker)) + protected def withContext[A]( + timeout: FiniteDuration = 30.seconds + )(fa: Context[IO] => IO[A]): A = + Context + .create[IO](1) .use(fa) - .toIO .unsafeRunTimed(timeout) .value diff --git a/core/src/test/scala/io/fmq/poll/PollerSpec.scala b/core/src/test/scala/io/fmq/poll/PollerSpec.scala index 8cf7d40..a4d64a9 100644 --- a/core/src/test/scala/io/fmq/poll/PollerSpec.scala +++ b/core/src/test/scala/io/fmq/poll/PollerSpec.scala @@ -1,10 +1,10 @@ package io.fmq.poll import cats.data.{Kleisli, NonEmptyList} -import cats.effect.{IO, Resource, Timer} +import cats.effect.std.Queue +import cats.effect.{IO, Resource} import cats.syntax.flatMap._ import cats.syntax.apply._ -import fs2.concurrent.Queue import io.fmq.socket.pubsub.Subscriber import io.fmq.socket.{ConsumerSocket, ProducerSocket} import io.fmq.syntax.literals._ @@ -16,7 +16,7 @@ import zmq.poll.PollItem import scala.concurrent.duration._ /** - * Tests are using Timer[IO].sleep(200.millis) to fix 'slow-joiner' problem. + * Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem. * More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver */ class PollerSpec extends IOSpec { @@ -49,12 +49,12 @@ class PollerSpec extends IOSpec { ) for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- producer.send("Topic-A") events1 <- IO.delay(ZMQ.poll(poller.selector, items, -1)) - _ <- Timer[IO].sleep(100.millis) + _ <- IO.sleep(100.millis) _ <- producer.send("Topic-B") - _ <- Timer[IO].sleep(100.millis) + _ <- IO.sleep(100.millis) events2 <- IO.delay(ZMQ.poll(poller.selector, items, -1)) _ <- producer.send("Topic-A") _ <- producer.send("Topic-B") @@ -83,7 +83,7 @@ class PollerSpec extends IOSpec { } yield (publisher, consumerA, consumerB, poller) def handler(queue: Queue[IO, String]): ConsumerHandler[IO] = - Kleisli(socket => socket.receive[String] >>= queue.enqueue1) + Kleisli(socket => socket.receive[String] >>= queue.offer) def program( producer: ProducerSocket[IO], @@ -92,7 +92,7 @@ class PollerSpec extends IOSpec { poller: Poller[IO] ): IO[Assertion] = for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) queueA <- Queue.unbounded[IO, String] queueB <- Queue.unbounded[IO, String] items = NonEmptyList.of( @@ -100,19 +100,19 @@ class PollerSpec extends IOSpec { PollEntry.Read(consumerB, handler(queueB)) ) _ <- poller.poll(items, PollTimeout.Fixed(200.millis)) - (queueA1, queueB1) <- (queueA.tryDequeue1, queueB.tryDequeue1).tupled + (queueA1, queueB1) <- (queueA.tryTake, queueB.tryTake).tupled _ <- producer.send("Topic-A") _ <- poller.poll(items, PollTimeout.Infinity) - (queueA2, queueB2) <- (queueA.tryDequeue1, queueB.tryDequeue1).tupled + (queueA2, queueB2) <- (queueA.tryTake, queueB.tryTake).tupled _ <- producer.send("Topic-B") - _ <- Timer[IO].sleep(100.millis) + _ <- IO.sleep(100.millis) _ <- poller.poll(items, PollTimeout.Infinity) - (queueA3, queueB3) <- (queueA.tryDequeue1, queueB.tryDequeue1).tupled + (queueA3, queueB3) <- (queueA.tryTake, queueB.tryTake).tupled _ <- producer.send("Topic-A") _ <- producer.send("Topic-B") - _ <- Timer[IO].sleep(100.millis) + _ <- IO.sleep(100.millis) _ <- poller.poll(items, PollTimeout.Infinity) - (queueA4, queueB4) <- (queueA.tryDequeue1, queueB.tryDequeue1).tupled + (queueA4, queueB4) <- (queueA.tryTake, queueB.tryTake).tupled } yield { queueA1 shouldBe empty queueB1 shouldBe empty @@ -144,7 +144,7 @@ class PollerSpec extends IOSpec { } yield (publisher, consumerA, consumerB, poller) def consumerHandler(queue: Queue[IO, String]): ConsumerHandler[IO] = - Kleisli(socket => socket.receive[String] >>= queue.enqueue1) + Kleisli(socket => socket.receive[String] >>= queue.offer) def producerHandler: ProducerHandler[IO] = Kleisli(socket => socket.send("Topic-A") >> socket.send("Topic-B")) @@ -157,8 +157,8 @@ class PollerSpec extends IOSpec { ): IO[Assertion] = { val setup: Resource[IO, (Queue[IO, String], Queue[IO, String])] = for { - queueA <- Resource.liftF(Queue.unbounded[IO, String]) - queueB <- Resource.liftF(Queue.unbounded[IO, String]) + queueA <- Resource.eval(Queue.unbounded[IO, String]) + queueB <- Resource.eval(Queue.unbounded[IO, String]) items = NonEmptyList.of( PollEntry.Write(producer, producerHandler), PollEntry.Read(consumerA, consumerHandler(queueA)), @@ -171,11 +171,11 @@ class PollerSpec extends IOSpec { val (queueA, queueB) = pair for { - _ <- Timer[IO].sleep(200.millis) - a1 <- queueA.dequeue1 - a2 <- queueA.dequeue1 - b1 <- queueB.dequeue1 - b2 <- queueB.dequeue1 + _ <- IO.sleep(200.millis) + a1 <- queueA.take + a2 <- queueA.take + b1 <- queueB.take + b2 <- queueB.take } yield { List(a1, a2) shouldBe List("Topic-A", "Topic-A") List(b1, b2) shouldBe List("Topic-B", "Topic-B") diff --git a/core/src/test/scala/io/fmq/proxy/ProxySpec.scala b/core/src/test/scala/io/fmq/proxy/ProxySpec.scala index 985bfa2..cb032bd 100644 --- a/core/src/test/scala/io/fmq/proxy/ProxySpec.scala +++ b/core/src/test/scala/io/fmq/proxy/ProxySpec.scala @@ -1,6 +1,8 @@ package io.fmq.proxy -import cats.effect.{Blocker, IO, Resource, Timer} +import java.util.concurrent.Executors + +import cats.effect.{IO, Resource} import cats.syntax.flatMap._ import io.fmq.address.Uri import io.fmq.frame.Frame @@ -12,14 +14,17 @@ import io.fmq.syntax.literals._ import io.fmq.{Context, IOSpec} import org.scalatest.Assertion +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ /** - * Tests are using Timer[IO].sleep(200.millis) to fix 'slow-joiner' problem. + * Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem. * More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver */ class ProxySpec extends IOSpec { + private val singleThreadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) + "Proxy" should { "proxy messages in bidirectional way" in withContext() { ctx: Context[IO] => @@ -40,7 +45,7 @@ class ProxySpec extends IOSpec { def program(client: Request.Socket[IO], server: Reply.Socket[IO]): IO[Assertion] = for { - _ <- Timer[IO].sleep(500.millis) + _ <- IO.sleep(500.millis) _ <- client.send("hello") req <- server.receive[String] _ <- server.send("reply") @@ -52,9 +57,8 @@ class ProxySpec extends IOSpec { (for { (front, back) <- createProxySockets - blocker <- Blocker[IO] proxy <- ctx.proxy.bidirectional(front, back) - _ <- proxy.start(blocker) + _ <- proxy.start(singleThreadContext) (client, server) <- createReqRepSockets } yield (client, server)).use((program _).tupled) } @@ -66,7 +70,7 @@ class ProxySpec extends IOSpec { def program(publisher: Publisher.Socket[IO], subscriber: Subscriber.Socket[IO], pull: Pull.Socket[IO]): IO[Assertion] = for { - _ <- Timer[IO].sleep(500.millis) + _ <- IO.sleep(500.millis) _ <- publisher.send("hello") msg <- subscriber.receive[String] control <- pull.receive[String] @@ -78,7 +82,6 @@ class ProxySpec extends IOSpec { val topic = Subscriber.Topic.All (for { - blocker <- Blocker[IO] publisherProxy <- Resource.suspend(ctx.createPublisher.map(_.bind(frontendUri))) publisher <- Resource.suspend(ctx.createPublisher.map(_.bind(backendUri))) subscriberProxy <- Resource.suspend(ctx.createSubscriber(topic).map(_.connect(backendUri))) @@ -87,7 +90,7 @@ class ProxySpec extends IOSpec { push <- Resource.suspend(ctx.createPush.map(_.connect(controlUri))) control <- Resource.pure[IO, Control[IO]](Control.push(push)) proxy <- ctx.proxy.unidirectional(subscriberProxy, publisherProxy, Some(control)) - _ <- proxy.start(blocker) + _ <- proxy.start(singleThreadContext) } yield (publisher, subscriber, pull)).use((program _).tupled) } @@ -118,7 +121,7 @@ class ProxySpec extends IOSpec { def program(client: Request.Socket[IO], server: Reply.Socket[IO], pull: Pull.Socket[IO]): IO[Assertion] = for { - _ <- Timer[IO].sleep(500.millis) + _ <- IO.sleep(500.millis) _ <- client.send("hello") req <- server.receive[String] _ <- server.send("reply") @@ -137,10 +140,9 @@ class ProxySpec extends IOSpec { (for { (front, back) <- createProxySockets (pull, push) <- createControlSockets - blocker <- Blocker[IO] control <- Resource.pure[IO, Control[IO]](Control.push(push)) proxy <- ctx.proxy.bidirectional(front, back, Some(control), Some(control)) - _ <- proxy.start(blocker) + _ <- proxy.start(singleThreadContext) (client, server) <- createReqRepSockets } yield (client, server, pull)).use((program _).tupled) } @@ -178,7 +180,7 @@ class ProxySpec extends IOSpec { pullOut: Pull.Socket[IO] ): IO[Assertion] = for { - _ <- Timer[IO].sleep(500.millis) + _ <- IO.sleep(500.millis) _ <- client.send("hello") req <- server.receive[String] _ <- server.send("reply") @@ -198,16 +200,15 @@ class ProxySpec extends IOSpec { (front, back) <- createProxySockets (pullIn, pushIn) <- createControlSockets(controlInUri) (pullOut, pushOut) <- createControlSockets(controlOutUri) - blocker <- Blocker[IO] controlIn <- Resource.pure[IO, Control[IO]](Control.push(pushIn)) controlOut <- Resource.pure[IO, Control[IO]](Control.push(pushOut)) proxy <- ctx.proxy.bidirectional(front, back, Some(controlIn), Some(controlOut)) - _ <- proxy.start(blocker) + _ <- proxy.start(singleThreadContext) (client, server) <- createReqRepSockets } yield (client, server, pullIn, pullOut)).use((program _).tupled) } - /* "start new proxy after termination" in withContext() { ctx: Context[IO] => + "start new proxy after termination" in withContext() { ctx: Context[IO] => val frontendUri = inproc"://frontend" val backendUri = inproc"://backend" @@ -224,29 +225,27 @@ class ProxySpec extends IOSpec { } yield (request, reply) def verifyProxy: IO[Assertion] = - createReqRepSockets.use { - case (client, server) => - for { - _ <- Timer[IO].sleep(500.millis) - _ <- client.send("hello") - req <- server.receive[String] - _ <- server.send("reply") - rep <- client.receive[String] - } yield { - req shouldBe "hello" - rep shouldBe "reply" - } + createReqRepSockets.use { case (client, server) => + for { + _ <- IO.sleep(500.millis) + _ <- client.send("hello") + req <- server.receive[String] + _ <- server.send("reply") + rep <- client.receive[String] + } yield { + req shouldBe "hello" + rep shouldBe "reply" + } } - def program(proxy: Proxy.Configured[IO], blocker: Blocker): IO[Assertion] = - proxy.start(blocker).use(_ => verifyProxy) >> proxy.start(blocker).use(_ => verifyProxy) + def program(proxy: Proxy.Configured[IO]): IO[Assertion] = + proxy.start(singleThreadContext).use(_ => verifyProxy) >> proxy.start(singleThreadContext).use(_ => verifyProxy) (for { (front, back) <- createProxySockets - blocker <- Blocker[IO] proxy <- ctx.proxy.bidirectional(front, back) - } yield (proxy, blocker)).use((program _).tupled) - }*/ + } yield proxy).use(program) + } } diff --git a/core/src/test/scala/io/fmq/socket/SocketBehavior.scala b/core/src/test/scala/io/fmq/socket/SocketBehavior.scala index 9c94f7f..1d150c8 100644 --- a/core/src/test/scala/io/fmq/socket/SocketBehavior.scala +++ b/core/src/test/scala/io/fmq/socket/SocketBehavior.scala @@ -1,8 +1,7 @@ package io.fmq package socket -import cats.effect.syntax.effect._ -import cats.effect.{IO, Resource, Sync, Timer} +import cats.effect.{IO, Resource, Sync} import cats.syntax.flatMap._ import cats.syntax.traverse._ import fs2.Stream @@ -16,7 +15,7 @@ import org.scalatest.Assertion import scala.concurrent.duration._ /** - * Tests are using Timer[IO].sleep(200.millis) to fix 'slow-joiner' problem. + * Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem. * More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver */ trait SocketBehavior { @@ -33,7 +32,7 @@ trait SocketBehavior { msg <- consumer.receiveFrame[String] } yield msg shouldBe Frame.Multipart("A", "We would like to see this") - Timer[IO].sleep(200.millis) >> program.toIO + IO.sleep(200.millis) >> program } "bind to specific port" in withContext() { ctx: Context[IO] => @@ -42,8 +41,8 @@ trait SocketBehavior { val resource = for { - producer <- Resource.liftF(socketResource.createProducer(ctx)) - consumer <- Resource.liftF(socketResource.createConsumer(ctx)) + producer <- Resource.eval(socketResource.createProducer(ctx)) + consumer <- Resource.eval(socketResource.createConsumer(ctx)) pair <- socketResource.bind(producer, consumer, port) } yield pair @@ -59,7 +58,7 @@ trait SocketBehavior { result <- collectMessages(consumer, messages.length.toLong) } yield result shouldBe messages - Timer[IO].sleep(200.millis) >> program.toIO + IO.sleep(200.millis) >> program } } @@ -74,7 +73,7 @@ trait SocketBehavior { result <- collectMessages(consumer, messages.length.toLong) } yield result shouldBe messages - Timer[IO].sleep(200.millis) >> program.toIO + IO.sleep(200.millis) >> program } "operate sendTimeout" in withContext() { context: Context[IO] => @@ -171,8 +170,8 @@ trait SocketBehavior { def withRandomPortPair[A](fa: SocketResource.Pair[IO] => IO[A]): A = withContext() { ctx: Context[IO] => (for { - producer <- Resource.liftF(socketResource.createProducer(ctx)) - consumer <- Resource.liftF(socketResource.createConsumer(ctx)) + producer <- Resource.eval(socketResource.createProducer(ctx)) + consumer <- Resource.eval(socketResource.createConsumer(ctx)) pair <- socketResource.bindToRandom(producer, consumer) } yield pair).use(fa) } diff --git a/core/src/test/scala/io/fmq/socket/pipeline/PushPullSpec.scala b/core/src/test/scala/io/fmq/socket/pipeline/PushPullSpec.scala index e5b7690..8f7ad9d 100644 --- a/core/src/test/scala/io/fmq/socket/pipeline/PushPullSpec.scala +++ b/core/src/test/scala/io/fmq/socket/pipeline/PushPullSpec.scala @@ -2,7 +2,7 @@ package io.fmq package socket package pipeline -import cats.effect.{IO, Resource, Sync} +import cats.effect.{IO, Resource} import io.fmq.address._ import io.fmq.socket.SocketBehavior.SocketResource @@ -22,7 +22,7 @@ class PushPullSpec extends IOSpec with SocketBehavior { } - private def tcpSocketResource[F[_]: Sync]: PushPullResource[F] = + private def tcpSocketResource[F[_]]: PushPullResource[F] = new PushPullResource[F] { override def bind(push: Push[F], pull: Pull[F], port: Int): Resource[F, Pair] = { @@ -48,7 +48,7 @@ class PushPullSpec extends IOSpec with SocketBehavior { } - private def inprocSocketResource[F[_]: Sync]: PushPullResource[F] = + private def inprocSocketResource[F[_]]: PushPullResource[F] = new PushPullResource[F] { override def bind(push: Push[F], pull: Pull[F], port: Int): Resource[F, Pair] = { diff --git a/core/src/test/scala/io/fmq/socket/pubsub/PubSubSpec.scala b/core/src/test/scala/io/fmq/socket/pubsub/PubSubSpec.scala index f72a84a..432e0f7 100644 --- a/core/src/test/scala/io/fmq/socket/pubsub/PubSubSpec.scala +++ b/core/src/test/scala/io/fmq/socket/pubsub/PubSubSpec.scala @@ -2,7 +2,7 @@ package io.fmq package socket package pubsub -import cats.effect.{IO, Resource, Sync} +import cats.effect.{IO, Resource} import io.fmq.address._ import io.fmq.socket.SocketBehavior.SocketResource import io.fmq.syntax.literals._ @@ -23,7 +23,7 @@ class PubSubSpec extends IOSpec with SocketBehavior { } - private def tcpSocketResource[F[_]: Sync]: PubSubResource[F] = + private def tcpSocketResource[F[_]]: PubSubResource[F] = new PubSubResource[F] { override def bind(producer: Publisher[F], consumer: Subscriber[F], port: Int): Resource[F, Pair] = { @@ -49,7 +49,7 @@ class PubSubSpec extends IOSpec with SocketBehavior { } - private def inprocSocketResource[F[_]: Sync]: PubSubResource[F] = + private def inprocSocketResource[F[_]]: PubSubResource[F] = new PubSubResource[F] { override def bind(producer: Publisher[F], consumer: Subscriber[F], port: Int): Resource[F, Pair] = { diff --git a/core/src/test/scala/io/fmq/socket/pubsub/SubscriberSpec.scala b/core/src/test/scala/io/fmq/socket/pubsub/SubscriberSpec.scala index 07e6d7f..6659be2 100644 --- a/core/src/test/scala/io/fmq/socket/pubsub/SubscriberSpec.scala +++ b/core/src/test/scala/io/fmq/socket/pubsub/SubscriberSpec.scala @@ -2,8 +2,7 @@ package io.fmq package socket package pubsub -import cats.effect.{IO, Resource, Timer} -import cats.syntax.flatMap._ +import cats.effect.{IO, Resource} import cats.syntax.traverse._ import io.fmq.socket.SocketBehavior.SocketResource import io.fmq.syntax.literals._ @@ -12,7 +11,7 @@ import org.scalatest.Assertion import scala.concurrent.duration._ /** - * Tests are using Timer[IO].sleep(200.millis) to fix 'slow-joiner' problem. + * Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem. * More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver */ class SubscriberSpec extends IOSpec with SocketBehavior { @@ -37,7 +36,7 @@ class SubscriberSpec extends IOSpec with SocketBehavior { def program(producer: ProducerSocket[IO], consumer: ConsumerSocket[IO]): IO[Assertion] = for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- sendA(producer) _ <- sendB(producer) msg1 <- consumer.receive[String] @@ -60,7 +59,7 @@ class SubscriberSpec extends IOSpec with SocketBehavior { val messages = List("0", "my-topic-1", "1", "my-topic2", "my-topic-3") for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- messages.traverse(producer.send[String]) result <- collectMessages(consumer, 3L) } yield result shouldBe List("my-topic-1", "my-topic2", "my-topic-3") @@ -72,7 +71,7 @@ class SubscriberSpec extends IOSpec with SocketBehavior { val messages = List[Array[Byte]](Array(1), Array(2, 1, 3), Array(3, 1, 2), Array(3, 2, 1)) for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- messages.traverse(producer.send[Array[Byte]]) result <- consumer.receive[Array[Byte]] } yield result shouldBe Array[Byte](3, 1, 2) @@ -84,7 +83,7 @@ class SubscriberSpec extends IOSpec with SocketBehavior { val messages = List("0", "my-topic-1", "1", "my-topic2", "my-topic-3") for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- messages.traverse(producer.send[String]) result <- collectMessages(consumer, messages.length.toLong) } yield result shouldBe messages diff --git a/core/src/test/scala/io/fmq/socket/pubsub/XPubXSubSpec.scala b/core/src/test/scala/io/fmq/socket/pubsub/XPubXSubSpec.scala index 3827fdf..1d384d5 100644 --- a/core/src/test/scala/io/fmq/socket/pubsub/XPubXSubSpec.scala +++ b/core/src/test/scala/io/fmq/socket/pubsub/XPubXSubSpec.scala @@ -2,7 +2,7 @@ package io.fmq package socket package pubsub -import cats.effect.{IO, Resource, Timer} +import cats.effect.{IO, Resource} import cats.syntax.traverse._ import io.fmq.frame.Frame import io.fmq.syntax.literals._ @@ -11,7 +11,7 @@ import org.scalatest.Assertion import scala.concurrent.duration._ /** - * Tests are using Timer[IO].sleep(200.millis) to fix 'slow-joiner' problem. + * Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem. * More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver */ class XPubXSubSpec extends IOSpec with SocketBehavior { @@ -22,9 +22,9 @@ class XPubXSubSpec extends IOSpec with SocketBehavior { val XPubXSubSpec.Pair(pub, sub) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- sub.sendSubscribe(Subscriber.Topic.utf8String("A")) - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) subMsg <- pub.receive[Array[Byte]] _ <- pub.sendMultipart(Frame.Multipart("A", "Hello")) msg <- sub.receiveFrame[String] @@ -38,7 +38,7 @@ class XPubXSubSpec extends IOSpec with SocketBehavior { val XPubXSubSpec.Pair(pub, sub) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- sub.send("Message from subscriber") msg1 <- pub.receive[String] _ <- sub.send(Array.emptyByteArray) @@ -53,9 +53,9 @@ class XPubXSubSpec extends IOSpec with SocketBehavior { val XPubXSubSpec.Pair(pub, sub) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- sub.sendSubscribe(Subscriber.Topic.All) - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- pub.send("Hello") msg <- sub.receive[String] } yield msg shouldBe "Hello" @@ -65,7 +65,7 @@ class XPubXSubSpec extends IOSpec with SocketBehavior { val XPubXSubSpec.Pair(pub, sub) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- pub.send("Hello") result <- sub.receiveNoWait[String] } yield result shouldBe empty @@ -78,13 +78,13 @@ class XPubXSubSpec extends IOSpec with SocketBehavior { val messages = topics.map(_ + "1") for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- topics.traverse(topic => sub.sendSubscribe(Subscriber.Topic.utf8String(topic))) - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- messages.traverse(pub.send[String]) received <- collectMessages(sub, 5) _ <- topics.traverse(topic => sub.sendUnsubscribe(Subscriber.Topic.utf8String(topic))) - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- messages.traverse(pub.send[String]) result <- sub.receiveNoWait[String] } yield { @@ -103,10 +103,10 @@ class XPubXSubSpec extends IOSpec with SocketBehavior { val (pub, sub1, sub2) = input for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- topics1.traverse(topic => sub1.sendSubscribe(Subscriber.Topic.utf8String(topic))) _ <- topics2.traverse(topic => sub2.sendSubscribe(Subscriber.Topic.utf8String(topic))) - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- pub.send("AB-1") msg1 <- sub1.receive[String] msg2 <- sub2.receive[String] diff --git a/core/src/test/scala/io/fmq/socket/reqrep/ReqRepSpec.scala b/core/src/test/scala/io/fmq/socket/reqrep/ReqRepSpec.scala index bc8f212..035acb7 100644 --- a/core/src/test/scala/io/fmq/socket/reqrep/ReqRepSpec.scala +++ b/core/src/test/scala/io/fmq/socket/reqrep/ReqRepSpec.scala @@ -2,7 +2,7 @@ package io.fmq package socket package reqrep -import cats.effect.{IO, Resource, Timer} +import cats.effect.{IO, Resource} import cats.syntax.either._ import io.fmq.frame.Frame import io.fmq.socket.reqrep.ReqRepSpec.Pair @@ -11,7 +11,7 @@ import io.fmq.syntax.literals._ import scala.concurrent.duration._ /** - * Tests are using Timer[IO].sleep(200.millis) to fix 'slow-joiner' problem. + * Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem. * More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver */ class ReqRepSpec extends IOSpec with SocketBehavior { @@ -22,7 +22,7 @@ class ReqRepSpec extends IOSpec with SocketBehavior { val Pair(req, rep) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- req.send("Hi") request <- rep.receive[String] _ <- rep.send("Hi2") @@ -37,7 +37,7 @@ class ReqRepSpec extends IOSpec with SocketBehavior { val Pair(req, rep) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- req.sendFrame(Frame.Multipart("Hello", "World")) request <- rep.receiveFrame[String] _ <- rep.sendFrame(Frame.Multipart("Hello", "Back")) @@ -55,7 +55,7 @@ class ReqRepSpec extends IOSpec with SocketBehavior { val Pair(req, _) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- req.send("Hi") result <- req.send("Hi2").attempt } yield result.leftMap(_.getMessage) shouldBe Left("Errno 156384763") @@ -65,7 +65,7 @@ class ReqRepSpec extends IOSpec with SocketBehavior { val Pair(req, _) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) result <- req.receive[Array[Byte]].attempt } yield result.leftMap(_.getMessage) shouldBe Left("Errno 156384763") } @@ -74,7 +74,7 @@ class ReqRepSpec extends IOSpec with SocketBehavior { val Pair(_, rep) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) result <- rep.send("hi").attempt } yield result.leftMap(_.getMessage) shouldBe Left("Errno 156384763") } diff --git a/core/src/test/scala/io/fmq/socket/reqrep/RouterSpec.scala b/core/src/test/scala/io/fmq/socket/reqrep/RouterSpec.scala index 4c2c88f..d0738a6 100644 --- a/core/src/test/scala/io/fmq/socket/reqrep/RouterSpec.scala +++ b/core/src/test/scala/io/fmq/socket/reqrep/RouterSpec.scala @@ -1,6 +1,6 @@ package io.fmq.socket.reqrep -import cats.effect.{IO, Resource, Timer} +import cats.effect.{IO, Resource} import cats.syntax.flatMap._ import io.fmq.{Context, IOSpec} import io.fmq.frame.Frame @@ -12,7 +12,7 @@ import io.fmq.syntax.literals._ import scala.concurrent.duration._ /** - * Tests are using Timer[IO].sleep(200.millis) to fix 'slow-joiner' problem. + * Tests are using IO.sleep(200.millis) to fix 'slow-joiner' problem. * More details: http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver */ class RouterSpec extends IOSpec with SocketBehavior { @@ -23,14 +23,14 @@ class RouterSpec extends IOSpec with SocketBehavior { val Pair(router, dealer, _) = pair for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- dealer.identity _ <- dealer.send("Hello") request <- router.receiveFrame[String] _ <- router.sendFrame(Frame.Multipart("1", "World-1")) _ <- router.sendFrame(Frame.Multipart("2", "World-2")) response1 <- dealer.receiveFrame[String] - _ <- Timer[IO].sleep(100.millis) + _ <- IO.sleep(100.millis) response2 <- dealer.receiveNoWait[String] } yield { request shouldBe Frame.Multipart("1", "Hello") @@ -54,7 +54,7 @@ class RouterSpec extends IOSpec with SocketBehavior { _ <- dealer2.sendFrame(Frame.Multipart("Hello", "World")) message2 <- router.receiveFrame[String] _ <- router.sendFrame(Frame.Multipart("ID", "Response")) - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) response1 <- dealer.receiveNoWait[String] response2 <- dealer2.receiveFrame[String] } yield { @@ -67,7 +67,7 @@ class RouterSpec extends IOSpec with SocketBehavior { for { _ <- router.setHandover(RouterHandover.Handover) - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- dealer.sendFrame(Frame.Multipart("Hello", "World")) identity <- router.receive[String] _ <- IO.delay(identity shouldBe "ID") @@ -82,10 +82,10 @@ class RouterSpec extends IOSpec with SocketBehavior { val uri = tcp_i"://localhost" (for { - router <- Resource.liftF(ctx.createRouter) - dealer <- Resource.liftF(ctx.createDealer) - _ <- Resource.liftF(router.setMandatory(RouterMandatory.NonMandatory)) - _ <- Resource.liftF(dealer.setIdentity(identity)) + router <- Resource.eval(ctx.createRouter) + dealer <- Resource.eval(ctx.createDealer) + _ <- Resource.eval(router.setMandatory(RouterMandatory.NonMandatory)) + _ <- Resource.eval(dealer.setIdentity(identity)) r <- router.bindToRandomPort(uri) d <- dealer.connect(r.uri) } yield Pair(r, d, ctx)).use(fa) diff --git a/docs/concepts/consuming-strategy.md b/docs/concepts/consuming-strategy.md index 714a973..957dab1 100644 --- a/docs/concepts/consuming-strategy.md +++ b/docs/concepts/consuming-strategy.md @@ -3,44 +3,46 @@ id: consuming-strategy title: Consuming strategy --- -You can use ƒMQ with any effect that has an instance of `cats.effect.Sync`: `cats.effect.IO`, `EitherT[IO, Error, *]` and so on. +You can use ƒMQ with any effect that has an instance of `cats.effect.kernel.Sync`: `cats.effect.IO`, `EitherT[IO, Error, *]` and so on. ## The problem The `socket.receive` method blocks the thread until a new message is available. -The `cats.effect.Blocker` allows to evaluate blocking operations on a separate execution context: `blocker.blockOn(socket.receive)`. +The `cats.effect.Async` allows to evaluate blocking operations on a separate execution context: `socker.receive.evalOn(blocker)`. So far so good, but if the expected throughput is high (e.g. 50k per second), you can face a performance degradation due to context switches. There are several ways to solve the problem: -### 1) Call `socket.receive` without `Blocker` +### 1) Call `socket.receive` directly `fs2.Stream.repeatEval(socket.receive).map(msg => handleMessage(msg)` The most straightforward solution. Since the message rate is high, the `socket.receive` operation returns the message almost immediately without blocking. ### 2) Evaluate the program entirely on the blocking context -`blocker.blockOn(fs2.Stream.repeatEval(socket.receive).map(msg => handleMessage(msg).compile.drain)` +`fs2.Stream.repeatEval(socket.receive).map(msg => handleMessage(msg).compile.drain).evalOn(blocker)` -The great disadvantage of this solution is evaluation of the lightweight operations on a blocking context. +The great disadvantage of this approach is evaluation of the lightweight operations on a blocking context. ### 3) Separate consuming operation from the processing `fs2.Stream.repeatEval(socket.receive)` can be evaluated on a blocking context in the background. ```scala mdoc -import cats.effect.syntax.concurrent._ -import cats.effect.{Blocker, Concurrent, ContextShift} +import cats.effect.syntax.async._ +import cats.effect.Async +import cats.effect.std.Queue import fs2.Stream -import fs2.concurrent.Queue import io.fmq.socket.ConsumerSocket -def consume[F[_]: Concurrent: ContextShift](blocker: Blocker, socket: ConsumerSocket[F]): Stream[F, String] = { +import scala.concurrent.ExecutionContext + +def consume[F[_]: Async](blocker: ExecutionContext, socket: ConsumerSocket[F]): Stream[F, String] = { def process(queue: Queue[F, String]) = - blocker.blockOn(Stream.repeatEval(socket.receive[String]).through(queue.enqueue).compile.drain) + Stream.repeatEval(socket.receive[String]).evalMap(queue.offer).compile.drain for { queue <- Stream.eval(Queue.unbounded[F, String]) - _ <- Stream.resource(process(queue).background) - result <- queue.dequeue + _ <- Stream.resource(process(queue).backgroundOn(blocker)) + result <- Stream.repeatEval(queue.take) } yield result } ``` diff --git a/docs/concepts/receiving-sending.md b/docs/concepts/receiving-sending.md index 9c87034..c845e02 100644 --- a/docs/concepts/receiving-sending.md +++ b/docs/concepts/receiving-sending.md @@ -29,7 +29,7 @@ Returns true if message is multipart. ### High-level API The `def receiveFrame[A: FrameDecoder]: F[Frame[A]]` **blocks indefinitely** until a message arrives. -Consumes multipart message automatically. The method returns `Frame.Multipart` if message is multipart, otherwise returns `Frame.Single`. +Consumes a multipart message automatically. The method returns `Frame.Multipart` if message is multipart, otherwise returns `Frame.Single`. ## Sending @@ -52,5 +52,5 @@ Queues a multi-part message to be sent. ### High-level API The `def sendFrame[A: FrameEncoder](frame: Frame[A]): F[Unit]` queues a message to be sent. -Sends multipart message automatically. +Sends a multipart message automatically. diff --git a/docs/examples/poller.md b/docs/examples/poller.md index 90f9079..ef16194 100644 --- a/docs/examples/poller.md +++ b/docs/examples/poller.md @@ -9,12 +9,12 @@ The `ConsumerHandler` is a simple `Kleisli`: `Kleisli[F, ConsumerSocket[F], Unit The best way to use the poller is enqueue messages into the queue: ```scala def handler(queue: Queue[F, String]): ConsumerHandler[F] = - Kleisli(socket => socket.receive[String] >>= queue.enqueue1) + Kleisli(socket => socket.receive[String] >>= queue.offer) ``` Then the `poller.poll` operation can be evaluated on the blocking context: ```scala -blocker.blockOn(poller.poll(pollItems, PollTimeout.Infinity).foreverM) +poller.poll(pollItems, PollTimeout.Infinity).foreverM.evalOn(blocker) ``` Thus all consuming operations is being executed on the one blocking thread, while the processing can be performed on the general context. @@ -26,8 +26,7 @@ The example shows how to use poller with three subscribers. First of all, let's introduce a `Producer` that sends messages with a specific topic: ```scala mdoc:silent -import cats.FlatMap -import cats.effect.Timer +import cats.effect.Async import cats.syntax.flatMap._ import fs2.Stream import io.fmq.frame.Frame @@ -35,10 +34,10 @@ import io.fmq.socket.pubsub.Publisher import scala.concurrent.duration._ -class Producer[F[_]: FlatMap: Timer](publisher: Publisher.Socket[F], topicA: String, topicB: String) { +class Producer[F[_]: Async](publisher: Publisher.Socket[F], topicA: String, topicB: String) { def generate: Stream[F, Unit] = - Stream.repeatEval(sendA >> sendB >> Timer[F].sleep(2000.millis)) + Stream.repeatEval(sendA >> sendB >> Async[F].sleep(2000.millis)) private def sendA: F[Unit] = publisher.sendFrame(Frame.Multipart(topicA, "We don't want to see this")) @@ -53,19 +52,22 @@ And the demo program that evaluates producer and subscribers in parallel: ```scala mdoc:silent import cats.data.{Kleisli, NonEmptyList} -import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync, Timer} +import cats.effect.{Async, Resource} +import cats.effect.syntax.async._ +import cats.effect.std.Queue import cats.syntax.flatMap._ import cats.syntax.functor._ import fs2.Stream -import fs2.concurrent.Queue import io.fmq.Context import io.fmq.poll.{ConsumerHandler, PollEntry, PollTimeout} import io.fmq.socket.pubsub.Subscriber import io.fmq.syntax.literals._ -class Demo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: Blocker) { +import scala.concurrent.ExecutionContext - private def log(message: String): F[Unit] = Sync[F].delay(println(message)) +class Demo[F[_]: Async](context: Context[F], blocker: ExecutionContext) { + + private def log(message: String): F[Unit] = Async[F].delay(println(message)) private val topicA = "my-topic-a" private val topicB = "my-topic-b" @@ -89,7 +91,7 @@ class Demo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: val producer = new Producer[F](publisher, topicA, topicB) def handler(queue: Queue[F, String]): ConsumerHandler[F] = - Kleisli(socket => socket.receive[String] >>= queue.enqueue1) + Kleisli(socket => socket.receive[String] >>= queue.offer) // evaluates poll on a blocking context def poll(queueA: Queue[F, String], queueB: Queue[F, String], queueAll: Queue[F, String]): F[Unit] = { @@ -99,7 +101,7 @@ class Demo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: PollEntry.Read(subscriberAll, handler(queueAll)) ) - blocker.blockOn(poller.poll(items, PollTimeout.Infinity).foreverM[Unit]) + poller.poll(items, PollTimeout.Infinity).foreverM[Unit].evalOn(blocker) } for { @@ -109,9 +111,9 @@ class Demo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: _ <- Stream( producer.generate, Stream.eval(poll(queueA, queueB, queueAll)), - queueA.dequeue.evalMap(frame => log(s"ConsumerA. Received $frame")), - queueB.dequeue.evalMap(frame => log(s"ConsumerB. Received $frame")), - queueAll.dequeue.evalMap(frame => log(s"ConsumerAll. Received $frame")) + Stream.repeatEval(queueA.take).evalMap(frame => log(s"ConsumerA. Received $frame")), + Stream.repeatEval(queueB.take).evalMap(frame => log(s"ConsumerB. Received $frame")), + Stream.repeatEval(queueAll.take).evalMap(frame => log(s"ConsumerAll. Received $frame")) ).parJoinUnbounded } yield () } @@ -119,20 +121,28 @@ class Demo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: } ``` -At the edge of out program we define our effect, `cats.effect.IO` in this case, and ask to evaluate the effects: +At the edge of our program we define our effect, `cats.effect.IO` in this case, and ask to evaluate the effects: ```scala mdoc:silent -import cats.effect.{Blocker, ExitCode, IO, IOApp} +import java.util.concurrent.Executors + +import cats.effect.{IO, IOApp} import cats.syntax.functor._ import io.fmq.Context -object Poller extends IOApp { +import scala.concurrent.ExecutionContext - override def run(args: List[String]): IO[ExitCode] = - Blocker[IO] - .flatMap(blocker => Context.create[IO](ioThreads = 1, blocker).tupleRight(blocker)) - .use { case (ctx, blocker) => new Demo[IO](ctx, blocker).program.compile.drain.as(ExitCode.Success) } +object Poller extends IOApp.Simple { + override def run: IO[Unit] = + blockingContext + .flatMap(blocker => Context.create[IO](ioThreads = 1).tupleRight(blocker)) + .use { case (ctx, blocker) => new Demo[IO](ctx, blocker).program.compile.drain } + + private def blockingContext: Resource[IO, ExecutionContext] = + Resource + .make(IO.delay(Executors.newCachedThreadPool()))(e => IO.delay(e.shutdown())) + .map(ExecutionContext.fromExecutor) } ``` diff --git a/docs/examples/proxy.md b/docs/examples/proxy.md index 9e857cc..f6cc97a 100644 --- a/docs/examples/proxy.md +++ b/docs/examples/proxy.md @@ -19,24 +19,26 @@ Replies shall automatically return to the client that made the original request. First of all, let's introduce a `Server` that replies to the requests: ```scala mdoc:silent -import cats.effect.{Blocker, Concurrent, ContextShift} -import cats.effect.syntax.concurrent._ +import cats.effect.Async +import cats.effect.syntax.async._ +import cats.effect.std.Queue import cats.syntax.flatMap._ import fs2.Stream -import fs2.concurrent.Queue import io.fmq.frame.Frame import io.fmq.socket.reqrep.Reply -class Server[F[_]: Concurrent: ContextShift](socket: Reply.Socket[F], blocker: Blocker) { +import scala.concurrent.ExecutionContext + +class Server[F[_]: Async](socket: Reply.Socket[F], blocker: ExecutionContext) { def serve: Stream[F, Unit] = { def process(queue: Queue[F, Frame[String]]) = - blocker.blockOn(Stream.repeatEval(socket.receiveFrame[String]).through(queue.enqueue).compile.drain) + Stream.repeatEval(socket.receiveFrame[String]).evalMap(queue.offer).compile.drain for { queue <- Stream.eval(Queue.unbounded[F, Frame[String]]) - _ <- Stream.resource(process(queue).background) - result <- queue.dequeue.evalMap(processRequest) + _ <- Stream.resource(process(queue).backgroundOn(blocker)) + result <- Stream.repeatEval(queue.take).evalMap(processRequest) } yield result } @@ -50,7 +52,7 @@ class Server[F[_]: Concurrent: ContextShift](socket: Reply.Socket[F], blocker: B } private def log(message: => String): F[Unit] = - Concurrent[F].delay(println(message)) + Async[F].delay(println(message)) } ``` @@ -58,13 +60,13 @@ class Server[F[_]: Concurrent: ContextShift](socket: Reply.Socket[F], blocker: B Secondly, we need a `Client` that sends requests: ```scala mdoc:silent -import cats.effect.{Sync, Timer} +import cats.effect.Async import fs2.Stream import io.fmq.pattern.RequestReply import scala.concurrent.duration._ -class Client[F[_]: Sync: Timer](dispatcher: RequestReply[F]) { +class Client[F[_]: Async](dispatcher: RequestReply[F]) { def start: Stream[F, Unit] = Stream @@ -77,7 +79,7 @@ class Client[F[_]: Sync: Timer](dispatcher: RequestReply[F]) { .evalMap(response => log(s"Client. Received response $response")) private def log(message: => String): F[Unit] = - Sync[F].delay(println(message)) + Async[F].delay(println(message)) } ``` @@ -85,11 +87,11 @@ class Client[F[_]: Sync: Timer](dispatcher: RequestReply[F]) { Also, we can add a `MessageObserver` that observers all proxied messages: ```scala mdoc:silent -import cats.effect.{Sync, Timer} +import cats.effect.Async import fs2.Stream import io.fmq.socket.pipeline.Pull -class MessageObserver[F[_]: Sync: Timer](pull: Pull.Socket[F]) { +class MessageObserver[F[_]: Async](pull: Pull.Socket[F]) { def start: Stream[F, Unit] = Stream @@ -98,7 +100,7 @@ class MessageObserver[F[_]: Sync: Timer](pull: Pull.Socket[F]) { .drain private def log(message: => String): F[Unit] = - Sync[F].delay(println(message)) + Async[F].delay(println(message)) } ``` @@ -106,7 +108,7 @@ class MessageObserver[F[_]: Sync: Timer](pull: Pull.Socket[F]) { And the `ProxyDemo` to put everything together: ```scala mdoc:silent -import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Timer} +import cats.effect.{Async, Resource} import cats.syntax.flatMap._ import cats.syntax.functor._ import fs2.Stream @@ -118,7 +120,9 @@ import io.fmq.socket.pipeline.{Pull, Push} import io.fmq.socket.reqrep.{Dealer, Reply, Request, Router} import io.fmq.syntax.literals._ -class ProxyDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: Blocker) { +import scala.concurrent.ExecutionContext + +class ProxyDemo[F[_]: Async](context: Context[F], blocker: ExecutionContext) { private val frontendUri = inproc"://frontend" private val backendUri = inproc"://backend" @@ -164,20 +168,28 @@ class ProxyDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], bloc } ``` -At the edge of out program we define our effect, `cats.effect.IO` in this case, and ask to evaluate the effects: +At the edge of our program we define our effect, `cats.effect.IO` in this case, and ask to evaluate the effects: ```scala mdoc:silent -import cats.effect.{Blocker, ExitCode, IO, IOApp} +import java.util.concurrent.Executors + +import cats.effect.{IO, IOApp} import cats.syntax.functor._ import io.fmq.Context -object ProxyApp extends IOApp { +import scala.concurrent.ExecutionContext - override def run(args: List[String]): IO[ExitCode] = - Blocker[IO] - .flatMap(blocker => Context.create[IO](ioThreads = 1, blocker).tupleRight(blocker)) - .use { case (ctx, blocker) => new ProxyDemo[IO](ctx, blocker).program.compile.drain.as(ExitCode.Success) } +object ProxyApp extends IOApp.Simple { + override def run: IO[Unit] = + blockingContext + .flatMap(blocker => Context.create[IO](ioThreads = 1).tupleRight(blocker)) + .use { case (ctx, blocker) => new ProxyDemo[IO](ctx, blocker).program.compile.drain } + + private def blockingContext: Resource[IO, ExecutionContext] = + Resource + .make(IO.delay(Executors.newCachedThreadPool()))(e => IO.delay(e.shutdown())) + .map(ExecutionContext.fromExecutor) } ``` diff --git a/docs/examples/pubsub.md b/docs/examples/pubsub.md index ca281fc..0fc5ae6 100644 --- a/docs/examples/pubsub.md +++ b/docs/examples/pubsub.md @@ -8,8 +8,7 @@ The example shows how to create one publisher and three subscribers with differe First of all, let's introduce a `Producer` that sends messages with a specific topic: ```scala mdoc:silent -import cats.FlatMap -import cats.effect.Timer +import cats.effect.Async import cats.syntax.flatMap._ import cats.syntax.functor._ import fs2.Stream @@ -18,10 +17,10 @@ import io.fmq.socket.pubsub.Publisher import scala.concurrent.duration._ -class Producer[F[_]: FlatMap: Timer](publisher: Publisher.Socket[F], topicA: String, topicB: String) { +class Producer[F[_]: Async](publisher: Publisher.Socket[F], topicA: String, topicB: String) { def generate: Stream[F, Unit] = - Stream.repeatEval(sendA >> sendB >> Timer[F].sleep(2000.millis)) + Stream.repeatEval(sendA >> sendB >> Async[F].sleep(2000.millis)) private def sendA: F[Unit] = publisher.sendFrame(Frame.Multipart(topicA, "We don't want to see this")) @@ -35,23 +34,25 @@ class Producer[F[_]: FlatMap: Timer](publisher: Publisher.Socket[F], topicA: Str Then let's implement a consumer logic: ```scala mdoc:silent -import cats.effect.{Blocker, Concurrent, ContextShift} -import cats.effect.syntax.concurrent._ +import cats.effect.Async +import cats.effect.syntax.async._ +import cats.effect.std.Queue import fs2.Stream -import fs2.concurrent.Queue import io.fmq.frame.Frame import io.fmq.socket.pubsub.Subscriber -class Consumer[F[_]: Concurrent: ContextShift](socket: Subscriber.Socket[F], blocker: Blocker) { +import scala.concurrent.ExecutionContext + +class Consumer[F[_]: Async](socket: Subscriber.Socket[F], blocker: ExecutionContext) { def consume: Stream[F, Frame[String]] = { def process(queue: Queue[F, Frame[String]]) = - blocker.blockOn(Stream.repeatEval(socket.receiveFrame[String]).through(queue.enqueue).compile.drain) + Stream.repeatEval(socket.receiveFrame[String]).evalMap(queue.offer).compile.drain for { queue <- Stream.eval(Queue.unbounded[F, Frame[String]]) - _ <- Stream.resource(process(queue).background) - result <- queue.dequeue + _ <- Stream.resource(process(queue).backgroundOn(blocker)) + result <- Stream.repeatEval(queue.take) } yield result } @@ -61,13 +62,15 @@ class Consumer[F[_]: Concurrent: ContextShift](socket: Subscriber.Socket[F], blo And the demo program that evaluates producer and subscribers in parallel: ```scala mdoc:silent -import cats.effect.{Concurrent, ContextShift, Resource, Sync, Timer} +import cats.effect.{Resource, Sync} import fs2.Stream import io.fmq.Context import io.fmq.socket.pubsub.Subscriber import io.fmq.syntax.literals._ -class Demo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: Blocker) { +import scala.concurrent.ExecutionContext + +class Demo[F[_]: Async](context: Context[F], blocker: ExecutionContext) { private def log(message: String): F[Unit] = Sync[F].delay(println(message)) @@ -105,20 +108,28 @@ class Demo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: } ``` -At the edge of out program we define our effect, `cats.effect.IO` in this case, and ask to evaluate the effects: +At the edge of our program we define our effect, `cats.effect.IO` in this case, and ask to evaluate the effects: ```scala mdoc:silent -import cats.effect.{Blocker, ExitCode, IO, IOApp} +import java.util.concurrent.Executors + +import cats.effect.{IO, IOApp} import cats.syntax.functor._ import io.fmq.Context -object PubSub extends IOApp { +import scala.concurrent.ExecutionContext + +object PubSub extends IOApp.Simple { - override def run(args: List[String]): IO[ExitCode] = - Blocker[IO] - .flatMap(blocker => Context.create[IO](ioThreads = 1, blocker).tupleRight(blocker)) - .use { case (ctx, blocker) => new Demo[IO](ctx, blocker).program.compile.drain.as(ExitCode.Success) } + override def run: IO[Unit] = + blockingContext + .flatMap(blocker => Context.create[IO](ioThreads = 1).tupleRight(blocker)) + .use { case (ctx, blocker) => new Demo[IO](ctx, blocker).program.compile.drain } + private def blockingContext: Resource[IO, ExecutionContext] = + Resource + .make(IO.delay(Executors.newCachedThreadPool()))(e => IO.delay(e.shutdown())) + .map(ExecutionContext.fromExecutor) } ``` diff --git a/docs/sockets/publisher.md b/docs/sockets/publisher.md index 5ca0f46..f6afd64 100644 --- a/docs/sockets/publisher.md +++ b/docs/sockets/publisher.md @@ -10,19 +10,14 @@ The publisher socket can only publish messages into the queue. The publisher can be created within the `Context`. ```scala mdoc:silent -import cats.effect.{Blocker, ContextShift, Resource, IO} +import cats.effect.{Resource, IO} import io.fmq.Context import io.fmq.socket.pubsub.Publisher -import scala.concurrent.ExecutionContext - -implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - val publisherResource: Resource[IO, Publisher[IO]] = for { - blocker <- Blocker[IO] - context <- Context.create[IO](1, blocker) - publisher <- Resource.liftF(context.createPublisher) + context <- Context.create[IO](1) + publisher <- Resource.eval(context.createPublisher) } yield publisher ``` @@ -68,7 +63,6 @@ def configurePublisher(publisher: Publisher[IO]): IO[Unit] = Only connected publisher can send messages: ```scala mdoc:silent -import cats.syntax.flatMap._ import io.fmq.frame.Frame def sendSingleMessage(publisher: Publisher.Socket[IO]): IO[Unit] = diff --git a/docs/sockets/subscriber.md b/docs/sockets/subscriber.md index e5a049e..97f232c 100644 --- a/docs/sockets/subscriber.md +++ b/docs/sockets/subscriber.md @@ -10,26 +10,20 @@ The subscriber socket can subscribe to a specific topic and can only receive mes The subscriber can be created within the `Context`. ```scala mdoc:silent -import cats.effect.{Blocker, ContextShift, Resource, IO} +import cats.effect.{Resource, IO} import io.fmq.Context import io.fmq.socket.pubsub.Subscriber -import scala.concurrent.ExecutionContext - -implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - val topicSubscriberResource: Resource[IO, Subscriber[IO]] = for { - blocker <- Blocker[IO] - context <- Context.create[IO](1, blocker) - subscriber <- Resource.liftF(context.createSubscriber(Subscriber.Topic.utf8String("my-topic"))) + context <- Context.create[IO](1) + subscriber <- Resource.eval(context.createSubscriber(Subscriber.Topic.utf8String("my-topic"))) } yield subscriber val allSubscriberResource: Resource[IO, Subscriber[IO]] = for { - blocker <- Blocker[IO] - context <- Context.create[IO](1, blocker) - subscriber <- Resource.liftF(context.createSubscriber(Subscriber.Topic.All)) + context <- Context.create[IO](1) + subscriber <- Resource.eval(context.createSubscriber(Subscriber.Topic.All)) } yield subscriber ``` diff --git a/examples/src/main/scala/io/fmq/examples/poller/Poller.scala b/examples/src/main/scala/io/fmq/examples/poller/Poller.scala index f446e2c..8473963 100644 --- a/examples/src/main/scala/io/fmq/examples/poller/Poller.scala +++ b/examples/src/main/scala/io/fmq/examples/poller/Poller.scala @@ -1,12 +1,14 @@ package io.fmq.examples.poller -import cats.FlatMap +import java.util.concurrent.Executors + import cats.data.{Kleisli, NonEmptyList} -import cats.effect.{Blocker, Concurrent, ContextShift, ExitCode, IO, IOApp, Resource, Sync, Timer} +import cats.effect.std.Queue +import cats.effect.syntax.async._ +import cats.effect._ import cats.syntax.flatMap._ import cats.syntax.functor._ import fs2.Stream -import fs2.concurrent.Queue import io.fmq.Context import io.fmq.frame.Frame import io.fmq.poll.{ConsumerHandler, PollEntry, PollTimeout} @@ -14,20 +16,26 @@ import io.fmq.socket.ProducerSocket import io.fmq.socket.pubsub.Subscriber import io.fmq.syntax.literals._ +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -object PollerApp extends IOApp { +object PollerApp extends IOApp.Simple { + + override def run: IO[Unit] = + blockingContext + .flatMap(blocker => Context.create[IO](ioThreads = 1).tupleRight(blocker)) + .use { case (ctx, blocker) => new PollerDemo[IO](ctx, blocker).program.compile.drain } - override def run(args: List[String]): IO[ExitCode] = - Blocker[IO] - .flatMap(blocker => Context.create[IO](ioThreads = 1, blocker).tupleRight(blocker)) - .use { case (ctx, blocker) => new PollerDemo[IO](ctx, blocker).program.compile.drain.as(ExitCode.Success) } + private def blockingContext: Resource[IO, ExecutionContext] = + Resource + .make(IO.delay(Executors.newSingleThreadExecutor()))(e => IO.delay(e.shutdown())) + .map(ExecutionContext.fromExecutor) } -class PollerDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: Blocker) { +class PollerDemo[F[_]: Async](context: Context[F], blocker: ExecutionContext) { - private def log(message: String): F[Unit] = Sync[F].delay(println(message)) + private def log(message: String): F[Unit] = Async[F].delay(println(message)) private val topicA = "my-topic-a" private val topicB = "my-topic-b" @@ -51,7 +59,7 @@ class PollerDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blo val producer = new Producer[F](publisher, topicA, topicB) def handler(queue: Queue[F, String]): ConsumerHandler[F] = - Kleisli(socket => socket.receive[String] >>= queue.enqueue1) + Kleisli(socket => socket.receive[String] >>= queue.offer) // evaluates poll on a blocking context def poll(queueA: Queue[F, String], queueB: Queue[F, String], queueAll: Queue[F, String]): F[Unit] = { @@ -61,7 +69,7 @@ class PollerDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blo PollEntry.Read(subscriberAll, handler(queueAll)) ) - blocker.blockOn(poller.poll(items, PollTimeout.Infinity).foreverM[Unit]) + poller.poll(items, PollTimeout.Infinity).foreverM[Unit].evalOn(blocker) } for { @@ -71,19 +79,19 @@ class PollerDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blo _ <- Stream( producer.generate, Stream.eval(poll(queueA, queueB, queueAll)), - queueA.dequeue.evalMap(frame => log(s"ConsumerA. Received $frame")), - queueB.dequeue.evalMap(frame => log(s"ConsumerB. Received $frame")), - queueAll.dequeue.evalMap(frame => log(s"ConsumerAll. Received $frame")) + Stream.repeatEval(queueA.take).evalMap(frame => log(s"ConsumerA. Received $frame")), + Stream.repeatEval(queueB.take).evalMap(frame => log(s"ConsumerB. Received $frame")), + Stream.repeatEval(queueAll.take).evalMap(frame => log(s"ConsumerAll. Received $frame")) ).parJoinUnbounded } yield () } } -class Producer[F[_]: FlatMap: Timer](publisher: ProducerSocket[F], topicA: String, topicB: String) { +class Producer[F[_]: Async](publisher: ProducerSocket[F], topicA: String, topicB: String) { def generate: Stream[F, Unit] = - Stream.repeatEval(sendA >> sendB >> Timer[F].sleep(2000.millis)) + Stream.repeatEval(sendA >> sendB >> Async[F].sleep(2000.millis)) private def sendA: F[Unit] = publisher.sendMultipart(Frame.Multipart(topicA, "We don't want to see this")) diff --git a/examples/src/main/scala/io/fmq/examples/proxy/Proxy.scala b/examples/src/main/scala/io/fmq/examples/proxy/Proxy.scala index 5fe2256..c064bdc 100644 --- a/examples/src/main/scala/io/fmq/examples/proxy/Proxy.scala +++ b/examples/src/main/scala/io/fmq/examples/proxy/Proxy.scala @@ -1,11 +1,13 @@ package io.fmq.examples.proxy -import cats.effect.{Blocker, Concurrent, ContextShift, ExitCode, IO, IOApp, Resource, Sync, Timer} -import cats.effect.syntax.concurrent._ +import java.util.concurrent.Executors + +import cats.effect.std.Queue +import cats.effect.syntax.async._ +import cats.effect._ import cats.syntax.flatMap._ import cats.syntax.functor._ import fs2.Stream -import fs2.concurrent.Queue import io.fmq.Context import io.fmq.frame.Frame import io.fmq.options.Identity @@ -15,18 +17,24 @@ import io.fmq.socket.pipeline.{Pull, Push} import io.fmq.socket.reqrep.{Dealer, Reply, Request, Router} import io.fmq.syntax.literals._ +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -object ProxyApp extends IOApp { +object ProxyApp extends IOApp.Simple { + + override def run: IO[Unit] = + blockingContext + .flatMap(blocker => Context.create[IO](ioThreads = 1).tupleRight(blocker)) + .use { case (ctx, blocker) => new ProxyDemo[IO](ctx, blocker).program.compile.drain } - override def run(args: List[String]): IO[ExitCode] = - Blocker[IO] - .flatMap(blocker => Context.create[IO](ioThreads = 1, blocker).tupleRight(blocker)) - .use { case (ctx, blocker) => new ProxyDemo[IO](ctx, blocker).program.compile.drain.as(ExitCode.Success) } + private def blockingContext: Resource[IO, ExecutionContext] = + Resource + .make(IO.delay(Executors.newCachedThreadPool()))(e => IO.delay(e.shutdown())) + .map(ExecutionContext.fromExecutor) } -class ProxyDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: Blocker) { +class ProxyDemo[F[_]: Async](context: Context[F], blocker: ExecutionContext) { private val frontendUri = inproc"://frontend" private val backendUri = inproc"://backend" @@ -72,16 +80,16 @@ class ProxyDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], bloc } @SuppressWarnings(Array("org.wartremover.warts.StringPlusAny")) -class Server[F[_]: Concurrent: ContextShift](socket: Reply.Socket[F], blocker: Blocker) { +class Server[F[_]: Async](socket: Reply.Socket[F], blocker: ExecutionContext) { def serve: Stream[F, Unit] = { def process(queue: Queue[F, Frame[String]]) = - blocker.blockOn(Stream.repeatEval(socket.receiveFrame[String]).through(queue.enqueue).compile.drain) + Stream.repeatEval(socket.receiveFrame[String]).evalMap(queue.offer).compile.drain for { queue <- Stream.eval(Queue.unbounded[F, Frame[String]]) - _ <- Stream.resource(process(queue).background) - result <- queue.dequeue.evalMap(processRequest) + _ <- Stream.resource(process(queue).backgroundOn(blocker)) + result <- Stream.repeatEval(queue.take).evalMap(processRequest) } yield result } @@ -95,12 +103,12 @@ class Server[F[_]: Concurrent: ContextShift](socket: Reply.Socket[F], blocker: B } private def log(message: => String): F[Unit] = - Concurrent[F].delay(println(message)) + Async[F].delay(println(message)) } @SuppressWarnings(Array("org.wartremover.warts.StringPlusAny")) -class Client[F[_]: Sync: Timer](dispatcher: RequestReply[F]) { +class Client[F[_]: Async](dispatcher: RequestReply[F]) { def start: Stream[F, Unit] = Stream @@ -113,12 +121,12 @@ class Client[F[_]: Sync: Timer](dispatcher: RequestReply[F]) { .evalMap(response => log(s"Client. Received response $response")) private def log(message: => String): F[Unit] = - Sync[F].delay(println(message)) + Async[F].delay(println(message)) } @SuppressWarnings(Array("org.wartremover.warts.StringPlusAny")) -class MessageObserver[F[_]: Sync: Timer](pull: Pull.Socket[F]) { +class MessageObserver[F[_]: Async](pull: Pull.Socket[F]) { def start: Stream[F, Unit] = Stream @@ -127,6 +135,6 @@ class MessageObserver[F[_]: Sync: Timer](pull: Pull.Socket[F]) { .drain private def log(message: => String): F[Unit] = - Sync[F].delay(println(message)) + Async[F].delay(println(message)) } diff --git a/examples/src/main/scala/io/fmq/examples/pubsub/PubSub.scala b/examples/src/main/scala/io/fmq/examples/pubsub/PubSub.scala index 4b6ce48..3e30cc1 100644 --- a/examples/src/main/scala/io/fmq/examples/pubsub/PubSub.scala +++ b/examples/src/main/scala/io/fmq/examples/pubsub/PubSub.scala @@ -1,33 +1,40 @@ package io.fmq.examples.pubsub -import cats.FlatMap -import cats.effect.syntax.concurrent._ -import cats.effect.{Blocker, Concurrent, ContextShift, ExitCode, IO, IOApp, Resource, Sync, Timer} +import java.util.concurrent.Executors + +import cats.effect.std.Queue +import cats.effect.syntax.async._ +import cats.effect._ import cats.syntax.flatMap._ import cats.syntax.functor._ import fs2.Stream -import fs2.concurrent.Queue import io.fmq.Context import io.fmq.frame.Frame import io.fmq.socket.pubsub.Subscriber import io.fmq.socket.{ConsumerSocket, ProducerSocket} import io.fmq.syntax.literals._ +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -object PubSubApp extends IOApp { +object PubSubApp extends IOApp.Simple { + + override def run: IO[Unit] = + blockingContext + .flatMap(blocker => Context.create[IO](ioThreads = 1).tupleRight(blocker)) + .use { case (ctx, blocker) => new PubSubDemo[IO](ctx, blocker).program.compile.drain } - override def run(args: List[String]): IO[ExitCode] = - Blocker[IO] - .flatMap(blocker => Context.create[IO](ioThreads = 1, blocker).tupleRight(blocker)) - .use { case (ctx, blocker) => new PubSubDemo[IO](ctx, blocker).program.compile.drain.as(ExitCode.Success) } + private def blockingContext: Resource[IO, ExecutionContext] = + Resource + .make(IO.delay(Executors.newCachedThreadPool()))(e => IO.delay(e.shutdown())) + .map(ExecutionContext.fromExecutor) } @SuppressWarnings(Array("org.wartremover.warts.StringPlusAny")) -class PubSubDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: Blocker) { +class PubSubDemo[F[_]: Async](context: Context[F], blocker: ExecutionContext) { - private def log(message: String): F[Unit] = Sync[F].delay(println(message)) + private def log(message: String): F[Unit] = Async[F].delay(println(message)) private val topicA = "my-topic-a" private val topicB = "my-topic-b" @@ -62,10 +69,10 @@ class PubSubDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blo } -class Producer[F[_]: FlatMap: Timer](publisher: ProducerSocket[F], topicA: String, topicB: String) { +class Producer[F[_]: Async](publisher: ProducerSocket[F], topicA: String, topicB: String) { def generate: Stream[F, Unit] = - Stream.repeatEval(sendA >> sendB >> Timer[F].sleep(2000.millis)) + Stream.repeatEval(sendA >> sendB >> Async[F].sleep(2000.millis)) private def sendA: F[Unit] = publisher.sendMultipart(Frame.Multipart(topicA, "We don't want to see this")) @@ -75,16 +82,16 @@ class Producer[F[_]: FlatMap: Timer](publisher: ProducerSocket[F], topicA: Strin } -class Consumer[F[_]: Concurrent: ContextShift](socket: ConsumerSocket[F], blocker: Blocker) { +class Consumer[F[_]: Async](socket: ConsumerSocket[F], blocker: ExecutionContext) { def consume: Stream[F, Frame[String]] = { def process(queue: Queue[F, Frame[String]]) = - blocker.blockOn(Stream.repeatEval(socket.receiveFrame[String]).through(queue.enqueue).compile.drain) + Stream.repeatEval(socket.receiveFrame[String]).evalMap(queue.offer).compile.drain for { queue <- Stream.eval(Queue.unbounded[F, Frame[String]]) - _ <- Stream.resource(process(queue).background) - result <- queue.dequeue + _ <- Stream.resource(process(queue).backgroundOn(blocker)) + result <- Stream.repeatEval(queue.take) } yield result } diff --git a/examples/src/main/scala/io/fmq/examples/reqrep/ReqRep.scala b/examples/src/main/scala/io/fmq/examples/reqrep/ReqRep.scala index fc80b57..531844b 100644 --- a/examples/src/main/scala/io/fmq/examples/reqrep/ReqRep.scala +++ b/examples/src/main/scala/io/fmq/examples/reqrep/ReqRep.scala @@ -1,29 +1,37 @@ package io.fmq.examples.reqrep -import cats.effect.syntax.concurrent._ -import cats.effect.{Blocker, Concurrent, ContextShift, ExitCode, IO, IOApp, Resource, Sync, Timer} +import java.util.concurrent.Executors + +import cats.effect.std.Queue +import cats.effect.syntax.async._ +import cats.effect._ import cats.syntax.flatMap._ import cats.syntax.functor._ import fs2.Stream -import fs2.concurrent.Queue import io.fmq.Context import io.fmq.frame.Frame import io.fmq.pattern.RequestReply import io.fmq.socket.reqrep.Reply import io.fmq.syntax.literals._ +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -object ReqRepApp extends IOApp { +object ReqRepApp extends IOApp.Simple { + + override def run: IO[Unit] = + blockingContext + .flatMap(blocker => Context.create[IO](ioThreads = 1).tupleRight(blocker)) + .use { case (ctx, blocker) => new ReqRepDemo[IO](ctx, blocker).program.compile.drain } - override def run(args: List[String]): IO[ExitCode] = - Blocker[IO] - .flatMap(blocker => Context.create[IO](ioThreads = 1, blocker).tupleRight(blocker)) - .use { case (ctx, blocker) => new ReqRepDemo[IO](ctx, blocker).program.compile.drain.as(ExitCode.Success) } + private def blockingContext: Resource[IO, ExecutionContext] = + Resource + .make(IO.delay(Executors.newCachedThreadPool()))(e => IO.delay(e.shutdown())) + .map(ExecutionContext.fromExecutor) } -class ReqRepDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blocker: Blocker) { +class ReqRepDemo[F[_]: Async](context: Context[F], blocker: ExecutionContext) { private val uri = tcp_i"://localhost" @@ -48,16 +56,16 @@ class ReqRepDemo[F[_]: Concurrent: ContextShift: Timer](context: Context[F], blo } @SuppressWarnings(Array("org.wartremover.warts.StringPlusAny")) -class Server[F[_]: Concurrent: ContextShift](socket: Reply.Socket[F], blocker: Blocker) { +class Server[F[_]: Async](socket: Reply.Socket[F], blocker: ExecutionContext) { def serve: Stream[F, Unit] = { def process(queue: Queue[F, Frame[String]]) = - blocker.blockOn(Stream.repeatEval(socket.receiveFrame[String]).through(queue.enqueue).compile.drain) + Stream.repeatEval(socket.receiveFrame[String]).evalMap(queue.offer).compile.drain for { queue <- Stream.eval(Queue.unbounded[F, Frame[String]]) - _ <- Stream.resource(process(queue).background) - result <- queue.dequeue.evalMap(processRequest) + _ <- Stream.resource(process(queue).backgroundOn(blocker)) + result <- Stream.repeatEval(queue.take).evalMap(processRequest) } yield result } @@ -71,12 +79,12 @@ class Server[F[_]: Concurrent: ContextShift](socket: Reply.Socket[F], blocker: B } private def log(message: => String): F[Unit] = - Sync[F].delay(println(message)) + Async[F].delay(println(message)) } @SuppressWarnings(Array("org.wartremover.warts.StringPlusAny")) -class Client[F[_]: Sync: Timer](dispatcher: RequestReply[F]) { +class Client[F[_]: Async](dispatcher: RequestReply[F]) { def start: Stream[F, Unit] = Stream @@ -89,6 +97,6 @@ class Client[F[_]: Sync: Timer](dispatcher: RequestReply[F]) { .evalMap(response => log(s"Client. Received response $response")) private def log(message: => String): F[Unit] = - Sync[F].delay(println(message)) + Async[F].delay(println(message)) } diff --git a/extras/src/main/scala/io/fmq/pattern/BackgroundConsumer.scala b/extras/src/main/scala/io/fmq/pattern/BackgroundConsumer.scala index baa2b89..720f7c6 100644 --- a/extras/src/main/scala/io/fmq/pattern/BackgroundConsumer.scala +++ b/extras/src/main/scala/io/fmq/pattern/BackgroundConsumer.scala @@ -1,29 +1,31 @@ package io.fmq.pattern -import cats.effect.syntax.concurrent._ -import cats.effect.{Blocker, Concurrent, ContextShift} +import cats.effect.kernel.Async +import cats.effect.std.Queue +import cats.effect.syntax.async._ import fs2.Stream -import fs2.concurrent.Queue import io.fmq.frame.{Frame, FrameDecoder} import io.fmq.socket.ConsumerSocket +import scala.concurrent.ExecutionContext + object BackgroundConsumer { /** - * Consumes messages in background on a dedicated blocking thread + * Consumes messages in background on a dedicated blocking execution context */ - def consume[F[_]: Concurrent: ContextShift, A: FrameDecoder]( - blocker: Blocker, + def consume[F[_]: Async, A: FrameDecoder]( + blocker: ExecutionContext, socket: ConsumerSocket[F], queueSize: Int ): Stream[F, Frame[A]] = { def process(queue: Queue[F, Frame[A]]): F[Unit] = - blocker.blockOn(Stream.repeatEval(socket.receiveFrame[A]).through(queue.enqueue).compile.drain) + Stream.repeatEval(socket.receiveFrame[A]).evalMap(queue.offer).compile.drain for { queue <- Stream.eval(Queue.bounded[F, Frame[A]](queueSize)) - _ <- Stream.resource(process(queue).background) - result <- queue.dequeue + _ <- Stream.resource(process(queue).backgroundOn(blocker)) + result <- Stream.repeatEval(queue.take) } yield result } diff --git a/extras/src/main/scala/io/fmq/pattern/RequestReply.scala b/extras/src/main/scala/io/fmq/pattern/RequestReply.scala index 8b496d5..8695d56 100644 --- a/extras/src/main/scala/io/fmq/pattern/RequestReply.scala +++ b/extras/src/main/scala/io/fmq/pattern/RequestReply.scala @@ -1,15 +1,16 @@ package io.fmq.pattern -import cats.effect.concurrent.Deferred -import cats.effect.syntax.concurrent._ -import cats.effect.{Blocker, Concurrent, ContextShift, Resource} +import cats.effect.std.Queue +import cats.effect.syntax.async._ +import cats.effect.{Async, Deferred, Resource} import cats.syntax.flatMap._ import cats.syntax.functor._ -import fs2.concurrent.Queue import io.fmq.frame.{Frame, FrameDecoder, FrameEncoder} import io.fmq.socket.reqrep.Request -class RequestReply[F[_]: Concurrent] private ( +import scala.concurrent.ExecutionContext + +class RequestReply[F[_]: Async] private ( socket: Request.Socket[F], requestQueue: Queue[F, F[Unit]] ) { @@ -25,7 +26,7 @@ class RequestReply[F[_]: Concurrent] private ( for { promise <- Deferred[F, Frame[Rep]] - _ <- requestQueue.enqueue1(background(promise)) + _ <- requestQueue.offer(background(promise)) result <- promise.get } yield result } @@ -34,14 +35,14 @@ class RequestReply[F[_]: Concurrent] private ( object RequestReply { - def create[F[_]: Concurrent: ContextShift]( - blocker: Blocker, + def create[F[_]: Async]( + blocker: ExecutionContext, socket: Request.Socket[F], queueSize: Int ): Resource[F, RequestReply[F]] = for { - queue <- Resource.liftF(Queue.bounded[F, F[Unit]](queueSize)) - _ <- blocker.blockOn(queue.dequeue.evalMap(identity).compile.drain).background + queue <- Resource.eval(Queue.bounded[F, F[Unit]](queueSize)) + _ <- fs2.Stream.repeatEval(queue.take).evalMap(identity).compile.drain.backgroundOn(blocker) } yield new RequestReply[F](socket, queue) } diff --git a/extras/src/test/scala/io/fmq/IOSpec.scala b/extras/src/test/scala/io/fmq/IOSpec.scala index bed4f5a..34e2ed7 100644 --- a/extras/src/test/scala/io/fmq/IOSpec.scala +++ b/extras/src/test/scala/io/fmq/IOSpec.scala @@ -1,27 +1,24 @@ package io.fmq -import cats.effect.syntax.effect._ -import cats.effect.{Blocker, ContextShift, Effect, IO, Sync, Timer} +import cats.effect.IO +import cats.effect.unsafe.IORuntime import org.scalatest.OptionValues._ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ trait IOSpec extends AnyWordSpecLike with Matchers { - protected implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - protected implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val runtime: IORuntime = IORuntime.global @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) - protected def withContext[F[_]: Sync: ContextShift: Effect, A]( - timeout: FiniteDuration = 3.seconds - )(fa: Context[F] => F[A]): A = - Blocker[F] - .flatMap(blocker => Context.create[F](1, blocker)) + protected def withContext[A]( + timeout: FiniteDuration = 30.seconds + )(fa: Context[IO] => IO[A]): A = + Context + .create[IO](1) .use(fa) - .toIO .unsafeRunTimed(timeout) .value diff --git a/extras/src/test/scala/io/fmq/pattern/BackgroundConsumerSpec.scala b/extras/src/test/scala/io/fmq/pattern/BackgroundConsumerSpec.scala index 4913ad2..2c7b8a8 100644 --- a/extras/src/test/scala/io/fmq/pattern/BackgroundConsumerSpec.scala +++ b/extras/src/test/scala/io/fmq/pattern/BackgroundConsumerSpec.scala @@ -1,14 +1,17 @@ package io.fmq.pattern -import cats.effect.{Blocker, IO, Resource, Timer} -import io.fmq.{Context, IOSpec} +import java.util.concurrent.Executors + +import cats.effect.{IO, Resource} import io.fmq.frame.Frame -import io.fmq.socket.{ConsumerSocket, ProducerSocket} -import io.fmq.socket.pubsub.Subscriber import io.fmq.pattern.BackgroundConsumerSpec.Pair +import io.fmq.socket.pubsub.Subscriber +import io.fmq.socket.{ConsumerSocket, ProducerSocket} import io.fmq.syntax.literals._ +import io.fmq.{Context, IOSpec} import org.scalatest.Assertion +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ class BackgroundConsumerSpec extends IOSpec { @@ -18,15 +21,18 @@ class BackgroundConsumerSpec extends IOSpec { "consume messages" in withSockets { pair => val Pair(publisher, subscriber) = pair - def program(blocker: Blocker): IO[Assertion] = + def program(blocker: ExecutionContext): IO[Assertion] = for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) _ <- publisher.send("hello") _ <- publisher.send("world") messages <- BackgroundConsumer.consume[IO, String](blocker, subscriber, 128).take(2).compile.toList } yield messages shouldBe List(Frame.Single("hello"), Frame.Single("world")) - Blocker[IO].use(program) + Resource + .make(IO.delay(Executors.newCachedThreadPool()))(e => IO.delay(e.shutdown())) + .map(ExecutionContext.fromExecutor) + .use(program) } } diff --git a/extras/src/test/scala/io/fmq/pattern/RequestReplySpec.scala b/extras/src/test/scala/io/fmq/pattern/RequestReplySpec.scala index 8189b6e..521dce0 100644 --- a/extras/src/test/scala/io/fmq/pattern/RequestReplySpec.scala +++ b/extras/src/test/scala/io/fmq/pattern/RequestReplySpec.scala @@ -1,14 +1,16 @@ package io.fmq.pattern -import cats.effect.{Blocker, IO, Resource, Timer} -import cats.syntax.flatMap._ +import java.util.concurrent.Executors + +import cats.effect.{IO, Resource} import io.fmq.frame.Frame +import io.fmq.pattern.RequestReplySpec.Pair import io.fmq.socket.reqrep.{Reply, Request} import io.fmq.syntax.literals._ -import io.fmq.pattern.RequestReplySpec.Pair import io.fmq.{Context, IOSpec} import org.scalatest.Assertion +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ class RequestReplySpec extends IOSpec { @@ -28,7 +30,7 @@ class RequestReplySpec extends IOSpec { def program(dispatcher: RequestReply[IO]): IO[Assertion] = for { - _ <- Timer[IO].sleep(200.millis) + _ <- IO.sleep(200.millis) response1 <- dispatcher.submit[String, String](Frame.Single("hello")) response2 <- dispatcher.submit[String, String](Frame.Multipart("hello", "world")) } yield { @@ -38,8 +40,8 @@ class RequestReplySpec extends IOSpec { (for { _ <- server.background - blocker <- Blocker[IO] - dispatcher <- RequestReply.create[IO](blocker, request, 10) + ec <- Resource.make(IO.delay(Executors.newCachedThreadPool()))(e => IO.delay(e.shutdown())) + dispatcher <- RequestReply.create[IO](ExecutionContext.fromExecutor(ec), request, 10) } yield dispatcher).use(program) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 8f3d355..4698b25 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,26 +1,26 @@ import sbt._ object Versions { - val scala_212 = "2.12.13" - val scala_213 = "2.13.5" - val catsEffect = "2.3.3" - val fs2 = "2.5.3" - val jeromq = "0.5.2" - val scalatest = "3.2.6" + val scala_213 = "2.13.5" + val catsEffect = "3.1.0" + val fs2 = "3.0.2" + val jeromq = "0.5.2" + val scalatest = "3.2.8" + val betterMonadicFor = "0.3.1" } object Dependencies { - val fs2 = "co.fs2" %% "fs2-io" % Versions.fs2 - def scalaReflect(sv: String) = "org.scala-lang" % "scala-reflect" % sv + val fs2 = "co.fs2" %% "fs2-io" % Versions.fs2 def core(scalaVersion: String): Seq[ModuleID] = Seq( - "org.typelevel" %% "cats-effect" % Versions.catsEffect, - "org.zeromq" % "jeromq" % Versions.jeromq, - "org.scala-lang" % "scala-reflect" % scalaVersion % Provided, - "org.scalatest" %% "scalatest" % Versions.scalatest % Test, - "co.fs2" %% "fs2-io" % Versions.fs2 % Test + "org.typelevel" %% "cats-effect-std" % Versions.catsEffect, + "org.zeromq" % "jeromq" % Versions.jeromq, + "org.scala-lang" % "scala-reflect" % scalaVersion % Provided, + "org.scalatest" %% "scalatest" % Versions.scalatest % Test, + "co.fs2" %% "fs2-io" % Versions.fs2 % Test, + compilerPlugin("com.olegpy" %% "better-monadic-for" % Versions.betterMonadicFor) ) val extras: Seq[ModuleID] = Seq( diff --git a/project/build.properties b/project/build.properties index dbae93b..19479ba 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.4.9 +sbt.version=1.5.2