Skip to content

Commit

Permalink
Merge pull request #43 from parapet-io/iss31
Browse files Browse the repository at this point in the history
cluster
  • Loading branch information
dmgcodevil authored May 14, 2021
2 parents 80ccbe5 + 9c96d38 commit 22d34b3
Show file tree
Hide file tree
Showing 53 changed files with 1,869 additions and 302 deletions.
56 changes: 51 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ resolvers in ThisBuild += "maven2" at "https://repo1.maven.org/maven2/"

useGpg := false

credentials += Credentials(Path.userHome / ".sbt" / "sonatype_credential")
ThisBuild / credentials += Credentials(Path.userHome / ".sbt" / "sonatype_credential")
ThisBuild / publishConfiguration := publishConfiguration.value.withOverwrite(true)
ThisBuild / publishLocalConfiguration := publishLocalConfiguration.value.withOverwrite(true)

val scalaTestVersion = "3.2.1"

lazy val dependencies =
new {
Expand All @@ -30,11 +34,13 @@ lazy val dependencies =
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2"
// test
val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.3" % Test
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.8" % Test
val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % Test
val pegdown = "org.pegdown" % "pegdown" % "1.6.0" % Test
val logstashLogbackEncoder = "net.logstash.logback" % "logstash-logback-encoder" % "5.3" % Test
val logbackContrib = "ch.qos.logback.contrib" % "logback-json-classic" % "0.1.5" % Test
val logbackJackson = "ch.qos.logback.contrib" % "logback-jackson" % "0.1.5" % Test
val flexmark = "com.vladsch.flexmark" % "flexmark-all" % "0.36.8" % Test

// utils
val sourcecode = "com.lihaoyi" %% "sourcecode" % "0.2.1"
}
Expand All @@ -56,11 +62,13 @@ lazy val global = project
.in(file("."))
.aggregate(
core,
clusterApi,
cluster,
clusterNode,
protobuf,
interopCats,
interopMonix,
testUtils,
intgTests,
benchmark)

lazy val core = project
Expand All @@ -74,11 +82,47 @@ lazy val core = project
libraryDependencies += "org.zeromq" % "jeromq" % "0.5.1"
).dependsOn(protobuf)

lazy val cluster = project
.enablePlugins(JavaAppPackaging, UniversalDeployPlugin)
.settings(
name := "cluster",
libraryDependencies ++= Seq("com.github.scopt" %% "scopt" % "4.0.1",
"org.slf4j" % "slf4j-log4j12" % "1.7.25"),
maintainer in Universal := "parapet.io",
packageName in Universal := "parapet-cluster-" + version.value,
mappings in Universal += {
val src = (sourceDirectory in Compile).value
src / "resources" / "log4j.xml" -> "etc/log4j.xml"
},
mappings in Universal += {
val src = (sourceDirectory in Compile).value
src / "resources" / "etc" / "node.properties.template" -> "etc/node.properties.template"
},
bashScriptExtraDefines += """addJava "-Dlog4j.configuration=file:${app_home}/../etc/log4j.xml""""

).dependsOn(core, interopCats, clusterApi)

lazy val clusterNode = project
.in(file("cluster-node"))
.settings(
name := "cluster-node"
).dependsOn(core, clusterApi)

lazy val clusterApi = project
.in(file("cluster-api"))
.settings(
name := "cluster-api",
libraryDependencies += "org.scalatest" %% "scalatest" % scalaTestVersion % Test,
libraryDependencies += dependencies.flexmark
).dependsOn(core)


lazy val testUtils = project
.in(file("test-utils"))
.settings(
name := "test-utils",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8"
libraryDependencies += "org.scalatest" %% "scalatest" % scalaTestVersion,
libraryDependencies += dependencies.flexmark
).dependsOn(core, interopCats, interopMonix)

lazy val interopCats = project
Expand Down Expand Up @@ -110,7 +154,8 @@ lazy val intgTests = project
publishLocal := {},
publish := {},
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.8",
"org.scalatest" %% "scalatest" % scalaTestVersion,
dependencies.flexmark,
"org.pegdown" % "pegdown" % "1.6.0",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"net.logstash.logback" % "logstash-logback-encoder" % "5.3"
Expand Down Expand Up @@ -148,6 +193,7 @@ lazy val commonDependencies = Seq(
dependencies.logbackContrib,
dependencies.logbackJackson,
dependencies.scalaTest,
dependencies.flexmark,
dependencies.sourcecode
)

Expand Down
150 changes: 150 additions & 0 deletions cluster-api/src/main/scala/io/parapet/cluster/api/ClusterApi.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.parapet.cluster.api

import io.parapet.core.{Encoder, Event}

import java.nio.ByteBuffer

object ClusterApi {


// @formatter:off
sealed trait API extends Event

// --------------------- JOIN ------------------------------------------ //
object JoinResultCodes {
val OK = 0
val ERROR = 1 // must to be clarified
}

// FORMAT
val TAG_SIZE = 4
val NODE_ID_SIZE = 4
val ADDR_SIZE = 4
val GROUP_SIZE = 4
val RESULT_CODE_SIZE = 4

case class Join(nodeId: String, address: String, group: String) extends API
case class JoinResult(nodeId: String, code: Int) extends API

// ---------------------- Result ---------------------------------------- //
case class Result(code: Int, msg: String) extends API

object ResultCodes {
val OK = 0
val ERROR = 1
}
// FORMAT
val CODE_SIZE = 4
val MSG_SIZE = 4

// ---------------------- NodeInfo ---------------------------------------- //
case class GetNodeInfo(senderId: String, id: String) extends API
case class NodeInfo(address: String, code: Int) extends API
object NodeInfoCodes {
val OK = 0
val NODE_NOT_FOUND = 1
val ERROR = 2
}

// TAGS
val JOIN_TAG = 20
val JOIN_RESULT_TAG = 21
val RESULT_TAG = 22
val GET_NODE_INFO_TAG = 23
val NODE_INFO = 24

// @formatter:on

val encoder: Encoder = new Encoder {
override def write(e: Event): Array[Byte] = {
e match {
case Join(nodeId, address, group) =>
val nodeIdBytes = nodeId.getBytes()
val addressBytes = address.getBytes()
val groupBytes = group.getBytes()
val buf = ByteBuffer.allocate(TAG_SIZE +
(NODE_ID_SIZE + nodeIdBytes.length) +
(GROUP_SIZE + groupBytes.length) +
(ADDR_SIZE + addressBytes.length))
// write
buf.putInt(JOIN_TAG)
putWithSize(buf, nodeIdBytes)
putWithSize(buf, addressBytes)
putWithSize(buf, groupBytes)
buf.rewind()
buf.array()
case JoinResult(nodeId, code) =>
val nodeIdBytes = nodeId.getBytes()
val buf = ByteBuffer.allocate(TAG_SIZE + (NODE_ID_SIZE + nodeIdBytes.length) + RESULT_CODE_SIZE)
buf.putInt(JOIN_RESULT_TAG)
putWithSize(buf, nodeIdBytes)
buf.putInt(code)
buf.rewind()
buf.array()
case Result(code, msg) =>
val msgBytes = msg.getBytes()
val buf = ByteBuffer.allocate(TAG_SIZE + CODE_SIZE + (MSG_SIZE + msgBytes.length))
buf.putInt(RESULT_TAG)
buf.putInt(code)
putWithSize(buf, msgBytes)
buf.rewind()
buf.array()
case GetNodeInfo(senderId, id) =>
val senderIdBytes = senderId.getBytes()
val idBytes = id.getBytes()
val buf = ByteBuffer.allocate(4 + 4 + senderIdBytes.length + 4 + idBytes.length)
buf.putInt(GET_NODE_INFO_TAG)
putWithSize(buf, senderIdBytes)
putWithSize(buf, idBytes)
buf.rewind()
buf.array()
case NodeInfo(address, code) =>
val addressBytes = address.getBytes
val buf = ByteBuffer.allocate(TAG_SIZE + 4 + CODE_SIZE + addressBytes.length)
.putInt(NODE_INFO)
.putInt(addressBytes.length)
.put(addressBytes)
.putInt(code)
buf.rewind()
buf.array()
case _ => throw new UnsupportedOperationException()
}
}

override def read(data: Array[Byte]): Event = {
val buf = ByteBuffer.wrap(data)
val tag = buf.getInt
tag match {
case JOIN_TAG =>
val nodeId = getString(buf)
val address = getString(buf)
val group = getString(buf)
Join(nodeId = nodeId, address = address, group = group)
case JOIN_RESULT_TAG =>
val nodeId = getString(buf)
val code = buf.getInt
JoinResult(nodeId, code)
case RESULT_TAG =>
val code = buf.getInt
val msg = getString(buf)
Result(code, msg)
case GET_NODE_INFO_TAG => GetNodeInfo(getString(buf), getString(buf))
case NODE_INFO => NodeInfo(getString(buf), buf.getInt)
case _ => throw new UnsupportedOperationException()
}
}
}

private def putWithSize(buf: ByteBuffer, data: Array[Byte]): Unit = {
buf.putInt(data.length)
buf.put(data)
}

private def getString(buf: ByteBuffer): String = {
val len = buf.getInt()
val data = new Array[Byte](len)
buf.get(data)
new String(data)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.parapet.cluster.api

import io.parapet.cluster.api.ClusterApi._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers._

class EncoderSpec extends AnyFunSuite {

test("join") {
val join = Join("1", "localhost:8080", "2")
val data = encoder.write(join)
encoder.read(data) shouldBe join
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.parapet.cluster.node

import scala.util.Try

trait Interface {

/** Connects to the leader.
*/
def connect(): Unit

/** Joins a cluster.
* @param group the node group
* @return result
*/
def join(group: String): Try[Unit]

/** Leaves the given node group.
* @param group the node group
* @return result
*/
def leave(group: String): Try[Unit]

/** Sends a request.
*
* @param req the request
* @return result
*/
def send(req: Req): Try[Unit]

/**
* Send a request and waits for response.
* Use for a strict REQ-REP dialog.
*
* @param req request
* @param handler message handler
* @return result
*/
def send(req: Req, handler: Array[Byte] => Unit): Try[Unit]

/**
* Sends a reply.
* Use for a strict REQ-REP dialog.
*
* @param rep the reply
* @return result
*/
def send(rep: Rep): Try[Unit]

/** Sends a message to all nodes in the group.
*
* @param group the node group
* @param data the data to send
* @return result
*/
def broadcast(group: String, data: Array[Byte]): Try[Unit]

/** Gets current leader.
*
* @return leader
*/
def leader: Option[String]

/** Gets all registered nodes.
* @return registered nodes
*/
def getNodes: Try[Seq[String]]

/** Disconnects from the cluster and closes open network connections.
*/
def close(): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.parapet.cluster.node

trait MessageHandler {

def handle(req: Req): Unit

}
Loading

0 comments on commit 22d34b3

Please sign in to comment.