Skip to content
This repository has been archived by the owner on Nov 5, 2024. It is now read-only.

Cats effect 3 #149

Merged
merged 15 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
steps:
- uses: actions/[email protected]
- uses: olafurpg/setup-scala@v10
- run: sbt ++2.13.1 docs/docusaurusCreateSite
- run: sbt ++2.13.5 docs/docusaurusCreateSite

test:
name: ${{ matrix.command }} ${{ matrix.java }}
Expand All @@ -25,8 +25,7 @@ jobs:
matrix:
java: ['[email protected]', '[email protected]']
command:
- "++2.12.10 ci"
- "++2.13.1 ci"
- "++2.13.5 ci"
steps:
- uses: actions/[email protected]
- uses: olafurpg/setup-scala@v10
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ jobs:
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
- name: Publish docs
run: sbt ++2.13.1 docs/docusaurusPublishGhpages
run: sbt ++2.13.5 docs/docusaurusPublishGhpages
env:
GITHUB_DEPLOY_KEY: ${{ secrets.GITHUB_DEPLOY_KEY }}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
*.class
.idea
.DS_Store
.bsp
logs/
project/target/
target/
Expand Down
15 changes: 7 additions & 8 deletions bench/src/main/scala/io/fmq/JeroMQSocketBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package io.fmq
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import cats.effect.{ContextShift, Fiber, IO}
import cats.effect.unsafe.IORuntime
import cats.effect.{Fiber, IO}
import io.fmq.JeroMQSocketBenchmark.MessagesCounter
import org.openjdk.jmh.annotations._
import org.zeromq.{SocketType, ZContext}
import zmq.ZMQ

import scala.concurrent.ExecutionContext

//jmh:run io.fmq.JeroMQSocketBenchmark
@BenchmarkMode(Array(Mode.Throughput))
@State(Scope.Benchmark)
Expand All @@ -21,15 +20,15 @@ import scala.concurrent.ExecutionContext
@SuppressWarnings(Array("org.wartremover.warts.All"))
class JeroMQSocketBenchmark {

private implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
private implicit val runtime: IORuntime = IORuntime.global

@Param(Array("128", "256", "512", "1024"))
var messageSize: Int = _

private val recording = new AtomicBoolean

private var publisher: Fiber[IO, Unit] = _
private var consumer: Fiber[IO, Unit] = _
private var publisher: Fiber[IO, Throwable, Unit] = _
private var consumer: Fiber[IO, Throwable, Unit] = _

@Setup(Level.Iteration)
def setup(): Unit = {
Expand All @@ -53,7 +52,7 @@ class JeroMQSocketBenchmark {
}

publisher = IO
.delay {
.interruptible(false) {
while (true) {
push.send(msg.data())
}
Expand All @@ -62,7 +61,7 @@ class JeroMQSocketBenchmark {
.unsafeRunSync()

consumer = IO
.delay {
.interruptible(false) {
while (true) {
pull.recv()

Expand Down
26 changes: 13 additions & 13 deletions bench/src/main/scala/io/fmq/SocketBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package io.fmq
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import cats.effect.{Blocker, ContextShift, Fiber, IO, Resource}
import cats.syntax.flatMap._
import cats.effect.unsafe.IORuntime
import cats.effect.{Fiber, IO, Resource}
import io.fmq.SocketBenchmark.MessagesCounter
import io.fmq.syntax.literals._
import org.openjdk.jmh.annotations._
import zmq.ZMQ

import scala.concurrent.ExecutionContext

//jmh:run io.fmq.SocketBenchmark
@BenchmarkMode(Array(Mode.Throughput))
@State(Scope.Benchmark)
Expand All @@ -22,24 +20,23 @@ import scala.concurrent.ExecutionContext
@SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.Var"))
class SocketBenchmark {

private implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
private implicit val runtime: IORuntime = IORuntime.global

@Param(Array("128", "256", "512", "1024"))
var messageSize: Int = _

private val recording = new AtomicBoolean

private var publisher: Fiber[IO, Nothing] = _
private var consumer: Fiber[IO, Nothing] = _
private var publisher: Fiber[IO, Throwable, Nothing] = _
private var consumer: Fiber[IO, Throwable, Nothing] = _

@Setup(Level.Iteration)
def setup(): Unit = {
val uri = tcp_i"://localhost"

val ((pull, push), _) =
(for {
blocker <- Blocker[IO]
context <- Context.create[IO](1, blocker)
context <- Context.create[IO](1)
consumer <- Resource.suspend(context.createPull.map(_.bindToRandomPort(uri)))
producer <- Resource.suspend(context.createPush.map(_.connect(consumer.uri)))
} yield (consumer, producer)).allocated.unsafeRunSync()
Expand All @@ -59,8 +56,8 @@ class SocketBenchmark {

@TearDown(Level.Iteration)
def teardown(): Unit = {
consumer.cancel.unsafeRunSync()
publisher.cancel.unsafeRunSync()
consumer.cancel.unsafeRunAndForget()
publisher.cancel.unsafeRunAndForget()
}

@Benchmark
Expand All @@ -74,18 +71,21 @@ class SocketBenchmark {

}

@SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.Var"))
object SocketBenchmark {

@AuxCounters
@State(Scope.Thread)
class MessagesCounter {

@Setup(Level.Iteration)
def clean(): Unit = messagesCounter.set(0)
def clean(): Unit = {
messagesCounter = new AtomicLong
}

def messagesPerSecond: Long = messagesCounter.get
}

private val messagesCounter = new AtomicLong
private var messagesCounter = new AtomicLong

}
18 changes: 17 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ lazy val core = project
.in(file("core"))
.settings(commonSettings)
.settings(commandSettings)
.settings(pluginAbsolutePathSettings)
.settings(
name := "fmq-core",
libraryDependencies ++= Dependencies.core(scalaVersion.value)
Expand Down Expand Up @@ -63,7 +64,7 @@ lazy val docs = project

lazy val commonSettings = Seq(
scalaVersion := Versions.scala_213,
crossScalaVersions := Seq(scalaVersion.value, Versions.scala_212),
crossScalaVersions := Seq(scalaVersion.value),
Test / fork := true,
Test / parallelExecution := false,
Compile / compile / wartremoverErrors ++= Warts.allBut(Wart.Any, Wart.Nothing), // false positive
Expand Down Expand Up @@ -97,6 +98,21 @@ lazy val noPublishSettings = Seq(
publish / skip := true
)

// See https://github.com/sbt/sbt/issues/6027
lazy val pluginAbsolutePathSettings = Seq(Compile, Test).map { c =>
c / scalacOptions := {
val prefix = "-Xplugin:"
(c / scalacOptions).value.map { opt =>
if (opt.startsWith(prefix)) {
val originalPluginFile = file(opt.drop(prefix.length))
prefix + originalPluginFile.toPath.toAbsolutePath
} else {
opt
}
}
}
}

inThisBuild(
Seq(
organization := "io.github.irevive",
Expand Down
38 changes: 19 additions & 19 deletions core/src/main/scala/io/fmq/Context.scala
Original file line number Diff line number Diff line change
@@ -1,67 +1,67 @@
package io.fmq

import cats.effect.{Blocker, Concurrent, ContextShift, Resource, Sync}
import cats.effect.kernel.{Async, Resource}
import io.fmq.poll.Poller
import io.fmq.proxy.Proxy
import io.fmq.socket.pipeline.{Pull, Push}
import io.fmq.socket.pubsub.{Publisher, Subscriber, XPublisher, XSubscriber}
import io.fmq.socket.reqrep.{Dealer, Reply, Request, Router}
import org.zeromq.{SocketType, ZContext, ZMQ}

final class Context[F[_]: Sync: ContextShift] private (private[fmq] val ctx: ZContext, blocker: Blocker) {
final class Context[F[_]: Async] private (private[fmq] val ctx: ZContext) {

def createSubscriber(topic: Subscriber.Topic): F[Subscriber[F]] =
createSocket(SocketType.SUB) { socket =>
val _ = socket.subscribe(topic.value)
new Subscriber[F](topic, socket, blocker)
new Subscriber[F](topic, socket)
}

def createPublisher: F[Publisher[F]] =
createSocket(SocketType.PUB)(socket => new Publisher[F](socket, blocker))
createSocket(SocketType.PUB)(socket => new Publisher[F](socket))

def createXSubscriber: F[XSubscriber[F]] =
createSocket(SocketType.XSUB)(socket => new XSubscriber(socket, blocker))
createSocket(SocketType.XSUB)(socket => new XSubscriber(socket))

def createXPublisher: F[XPublisher[F]] =
createSocket(SocketType.XPUB)(socket => new XPublisher(socket, blocker))
createSocket(SocketType.XPUB)(socket => new XPublisher(socket))

def createPull: F[Pull[F]] =
createSocket(SocketType.PULL)(socket => new Pull(socket, blocker))
createSocket(SocketType.PULL)(socket => new Pull(socket))

def createPush: F[Push[F]] =
createSocket(SocketType.PUSH)(socket => new Push(socket, blocker))
createSocket(SocketType.PUSH)(socket => new Push(socket))

def createRequest: F[Request[F]] =
createSocket(SocketType.REQ)(socket => new Request(socket, blocker))
createSocket(SocketType.REQ)(socket => new Request(socket))

def createReply: F[Reply[F]] =
createSocket(SocketType.REP)(socket => new Reply(socket, blocker))
createSocket(SocketType.REP)(socket => new Reply(socket))

def createRouter: F[Router[F]] =
createSocket(SocketType.ROUTER)(socket => new Router(socket, blocker))
createSocket(SocketType.ROUTER)(socket => new Router(socket))

def createDealer: F[Dealer[F]] =
createSocket(SocketType.DEALER)(socket => new Dealer(socket, blocker))
createSocket(SocketType.DEALER)(socket => new Dealer(socket))

def createPoller: Resource[F, Poller[F]] =
for {
selector <- Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(ctx.getContext.selector()))
selector <- Resource.fromAutoCloseable(Async[F].delay(ctx.getContext.selector()))
} yield Poller.fromSelector[F](selector)

def proxy(implicit ev: Concurrent[F]): Proxy[F] = new Proxy[F](this)
val proxy: Proxy[F] = new Proxy[F](this)

def isClosed: F[Boolean] = Sync[F].delay(ctx.isClosed)
def isClosed: F[Boolean] = Async[F].delay(ctx.isClosed)

private def createSocket[A](tpe: SocketType)(fa: ZMQ.Socket => A): F[A] =
Sync[F].delay(fa(ctx.createSocket(tpe)))
Async[F].delay(fa(ctx.createSocket(tpe)))

}

object Context {

def create[F[_]: Sync: ContextShift](ioThreads: Int, blocker: Blocker): Resource[F, Context[F]] =
def create[F[_]: Async](ioThreads: Int): Resource[F, Context[F]] =
for {
ctx <- Resource.fromAutoCloseableBlocking(blocker)(Sync[F].delay(new ZContext(ioThreads)))
} yield new Context[F](ctx, blocker)
ctx <- Resource.fromAutoCloseable(Async[F].delay(new ZContext(ioThreads)))
} yield new Context[F](ctx)

}
4 changes: 4 additions & 0 deletions core/src/main/scala/io/fmq/poll/PollTimeout.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.fmq.poll

import cats.Eq

import scala.concurrent.duration.FiniteDuration

sealed abstract class PollTimeout(val value: Long)
Expand All @@ -16,4 +18,6 @@ object PollTimeout {
*/
final case class Fixed(duration: FiniteDuration) extends PollTimeout(duration.toMillis)

implicit val pollTimeoutEq: Eq[PollTimeout] = Eq.fromUniversalEquals

}
Loading