diff --git a/build.sbt b/build.sbt index c77edd8..03d63e4 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,11 @@ resolvers in ThisBuild += "maven2" at "https://repo1.maven.org/maven2/" useGpg := false -credentials += Credentials(Path.userHome / ".sbt" / "sonatype_credential") +ThisBuild / credentials += Credentials(Path.userHome / ".sbt" / "sonatype_credential") +ThisBuild / publishConfiguration := publishConfiguration.value.withOverwrite(true) +ThisBuild / publishLocalConfiguration := publishLocalConfiguration.value.withOverwrite(true) + +val scalaTestVersion = "3.2.1" lazy val dependencies = new { @@ -30,11 +34,13 @@ lazy val dependencies = val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2" // test val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.3" % Test - val scalaTest = "org.scalatest" %% "scalatest" % "3.0.8" % Test + val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % Test val pegdown = "org.pegdown" % "pegdown" % "1.6.0" % Test val logstashLogbackEncoder = "net.logstash.logback" % "logstash-logback-encoder" % "5.3" % Test val logbackContrib = "ch.qos.logback.contrib" % "logback-json-classic" % "0.1.5" % Test val logbackJackson = "ch.qos.logback.contrib" % "logback-jackson" % "0.1.5" % Test + val flexmark = "com.vladsch.flexmark" % "flexmark-all" % "0.36.8" % Test + // utils val sourcecode = "com.lihaoyi" %% "sourcecode" % "0.2.1" } @@ -56,11 +62,13 @@ lazy val global = project .in(file(".")) .aggregate( core, + clusterApi, + cluster, + clusterNode, protobuf, interopCats, interopMonix, testUtils, - intgTests, benchmark) lazy val core = project @@ -74,11 +82,47 @@ lazy val core = project libraryDependencies += "org.zeromq" % "jeromq" % "0.5.1" ).dependsOn(protobuf) +lazy val cluster = project + .enablePlugins(JavaAppPackaging, UniversalDeployPlugin) + .settings( + name := "cluster", + libraryDependencies ++= Seq("com.github.scopt" %% "scopt" % "4.0.1", + "org.slf4j" % "slf4j-log4j12" % "1.7.25"), + maintainer in Universal := "parapet.io", + packageName in Universal := "parapet-cluster-" + version.value, + mappings in Universal += { + val src = (sourceDirectory in Compile).value + src / "resources" / "log4j.xml" -> "etc/log4j.xml" + }, + mappings in Universal += { + val src = (sourceDirectory in Compile).value + src / "resources" / "etc" / "node.properties.template" -> "etc/node.properties.template" + }, + bashScriptExtraDefines += """addJava "-Dlog4j.configuration=file:${app_home}/../etc/log4j.xml"""" + + ).dependsOn(core, interopCats, clusterApi) + +lazy val clusterNode = project + .in(file("cluster-node")) + .settings( + name := "cluster-node" + ).dependsOn(core, clusterApi) + +lazy val clusterApi = project + .in(file("cluster-api")) + .settings( + name := "cluster-api", + libraryDependencies += "org.scalatest" %% "scalatest" % scalaTestVersion % Test, + libraryDependencies += dependencies.flexmark + ).dependsOn(core) + + lazy val testUtils = project .in(file("test-utils")) .settings( name := "test-utils", - libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" + libraryDependencies += "org.scalatest" %% "scalatest" % scalaTestVersion, + libraryDependencies += dependencies.flexmark ).dependsOn(core, interopCats, interopMonix) lazy val interopCats = project @@ -110,7 +154,8 @@ lazy val intgTests = project publishLocal := {}, publish := {}, libraryDependencies ++= Seq( - "org.scalatest" %% "scalatest" % "3.0.8", + "org.scalatest" %% "scalatest" % scalaTestVersion, + dependencies.flexmark, "org.pegdown" % "pegdown" % "1.6.0", "ch.qos.logback" % "logback-classic" % "1.2.3", "net.logstash.logback" % "logstash-logback-encoder" % "5.3" @@ -148,6 +193,7 @@ lazy val commonDependencies = Seq( dependencies.logbackContrib, dependencies.logbackJackson, dependencies.scalaTest, + dependencies.flexmark, dependencies.sourcecode ) diff --git a/cluster-api/src/main/scala/io/parapet/cluster/api/ClusterApi.scala b/cluster-api/src/main/scala/io/parapet/cluster/api/ClusterApi.scala new file mode 100644 index 0000000..ee256ca --- /dev/null +++ b/cluster-api/src/main/scala/io/parapet/cluster/api/ClusterApi.scala @@ -0,0 +1,150 @@ +package io.parapet.cluster.api + +import io.parapet.core.{Encoder, Event} + +import java.nio.ByteBuffer + +object ClusterApi { + + + // @formatter:off + sealed trait API extends Event + + // --------------------- JOIN ------------------------------------------ // + object JoinResultCodes { + val OK = 0 + val ERROR = 1 // must to be clarified + } + + // FORMAT + val TAG_SIZE = 4 + val NODE_ID_SIZE = 4 + val ADDR_SIZE = 4 + val GROUP_SIZE = 4 + val RESULT_CODE_SIZE = 4 + + case class Join(nodeId: String, address: String, group: String) extends API + case class JoinResult(nodeId: String, code: Int) extends API + + // ---------------------- Result ---------------------------------------- // + case class Result(code: Int, msg: String) extends API + + object ResultCodes { + val OK = 0 + val ERROR = 1 + } + // FORMAT + val CODE_SIZE = 4 + val MSG_SIZE = 4 + + // ---------------------- NodeInfo ---------------------------------------- // + case class GetNodeInfo(senderId: String, id: String) extends API + case class NodeInfo(address: String, code: Int) extends API + object NodeInfoCodes { + val OK = 0 + val NODE_NOT_FOUND = 1 + val ERROR = 2 + } + + // TAGS + val JOIN_TAG = 20 + val JOIN_RESULT_TAG = 21 + val RESULT_TAG = 22 + val GET_NODE_INFO_TAG = 23 + val NODE_INFO = 24 + + // @formatter:on + + val encoder: Encoder = new Encoder { + override def write(e: Event): Array[Byte] = { + e match { + case Join(nodeId, address, group) => + val nodeIdBytes = nodeId.getBytes() + val addressBytes = address.getBytes() + val groupBytes = group.getBytes() + val buf = ByteBuffer.allocate(TAG_SIZE + + (NODE_ID_SIZE + nodeIdBytes.length) + + (GROUP_SIZE + groupBytes.length) + + (ADDR_SIZE + addressBytes.length)) + // write + buf.putInt(JOIN_TAG) + putWithSize(buf, nodeIdBytes) + putWithSize(buf, addressBytes) + putWithSize(buf, groupBytes) + buf.rewind() + buf.array() + case JoinResult(nodeId, code) => + val nodeIdBytes = nodeId.getBytes() + val buf = ByteBuffer.allocate(TAG_SIZE + (NODE_ID_SIZE + nodeIdBytes.length) + RESULT_CODE_SIZE) + buf.putInt(JOIN_RESULT_TAG) + putWithSize(buf, nodeIdBytes) + buf.putInt(code) + buf.rewind() + buf.array() + case Result(code, msg) => + val msgBytes = msg.getBytes() + val buf = ByteBuffer.allocate(TAG_SIZE + CODE_SIZE + (MSG_SIZE + msgBytes.length)) + buf.putInt(RESULT_TAG) + buf.putInt(code) + putWithSize(buf, msgBytes) + buf.rewind() + buf.array() + case GetNodeInfo(senderId, id) => + val senderIdBytes = senderId.getBytes() + val idBytes = id.getBytes() + val buf = ByteBuffer.allocate(4 + 4 + senderIdBytes.length + 4 + idBytes.length) + buf.putInt(GET_NODE_INFO_TAG) + putWithSize(buf, senderIdBytes) + putWithSize(buf, idBytes) + buf.rewind() + buf.array() + case NodeInfo(address, code) => + val addressBytes = address.getBytes + val buf = ByteBuffer.allocate(TAG_SIZE + 4 + CODE_SIZE + addressBytes.length) + .putInt(NODE_INFO) + .putInt(addressBytes.length) + .put(addressBytes) + .putInt(code) + buf.rewind() + buf.array() + case _ => throw new UnsupportedOperationException() + } + } + + override def read(data: Array[Byte]): Event = { + val buf = ByteBuffer.wrap(data) + val tag = buf.getInt + tag match { + case JOIN_TAG => + val nodeId = getString(buf) + val address = getString(buf) + val group = getString(buf) + Join(nodeId = nodeId, address = address, group = group) + case JOIN_RESULT_TAG => + val nodeId = getString(buf) + val code = buf.getInt + JoinResult(nodeId, code) + case RESULT_TAG => + val code = buf.getInt + val msg = getString(buf) + Result(code, msg) + case GET_NODE_INFO_TAG => GetNodeInfo(getString(buf), getString(buf)) + case NODE_INFO => NodeInfo(getString(buf), buf.getInt) + case _ => throw new UnsupportedOperationException() + } + } + } + + private def putWithSize(buf: ByteBuffer, data: Array[Byte]): Unit = { + buf.putInt(data.length) + buf.put(data) + } + + private def getString(buf: ByteBuffer): String = { + val len = buf.getInt() + val data = new Array[Byte](len) + buf.get(data) + new String(data) + } + +} diff --git a/cluster-api/src/test/scala/io/parapet/cluster/api/EncoderSpec.scala b/cluster-api/src/test/scala/io/parapet/cluster/api/EncoderSpec.scala new file mode 100644 index 0000000..5b8f2b3 --- /dev/null +++ b/cluster-api/src/test/scala/io/parapet/cluster/api/EncoderSpec.scala @@ -0,0 +1,15 @@ +package io.parapet.cluster.api + +import io.parapet.cluster.api.ClusterApi._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ + +class EncoderSpec extends AnyFunSuite { + + test("join") { + val join = Join("1", "localhost:8080", "2") + val data = encoder.write(join) + encoder.read(data) shouldBe join + } + +} diff --git a/cluster-node/src/main/scala/io/parapet/cluster/node/Interface.scala b/cluster-node/src/main/scala/io/parapet/cluster/node/Interface.scala new file mode 100644 index 0000000..5408be0 --- /dev/null +++ b/cluster-node/src/main/scala/io/parapet/cluster/node/Interface.scala @@ -0,0 +1,71 @@ +package io.parapet.cluster.node + +import scala.util.Try + +trait Interface { + + /** Connects to the leader. + */ + def connect(): Unit + + /** Joins a cluster. + * @param group the node group + * @return result + */ + def join(group: String): Try[Unit] + + /** Leaves the given node group. + * @param group the node group + * @return result + */ + def leave(group: String): Try[Unit] + + /** Sends a request. + * + * @param req the request + * @return result + */ + def send(req: Req): Try[Unit] + + /** + * Send a request and waits for response. + * Use for a strict REQ-REP dialog. + * + * @param req request + * @param handler message handler + * @return result + */ + def send(req: Req, handler: Array[Byte] => Unit): Try[Unit] + + /** + * Sends a reply. + * Use for a strict REQ-REP dialog. + * + * @param rep the reply + * @return result + */ + def send(rep: Rep): Try[Unit] + + /** Sends a message to all nodes in the group. + * + * @param group the node group + * @param data the data to send + * @return result + */ + def broadcast(group: String, data: Array[Byte]): Try[Unit] + + /** Gets current leader. + * + * @return leader + */ + def leader: Option[String] + + /** Gets all registered nodes. + * @return registered nodes + */ + def getNodes: Try[Seq[String]] + + /** Disconnects from the cluster and closes open network connections. + */ + def close(): Unit +} diff --git a/cluster-node/src/main/scala/io/parapet/cluster/node/MessageHandler.scala b/cluster-node/src/main/scala/io/parapet/cluster/node/MessageHandler.scala new file mode 100644 index 0000000..f2b9360 --- /dev/null +++ b/cluster-node/src/main/scala/io/parapet/cluster/node/MessageHandler.scala @@ -0,0 +1,7 @@ +package io.parapet.cluster.node + +trait MessageHandler { + + def handle(req: Req): Unit + +} diff --git a/cluster-node/src/main/scala/io/parapet/cluster/node/Node.scala b/cluster-node/src/main/scala/io/parapet/cluster/node/Node.scala new file mode 100644 index 0000000..b42aabe --- /dev/null +++ b/cluster-node/src/main/scala/io/parapet/cluster/node/Node.scala @@ -0,0 +1,294 @@ +package io.parapet.cluster.node +import com.typesafe.scalalogging.Logger +import io.parapet.cluster.api.ClusterApi._ +import io.parapet.core.processes.RouletteLeaderElection +import io.parapet.core.processes.RouletteLeaderElection.{REQ_TAG, WHO_REP_TAG, WHO_TAG, WhoRep} +import org.zeromq.ZMQ.Socket +import org.zeromq.{SocketType, ZContext, ZFrame, ZMQ, ZMsg} +import zmq.ZError + +import java.nio.ByteBuffer +import java.nio.channels.Selector +import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit} +import scala.annotation.tailrec +import scala.collection.mutable +import scala.util.Try +import org.zeromq.ZMQ.Poller + +import scala.util.control.Breaks.{break, breakable} + +// TODO leader heartbeat +class Node(id: String, host: String, port: Int, servers: Array[String], msgHandler: MessageHandler) extends Interface { + + private val logger = Logger[Node] + private val zmqCtx = new ZContext() + private val addr = s"$host:$port" + + // this node server + + // connections to all servers in the cluster + private val _servers = mutable.Map.empty[String, Socket] + private val _nodes = new ConcurrentHashMap[String, Socket]() + + private var _leader: Option[String] = Option.empty + private val server = new Server(port) + + override def connect(): Unit = { + _servers ++= servers.map { address => + val socket = zmqCtx.createSocket(SocketType.DEALER) + socket.setIdentity(id.getBytes()) + socket.connect(s"tcp://$address") + (address, socket) + }.toMap + _leader = Option(getLeader) + logger.info(s"node[id: $id] connected to the cluster. leader: ${_leader}") + server.start() + } + + override def join(group: String): Try[Unit] = + Try { + _leader match { + case Some(leaderAddress) => + val join = Join(nodeId = id, address = addr, group = group) + val data = encoder.write(join) + val msg = ByteBuffer.allocate(4 + data.length) + msg.putInt(REQ_TAG) + msg.put(data) + msg.rewind() + _servers(leaderAddress).send(msg.array()) + + breakable { + while (true) { + // receive a response + val responseMsg = ZMsg.recvMsg(_servers(leaderAddress)) + // responseMsg.popString() // identity + val buf = ByteBuffer.wrap(responseMsg.pop().getData) + val code = buf.getInt(0) + if (code == WHO_REP_TAG) { + logger.debug("ignore late WHO messages") + } else { + encoder.read(buf.array()) match { + case res: Result => logger.debug(s"client received a response from leader: $res") + break + } + } + } + } + + case None => throw new RuntimeException("no leader") + } + } + + override def leave(group: String): Try[Unit] = + Try(throw new UnsupportedOperationException("leave is not supported yet")) + + + override def send(req: Req): Try[Unit] = { + send(req, Option.empty) + } + + override def send(req: Req, handler: Array[Byte] => Unit): Try[Unit] = { + send(req, Option(handler)) + } + + private def send(req: Req, handlerOpt: Option[Array[Byte] => Unit]): Try[Unit] = { + Try { + val socket = _nodes.computeIfAbsent(req.nodeId, _ => { + logger.debug(s"node[id=${req.nodeId}] is not registered. requesting node info") + val leader = _leader.getOrElse(throw new IllegalStateException("no leader")) + val getNodeInfo = GetNodeInfo(id, req.nodeId) + val data = encoder.write(getNodeInfo) + val buf = ByteBuffer.allocate(4 + data.length) + buf.putInt(REQ_TAG) + buf.put(data) + _servers(leader).send(buf.array()) + val responseMsg = ZMsg.recvMsg(_servers(leader)) + encoder.read(responseMsg.pop().getData) match { + case ni@NodeInfo(address, code) => + logger.debug(s"received node info. node info=$ni") + // todo check code + val socket = zmqCtx.createSocket(SocketType.DEALER) + socket.setIdentity(id.getBytes()) + socket.connect(s"tcp://$address") + logger.debug(s"connection with ${ni.address} has been established") + socket + } + }) + socket.send(req.data) + logger.debug(s"req ${new String(req.data)} to ${req.nodeId} has been sent") + handlerOpt match { + case Some(handler) => + logger.debug(s"wait for response from recipient id=${req.nodeId}") + handler(socket.recv()) + case _ => () + } + } + } + + override def send(rep: Rep): Try[Unit] = server.send(rep).map(_ => ()) + + override def broadcast(group: String, data: Array[Byte]): Try[Unit] = + Try(throw new UnsupportedOperationException("broadcast is not supported yet")) + + override def leader: Option[String] = _leader + + override def getNodes: Try[Seq[String]] = + Try(throw new UnsupportedOperationException("getNodes is not supported yet")) + + override def close(): Unit = + if (!zmqCtx.isClosed) { + _servers.values.foreach(socket => + try socket.close() + catch { + case err: Exception => logger.error("failed to close the socket", err) + }, + ) + + try zmqCtx.close() + catch { + case err: Exception => logger.error("error occurred while shutting down ZMQ context", err) + } + } + + // attempts to acquire a leader until it's available + private def getLeader: String = { + + val pollItems = _servers.values.map(socket => new ZMQ.PollItem(socket, ZMQ.Poller.POLLIN)).toArray + + val msg = new Array[Byte](4) + val buf = ByteBuffer.allocate(msg.length) + buf.putInt(WHO_TAG) + buf.rewind() + buf.get(msg) + + @tailrec + def step(attempts: Int): String = { + _servers.values.foreach(socket => socket.send(msg, ZMQ.NOBLOCK)) + val selector = Selector.open() + ZMQ.poll(selector, pollItems, 5000L) + selector.close() + val events = pollItems.filter(_.isReadable()) + if (events.length > 0) { + events.map(item => { + val data = item.getSocket.recv() + val buf = ByteBuffer.allocate(4 + data.length) + buf.putInt(0) + buf.put(data) + val rep = RouletteLeaderElection.encoder.read(buf.array()).asInstanceOf[WhoRep] + logger.debug(s"node ${rep.address} is leader: ${rep.leader}") + rep + }).find(_.leader) match { + case Some(leader) => leader.address + case None => + println(s"no leader available. attempts made: $attempts") + Thread.sleep(5000L) + step(attempts + 1) + } + } else { + println(s"no nodes responded within a timeout. attempts made: $attempts") + step(attempts + 1) + } + } + step(1) + + } + + class Server(port: Int) { + private val logger = Logger[Server] + private val zmqCtx = new ZContext() + private val appControl = new ThreadLocal[Socket]() { + override def initialValue(): Socket = { + val socket = zmqCtx.createSocket(SocketType.PAIR) + socket.bind(CONTROL_ADDRESS) + socket + } + } + + // Socket to talk to application + private val threadPool = Executors.newSingleThreadExecutor() + + private val CONTROL_ADDRESS = "inproc://control" + private val CONTROL_POLLIN = 0 + private val ROUTER_POLLIN = 1 + + // commands + private val CONTROL_SEND = 0 + private val CONTROL_SEND_BYTES = Array(0x0.toByte, 0x0.toByte, 0x0.toByte, 0x0.toByte) + private val CONTROL_TERM = 1 + private val CONTROL_TERM_BYTES = Array(0x0.toByte, 0x0.toByte, 0x0.toByte, 0x1.toByte) + + def start(): Unit = { + threadPool.submit(new Loop()) + } + + def send(rep: Rep): Try[Boolean] = + Try { + val msg = new ZMsg + msg.add(CONTROL_SEND_BYTES) + msg.addString(rep.nodeId) + msg.add(rep.data) + msg.send(appControl.get()) + } + + def stop(): Unit = + try { + val msg = new ZMsg + msg.add(CONTROL_TERM_BYTES) + msg.send(appControl.get()) + threadPool.shutdown() // what for the loop task to complete + threadPool.awaitTermination(5, TimeUnit.MINUTES) + zmqCtx.close() + } catch { + case err: Exception => logger.error("error occurred while stopping the server", err) + } + + class Loop extends Runnable { + private val control = zmqCtx.createSocket(SocketType.PAIR) + private val server = zmqCtx.createSocket(SocketType.ROUTER) + private val poller = zmqCtx.createPoller(2) + + private def init(): Unit = { + server.bind(s"tcp://*:$port") + control.connect(CONTROL_ADDRESS) + poller.register(control, Poller.POLLIN) + poller.register(server, Poller.POLLIN) + } + + override def run(): Unit = { + init() + breakable { + while (!Thread.currentThread().isInterrupted && !zmqCtx.isClosed) + try { + poller.poll() // waiting for messages + if (poller.pollin(CONTROL_POLLIN)) { + val msg = ZMsg.recvMsg(control) + val cmd = ByteBuffer.wrap(msg.pop().getData).getInt + cmd match { + case CONTROL_SEND => + logger.debug(s"send a message to ?") + msg.send(server) + case CONTROL_TERM => break + case unknown => logger.warn(s"unknown control message=$unknown") + } + } + if (poller.pollin(ROUTER_POLLIN)) { + val reqMsg = ZMsg.recvMsg(server) + val clientId = reqMsg.popString() // identity + val data = reqMsg.pop().getData + msgHandler.handle(Req(clientId, data)) + } + + } catch { + case err: org.zeromq.ZMQException if err.getErrorCode == ZError.ETERM => + logger.info("zmq context has been terminated") + case err: Exception => logger.error("error occurred while processing message", err) + } + } + } + + + } + + } + +} diff --git a/cluster-node/src/main/scala/io/parapet/cluster/node/Rep.scala b/cluster-node/src/main/scala/io/parapet/cluster/node/Rep.scala new file mode 100644 index 0000000..be34741 --- /dev/null +++ b/cluster-node/src/main/scala/io/parapet/cluster/node/Rep.scala @@ -0,0 +1,5 @@ +package io.parapet.cluster.node + +import io.parapet.core.Event + +case class Rep(nodeId: String, data: Array[Byte]) extends Event diff --git a/cluster-node/src/main/scala/io/parapet/cluster/node/Req.scala b/cluster-node/src/main/scala/io/parapet/cluster/node/Req.scala new file mode 100644 index 0000000..97c956f --- /dev/null +++ b/cluster-node/src/main/scala/io/parapet/cluster/node/Req.scala @@ -0,0 +1,5 @@ +package io.parapet.cluster.node + +import io.parapet.core.Event + +case class Req(nodeId: String, data: Array[Byte]) extends Event diff --git a/cluster/src/main/resources/etc/node.properties.template b/cluster/src/main/resources/etc/node.properties.template new file mode 100644 index 0000000..b7c04eb --- /dev/null +++ b/cluster/src/main/resources/etc/node.properties.template @@ -0,0 +1,8 @@ +node.id= +node.address= +node.peers=[] +node.election-delay=10 +node.heartbeat-delay=5 +node.monitor-delay=10 +node.peer-timeout=10 +node.leader-election-threshold=0.7 \ No newline at end of file diff --git a/cluster/src/main/resources/log4j.xml b/cluster/src/main/resources/log4j.xml new file mode 100644 index 0000000..fbfbda0 --- /dev/null +++ b/cluster/src/main/resources/log4j.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/cluster/src/main/scala/io/parapet/cluster/ClusterApp.scala b/cluster/src/main/scala/io/parapet/cluster/ClusterApp.scala new file mode 100644 index 0000000..4bc7e01 --- /dev/null +++ b/cluster/src/main/scala/io/parapet/cluster/ClusterApp.scala @@ -0,0 +1,93 @@ +package io.parapet.cluster + +import cats.effect.IO +import io.parapet.cluster.Config.PeerInfo +import io.parapet.core.processes.RouletteLeaderElection +import io.parapet.core.processes.RouletteLeaderElection.Peers +import io.parapet.core.processes.net.{AsyncClient, AsyncServer} +import io.parapet.core.{Parapet, Process, ProcessRef} +import io.parapet.{CatsApp, core} +import scopt.OParser + +import java.nio.file.Paths + +object ClusterApp extends CatsApp { + + override def processes(args: Array[String]): IO[Seq[core.Process[IO]]] = + for { + appArgs <- parseArgs(args) + config <- loadConfig(appArgs) + peerNetClients <- createPeerNetClients(config.id, config.peers) + peers <- IO(Peers(peerNetClients.map{ + case (peerInfo, netClient) => + Peers.builder + .id(peerInfo.id) + .address(peerInfo.address) + .netClient(netClient.ref) + .timeoutMs(config.peerTimeout) + .build + }.toVector)) + leRef <- IO.pure(ProcessRef(config.id)) + srv <- IO { + val port = config.address.split(":")(1).trim.toInt + AsyncServer[IO]( + ref = ProcessRef("net-server"), + address = s"tcp://*:$port", + sink = leRef, + encoder = RouletteLeaderElection.encoder, + ) + } + leState <- IO.pure( + new RouletteLeaderElection.State( + ref = leRef, + addr = config.address, + netServer = srv.ref, + peers = peers, + threshold = config.leaderElectionThreshold, + ), + ) + cluster <- IO(new ClusterProcess(leState.ref)) + le <- IO(new RouletteLeaderElection[IO](leState, cluster.ref)) + seq <- IO(Seq(cluster, le, srv) ++ peerNetClients.map(_._2)) + } yield seq + + def loadConfig(appArgs: AppArgs): IO[Config] = IO(Config.load(appArgs.config)) + + def createPeerNetClients(clientId: String, peers: Array[PeerInfo]): IO[Array[(PeerInfo, Process[IO])]] = + IO( + peers.zipWithIndex + .map(p => + p._1 -> AsyncClient[IO]( + ref = ProcessRef("peer-client-" + p._2), + clientId = clientId, + address = "tcp://" + p._1.address, + encoder = RouletteLeaderElection.encoder, + ), + ) + ) + + private val builder = OParser.builder[AppArgs] + private val parser = { + import builder._ + OParser.sequence( + programName("parapet-cluster"), + head("cluster", Parapet.Version), + opt[String]('c', "config") + .action((x, c) => c.copy(config = x)) + .text("path to config file"), + ) + } + + case class AppArgs(config: String = "etc/node.properties") + + private def parseArgs(args: Array[String]): IO[AppArgs] = { + IO { + println(System.getProperty("user.dir")) + println(Paths.get("").toAbsolutePath.toString) + OParser.parse(parser, args, AppArgs()) match { + case Some(appArgs) => appArgs + case _ => throw new IllegalArgumentException("bad program args") + } + } + } +} diff --git a/cluster/src/main/scala/io/parapet/cluster/ClusterProcess.scala b/cluster/src/main/scala/io/parapet/cluster/ClusterProcess.scala new file mode 100644 index 0000000..a9e12ec --- /dev/null +++ b/cluster/src/main/scala/io/parapet/cluster/ClusterProcess.scala @@ -0,0 +1,173 @@ +package io.parapet.cluster + +import cats.effect.{Concurrent, IO} +import com.typesafe.scalalogging.Logger +import io.parapet.cluster.ClusterProcess.{Cluster, Joined} +import io.parapet.cluster.api.ClusterApi._ +import io.parapet.core.Dsl.DslF +import io.parapet.core.Event.Start +import io.parapet.core.processes.RouletteLeaderElection.{encoder => _, _} +import io.parapet.core.{Channel, Process, ProcessRef} +import org.zeromq.ZMQ.Socket +import org.zeromq.{SocketType, ZContext} +import io.parapet.core.processes.net.AsyncServer.{Send => ServerSend} + +import java.util + +class ClusterProcess(leaderElection: ProcessRef)(implicit ctxShit : Concurrent[IO]) extends Process[IO] { + + import dsl._ + + private val ch = new Channel[IO](ref) + + private val zmqCtx = new ZContext() + private val logger = Logger[ClusterProcess] + private val cluster = new Cluster(zmqCtx) + + override def handle: Receive = { + case Start => register(ref, ch) + case Req(clientId, data) => + encoder.read(data) match { + case join: Join => + ifLeader { + cluster.getNode(join.nodeId) match { + case Some(node) if node.address == join.address => + eval(logger.debug(s"node $node already joined")) ++ + ServerSend(clientId, encoder.write(Result(ResultCodes.OK, + "node has been added to the group"))) ~> leaderElection + case Some(node) => + eval { + logger.debug(s"nodes $node address has changed, rejoin. old=${node.address}, new=${join.address}") + cluster.remove(node.nodeId) + } ++ broadcast(clientId, join) + case None => eval(logger.debug("send broadcast to leader election process")) ++ broadcast(clientId, join) + } + } { + eval(logger.debug("process join and send response to the leader")) ++ + processJoin(clientId, join) ++ + Rep(clientId, encoder.write(JoinResult(join.nodeId, JoinResultCodes.OK))) ~> leaderElection + } + case joinRes: JoinResult => eval(logger.debug(s"cluster received $joinRes")) + case getNodeInfo: GetNodeInfo => + eval(logger.debug(s"received $getNodeInfo")) ++ + (cluster.getNode(getNodeInfo.senderId) match { + case Some(senderNode) => + if (senderNode.clientId != clientId) { + eval { + logger.error(s"expected node clientId=${senderNode.clientId} but received $clientId") + // todo send a response + } + } else { + cluster.getNode(getNodeInfo.id) match { + case Some(node) => + ServerSend(clientId, encoder.write(NodeInfo(node.address, NodeInfoCodes.OK))) ~> leaderElection + case None => + ServerSend(clientId, encoder.write(NodeInfo("", NodeInfoCodes.NODE_NOT_FOUND))) ~> leaderElection + } + } + case None => eval { + logger.error(s"node id=${getNodeInfo.senderId} doesn't exist") + // todo send a response + } + }) + + } + } + + // sends join to all nodes in the cluster and waits for responses + private def broadcast(clientId: String, join: Join): DslF[IO, Unit] = { + // Note: clientId != join.nodeId when Join sent by a leader election process + val data = encoder.write(join) + blocking { + ch.send(Broadcast(data), leaderElection, { + case scala.util.Success(BroadcastResult(_)) => + // todo use reliable atomic broadcast + // todo wait for acks from the majority of nodes + // https://github.com/parapet-io/parapet/issues/47 + processJoin(clientId, join) ++ + eval(logger.debug(s"send Join result to $clientId")) ++ + ServerSend(clientId, encoder.write(Result(ResultCodes.OK, + "node has been added to the group"))) ~> leaderElection + case scala.util.Failure(err) => + eval { + logger.error("broadcast has failed", err) + } ++ ServerSend(clientId, encoder.write(Result(ResultCodes.ERROR, + Option(err.getMessage).getOrElse("")))) ~> leaderElection + }) + } + } + + private def ifLeader(isLeader: => DslF[IO, Unit])(isNotLeader: => DslF[IO, Unit]): DslF[IO, Unit] = { + ch.send(IsLeader, leaderElection, { + case scala.util.Success(IsLeaderRep(leader)) => + if (leader) eval(logger.debug("I'm a leader")) ++ isLeader + else isNotLeader + }) + } + + private def processJoin(clientId: String, join: Join): DslF[IO, Unit] = { + eval { + logger.debug(s"joining node id = ${join.nodeId}") + val node = cluster.join(clientId, join) + logger.debug(s"node $node created") + } + } + +} + +object ClusterProcess { + + class Cluster(zmqCtx: ZContext) { + private val logger = Logger[Cluster] + private val nodes = new util.HashMap[String, Node]() + private val nodesClientId = new util.HashMap[String, Node]() + + def join(clientId: String, join: Join): Node = { + if(nodes.containsKey(join.nodeId)) { + val node = nodes.remove(join.nodeId) + node.socket.close() + logger.debug(s"remove node id=${join.nodeId}") + } + logger.debug(s"add node id=${join.nodeId}") + val socket = zmqCtx.createSocket(SocketType.DEALER) + socket.connect("tcp://" + join.address) + nodes.put(join.nodeId, + new Node(clientId, join.nodeId, join.address, join.group, socket, Joined)) + } + + def node(id: String): Node = nodes.get(id) + + def getNode(id: String): Option[Node] = { + Option(nodes.get(id)) + } + + def remove(id: String): Unit = { + val node = nodes.remove(id) + node.socket.close() + } + + } + + class Node(val clientId: String, + val nodeId: String, + val address: String, + val group: String, + val socket: Socket, + private var _state: NodeState) { + + def state(state: NodeState): Unit = _state = state + + def state: NodeState = _state + + def send(data: Array[Byte]): Unit = { + socket.send(data) + } + + override def toString: String = s"clientId=$clientId, nodeId=$nodeId, address=$address, group=$group" + } + + sealed trait NodeState + case object Joined extends NodeState + case object Failed extends NodeState + +} diff --git a/cluster/src/main/scala/io/parapet/cluster/Config.scala b/cluster/src/main/scala/io/parapet/cluster/Config.scala new file mode 100644 index 0000000..a0e3f03 --- /dev/null +++ b/cluster/src/main/scala/io/parapet/cluster/Config.scala @@ -0,0 +1,51 @@ +package io.parapet.cluster + +import io.parapet.cluster.Config.PeerInfo + +import java.io.FileInputStream +import java.util.Properties +import scala.concurrent.duration.{FiniteDuration, _} +import scala.util.Using + +case class Config( + id: String, + address: String, + peers: Array[PeerInfo], + leaderElectionThreshold: Double, + electionDelay: FiniteDuration, + heartbeatDelay: FiniteDuration, + monitorDelay: FiniteDuration, + peerTimeout: FiniteDuration, +) + +object Config { + + def load(path: String): Config = + Using.resource(new FileInputStream(path)) { is => + val prop = new Properties() + prop.load(is) + Config( + id = prop.getProperty("node.id"), + address = prop.getProperty("node.address"), + peers = parsePeers(prop.getProperty("node.peers", "")), + electionDelay = prop.getProperty("node.election-delay").toInt.seconds, + heartbeatDelay = prop.getProperty("node.heartbeat-delay").toInt.seconds, + monitorDelay = prop.getProperty("node.monitor-delay").toInt.seconds, + peerTimeout = prop.getProperty("node.peer-timeout").toInt.seconds, + leaderElectionThreshold = prop.getProperty("node.leader-election-threshold").toDouble, + ) + } + + def parsePeers(str: String): Array[PeerInfo] = + str + .split(",") + .map(p => + p.split(":", 2) match { + case Array(id, address) => PeerInfo(id, address) + case _ => throw new RuntimeException(s"invalid format=$p") + }, + ) + + case class PeerInfo(id: String, address: String) + +} diff --git a/core/src/main/scala/io/parapet/ParApp.scala b/core/src/main/scala/io/parapet/ParApp.scala index 08f6020..f0cd67f 100644 --- a/core/src/main/scala/io/parapet/ParApp.scala +++ b/core/src/main/scala/io/parapet/ParApp.scala @@ -8,7 +8,7 @@ import io.parapet.core.Dsl.{DslF, WithDsl} import io.parapet.core.DslInterpreter.Interpreter import io.parapet.core.Parapet.ParConfig import io.parapet.core.processes.DeadLetterProcess -import io.parapet.core.{Context, EventStore, Parallel, Process, ProcessRef, Scheduler} +import io.parapet.core.{Context, DslInterpreter, EventStore, Parallel, Process, ProcessRef, Scheduler} import io.parapet.syntax.FlowSyntax import org.slf4j.LoggerFactory @@ -51,7 +51,9 @@ trait ParApp[F[_]] extends WithDsl[F] with FlowSyntax[F] { ct.raiseError[Unit](new RuntimeException("Initialization error: at least one process must be provided")) } else ct.unit context <- Context(config, eventLog) - scheduler <- Scheduler.apply[F](config.schedulerConfig, context, interpreter(context)) + interpreter <- ct.pure(interpreter(context)) + _ <- ct.delay(DslInterpreter.instance(interpreter)) + scheduler <- Scheduler.apply[F](config.schedulerConfig, context, interpreter) _ <- context.start(scheduler) dlProcess <- deadLetter _ <- context.registerAll(ps.toList :+ dlProcess) diff --git a/core/src/main/scala/io/parapet/core/Channel.scala b/core/src/main/scala/io/parapet/core/Channel.scala index 6cdd05d..59b0c6d 100644 --- a/core/src/main/scala/io/parapet/core/Channel.scala +++ b/core/src/main/scala/io/parapet/core/Channel.scala @@ -1,11 +1,10 @@ package io.parapet.core import cats.effect.Concurrent +import cats.effect.ExitCase import cats.effect.concurrent.Deferred -import io.parapet.core.Channel.Request import io.parapet.core.Dsl.DslF import io.parapet.core.Event.{Failure, Stop} - import scala.util.Try /** Channel is a process that implements strictly synchronous request-reply dialog. @@ -15,10 +14,14 @@ import scala.util.Try * * @tparam F an effect type */ -class Channel[F[_]: Concurrent] extends Process[F] { +class Channel[F[_]: Concurrent](clientRef:ProcessRef = null) extends Process[F] { + + import io.parapet.core.Channel._ import dsl._ + private val ct: Concurrent[F] = implicitly[Concurrent[F]] + private var callback: Deferred[F, Try[Event]] = _ private def waitForRequest: Receive = { case req: Request[F] => @@ -37,19 +40,17 @@ class Channel[F[_]: Concurrent] extends Process[F] { } } case req: Request[F] => - suspend(req.cb.complete(scala.util.Failure(new IllegalStateException("current request is not completed yet")))) + suspend(req.cb.complete(scala.util.Failure(new IllegalStateException("the current request is not completed yet")))) case Failure(_, err) => - suspend(callback.complete(scala.util.Failure(err))) ++ - eval{ - callback = null - } ++ - switch(waitForRequest) + suspend(callback.complete(scala.util.Failure(err))) ++ resetAndWaitForRequest case e => - suspend(callback.complete(scala.util.Success(e))) ++ - eval { - callback = null - } ++ - switch(waitForRequest) + suspend(callback.complete(scala.util.Success(e))) ++ resetAndWaitForRequest + } + + private def resetAndWaitForRequest: DslF[F, Unit] = { + eval { + callback = null + } ++ switch(waitForRequest) } def handle: Receive = waitForRequest @@ -65,7 +66,16 @@ class Channel[F[_]: Concurrent] extends Process[F] { for { d <- suspend(Deferred[F, Try[Event]]) _ <- Request(event, d, receiver) ~> ref - res <- suspend(d.get) + res <- + suspend( + ct.guaranteeCase(d.get){ + case ExitCase.Canceled => + (resetAndWaitForRequest ++ cb(scala.util.Failure(ChannelInterruptedException))) + .foldMap[F](DslInterpreter.instance.interpret(clientRef, clientRef)) + case ExitCase.Error(err) => + (resetAndWaitForRequest ++ cb(scala.util.Failure(new ChannelException(err)))) + .foldMap[F](DslInterpreter.instance.interpret(clientRef, clientRef)) + case ExitCase.Completed => ct.delay(println("completed"))}) _ <- cb(res) } yield () @@ -73,6 +83,9 @@ class Channel[F[_]: Concurrent] extends Process[F] { object Channel { + sealed class ChannelException(cause:Throwable) extends RuntimeException(cause) + case object ChannelInterruptedException extends ChannelException(null) + def apply[F[_]: Concurrent]: Channel[F] = new Channel() private case class Request[F[_]](e: Event, cb: Deferred[F, Try[Event]], receiver: ProcessRef) extends Event diff --git a/core/src/main/scala/io/parapet/core/Context.scala b/core/src/main/scala/io/parapet/core/Context.scala index 011a919..9041534 100644 --- a/core/src/main/scala/io/parapet/core/Context.scala +++ b/core/src/main/scala/io/parapet/core/Context.scala @@ -9,7 +9,7 @@ import io.parapet.core.Event.{Envelope, Start} import io.parapet.core.Queue.ChannelType import io.parapet.core.Scheduler.{Deliver, SubmissionResult, Task, TaskQueue} import io.parapet.core.exceptions.UnknownProcessException -import io.parapet.core.processes.SystemProcess +import io.parapet.core.processes.{BlackHole, SystemProcess} import java.util.UUID import scala.jdk.CollectionConverters._ @@ -31,14 +31,17 @@ class Context[F[_]: Concurrent: ContextShift](config: Parapet.ParConfig, val eve private var _scheduler: Scheduler[F] = _ def start(scheduler: Scheduler[F]): F[Unit] = - ct.delay{ + ct.delay { _scheduler = scheduler - } >> - ct.delay(new SystemProcess[F]()).flatMap { sysProcess => - ProcessState(sysProcess, config).flatMap { s => - ct.delay(processes.put(sysProcess.ref, s)) >> sendStartEvent(sysProcess.ref).void - } - } + } >> createSysProcesses >> sendStartEvent(ProcessRef.SystemRef).void + + private[core] def createSysProcesses: F[Unit] = { + for { + sysProcesses <- ct.delay(List(new SystemProcess[F](), new BlackHole[F])) + states <- sysProcesses.map(p => ProcessState(p, config)).sequence + _ <- ct.delay(states.foreach(s => processes.put(s.process.ref, s))) + } yield () + } def schedule(task: Task[F]): F[SubmissionResult] = _scheduler.submit(task) diff --git a/core/src/main/scala/io/parapet/core/Dsl.scala b/core/src/main/scala/io/parapet/core/Dsl.scala index 1e99073..eafd043 100644 --- a/core/src/main/scala/io/parapet/core/Dsl.scala +++ b/core/src/main/scala/io/parapet/core/Dsl.scala @@ -39,6 +39,8 @@ object Dsl { case class Blocking[F[_], C[_], A](body: () => Free[C, A]) extends FlowOp[F, Unit] + case class HandelError[F[_], C[_], A, AA >: A](body: () => Free[C, A], handle: Throwable => Free[C, AA]) extends FlowOp[F, AA] + /** Smart constructors for FlowOp[F, _]. * * @param I an injection from type constructor `F` into type constructor `C` @@ -292,6 +294,19 @@ object Dsl { */ def blocking[A](thunk: => Free[C, A]): Free[C, Unit] = Free.inject[FlowOp[F, *], C](Blocking(() => thunk)) + + /** + * Handle any error, potentially recovering from it, by mapping it to an Free[C, AA] value. + * + * @param thunk a flow to execute + * @param handle error handler + * @tparam A result + * @tparam AA contravariant to A + * @return result of thunk or handle in case of any error + */ + def handleError[A, AA >: A](thunk: => Free[C, A], handle: Throwable => Free[C, AA]): Free[C, AA] = { + Free.inject[FlowOp[F, *], C](HandelError(() => thunk, handle)) + } } object FlowOps { diff --git a/core/src/main/scala/io/parapet/core/DslInterpreter.scala b/core/src/main/scala/io/parapet/core/DslInterpreter.scala index 2e189c2..45c6004 100644 --- a/core/src/main/scala/io/parapet/core/DslInterpreter.scala +++ b/core/src/main/scala/io/parapet/core/DslInterpreter.scala @@ -5,29 +5,22 @@ import cats.effect.{Concurrent, Timer} import cats.implicits._ import cats.~> import io.parapet.core.Context.ProcessState -import io.parapet.core.Dsl.{ - Blocking, - Delay, - Dsl, - Eval, - FlowOp, - Fork, - Forward, - Par, - Race, - Register, - Send, - Suspend, - SuspendF, - UnitFlow, - WithSender, -} +import io.parapet.core.Dsl.{Blocking, Delay, Dsl, Eval, FlowOp, Fork, Forward, HandelError, Par, Race, Register, Send, Suspend, SuspendF, UnitFlow, WithSender} import io.parapet.core.Event.Envelope import io.parapet.core.Scheduler.{Deliver, ProcessQueueIsFull} object DslInterpreter { + // ugly but necessary for Channel + private var _instance: Any = _ + private[parapet] def instance[F[_]](i : Interpreter[F]):Unit = _instance = i + private[parapet] def instance[F[_]]: Interpreter[F] = { + _instance.asInstanceOf[Interpreter[F]] + } + trait Interpreter[F[_]] { + def interpret(sender: ProcessRef, target: ProcessRef): FlowOp[F, *] ~> F + def interpret(sender: ProcessRef, target: ProcessRef, execTrace: ExecutionTrace): FlowOp[F, *] ~> F def interpret(sender: ProcessRef, ps: ProcessState[F], execTrace: ExecutionTrace): FlowOp[F, *] ~> F } @@ -37,6 +30,14 @@ object DslInterpreter { private val ct = implicitly[Concurrent[F]] private val timer = implicitly[Timer[F]] + def interpret(sender: ProcessRef, target: ProcessRef): FlowOp[F, *] ~> F = { + interpret(sender, target, context.createTrace) + } + + def interpret(sender: ProcessRef, target: ProcessRef, execTrace: ExecutionTrace): FlowOp[F, *] ~> F = { + interpret(sender, context.getProcessState(target).get, execTrace) + } + def interpret(sender: ProcessRef, ps: ProcessState[F], execTrace: ExecutionTrace): FlowOp[F, *] ~> F = new (FlowOp[F, *] ~> F) { override def apply[A](fa: FlowOp[F, A]): F[A] = @@ -93,6 +94,10 @@ object DslInterpreter { case Register(parent, process: Process[F]) => context.registerAndStart(parent, process).void //-------------------------------------------------------------- + case he:HandelError[F, Dsl[F, *], A, A]=> + ct.handleErrorWith(he.body().foldMap[F](interpret(sender, ps, execTrace))){ + err => he.handle(err).foldMap[F](interpret(sender, ps, execTrace)) + } } } diff --git a/core/src/main/scala/io/parapet/core/Event.scala b/core/src/main/scala/io/parapet/core/Event.scala index b007e22..ff5dcbc 100644 --- a/core/src/main/scala/io/parapet/core/Event.scala +++ b/core/src/main/scala/io/parapet/core/Event.scala @@ -5,6 +5,16 @@ trait Event object Event { + // common events + + case class ByteEvent(data: Array[Byte]) extends Event { + override def toString: String = new String(data) + } + + case class StringEvent(value:String) extends Event { + override def toString: String = value + } + // Lifecycle events case object Start extends Event diff --git a/core/src/main/scala/io/parapet/core/EventLog.scala b/core/src/main/scala/io/parapet/core/EventLog.scala index 5a74735..fe4760d 100644 --- a/core/src/main/scala/io/parapet/core/EventLog.scala +++ b/core/src/main/scala/io/parapet/core/EventLog.scala @@ -36,7 +36,7 @@ class EventLog { var l = v if (l == null) { l = new ListBuffer[Node]() - l += PNode(ref.ref, ref.ref, start = true) + l += PNode(ref.value, ref.value, start = true) } l += n }, @@ -70,7 +70,7 @@ object EventLog { t.graph.asScala.foreach { case (p, nodes) => var x = 100 - val parent = p.ref + "-parent" + val parent = p.value + "-parent" data += Json.obj("data" -> Json.obj("id" -> parent)) data ++= nodes.map { n => diff --git a/core/src/main/scala/io/parapet/core/Parapet.scala b/core/src/main/scala/io/parapet/core/Parapet.scala index 820a561..a5be3b5 100644 --- a/core/src/main/scala/io/parapet/core/Parapet.scala +++ b/core/src/main/scala/io/parapet/core/Parapet.scala @@ -9,6 +9,7 @@ import shapeless.{Lens, lens} object Parapet extends StrictLogging { val ParapetPrefix = "parapet" + val Version = "0.0.1-RC5" case class ParConfig( processBufferSize: Int, diff --git a/core/src/main/scala/io/parapet/core/ProcessRef.scala b/core/src/main/scala/io/parapet/core/ProcessRef.scala index 2427bd2..2e70da3 100644 --- a/core/src/main/scala/io/parapet/core/ProcessRef.scala +++ b/core/src/main/scala/io/parapet/core/ProcessRef.scala @@ -4,9 +4,8 @@ import java.util.UUID import io.parapet.core.Parapet.ParapetPrefix -case class ProcessRef(private[core] val ref: String) { - override def toString: String = ref - +case class ProcessRef(private[core] val value: String) { + override def toString: String = value } object ProcessRef { @@ -14,6 +13,9 @@ object ProcessRef { val DeadLetterRef: ProcessRef = ProcessRef(ParapetPrefix + "-deadletter") val SchedulerRef: ProcessRef = ProcessRef(ParapetPrefix + "-scheduler") val UndefinedRef: ProcessRef = ProcessRef(ParapetPrefix + "-undefined") + val BlackHoleRef: ProcessRef = ProcessRef(ParapetPrefix + "-blackhole") + + def apply(): ProcessRef = jdkUUIDRef def jdkUUIDRef: ProcessRef = new ProcessRef(UUID.randomUUID().toString) } diff --git a/core/src/main/scala/io/parapet/core/Scheduler.scala b/core/src/main/scala/io/parapet/core/Scheduler.scala index c6b5eae..fcf73c1 100644 --- a/core/src/main/scala/io/parapet/core/Scheduler.scala +++ b/core/src/main/scala/io/parapet/core/Scheduler.scala @@ -49,33 +49,28 @@ object Scheduler { val default: SchedulerConfig = SchedulerConfig(numberOfWorkers = Runtime.getRuntime.availableProcessors()) } - // todo temporary solution for debugging - case class LoggerWrapper[F[_]: Concurrent](logger: Logger, devMode: Boolean) { + case class LoggerWrapper[F[_] : Concurrent](logger: Logger, devMode: Boolean) { private val ct = Concurrent[F] - private val stdio = devMode def debug(msg: => String): F[Unit] = - if (stdio) ct.delay(println(msg)) - else ct.delay(logger.debug(msg)) + if (devMode) ct.delay(logger.debug(msg)) + else ct.unit def error(msg: => String): F[Unit] = - if (stdio) ct.delay(println(msg)) - else ct.delay(logger.error(msg)) + if (devMode) ct.delay(logger.error(msg)) + else ct.unit def error(msg: => String, cause: Throwable): F[Unit] = - if (stdio) ct.delay { - println(msg) - cause.printStackTrace() - } - else ct.delay(logger.error(msg, cause)) + if (devMode) ct.delay(logger.error(msg, cause)) + else ct.unit def info(msg: => String): F[Unit] = - if (stdio) ct.delay(println(msg)) - else ct.delay(logger.info(msg)) + if (devMode) ct.delay(logger.info(msg)) + else ct.unit def warn(msg: => String): F[Unit] = - if (stdio) ct.delay(println(msg)) - else ct.delay(logger.warn(msg)) + if (devMode) ct.delay(logger.warn(msg)) + else ct.unit } // internal events diff --git a/core/src/main/scala/io/parapet/core/processes/BlackHole.scala b/core/src/main/scala/io/parapet/core/processes/BlackHole.scala new file mode 100644 index 0000000..f0bec45 --- /dev/null +++ b/core/src/main/scala/io/parapet/core/processes/BlackHole.scala @@ -0,0 +1,9 @@ +package io.parapet.core.processes + +import io.parapet.core.Process + +class BlackHole[F[_]] extends Process[F] { + override def handle: Receive = { case _ => + dsl.unit + } +} diff --git a/core/src/main/scala/io/parapet/core/processes/DeadLetterProcess.scala b/core/src/main/scala/io/parapet/core/processes/DeadLetterProcess.scala index 10d07fd..408eb49 100644 --- a/core/src/main/scala/io/parapet/core/processes/DeadLetterProcess.scala +++ b/core/src/main/scala/io/parapet/core/processes/DeadLetterProcess.scala @@ -9,7 +9,7 @@ import io.parapet.core.{Process, ProcessRef} import org.slf4j.LoggerFactory trait DeadLetterProcess[F[_]] extends Process[F] { - override val name: String = DeadLetterRef.ref + override val name: String = DeadLetterRef.value override final val ref: ProcessRef = DeadLetterRef } @@ -18,7 +18,7 @@ object DeadLetterProcess { class DeadLetterLoggingProcess[F[_]] extends DeadLetterProcess[F] { import dsl._ private val logger = Logger(LoggerFactory.getLogger(getClass.getCanonicalName)) - override val name: String = DeadLetterRef.ref + "-logging" + override val name: String = DeadLetterRef.value + "-logging" override val handle: Receive = { case DeadLetter(Envelope(sender, event, receiver), error) => val mdcFields: MDCFields = Map( "processRef" -> ref, diff --git a/core/src/main/scala/io/parapet/core/processes/RouletteLeaderElection.scala b/core/src/main/scala/io/parapet/core/processes/RouletteLeaderElection.scala index bf42a98..7cbbbfe 100644 --- a/core/src/main/scala/io/parapet/core/processes/RouletteLeaderElection.scala +++ b/core/src/main/scala/io/parapet/core/processes/RouletteLeaderElection.scala @@ -1,12 +1,13 @@ package io.parapet.core.processes import java.nio.ByteBuffer - import com.typesafe.scalalogging.Logger import io.parapet.core.Dsl.DslF import io.parapet.core.Event.Start import io.parapet.core.processes.RouletteLeaderElection.ResponseCodes.AckCode import io.parapet.core.processes.RouletteLeaderElection._ +import io.parapet.core.processes.net.AsyncServer.{Send => ServerSend} +import io.parapet.core.processes.net.AsyncClient.{Send => ClientSend} import io.parapet.core.utils.CorrelationId import io.parapet.core.{Clock, Encoder, Event, ProcessRef} import org.slf4j.MDC @@ -16,8 +17,8 @@ import scala.collection.mutable import scala.concurrent.duration._ import scala.util.Random -// Implementation of modified leader election algorithm https://arxiv.org/ftp/arxiv/papers/1703/1703.02247.pdf -class RouletteLeaderElection[F[_]](state: State) extends ProcessWithState[F, State](state) { +class RouletteLeaderElection[F[_]](state: State, sink: ProcessRef = ProcessRef.BlackHoleRef) + extends ProcessWithState[F, State](state) { import dsl._ @@ -56,18 +57,18 @@ class RouletteLeaderElection[F[_]](state: State) extends ProcessWithState[F, Sta val peer = state.peers.get(sender) val action = if (state.coordinator) { - Ack(state.addr, state.num, AckCode.COORDINATOR) ~> peer.netClient + Ack(state.addr, state.num, AckCode.COORDINATOR).toClient ~> peer.netClient } else if (state.voted) { - Ack(state.addr, state.num, AckCode.VOTED) ~> peer.netClient + Ack(state.addr, state.num, AckCode.VOTED).toClient ~> peer.netClient } else if (state.hasLeader) { - Ack(state.addr, state.num, AckCode.ELECTED) ~> peer.netClient + Ack(state.addr, state.num, AckCode.ELECTED).toClient ~> peer.netClient } else if (num > state.roundNum) { eval { state.voted = true state.roundNum = num - } ++ Ack(state.addr, state.num, AckCode.OK) ~> peer.netClient ++ waitForHeartbeat + } ++ Ack(state.addr, state.num, AckCode.OK).toClient ~> peer.netClient ++ waitForHeartbeat } else { - Ack(state.addr, state.num, AckCode.HIGH) ~> peer.netClient + Ack(state.addr, state.num, AckCode.HIGH).toClient ~> peer.netClient } log(s"received Propose($sender, $num)") ++ action @@ -85,9 +86,9 @@ class RouletteLeaderElection[F[_]](state: State) extends ProcessWithState[F, Sta (if (!state.coordinator && receivedMajorityOfVotes) { for { _ <- eval(state.coordinator = true) - leaderAddr <- eval(state.roulette(state.peers.alive.map(_.addr))) + leaderAddr <- eval(state.roulette(state.peers.alive.map(_.address))) _ <- log("became coordinator") - _ <- Announce(state.addr) ~> state.peers.get(leaderAddr).netClient + _ <- Announce(state.addr).toClient ~> state.peers.get(leaderAddr).netClient _ <- log(s"send announce to $leaderAddr") _ <- waitForHeartbeat } yield () @@ -167,6 +168,38 @@ class RouletteLeaderElection[F[_]](state: State) extends ProcessWithState[F, Sta } } } + + // -----------------------WHO------------------------------- // + case Who(clientId) => + withSender(sender => ServerSend(clientId, encoder.write(WhoRep(state.addr, state.leader.contains(state.addr)))) ~> sender) + + case IsLeader => + withSender(sender => IsLeaderRep(state.leader.contains(state.addr)) ~> sender) + + // -------------------- BROADCAST ----------------------------// + case Broadcast(data) => + implicit val correlationId: CorrelationId = CorrelationId() + val msg = ClientSend(prependTag(REQ_TAG, data)) + log("received broadcast") ++ + state.peers.netClients.foldLeft(unit)((acc, client) => acc ++ msg ~> client) ++ + withSender(sender => BroadcastResult(state.peers.size / 2) ~> sender) + + case BroadcastResult(res) => + implicit val correlationId: CorrelationId = CorrelationId() + log(s"received broadcast result: $res") + + // -----------------------REQ------------------------------- // + case req: Req => req ~> sink + + // ----------------------- REP -------------------------------// + // Rep is sent by sink process in event of Req + case Rep(clientId, data) => + implicit val correlationId: CorrelationId = CorrelationId() + log(s"received Rep from clientId: $clientId") ++ + ClientSend(prependTag(REQ_TAG, data)) ~> state.peers.getById(clientId).netClient + + // -------------------- SERVER SEND -------------------------// + case send: ServerSend => send ~> state.netServer } // -----------------------HELPERS------------------------------- // @@ -175,7 +208,7 @@ class RouletteLeaderElection[F[_]](state: State) extends ProcessWithState[F, Sta implicit val correlationId: CorrelationId = CorrelationId() state.peers.all .map { case (addr, peer) => - log(s"send propose to '$addr'") ++ Propose(state.addr, state.num) ~> peer.netClient + log(s"send propose to '$addr'") ++ Propose(state.addr, state.num).toClient ~> peer.netClient } .fold(unit)(_ ++ _) } @@ -194,7 +227,7 @@ class RouletteLeaderElection[F[_]](state: State) extends ProcessWithState[F, Sta implicit val correlationId: CorrelationId = CorrelationId() state.peers.all .map { case (addr, peer) => - log(s"send heartbeat to $addr") ++ Heartbeat(state.addr, state.leader) ~> peer.netClient + log(s"send heartbeat to $addr") ++ Heartbeat(state.addr, state.leader).toClient ~> peer.netClient } .fold(unit)(_ ++ _) } @@ -221,8 +254,9 @@ class RouletteLeaderElection[F[_]](state: State) extends ProcessWithState[F, Sta private[core] def monitorClusterLoopAsync: DslF[F, Unit] = fork(monitorClusterLoop) private[core] def monitorClusterLoop: DslF[F, Unit] = { + val step0 = handleError(monitorCluster, err => eval(logger.error("cluster monitor has failed", err))) def step: DslF[F, Unit] = flow { - monitorCluster ++ delay(state.delays.monitor) ++ step + step0 ++ delay(state.delays.monitor) ++ step } step @@ -320,16 +354,17 @@ object RouletteLeaderElection { ) class State( - val ref: ProcessRef, - val addr: Addr, - val peers: Peers, - val random: RandomNum = RandomNumGen, - val roulette: Vector[Addr] => Addr = Roulette, - val timeouts: Timeouts = Timeouts(), - val delays: Delays = Delays(), - val rndNumMinRounds: Int = GenNumAttempts, - val threshold: Double = GenNumThreshold, - ) { + val ref: ProcessRef, + val addr: Addr, + val netServer: ProcessRef, + val peers: Peers, + val random: RandomNum = RandomNumGen, + val roulette: Vector[Addr] => Addr = Roulette, + val timeouts: Timeouts = Timeouts(), + val delays: Delays = Delays(), + val rndNumMinRounds: Int = GenNumAttempts, + val threshold: Double = GenNumThreshold, + ) { var num: Double = 0.0 var roundNum: Double = 0.0 var votes = 0 @@ -423,6 +458,26 @@ object RouletteLeaderElection { case class Announce(addr: Addr) extends API case class Heartbeat(addr: Addr, leader: Option[Addr]) extends API case class Timeout(phase: Phase) extends API + case class Who(clientId: String) extends API + case class WhoRep(address: String, leader: Boolean) extends API + // REQ payload format + // 4 bytes = client_id length + // client_id bytes + // 4 bytes - command + // remaining bytes - body + case class Req(clientId: String, data: Array[Byte]) extends API + case class Rep(clientId: String, data: Array[Byte]) extends API + // sends data to all service in the cluster + case class Broadcast(data: Array[Byte]) extends API + case class BroadcastResult(majorityCount: Int) extends API + + // internal API + case object IsLeader extends API + case class IsLeaderRep(leader: Boolean) extends API + + implicit class ApiOps(e:API) { + def toClient: Event = ClientSend(encoder.write(e)) + } object ResponseCodes { @@ -445,87 +500,121 @@ object RouletteLeaderElection { } } } - // @formatter:on + // Protocol format [tag: int32, body: byte[]] // Tags - val PROPOSE_TAG = 1 - val ACK_TAG = 2 - val ANNOUNCE_TAG = 3 - val HEARTBEAT_TAG = 4 + val PROPOSE_TAG = 11 + val ACK_TAG = 12 + val ANNOUNCE_TAG = 13 + val HEARTBEAT_TAG = 14 + val WHO_TAG = 15 + val REQ_TAG = 16 + val BROADCAST_RESULT_TAG = 17 + val WHO_REP_TAG = 18 + + // @formatter:on val encoder: Encoder = new Encoder { override def write(e: Event): Array[Byte] = e match { case Propose(addr, num) => - val refData = addr.getBytes() + val addrBytes = addr.getBytes() ByteBuffer - .allocate(4 + 4 + refData.length + 8) + .allocate(4 + 4 + addrBytes.length + 8) .putInt(PROPOSE_TAG) - .putInt(refData.length) - .put(refData) + .putInt(addrBytes.length) + .put(addrBytes) .putDouble(num) .array() case Ack(addr, num, code) => - val refData = addr.getBytes() + val addrBytes = addr.getBytes() ByteBuffer - .allocate(4 + 4 + refData.length + 8 + 4) + .allocate(4 + 4 + addrBytes.length + 8 + 4) .putInt(ACK_TAG) - .putInt(refData.length) - .put(refData) + .putInt(addrBytes.length) + .put(addrBytes) .putDouble(num) .putInt(code.value) .array() case Announce(addr) => - val refData = addr.getBytes() + val addrBytes = addr.getBytes() ByteBuffer - .allocate(4 + 4 + refData.length) + .allocate(4 + 4 + addrBytes.length) .putInt(ANNOUNCE_TAG) - .putInt(refData.length) - .put(refData) + .putInt(addrBytes.length) + .put(addrBytes) .array() case Heartbeat(addr, leader) => - val addrData = addr.getBytes() - val leaderAddrData = leader.getOrElse("").getBytes() + val addrBytes = addr.getBytes() + val leaderAddrBytes = leader.getOrElse("").getBytes() ByteBuffer - .allocate(4 + (4 + addrData.length) + (4 + leaderAddrData.length)) + .allocate(4 + (4 + addrBytes.length) + (4 + leaderAddrBytes.length)) .putInt(HEARTBEAT_TAG) - .putInt(addrData.length) - .put(addrData) - .putInt(leaderAddrData.length) - .put(leaderAddrData) + .putInt(addrBytes.length) + .put(addrBytes) + .putInt(leaderAddrBytes.length) + .put(leaderAddrBytes) .array() + case BroadcastResult(resCode) => + ByteBuffer + .allocate(8) + .putInt(BROADCAST_RESULT_TAG) + .putInt(resCode) + .array() + case WhoRep(address, leader) => + val addressBytes = address.getBytes() + val buf = ByteBuffer.allocate(4 + 4 + addressBytes.length + 2) + buf.putInt(WHO_REP_TAG) + buf.putInt(addressBytes.length) + buf.put(addressBytes) + buf.putShort(if (leader) 1 else 0) + buf.array() } override def read(data: Array[Byte]): Event = { val buf = ByteBuffer.wrap(data) + val clientId = getString(buf) val tag = buf.getInt() tag match { case PROPOSE_TAG => - val refData = new Array[Byte](buf.getInt()) - buf.get(refData) + val addrBytes = new Array[Byte](buf.getInt()) + buf.get(addrBytes) val num = buf.getDouble - Propose(new String(refData), num) - + Propose(new String(addrBytes), num) case ACK_TAG => - val refData = new Array[Byte](buf.getInt()) - buf.get(refData) + val addrBytes = new Array[Byte](buf.getInt()) + buf.get(addrBytes) val num = buf.getDouble val code = buf.getInt - Ack(new String(refData), num, AckCode(code)) + Ack(new String(addrBytes), num, AckCode(code)) case ANNOUNCE_TAG => - val refData = new Array[Byte](buf.getInt()) - buf.get(refData) - Announce(new String(refData)) + val addrBytes = new Array[Byte](buf.getInt()) + buf.get(addrBytes) + Announce(new String(addrBytes)) case HEARTBEAT_TAG => - val addrData = new Array[Byte](buf.getInt()) - buf.get(addrData) - val leaderAddrData = new Array[Byte](buf.getInt()) - buf.get(leaderAddrData) - Heartbeat(new String(addrData), Option(new String(leaderAddrData)).filter(_.nonEmpty)) - + val addrBytes = new Array[Byte](buf.getInt()) + buf.get(addrBytes) + val leaderAddrBytes = new Array[Byte](buf.getInt()) + buf.get(leaderAddrBytes) + Heartbeat(new String(addrBytes), Option(new String(leaderAddrBytes)).filter(_.nonEmpty)) + case WHO_TAG => Who(clientId) + case WHO_REP_TAG => WhoRep(getString(buf), shortToBool(buf.getShort())) + case REQ_TAG => + val data = new Array[Byte](buf.remaining()) + buf.get(data) + Req(clientId, data) + case BROADCAST_RESULT_TAG => + BroadcastResult(buf.getInt()) } } + + private def getString(buf: ByteBuffer): String = { + val len = buf.getInt() + val data = new Array[Byte](len) + buf.get(data) + new String(data) + } } def boolToShort(b: Boolean): Short = @@ -534,11 +623,19 @@ object RouletteLeaderElection { def shortToBool(s: Short): Boolean = s == 1 + private def prependTag(tag: Int, data: Array[Byte]): Array[Byte] = { + ByteBuffer.allocate(4 + data.length) + .putInt(tag) + .put(data) + .array() + } + class Peer( - val addr: Addr, - val netClient: NetClient, - val timeoutMs: Long, - val clock: Clock, + val id: String, + val address: Addr, + val netClient: NetClient, + val timeoutMs: Long, + val clock: Clock, ) { private var lastPingAt: Long = 0 @@ -554,15 +651,22 @@ object RouletteLeaderElection { lastPingAt >= cur - timeoutMs override def toString: String = - s"addr=$addr, netClientRef=$netClient" + s"addr=$address, netClientRef=$netClient" } case class Peers(peers: Vector[Peer]) { - private val map: Map[Addr, Peer] = peers.map(p => p.addr -> p).toMap + private val map: Map[Addr, Peer] = peers.map(p => p.address -> p).toMap + private val idMap: Map[Addr, Peer] = peers.map(p => p.id -> p).toMap val netClients: Seq[NetClient] = peers.map(_.netClient) def all: Map[Addr, Peer] = map + @throws[IllegalStateException] + def getById(id: Addr): Peer = idMap.get(id) match { + case Some(value) => value + case None => throw new IllegalStateException(s"peer with id=$id doesn't exist") + } + @throws[IllegalStateException] def get(addr: Addr): Peer = map.get(addr) match { case Some(value) => value @@ -583,16 +687,61 @@ object RouletteLeaderElection { def info: String = map .map { case (_, v) => val status = if (v.isAlive) "alive" else "unavailable" - s"{peer=${v.addr}, status=$status}" + s"{peer=${v.address}, status=$status}" } .mkString("; ") } object Peers { - def apply(peers: Map[Addr, NetClient], timeoutMs: Long = 10000, clock: Clock = Clock()): Peers = - new Peers(peers.map { case (peerAddr, netClient) => - new Peer(peerAddr, netClient, timeoutMs, clock) - }.toVector) + + def builder: Builder = new Builder() + + class Builder { + private var _id: String = _ + private var _address: String = _ + private var _netClient: NetClient = _ + private var _timeoutMs = 10000L + private var _clock: Clock = Clock() + + def id(id: String): Builder = { + _id = id + this + } + + def address(address: String): Builder = { + _address = address + this + } + + def netClient(netClient: NetClient): Builder = { + _netClient = netClient + this + } + + def timeoutMs(timeoutMs: FiniteDuration): Builder = { + _timeoutMs = timeoutMs.toMillis + this + } + + def clock(clock: Clock): Builder = { + _clock = clock + this + } + + def build: Peer = { + require(Option(_id).exists(_.nonEmpty), "id cannot be null or empty") + require(Option(_address).exists(_.nonEmpty), "address cannot be null or empty") + require(Option(_netClient).isDefined, "netClient cannot be null") + require(Option(_timeoutMs).exists(_ > 0), "timeout must be > 0") + require(Option(_clock).isDefined, "clock cannot be null") + new Peer(id = _id, + address = _address, + netClient = _netClient, + timeoutMs = _timeoutMs, + clock = _clock) + } + } + } } diff --git a/core/src/main/scala/io/parapet/core/processes/SystemProcess.scala b/core/src/main/scala/io/parapet/core/processes/SystemProcess.scala index 5b9b33f..bbf9f18 100644 --- a/core/src/main/scala/io/parapet/core/processes/SystemProcess.scala +++ b/core/src/main/scala/io/parapet/core/processes/SystemProcess.scala @@ -6,7 +6,7 @@ import io.parapet.core.{Process, ProcessRef} class SystemProcess[F[_]] extends Process[F] { - override val name: String = SystemRef.ref + override val name: String = SystemRef.value override val ref: ProcessRef = SystemRef override val handle: Receive = { case f: Failure => dsl.send(DeadLetter(f), DeadLetterRef) diff --git a/core/src/main/scala/io/parapet/core/processes/net/AsyncClient.scala b/core/src/main/scala/io/parapet/core/processes/net/AsyncClient.scala index c8ed7ad..ad1aca4 100644 --- a/core/src/main/scala/io/parapet/core/processes/net/AsyncClient.scala +++ b/core/src/main/scala/io/parapet/core/processes/net/AsyncClient.scala @@ -1,27 +1,29 @@ package io.parapet.core.processes.net -import java.util.UUID - +import cats.implicits.toFunctorOps import io.parapet.core.Event.{Start, Stop} -import io.parapet.core.{Encoder, ProcessRef} +import io.parapet.core.processes.net.AsyncClient.Send +import io.parapet.core.{Encoder, Event, ProcessRef} +import org.slf4j.LoggerFactory import org.zeromq.{SocketType, ZContext, ZMQ} -class AsyncClient[F[_]](override val ref: ProcessRef, address: String, encoder: Encoder) +class AsyncClient[F[_]](override val ref: ProcessRef, clientId: String, address: String) extends io.parapet.core.Process[F] { import dsl._ - private val clientId = UUID.randomUUID().toString - private lazy val zmqContext = new ZContext(1) private lazy val client = zmqContext.createSocket(SocketType.DEALER) + private val logger = LoggerFactory.getLogger(ref.value) + + private val info: String = s"client[ref=$ref, id=$clientId, address=$address]" override def handle: Receive = { case Start => eval { client.setIdentity(clientId.getBytes(ZMQ.CHARSET)) client.connect(address) - println(s"client[ref=$ref, id=$clientId] connected") + logger.debug(s"client[id=$clientId] connected to $address") } case Stop => @@ -30,15 +32,17 @@ class AsyncClient[F[_]](override val ref: ProcessRef, address: String, encoder: zmqContext.close() } - case e => - eval { - val data = encoder.write(e) - client.send(data, 0) - } + case Send(data) => + eval(client.send(data, 0)).void + .handleError(err => eval(logger.error(s"$info failed to send message", err))) } } object AsyncClient { - def apply[F[_]](ref: ProcessRef, address: String, encoder: Encoder): AsyncClient[F] = - new AsyncClient(ref, address, encoder) + + sealed trait API extends Event + case class Send(data: Array[Byte]) extends API + + def apply[F[_]](ref: ProcessRef, clientId: String, address: String, encoder: Encoder): AsyncClient[F] = + new AsyncClient(ref, clientId, address) } diff --git a/core/src/main/scala/io/parapet/core/processes/net/AsyncServer.scala b/core/src/main/scala/io/parapet/core/processes/net/AsyncServer.scala index d143303..f1a012a 100644 --- a/core/src/main/scala/io/parapet/core/processes/net/AsyncServer.scala +++ b/core/src/main/scala/io/parapet/core/processes/net/AsyncServer.scala @@ -2,25 +2,51 @@ package io.parapet.core.processes.net import io.parapet.core.Dsl.DslF import io.parapet.core.Event.{Start, Stop} -import io.parapet.core.{Encoder, ProcessRef} -import org.zeromq.{SocketType, ZContext, ZMQException} +import io.parapet.core.{Encoder, Event, ProcessRef} +import org.slf4j.LoggerFactory +import org.zeromq.{SocketType, ZContext, ZMQException, ZMsg} +import zmq.ZError + +import java.nio.ByteBuffer class AsyncServer[F[_]](override val ref: ProcessRef, address: String, sink: ProcessRef, encoder: Encoder) extends io.parapet.core.Process[F] { + import AsyncServer._ + import dsl._ private lazy val zmqContext = new ZContext(1) private lazy val server = zmqContext.createSocket(SocketType.ROUTER) + private val logger = LoggerFactory.getLogger(ref.value) + + private val step0 = eval { + val clientId = server.recvStr() + val clientIdBytes = clientId.getBytes() + val msgBytes = server.recv() + val size = 4 + clientIdBytes.length + msgBytes.length + val buf = ByteBuffer.allocate(size) + buf.putInt(clientIdBytes.length) + buf.put(clientIdBytes) + buf.put(msgBytes) + val data = new Array[Byte](size) + buf.rewind() + buf.get(data) + val msg = encoder.read(data) + logger.debug(s"received message = $msg from client: $clientId") + msg + }.flatMap(e => e ~> sink) + + def step: DslF[F, Unit] = { + step0.handleError { + case err: org.zeromq.ZMQException if err.getErrorCode == ZError.ETERM => + eval(logger.error("zmq context has been terminated", err)) ++ eval(throw err) + case err => eval(logger.error("net server failed to process msg", err)) + } + } private def loop: DslF[F, Unit] = flow { - eval { - val clientId = server.recvStr() - val data = server.recv() - val msg = encoder.read(data) - println(s"AsyncServer($ref): received message = $msg from client: $clientId") - msg - }.flatMap(e => e ~> sink) ++ loop + step ++ loop } override def handle: Receive = { @@ -28,15 +54,26 @@ class AsyncServer[F[_]](override val ref: ProcessRef, address: String, sink: Pro eval { try { server.bind(address) - println(s"$ref server started on $address") + logger.debug(s"$ref server started on $address") } catch { - case e: ZMQException => - if (e.getErrorCode == 48) { - println(s"address: '$address' in use") + case e: Exception => + e match { + case zmqError: ZMQException if zmqError.getErrorCode == 48 => logger.error(s"address: '$address' in use") + case _ => () } + throw e } - } ++ loop + } ++ fork(loop) + + case Send(clientId, data) => + eval { + logger.debug(s"send message to $clientId") + val msg = new ZMsg() + msg.add(clientId) + msg.add(data) + msg.send(server) + } case Stop => eval { @@ -48,6 +85,11 @@ class AsyncServer[F[_]](override val ref: ProcessRef, address: String, sink: Pro } object AsyncServer { + + // API + sealed trait Api extends Event + case class Send(clientId: String, data: Array[Byte]) extends Api + def apply[F[_]](ref: ProcessRef, address: String, sink: ProcessRef, encoder: Encoder): AsyncServer[F] = new AsyncServer(ref, address, sink, encoder) } diff --git a/core/src/main/scala/io/parapet/syntax/FlowSyntax.scala b/core/src/main/scala/io/parapet/syntax/FlowSyntax.scala index 3830181..c4e8bff 100644 --- a/core/src/main/scala/io/parapet/syntax/FlowSyntax.scala +++ b/core/src/main/scala/io/parapet/syntax/FlowSyntax.scala @@ -1,12 +1,15 @@ package io.parapet.syntax -import io.parapet.core.Dsl.DslF +import cats.free.Free +import io.parapet.core.Dsl.{DslF, FlowOp, WithDsl} -trait FlowSyntax[F[_]] extends EventSyntax[F] { +trait FlowSyntax[F[_]] extends EventSyntax[F] with WithDsl[F] { implicit class FreeOps[A](fa: DslF[F, A]) { // alias for Free flatMap def ++[B](fb: => DslF[F, B]): DslF[F, B] = fa.flatMap(_ => fb) + + def handleError[AA >: A](f: Throwable => Free[FlowOp[F, *], AA]): Free[FlowOp[F, *], AA] = dsl.handleError(fa, f) } } diff --git a/core/src/test/scala/io/parapet/core/LockSpec.scala b/core/src/test/scala/io/parapet/core/LockSpec.scala index ec611ca..24871aa 100644 --- a/core/src/test/scala/io/parapet/core/LockSpec.scala +++ b/core/src/test/scala/io/parapet/core/LockSpec.scala @@ -1,12 +1,12 @@ package io.parapet.core import cats.effect.{ContextShift, IO, Timer} -import org.scalatest.FlatSpec -import org.scalatest.Matchers._ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers._ import scala.concurrent.ExecutionContext.Implicits.global -class LockSpec extends FlatSpec { +class LockSpec extends AnyFlatSpec { private implicit val ctx: ContextShift[IO] = IO.contextShift(global) diff --git a/core/src/test/scala/io/parapet/core/TestUtils.scala b/core/src/test/scala/io/parapet/core/TestUtils.scala index 9714746..f822807 100644 --- a/core/src/test/scala/io/parapet/core/TestUtils.scala +++ b/core/src/test/scala/io/parapet/core/TestUtils.scala @@ -24,20 +24,20 @@ object TestUtils { } } - class IdInterpreter(val execution: Execution) extends (FlowOp[Id, *] ~> Id) { + class IdInterpreter(val execution: Execution, mapper: Event => Event) extends (FlowOp[Id, *] ~> Id) { override def apply[A](fa: FlowOp[Id, A]): Id[A] = { fa match { case _: UnitFlow[Id]@unchecked => () - case eval: Eval[Id, Dsl[Id, ?], A]@unchecked => + case eval: Eval[Id, Dsl[Id, *], A]@unchecked => implicitly[Monad[Id]].pure(eval.thunk()) case send: Send[Id]@unchecked => - val event = send.e() + val event = mapper(send.e()) execution.trace.append(Message(event, send.receiver)) send.receivers.foreach(p => { execution.trace.append(Message(event, p)) }) - case fork: Fork[Id, Dsl[Id, *]] => fork.flow.foldMap(new IdInterpreter(execution)) + case fork: Fork[Id, Dsl[Id, *]] => fork.flow.foldMap(new IdInterpreter(execution, mapper)) case _: Delay[Id] => () case _: SuspendF[Id, Dsl[Id, *], A] => ().asInstanceOf[A] // s.thunk().foldMap(new IdInterpreter(execution)) } @@ -45,7 +45,8 @@ object TestUtils { } object IdInterpreter { - def apply(execution: Execution): IdInterpreter = new IdInterpreter(execution) + def apply(execution: Execution, mapper: Event => Event = e => e): IdInterpreter = + new IdInterpreter(execution, mapper) } } diff --git a/core/src/test/scala/io/parapet/core/processes/RouletteLeaderElectionProtocolSpec.scala b/core/src/test/scala/io/parapet/core/processes/RouletteLeaderElectionProtocolSpec.scala index bb09a70..5a66490 100644 --- a/core/src/test/scala/io/parapet/core/processes/RouletteLeaderElectionProtocolSpec.scala +++ b/core/src/test/scala/io/parapet/core/processes/RouletteLeaderElectionProtocolSpec.scala @@ -2,28 +2,38 @@ package io.parapet.core.processes import io.parapet.core.processes.RouletteLeaderElection.ResponseCodes.AckCode import io.parapet.core.processes.RouletteLeaderElection.{Ack, Announce, Heartbeat, Propose} -import org.scalatest.FunSuite -import org.scalatest.Matchers._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ -class RouletteLeaderElectionProtocolSpec extends FunSuite { +import java.nio.ByteBuffer + +class RouletteLeaderElectionProtocolSpec extends AnyFunSuite { test("propose") { - val data = RouletteLeaderElection.encoder.write(Propose("Propose", 0.85)) + val data = addClientId(RouletteLeaderElection.encoder.write(Propose("Propose", 0.85))) RouletteLeaderElection.encoder.read(data) shouldBe Propose("Propose", 0.85) } test("ack") { - val data = RouletteLeaderElection.encoder.write(Ack("Ack", 0.86, AckCode.OK)) + val data = addClientId(RouletteLeaderElection.encoder.write(Ack("Ack", 0.86, AckCode.OK))) RouletteLeaderElection.encoder.read(data) shouldBe Ack("Ack", 0.86, AckCode.OK) } test("announce") { - val data = RouletteLeaderElection.encoder.write(Announce("Announce")) + val data = addClientId(RouletteLeaderElection.encoder.write(Announce("Announce"))) RouletteLeaderElection.encoder.read(data) shouldBe Announce("Announce") } test("heartbeat") { - val data = RouletteLeaderElection.encoder.write(Heartbeat("Heartbeat", Option.empty)) + val data = addClientId(RouletteLeaderElection.encoder.write(Heartbeat("Heartbeat", Option.empty))) RouletteLeaderElection.encoder.read(data) shouldBe Heartbeat("Heartbeat", Option.empty) } + + private def addClientId(source: Array[Byte]): Array[Byte] = { + val buf = ByteBuffer.allocate(4 + source.length) + buf.putInt(0) + buf.put(source) + buf.rewind() + buf.array() + } } diff --git a/core/src/test/scala/io/parapet/core/processes/RouletteLeaderElectionSpec.scala b/core/src/test/scala/io/parapet/core/processes/RouletteLeaderElectionSpec.scala index c0d7037..fdfccbf 100644 --- a/core/src/test/scala/io/parapet/core/processes/RouletteLeaderElectionSpec.scala +++ b/core/src/test/scala/io/parapet/core/processes/RouletteLeaderElectionSpec.scala @@ -2,18 +2,21 @@ package io.parapet.core.processes import cats.Id import io.parapet.core -import io.parapet.core.ProcessRef +import io.parapet.core.{Clock, Event, ProcessRef} import io.parapet.core.TestUtils._ import io.parapet.core.processes.RouletteLeaderElection.ResponseCodes.AckCode import io.parapet.core.processes.RouletteLeaderElection.ResponseCodes.AckCode.ELECTED import io.parapet.core.processes.RouletteLeaderElection._ import io.parapet.core.processes.RouletteLeaderElectionSpec._ -import org.scalatest.Matchers._ -import org.scalatest.{FunSuite, Tag} +import io.parapet.core.processes.net.AsyncClient +import org.scalatest.Tag +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ +import java.nio.ByteBuffer import scala.concurrent.duration._ -class RouletteLeaderElectionSpec extends FunSuite { +class RouletteLeaderElectionSpec extends AnyFunSuite { test("a node satisfying the launch condition", Lemma1) { @@ -22,14 +25,15 @@ class RouletteLeaderElectionSpec extends FunSuite { val p2Addr = "p1:6666" val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") + val server = ProcessRef("server") - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2)), _ => VoteNum(0.86, 0.86), threshold = 0.85) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2)), _ => VoteNum(0.86, 0.86), threshold = 0.85) val execution = new Execution() val le = new RouletteLeaderElection[Id](state) updatePeers(state) // when - le(Begin).foldMap(IdInterpreter(execution)) + le(Begin).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -46,15 +50,16 @@ class RouletteLeaderElectionSpec extends FunSuite { val p2Addr = "p1:6666" val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") + val server = ProcessRef("server") - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2))) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2))) state.num = 0.6 updatePeers(state) val execution = new Execution() val le = new RouletteLeaderElection[Id](state) // when - le(Propose(p2Addr, 0.86)).foldMap(IdInterpreter(execution)) + le(Propose(p2Addr, 0.86)).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -75,8 +80,9 @@ class RouletteLeaderElectionSpec extends FunSuite { val p2Addr = "p2:6666" val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") + val server = ProcessRef("server") - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2))) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2))) state.num = 0.86 state.roundNum = 0.86 updatePeers(state) @@ -84,7 +90,7 @@ class RouletteLeaderElectionSpec extends FunSuite { val execution = new Execution() // when - le(Propose(p2Addr, 0.6)).foldMap(IdInterpreter(execution)) + le(Propose(p2Addr, 0.6)).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -103,16 +109,18 @@ class RouletteLeaderElectionSpec extends FunSuite { val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") val p3 = ProcessRef("p3") + val server = ProcessRef("server") val execution = new Execution() - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2, p3Addr -> p3)), _ => VoteNum(0.86, 0.86), threshold = 0.85, roulette = _ => p3Addr) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2, p3Addr -> p3)), + _ => VoteNum(0.86, 0.86), threshold = 0.85, roulette = _ => p3Addr) updatePeers(state) val le = new RouletteLeaderElection[Id](state) // when - le(Begin).foldMap(IdInterpreter(execution)) - le(Ack(p2Addr, 0.5, AckCode.OK)).foldMap(IdInterpreter(execution)) - le(Ack(p3Addr, 0.3, AckCode.OK)).foldMap(IdInterpreter(execution)) + le(Begin).foldMap(IdInterpreter(execution, eventMapper)) + le(Ack(p2Addr, 0.5, AckCode.OK)).foldMap(IdInterpreter(execution, eventMapper)) + le(Ack(p3Addr, 0.3, AckCode.OK)).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -136,7 +144,8 @@ class RouletteLeaderElectionSpec extends FunSuite { val p2Addr = "p2:6666" val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2))) + val server = ProcessRef("server") + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2))) state.num = 0.86 state.roundNum = 0.86 state.coordinator = true @@ -145,7 +154,7 @@ class RouletteLeaderElectionSpec extends FunSuite { val execution = new Execution() // when - le(Propose(p2Addr, 0.87)).foldMap(IdInterpreter(execution)) + le(Propose(p2Addr, 0.87)).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -163,16 +172,17 @@ class RouletteLeaderElectionSpec extends FunSuite { val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") val p3 = ProcessRef("p3") + val server = ProcessRef("server") - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2, p3Addr -> p3))) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2, p3Addr -> p3))) state.num = 0.1 val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when - le(Propose(p2Addr, 0.87)).foldMap(IdInterpreter(execution)) - le(Propose(p3Addr, 0.88)).foldMap(IdInterpreter(execution)) + le(Propose(p2Addr, 0.87)).foldMap(IdInterpreter(execution, eventMapper)) + le(Propose(p3Addr, 0.88)).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -193,16 +203,17 @@ class RouletteLeaderElectionSpec extends FunSuite { val p2Addr = "p2:6666" val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") + val server = ProcessRef("server") - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2))) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2))) state.num = 0.6 val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when - le(Propose(p2Addr, 0.87)).foldMap(IdInterpreter(execution)) - le(Timeout(Leader)).foldMap(IdInterpreter(execution)) + le(Propose(p2Addr, 0.87)).foldMap(IdInterpreter(execution, eventMapper)) + le(Timeout(Leader)).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -223,16 +234,18 @@ class RouletteLeaderElectionSpec extends FunSuite { val p2Addr = "p2:6666" val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") + val server = ProcessRef("server") - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2)), random = _ => VoteNum(0.86, 0.86), threshold = 0.85) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2)), + random = _ => VoteNum(0.86, 0.86), threshold = 0.85) updatePeers(state) val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when - le(Begin).foldMap(IdInterpreter(execution)) - le(Timeout(Coordinator)).foldMap(IdInterpreter(execution)) + le(Begin).foldMap(IdInterpreter(execution, eventMapper)) + le(Timeout(Coordinator)).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -254,16 +267,17 @@ class RouletteLeaderElectionSpec extends FunSuite { val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") + val server = ProcessRef("server") - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2))) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2))) updatePeers(state) val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when - le(Announce(p2Addr)).foldMap(IdInterpreter(execution)) - le.sendHeartbeat.foldMap(IdInterpreter(execution)) + le(Announce(p2Addr)).foldMap(IdInterpreter(execution, eventMapper)) + le.sendHeartbeat.foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -280,30 +294,31 @@ class RouletteLeaderElectionSpec extends FunSuite { val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") val p3 = ProcessRef("p3") + val server = ProcessRef("server") - val peerHeartbeatTimeout = 1.second.toMillis + val peerHeartbeatTimeout = 1.second val clock = new core.Clock.Mock(1.second) - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2, p3Addr -> p3), peerHeartbeatTimeout, clock)) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2, p3Addr -> p3), peerHeartbeatTimeout, clock)) // todo separate test case - state.peers.alive.map(p => p.addr -> p.netClient).toMap shouldBe Map(p2Addr -> p2, p3Addr -> p3) + state.peers.alive.map(p => p.address -> p.netClient).toMap shouldBe Map(p2Addr -> p2, p3Addr -> p3) val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when - le(Heartbeat(p2Addr, Option(p2Addr))).foldMap(IdInterpreter(execution)) + le(Heartbeat(p2Addr, Option(p2Addr))).foldMap(IdInterpreter(execution, eventMapper)) state.leader shouldBe Some(p2Addr) // todo separate test case state.hasLeader shouldBe true // todo separate test case clock.tick(2.second) state.leader shouldBe Some(p2Addr) state.hasLeader shouldBe false state.peers.get(p3Addr).update(clock.currentTimeMillis) - le.monitorCluster.foldMap(IdInterpreter(execution)) + le.monitorCluster.foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() - state.peers.alive.map(p => p.addr -> p.netClient).toMap shouldBe Map(p3Addr -> p3) + state.peers.alive.map(p => p.address -> p.netClient).toMap shouldBe Map(p3Addr -> p3) state.leader shouldBe Option.empty execution.trace shouldBe Seq( Message(Begin, p1) @@ -318,25 +333,26 @@ class RouletteLeaderElectionSpec extends FunSuite { val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") val p3 = ProcessRef("p3") + val server = ProcessRef("server") - val peerHeartbeatTimeout = 1.second.toMillis + val peerHeartbeatTimeout = 1.second val clock = new core.Clock.Mock(1.second) - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2, p3Addr -> p3), peerHeartbeatTimeout, clock)) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2, p3Addr -> p3), peerHeartbeatTimeout, clock)) // todo separate test case - state.peers.alive.map(p => p.addr -> p.netClient).toMap shouldBe Map(p2Addr -> p2, p3Addr -> p3) + state.peers.alive.map(p => p.address -> p.netClient).toMap shouldBe Map(p2Addr -> p2, p3Addr -> p3) val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when - le(Heartbeat(p2Addr, Option(p2Addr))).foldMap(IdInterpreter(execution)) + le(Heartbeat(p2Addr, Option(p2Addr))).foldMap(IdInterpreter(execution, eventMapper)) state.leader shouldBe Some(p2Addr) // todo separate test case state.hasLeader shouldBe true // todo separate test case clock.tick(2.second) state.leader shouldBe Some(p2Addr) state.hasLeader shouldBe false - le.monitorCluster.foldMap(IdInterpreter(execution)) + le.monitorCluster.foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() @@ -353,16 +369,17 @@ class RouletteLeaderElectionSpec extends FunSuite { val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") val p3 = ProcessRef("p3") - val peerHeartbeatTimeout = 1.second.toMillis + val server = ProcessRef("server") + val peerHeartbeatTimeout = 1.second val clock = new core.Clock.Mock(1.second) - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2, p3Addr -> p3), peerHeartbeatTimeout, clock)) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2, p3Addr -> p3), peerHeartbeatTimeout, clock)) state.leader = Option(p2Addr) val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when - le(Propose(p3Addr, 0.9)).foldMap(IdInterpreter(execution)) + le(Propose(p3Addr, 0.9)).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() execution.trace shouldBe Seq( @@ -378,19 +395,21 @@ class RouletteLeaderElectionSpec extends FunSuite { val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") val p3 = ProcessRef("p3") - val peerHeartbeatTimeout = 1.second.toMillis + val server = ProcessRef("server") + + val peerHeartbeatTimeout = 1.second val clock = new core.Clock.Mock(1.second) - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2, p3Addr -> p3), peerHeartbeatTimeout, clock)) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2, p3Addr -> p3), peerHeartbeatTimeout, clock)) val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when - le(Heartbeat(p3Addr, Some(p3Addr))).foldMap(IdInterpreter(execution)) + le(Heartbeat(p3Addr, Some(p3Addr))).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() - state.peers.alive.map(p => p.addr -> p.netClient).toMap shouldBe Map(p2Addr -> p2, p3Addr -> p3) + state.peers.alive.map(p => p.address -> p.netClient).toMap shouldBe Map(p2Addr -> p2, p3Addr -> p3) state.leader shouldBe Some(p3Addr) execution.trace shouldBe Seq.empty } @@ -405,20 +424,21 @@ class RouletteLeaderElectionSpec extends FunSuite { val p2 = ProcessRef("p2") val p3 = ProcessRef("p3") val p4 = ProcessRef("p4") - val peerHeartbeatTimeout = 1.second.toMillis + val server = ProcessRef("server") + val peerHeartbeatTimeout = 1.second val clock = new core.Clock.Mock(1.second) - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2, p3Addr -> p3, p4Addr -> p4), peerHeartbeatTimeout, clock)) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2, p3Addr -> p3, p4Addr -> p4), peerHeartbeatTimeout, clock)) val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when clock.tick(1.second) - le(Heartbeat(p3Addr, Some(p3Addr))).foldMap(IdInterpreter(execution)) + le(Heartbeat(p3Addr, Some(p3Addr))).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() - state.peers.alive.map(p => p.addr -> p.netClient).toMap shouldBe Map(p3Addr -> p3) + state.peers.alive.map(p => p.address -> p.netClient).toMap shouldBe Map(p3Addr -> p3) state.leader shouldBe Option.empty execution.trace shouldBe Seq.empty } @@ -433,9 +453,10 @@ class RouletteLeaderElectionSpec extends FunSuite { val p2 = ProcessRef("p2") val p3 = ProcessRef("p3") val p4 = ProcessRef("p4") - val peerHeartbeatTimeout = 1.second.toMillis + val server = ProcessRef("server") + val peerHeartbeatTimeout = 1.second val clock = new core.Clock.Mock(1.second) - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2, p3Addr -> p3, p4Addr -> p4), peerHeartbeatTimeout, clock)) + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2, p3Addr -> p3, p4Addr -> p4), peerHeartbeatTimeout, clock)) val le = new RouletteLeaderElection[Id](state) val execution = new Execution() @@ -443,11 +464,11 @@ class RouletteLeaderElectionSpec extends FunSuite { // when clock.tick(1.second) state.peers.all.values.foreach(_.update(clock.currentTimeMillis)) - le(Heartbeat(p3Addr, Some(p2Addr))).foldMap(IdInterpreter(execution)) + le(Heartbeat(p3Addr, Some(p2Addr))).foldMap(IdInterpreter(execution, eventMapper)) // then execution.print() - state.peers.alive.map(p => p.addr -> p.netClient).toMap shouldBe Map(p2Addr -> p2, p3Addr -> p3, p4Addr -> p4) + state.peers.alive.map(p => p.address -> p.netClient).toMap shouldBe Map(p2Addr -> p2, p3Addr -> p3, p4Addr -> p4) state.leader shouldBe Option.empty execution.trace shouldBe Seq.empty } @@ -460,15 +481,15 @@ class RouletteLeaderElectionSpec extends FunSuite { val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") val p3 = ProcessRef("p3") - - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2, p3Addr -> p3))) + val server = ProcessRef("server") + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2, p3Addr -> p3))) state.leader = Some(p2Addr) val le = new RouletteLeaderElection[Id](state) val execution = new Execution() // when assertThrows[IllegalStateException] { - le(Heartbeat(p3Addr, Some(p3Addr))).foldMap(IdInterpreter(execution)) + le(Heartbeat(p3Addr, Some(p3Addr))).foldMap(IdInterpreter(execution, eventMapper)) } } @@ -479,8 +500,8 @@ class RouletteLeaderElectionSpec extends FunSuite { val p2Addr = "p2:6666" val p1 = ProcessRef("p1") val p2 = ProcessRef("p2") - - val state = new State(p1, p1Addr, Peers(Map(p2Addr -> p2))) + val server = ProcessRef("server") + val state = new State(p1, p1Addr, server, createPeers(Map(p2Addr -> p2))) state.voted = true state.leader = Some(p2Addr) @@ -488,7 +509,7 @@ class RouletteLeaderElectionSpec extends FunSuite { val le = new RouletteLeaderElection[Id](state) // when - le(Heartbeat(p2Addr, Option.empty)).foldMap(IdInterpreter(execution)) + le(Heartbeat(p2Addr, Option.empty)).foldMap(IdInterpreter(execution, eventMapper)) // then @@ -516,8 +537,33 @@ object RouletteLeaderElectionSpec { object Lemma12 extends Tag(Lemmas.Lemma12.description) // @formatter:on + val eventMapper: Event => Event = e => e match { + case AsyncClient.Send(bytes) => + // in order to decode the message we need to prepend client id + val dummyId = "1".getBytes() + val size = 4 + dummyId.length + bytes.length + val buf = ByteBuffer.allocate(size) + .putInt(dummyId.length) + .put(dummyId) + .put(bytes) + buf.rewind() + val data = new Array[Byte](size) + buf.get(data) + RouletteLeaderElection.encoder.read(data) + case e => e + } + def updatePeers(s: State, ts: Long = System.currentTimeMillis()): Unit = { s.peers.peers.foreach(_.update(ts)) } + def createPeers(peers: Map[String, ProcessRef], timeout: FiniteDuration = 10000.millis, clock: Clock = Clock()): Peers = + Peers(peers.map(p => + Peers.builder.id(p._2.value) + .address(p._1) + .netClient(p._2) + .timeoutMs(timeout) + .clock(clock) + .build).toVector) + } \ No newline at end of file diff --git a/interop-cats/src/main/scala/io/parapet/CatsApp.scala b/interop-cats/src/main/scala/io/parapet/CatsApp.scala index cddf549..2f3a259 100644 --- a/interop-cats/src/main/scala/io/parapet/CatsApp.scala +++ b/interop-cats/src/main/scala/io/parapet/CatsApp.scala @@ -30,6 +30,7 @@ trait CatsApp extends ParApp[IO] { sys.addShutdownHook { // Should block the thread until all finalizers are executed fiber.cancel.unsafeRunSync() + // todo fromExecutorService(executor) + executionContext.shutdownNow() } } } diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/BlockingSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/BlockingSpec.scala index 9b1cd32..ab8a1b7 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/BlockingSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/BlockingSpec.scala @@ -5,15 +5,15 @@ import io.parapet.core.Parapet.ParConfig import io.parapet.core.Scheduler.SchedulerConfig import io.parapet.core.{Event, Process, ProcessRef} import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FunSuite -import org.scalatest.Matchers._ import org.scalatest.OptionValues._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ import scala.util.Random -abstract class BlockingSpec[F[_]] extends FunSuite with IntegrationSpec[F] { +abstract class BlockingSpec[F[_]] extends AnyFunSuite with IntegrationSpec[F] { import BlockingSpec._ import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/ChannelSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/ChannelSpec.scala index 7207508..945f9ee 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/ChannelSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/ChannelSpec.scala @@ -7,12 +7,12 @@ import io.parapet.core.Scheduler.SchedulerConfig import io.parapet.core.{Channel, Event, Process, ProcessRef} import io.parapet.tests.intg.ChannelSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FunSuite -import org.scalatest.Matchers._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ import scala.util.{Success, Failure => SFailure} -abstract class ChannelSpec[F[_]] extends FunSuite with IntegrationSpec[F] { +abstract class ChannelSpec[F[_]] extends AnyFunSuite with IntegrationSpec[F] { import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/DslSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/DslSpec.scala index af652d6..4198a31 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/DslSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/DslSpec.scala @@ -5,13 +5,13 @@ import io.parapet.core.Event.Start import io.parapet.core.{Event, Process, ProcessRef} import io.parapet.tests.intg.DslSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.Matchers._ import org.scalatest.OptionValues._ -import org.scalatest.WordSpec +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.matchers.should.Matchers._ import scala.concurrent.duration._ -abstract class DslSpec[F[_]] extends WordSpec with IntegrationSpec[F] { +abstract class DslSpec[F[_]] extends AnyWordSpec with IntegrationSpec[F] { import dsl._ @@ -272,6 +272,53 @@ abstract class DslSpec[F[_]] extends WordSpec with IntegrationSpec[F] { } } + "A flow with error handler" when { + "fails" should { + "recover" in { + val eventStore = new EventStore[F, Response] + + val failure = eval(throw new RuntimeException("server failed")) + + val process: Process[F] = Process[F](ref => { + case Start => + handleError(failure, err => eval(eventStore.add(ref, Response(err)))) + }) + unsafeRun(eventStore.await(1, createApp(ct.pure(Seq(process))).run)) + + eventStore.get(process.ref).headOption.value should matchPattern { + case Response(err: RuntimeException) if err.getMessage == "server failed" => + } + } + } + } + + "A flow with async loop" when { + "unrecoverable error occurred" should { + "fail" in { + val ref = ProcessRef() + val eventStore = new EventStore[F, Response] + + def step: DslF[F, Unit] = + eval(throw new RuntimeException("failure")) + .handleError(err => eval { + eventStore.add(ref, Response(err)) + throw err + }) + + def loop: DslF[F, Unit] = flow { + step ++ loop + } + + val process: Process[F] = Process.builder[F](_ => { + case Start => fork(loop) + }).ref(ref).build + + unsafeRun(eventStore.await(1, createApp(ct.pure(Seq(process))).run)) + + } + } + } + } object DslSpec { diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/DynamicProcessCreationSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/DynamicProcessCreationSpec.scala index 9329a6f..ec22ba5 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/DynamicProcessCreationSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/DynamicProcessCreationSpec.scala @@ -5,10 +5,10 @@ import io.parapet.core.Event.Start import io.parapet.core.{Event, Process, ProcessRef} import io.parapet.tests.intg.DynamicProcessCreationSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FunSuite -import org.scalatest.Matchers._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ -abstract class DynamicProcessCreationSpec[F[_]] extends FunSuite with IntegrationSpec[F] { +abstract class DynamicProcessCreationSpec[F[_]] extends AnyFunSuite with IntegrationSpec[F] { test("create child process") { diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/ErrorHandlingSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/ErrorHandlingSpec.scala index 2131e28..5d95c8b 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/ErrorHandlingSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/ErrorHandlingSpec.scala @@ -6,11 +6,11 @@ import io.parapet.core.processes.DeadLetterProcess import io.parapet.core.{Event, Process} import io.parapet.tests.intg.ErrorHandlingSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.Matchers._ import org.scalatest.OptionValues._ -import org.scalatest.WordSpec +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.matchers.should.Matchers._ -abstract class ErrorHandlingSpec[F[_]] extends WordSpec with IntegrationSpec[F] { +abstract class ErrorHandlingSpec[F[_]] extends AnyWordSpec with IntegrationSpec[F] { import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/EventDeliverySpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/EventDeliverySpec.scala index e6af32a..77b1edd 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/EventDeliverySpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/EventDeliverySpec.scala @@ -6,12 +6,11 @@ import io.parapet.core.processes.DeadLetterProcess import io.parapet.core.{Event, Process, ProcessRef} import io.parapet.tests.intg.EventDeliverySpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FlatSpec -import org.scalatest.Matchers._ import org.scalatest.OptionValues._ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers._ -abstract class EventDeliverySpec[F[_]] - extends FlatSpec with IntegrationSpec[F] { +abstract class EventDeliverySpec[F[_]] extends AnyFlatSpec with IntegrationSpec[F] { import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessBehaviourSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessBehaviourSpec.scala index 9a9336e..db765a5 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessBehaviourSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessBehaviourSpec.scala @@ -3,11 +3,10 @@ package io.parapet.tests.intg import io.parapet.core.{Event, Process} import io.parapet.tests.intg.ProcessBehaviourSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FunSuite -import org.scalatest.Matchers._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ -abstract class ProcessBehaviourSpec[F[_]] - extends FunSuite with IntegrationSpec[F] { +abstract class ProcessBehaviourSpec[F[_]] extends AnyFunSuite with IntegrationSpec[F] { import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessLifecycleSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessLifecycleSpec.scala index 9307523..82091de 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessLifecycleSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessLifecycleSpec.scala @@ -9,10 +9,10 @@ import io.parapet.core.Parapet.ParConfig import io.parapet.core.{Event, Process, ProcessRef} import io.parapet.tests.intg.ProcessLifecycleSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FlatSpec -import org.scalatest.Matchers._ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers._ -abstract class ProcessLifecycleSpec[F[_]] extends FlatSpec with IntegrationSpec[F] { +abstract class ProcessLifecycleSpec[F[_]] extends AnyFlatSpec with IntegrationSpec[F] { import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessSpec.scala index 49e32bf..60998d6 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/ProcessSpec.scala @@ -6,11 +6,11 @@ import io.parapet.core.processes.DeadLetterProcess import io.parapet.core.{Event, Process, ProcessRef} import io.parapet.tests.intg.ProcessSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.Matchers._ import org.scalatest.OptionValues._ -import org.scalatest.WordSpec +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.matchers.should.Matchers._ -abstract class ProcessSpec[F[_]] extends WordSpec with IntegrationSpec[F] { +abstract class ProcessSpec[F[_]] extends AnyWordSpec with IntegrationSpec[F] { import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/ReplySpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/ReplySpec.scala index 73e1372..08c5104 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/ReplySpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/ReplySpec.scala @@ -4,11 +4,11 @@ import io.parapet.core.Event._ import io.parapet.core.{Event, Process} import io.parapet.tests.intg.ReplySpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FlatSpec -import org.scalatest.Matchers._ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers._ import org.scalatest.OptionValues._ -abstract class ReplySpec[F[_]] extends FlatSpec with IntegrationSpec[F] { +abstract class ReplySpec[F[_]] extends AnyFlatSpec with IntegrationSpec[F] { import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/SchedulerCorrectnessSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/SchedulerCorrectnessSpec.scala index b4ff3db..85c533f 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/SchedulerCorrectnessSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/SchedulerCorrectnessSpec.scala @@ -10,8 +10,8 @@ import io.parapet.syntax.logger.MDCFields import io.parapet.tests.intg.SchedulerCorrectnessSpec.TaskProcessingTime._ import io.parapet.tests.intg.SchedulerCorrectnessSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FunSuite -import org.scalatest.Matchers._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ import java.util.concurrent.TimeUnit import scala.annotation.tailrec @@ -19,7 +19,7 @@ import scala.concurrent.duration.{FiniteDuration, _} import scala.util.Random -abstract class SchedulerCorrectnessSpec[F[_]] extends FunSuite with IntegrationSpec[F] { +abstract class SchedulerCorrectnessSpec[F[_]] extends AnyFunSuite with IntegrationSpec[F] { test("scheduler correctness under normal conditions") { val specs = Seq( diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/SchedulerSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/SchedulerSpec.scala index 7369347..89bd1f7 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/SchedulerSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/SchedulerSpec.scala @@ -7,13 +7,13 @@ import io.parapet.core.processes.DeadLetterProcess import io.parapet.core.{Event, Process} import io.parapet.tests.intg.SchedulerSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.Matchers._ import org.scalatest.OptionValues._ -import org.scalatest.WordSpec +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.matchers.should.Matchers._ import scala.concurrent.duration._ -abstract class SchedulerSpec[F[_]] extends WordSpec with IntegrationSpec[F] { +abstract class SchedulerSpec[F[_]] extends AnyWordSpec with IntegrationSpec[F] { import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/SelfSendSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/SelfSendSpec.scala index ede5e55..e95c9f0 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/SelfSendSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/SelfSendSpec.scala @@ -4,10 +4,10 @@ import io.parapet.core.Event.Start import io.parapet.core.{Event, Process} import io.parapet.tests.intg.SelfSendSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FlatSpec -import org.scalatest.Matchers._ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers._ -abstract class SelfSendSpec[F[_]] extends FlatSpec with IntegrationSpec[F] { +abstract class SelfSendSpec[F[_]] extends AnyFlatSpec with IntegrationSpec[F] { import dsl._ diff --git a/intg-tests/src/main/scala/io/parapet/tests/intg/SwitchBehaviorSpec.scala b/intg-tests/src/main/scala/io/parapet/tests/intg/SwitchBehaviorSpec.scala index c4b0fe1..3b058d3 100644 --- a/intg-tests/src/main/scala/io/parapet/tests/intg/SwitchBehaviorSpec.scala +++ b/intg-tests/src/main/scala/io/parapet/tests/intg/SwitchBehaviorSpec.scala @@ -4,10 +4,10 @@ import io.parapet.core.Dsl.DslF import io.parapet.core.{Channel, Event, Process, ProcessRef} import io.parapet.tests.intg.SwitchBehaviorSpec._ import io.parapet.testutils.{EventStore, IntegrationSpec} -import org.scalatest.FunSuite -import org.scalatest.Matchers._ +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ -abstract class SwitchBehaviorSpec[F[_]] extends FunSuite with IntegrationSpec[F] { +abstract class SwitchBehaviorSpec[F[_]] extends AnyFunSuite with IntegrationSpec[F] { self => import dsl._ diff --git a/intg-tests/src/test/scala/io/parapet/tests/intg/catsio/BlockingChannelWithTimeout.scala b/intg-tests/src/test/scala/io/parapet/tests/intg/catsio/BlockingChannelWithTimeout.scala new file mode 100644 index 0000000..2abdbba --- /dev/null +++ b/intg-tests/src/test/scala/io/parapet/tests/intg/catsio/BlockingChannelWithTimeout.scala @@ -0,0 +1,53 @@ +package io.parapet.tests.intg.catsio + +import cats.effect.IO +import io.parapet.core.Event.{ByteEvent, Start, StringEvent} +import io.parapet.core.{Channel, Event, Process, ProcessRef} +import io.parapet.testutils.{BasicCatsIOSpec, EventStore} +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers._ + +import scala.concurrent.duration._ + +class BlockingChannelWithTimeout extends AnyFunSuite with BasicCatsIOSpec { + + import dsl._ + + test("blockingChannelTimeout") { + + val eventStore = new EventStore[IO, Event] + + val server = Process.builder[IO](_ => { + case e:ByteEvent => eval(println(s"server received: $e")) ++ delay(10.seconds) ++ + withSender(sender => ByteEvent("res".getBytes) ~> sender) + }).ref(ProcessRef("server")).build + + val failover = Process.builder[IO](ref => { + case e: ByteEvent => withSender(sender => eval(println(s"failover received: $e from $sender"))) ++ + eval(eventStore.add(ref, StringEvent("success"))) + }).ref(ProcessRef("failover")).build + + val clientRef = ProcessRef("client") + val ch = new Channel[IO](clientRef) + println(s"cannel ref: ${ch.ref}") + val client = Process.builder[IO](ref => { + case Start => register(ref, ch) ++ + blocking { + race( + ch.send(ByteEvent("request".getBytes()), server.ref, { + case scala.util.Failure(Channel.ChannelInterruptedException) => + eval(println("channel was interrupted")) ++ ByteEvent("help".getBytes()) ~> failover + case res => eval(println(s"client received: $res")) + }), + delay(3.seconds) ++ eval(println("server doesn't respond"))) ++ + ch.send(ByteEvent("request".getBytes()), failover.ref, _ => unit) + } + } + ).ref(clientRef).build + + unsafeRun(eventStore.await(2, createApp(ct.pure(Seq(client, server, failover))).run)) + eventStore.get(failover.ref) shouldBe Seq(StringEvent("success"), StringEvent("success")) + + } + +} diff --git a/intg-tests/src/test/scala/io/parapet/tests/intg/catsio/SchedulerCorrectnessSpec.scala b/intg-tests/src/test/scala/io/parapet/tests/intg/catsio/SchedulerCorrectnessSpec.scala index 193bf7a..999f9f2 100644 --- a/intg-tests/src/test/scala/io/parapet/tests/intg/catsio/SchedulerCorrectnessSpec.scala +++ b/intg-tests/src/test/scala/io/parapet/tests/intg/catsio/SchedulerCorrectnessSpec.scala @@ -5,10 +5,11 @@ import io.parapet.core.{Context, DslInterpreter} import io.parapet.testutils.BasicCatsIOSpec import io.parapet.testutils.tags.CatsTest import org.scalatest.tags.Slow -import org.scalatest.{Outcome, Retries} +import org.scalatest.{Ignore, Outcome, Retries} @CatsTest @Slow +@Ignore class SchedulerCorrectnessSpec extends io.parapet.tests.intg.SchedulerCorrectnessSpec[IO] with BasicCatsIOSpec with Retries { val retries = 1 diff --git a/project/plugins.sbt b/project/plugins.sbt index 8734367..2f85993 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.0-M2") -addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.4.1") +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.7.6") addSbtPlugin("com.github.gseitz" % "sbt-protobuf" % "0.6.5") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") \ No newline at end of file diff --git a/site/index.html b/site/index.html index b4eec04..240bbee 100644 --- a/site/index.html +++ b/site/index.html @@ -95,6 +95,7 @@

Parapet

  • blocking
  • +
  • Network
  • Process