From 881e66e9b6d0bbfc49cb0d8ec63583c802257a40 Mon Sep 17 00:00:00 2001 From: Fabio Pinheiro Date: Mon, 12 Jun 2023 16:36:41 +0100 Subject: [PATCH] feat: Add Storage (#8) Added Mongo db driver Added ReactiveMongo Zlayer Added Mongo repo and collection update and query Updated the docker-compose to create the collection and indexes required. Co-authored-by: Shailesh Patil --- build.sbt | 49 +--- .../src/main/resources/application.conf | 14 + .../src/main/scala/fmgp/did/Error.scala | 50 ++++ .../src/main/scala/fmgp/did/MessageDB.scala | 23 -- .../src/main/scala/fmgp/did/MongoDriver.scala | 59 +++++ .../src/main/scala/fmgp/did/MsgContex.scala | 15 -- .../mediator/ForwardMessageExecuter.scala | 22 +- .../did/comm/mediator/MediatorAgent.scala | 83 +++--- .../MediatorCoordinationExecuter.scala | 118 ++------- .../comm/mediator/MediatorStandalone.scala | 23 +- .../did/comm/mediator/PickupExecuter.scala | 45 +++- .../did/comm/protocol/ProtocolExecute.scala | 67 ++--- .../fmgp/did/db/AsyncDriverResource.scala | 19 ++ .../scala/fmgp/did/db/BsonImplicits.scala | 239 ++++++++++++++++++ .../main/scala/fmgp/did/db/DataModels.scala | 37 +++ .../scala/fmgp/did/db/DidAccountRepo.scala | 148 +++++++++++ .../scala/fmgp/did/db/MessageItemRepo.scala | 73 ++++++ .../scala/fmgp/did/db/ReactiveMongoApi.scala | 53 ++++ docker-compose.yaml | 45 ++-- initdb.js | 21 ++ 20 files changed, 932 insertions(+), 271 deletions(-) create mode 100644 did-mediator/src/main/scala/fmgp/did/Error.scala delete mode 100644 did-mediator/src/main/scala/fmgp/did/MessageDB.scala create mode 100644 did-mediator/src/main/scala/fmgp/did/MongoDriver.scala delete mode 100644 did-mediator/src/main/scala/fmgp/did/MsgContex.scala create mode 100644 did-mediator/src/main/scala/fmgp/did/db/AsyncDriverResource.scala create mode 100644 did-mediator/src/main/scala/fmgp/did/db/BsonImplicits.scala create mode 100644 did-mediator/src/main/scala/fmgp/did/db/DataModels.scala create mode 100644 did-mediator/src/main/scala/fmgp/did/db/DidAccountRepo.scala create mode 100644 did-mediator/src/main/scala/fmgp/did/db/MessageItemRepo.scala create mode 100644 did-mediator/src/main/scala/fmgp/did/db/ReactiveMongoApi.scala create mode 100644 initdb.js diff --git a/build.sbt b/build.sbt index ff7a09ba..caf32944 100644 --- a/build.sbt +++ b/build.sbt @@ -25,21 +25,8 @@ lazy val V = new { // val zioMunitTest = "0.1.1" val zioHttp = "0.0.5" val zioConfig = "4.0.0-RC16" -// val zioPrelude = "1.0.0-RC19" - -// // https://mvnrepository.com/artifact/io.github.cquiroz/scala-java-time -// val scalaJavaTime = "2.3.0" - -// val logbackClassic = "1.2.10" -// val scalaLogging = "3.9.4" - -// val laika = "0.19.1" - -// val laminar = "15.0.1" -// val waypoint = "6.0.0" -// val upickle = "3.1.0" -// // https://www.npmjs.com/package/material-components-web -// val materialComponents = "12.0.0" + val zioSl4j = "2.1.13" + val mongo = "1.1.0-RC10" } /** Dependencies */ @@ -66,28 +53,10 @@ lazy val D = new { val zioConfig = Def.setting("dev.zio" %% "zio-config" % V.zioConfig) val zioConfigMagnolia = Def.setting("dev.zio" %% "zio-config-magnolia" % V.zioConfig) // For deriveConfig val zioConfigTypesafe = Def.setting("dev.zio" %% "zio-config-typesafe" % V.zioConfig) // For HOCON -// val zioPrelude = Def.setting("dev.zio" %%% "zio-prelude" % V.zioPrelude) -// // val zioTest = Def.setting("dev.zio" %%% "zio-test" % V.zio % Test) -// // val zioTestSBT = Def.setting("dev.zio" %%% "zio-test-sbt" % V.zio % Test) -// // val zioTestMagnolia = Def.setting("dev.zio" %%% "zio-test-magnolia" % V.zio % Test) -// val zioMunitTest = Def.setting("com.github.poslegm" %%% "munit-zio" % V.zioMunitTest % Test) - -// // Needed for ZIO -// val scalaJavaT = Def.setting("io.github.cquiroz" %%% "scala-java-time" % V.scalaJavaTime) -// val scalaJavaTZ = Def.setting("io.github.cquiroz" %%% "scala-java-time-tzdb" % V.scalaJavaTime) - -// // Test DID comm -// // val didcomm = Def.setting("org.didcommx" % "didcomm" % "0.3.1") - + val zioLoggingSl4j = Def.setting("dev.zio" %% "zio-logging-slf4j" % V.zioSl4j) + val mongo = Def.setting("org.reactivemongo" %% "reactivemongo" % V.mongo) // // For munit https://scalameta.org/munit/docs/getting-started.html#scalajs-setup val munit = Def.setting("org.scalameta" %%% "munit" % V.munit % Test) - -// val laika = Def.setting("org.planet42" %%% "laika-core" % V.laika) // JVM & JS - -// // For WEBAPP -// val laminar = Def.setting("com.raquo" %%% "laminar" % V.laminar) -// val waypoint = Def.setting("com.raquo" %%% "waypoint" % V.waypoint) -// val upickle = Def.setting("com.lihaoyi" %%% "upickle" % V.upickle) } inThisBuild( @@ -171,6 +140,7 @@ lazy val buildInfoConfigure: Project => Project = _.enablePlugins(BuildInfoPlugi lazy val httpUtils = crossProject(JSPlatform, JVMPlatform) // project .in(file("http-utils")) .settings(publish / skip := true) + .settings((setupTestConfig): _*) .settings( libraryDependencies += D.scalaDID.value, ) @@ -181,11 +151,18 @@ lazy val httpUtils = crossProject(JSPlatform, JVMPlatform) // project lazy val mediator = project .in(file("did-mediator")) .settings(publish / skip := true) + .settings((setupTestConfig): _*) .settings( libraryDependencies += D.scalaDID_imp.value, libraryDependencies += D.scalaDID_peer.value, libraryDependencies += D.zioHttp.value, - libraryDependencies ++= Seq(D.zioConfig.value, D.zioConfigMagnolia.value, D.zioConfigTypesafe.value), + libraryDependencies ++= Seq( + D.zioConfig.value, + D.zioConfigMagnolia.value, + D.zioConfigTypesafe.value, + D.zioLoggingSl4j.value + ), + libraryDependencies += D.mongo.value, ) .settings( Compile / mainClass := Some("fmgp.did.demo.MediatorStandalone"), diff --git a/did-mediator/src/main/resources/application.conf b/did-mediator/src/main/resources/application.conf index b6a4c251..0e4517ef 100644 --- a/did-mediator/src/main/resources/application.conf +++ b/did-mediator/src/main/resources/application.conf @@ -21,4 +21,18 @@ mediator = { } server.http.port = 8080 # server.http.port = ${?PORT} + database = { + protocol = mongodb + protocol = ${?MONGODB_PROTOCOL} + port = 27017 + port = ${?MONGODB_PORT} + host = "localhost" + host = ${?MONGODB_HOST} + userName = "admin" + userName = ${?MONGODB_USER} + password = "admin" + password = ${?MONGODB_PASSWORD} + dbName = "mediator" + dbName = ${?MONGODB_DB_NAME} + } } diff --git a/did-mediator/src/main/scala/fmgp/did/Error.scala b/did-mediator/src/main/scala/fmgp/did/Error.scala new file mode 100644 index 00000000..6a225790 --- /dev/null +++ b/did-mediator/src/main/scala/fmgp/did/Error.scala @@ -0,0 +1,50 @@ +package fmgp.did + +import fmgp.crypto.error._ +import fmgp.did._ +import fmgp.did.comm._ +import zio.json._ + +trait MediatorError + +case class MediatorException(fail: MediatorError) extends Exception(fail.toString()) + +final case class MediatorDidError(val error: DidFail) extends MediatorError +object MediatorDidError { + def apply(error: DidFail) = new MediatorDidError(error) +} + +final case class MediatorThrowable(val error: String) extends StorageError +object MediatorThrowable { + def apply(throwable: Throwable) = new MediatorThrowable(throwable.getClass.getName() + ":" + throwable.getMessage) +} + +// Storage + +trait StorageError extends MediatorError { + def error: String +} + +final case class StorageCollection(val error: String) extends StorageError +object StorageCollection { + def apply(throwable: Throwable) = new StorageCollection(throwable.getClass.getName() + ":" + throwable.getMessage) +} + +final case class StorageThrowable(val error: String) extends StorageError +object StorageThrowable { + def apply(throwable: Throwable) = new StorageThrowable(throwable.getClass.getName() + ":" + throwable.getMessage) +} + +sealed trait ProtocolError extends MediatorError { + def piuri: PIURI +} + +// Protocol + +object ProtocolError { + given decoder: JsonDecoder[ProtocolError] = DeriveJsonDecoder.gen[ProtocolError] + given encoder: JsonEncoder[ProtocolError] = DeriveJsonEncoder.gen[ProtocolError] +} + +case class MissingProtocolError(piuri: PIURI) extends ProtocolError +case class FailToEncodeMessage(piuri: PIURI, error: String) extends ProtocolError diff --git a/did-mediator/src/main/scala/fmgp/did/MessageDB.scala b/did-mediator/src/main/scala/fmgp/did/MessageDB.scala deleted file mode 100644 index 7f91005b..00000000 --- a/did-mediator/src/main/scala/fmgp/did/MessageDB.scala +++ /dev/null @@ -1,23 +0,0 @@ -package fmgp.did - -import zio.json._ - -import fmgp.did.comm.EncryptedMessage -import scala.collection.immutable.HashMap -import fmgp.did.MsgContex - -type HashEncryptedMessage = Int - -final case class MessageDB( - db: Map[HashEncryptedMessage, EncryptedMessage] = Map.empty, - ctx: Map[HashEncryptedMessage, MsgContex] = Map.empty, -) { - def add(msg: EncryptedMessage): MessageDB = { - this.copy(db = db + (msg.hashCode -> msg)) - } -} - -object MessageDB { - given JsonDecoder[MessageDB] = DeriveJsonDecoder.gen[MessageDB] - given JsonEncoder[MessageDB] = DeriveJsonEncoder.gen[MessageDB] -} diff --git a/did-mediator/src/main/scala/fmgp/did/MongoDriver.scala b/did-mediator/src/main/scala/fmgp/did/MongoDriver.scala new file mode 100644 index 00000000..f967bd12 --- /dev/null +++ b/did-mediator/src/main/scala/fmgp/did/MongoDriver.scala @@ -0,0 +1,59 @@ +package fmgp.did + +import scala.concurrent.{ExecutionContext, Future} + +import reactivemongo.api.{Cursor, DB, MongoConnection, AsyncDriver} +import reactivemongo.api.bson.{BSONDocumentWriter, BSONDocumentReader, Macros, document} + +object MongoDriver { + // My settings (see available connection options) + // val connectionString = "mongodb://localhost:27017/mydb?authMode=scram-sha1" + val connectionString = + // "mongodb+srv://mediator:@fmgp-db.orfjsdi.mongodb.net/?retryWrites=true&w=majority" + "mongodb+srv://mediator:w419cDYIQ2lxpDKT@mediatordb.sa0rfqg.mongodb.net" + + import ExecutionContext.Implicits.global // use any appropriate context + + // Connect to the database: Must be done only once per application + val driver = AsyncDriver() + val parsedUri = MongoConnection.fromString(connectionString) + + // Database and collections: Get references + val futureConnection = parsedUri.flatMap(driver.connect(_)) + def db1: Future[DB] = futureConnection.flatMap(_.database("firstdb")) + def db2: Future[DB] = futureConnection.flatMap(_.database("anotherdb")) + def personCollection = db1.map(_.collection("person")) + + // Write Documents: insert or update + + implicit def personWriter: BSONDocumentWriter[Person] = Macros.writer[Person] + // or provide a custom one + + // use personWriter + def createPerson(person: Person): Future[Unit] = + personCollection.flatMap(_.insert.one(person).map(_ => {})) + + def updatePerson(person: Person): Future[Int] = { + val selector = document( + "firstName" -> person.firstName, + "lastName" -> person.lastName + ) + + // Update the matching person + personCollection.flatMap(_.update.one(selector, person).map(_.n)) + } + + implicit def personReader: BSONDocumentReader[Person] = Macros.reader[Person] + // or provide a custom one + + def findPersonByAge(age: Int): Future[List[Person]] = + personCollection.flatMap( + _.find(document("age" -> age)) + .cursor[Person]() + .collect[List](-1, Cursor.FailOnError[List[Person]]()) + ) + // ... deserializes the document using personReader + + // Custom persistent types + case class Person(firstName: String, lastName: String, age: Int) +} diff --git a/did-mediator/src/main/scala/fmgp/did/MsgContex.scala b/did-mediator/src/main/scala/fmgp/did/MsgContex.scala deleted file mode 100644 index a4a3cdca..00000000 --- a/did-mediator/src/main/scala/fmgp/did/MsgContex.scala +++ /dev/null @@ -1,15 +0,0 @@ -package fmgp.did - -import zio.json._ -import fmgp.did.comm.EncryptedMessage -import scala.collection.immutable.HashMap -import fmgp.did.HashEncryptedMessage - -object MsgContex { - given JsonDecoder[MsgContex] = DeriveJsonDecoder.gen[MsgContex] - given JsonEncoder[MsgContex] = DeriveJsonEncoder.gen[MsgContex] -} - -case class MsgContex( - hash: HashEncryptedMessage, -) diff --git a/did-mediator/src/main/scala/fmgp/did/comm/mediator/ForwardMessageExecuter.scala b/did-mediator/src/main/scala/fmgp/did/comm/mediator/ForwardMessageExecuter.scala index 3f523ba9..8c9815c3 100644 --- a/did-mediator/src/main/scala/fmgp/did/comm/mediator/ForwardMessageExecuter.scala +++ b/did-mediator/src/main/scala/fmgp/did/comm/mediator/ForwardMessageExecuter.scala @@ -7,14 +7,18 @@ import fmgp.did._ import fmgp.did.comm._ import fmgp.did.comm.protocol._ import fmgp.did.comm.protocol.routing2._ +import fmgp.did.db._ -object ForwardMessageExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services & Ref[MediatorDB]] { +object ForwardMessageExecuter + extends ProtocolExecuterWithServices[ + ProtocolExecuter.Services & DidAccountRepo & MessageItemRepo + ] { override def suportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri) - override def program[R1 <: Ref[MediatorDB]]( + override def program[R1 <: DidAccountRepo & MessageItemRepo]( plaintextMessage: PlaintextMessage - ): ZIO[R1, DidFail, Action] = { + ): ZIO[R1, MediatorError, Action] = { // the val is from the match to be definitely stable val piuriForwardMessage = ForwardMessage.piuri @@ -23,8 +27,16 @@ object ForwardMessageExecuter extends ProtocolExecuterWithServices[ProtocolExecu }).map { case m: ForwardMessage => for { _ <- ZIO.logInfo("ForwardMessage") - db <- ZIO.service[Ref[MediatorDB]] - _ <- db.update(_.store(m.next, m.msg)) + repoMessageItem <- ZIO.service[MessageItemRepo] + repoDidAccount <- ZIO.service[DidAccountRepo] + recipientsSubject = Set(m.next) // m.msg.recipientsSubject + numbreOfUpdated <- repoDidAccount.addToInboxes(recipientsSubject, m.msg) + msg <- + if (numbreOfUpdated > 0) { // Or maybe we can add all the time + repoMessageItem.insert(MessageItem(m.msg)) *> + ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") // TODO change to debug level + } else + ZIO.logWarning("Note: No update on the DidAccount of the recipients") } yield None } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) diff --git a/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorAgent.scala b/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorAgent.scala index 7738e238..26c6fae6 100644 --- a/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorAgent.scala +++ b/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorAgent.scala @@ -15,31 +15,34 @@ import fmgp.crypto._ import fmgp.crypto.error._ import fmgp.did.comm._ import fmgp.did.comm.protocol._ +import fmgp.did.db._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.Try + +import reactivemongo.api.bson.{_, given} +import reactivemongo.api.bson.Macros.{_, given} case class MediatorAgent( override val id: DID, - override val keyStore: KeyStore, // Shound we make it lazy with ZIO + override val keyStore: KeyStore, // Should we make it lazy with ZIO didSocketManager: Ref[DIDSocketManager], - messageDB: Ref[MessageDB], ) extends Agent { override def keys: Seq[PrivateKey] = keyStore.keys.toSeq // val resolverLayer: ULayer[DynamicResolver] = // DynamicResolver.resolverLayer(didSocketManager) - type Services = Resolver & Agent & Operations & MessageDispatcher & Ref[MediatorDB] - val protocolHandlerLayer: URLayer[Ref[MediatorDB], ProtocolExecuter[Services]] = - ZLayer { - for { - ref <- ZIO.service[Ref[MediatorDB]] - } yield ProtocolExecuterCollection[Services]( + type Services = Resolver & Agent & Operations & MessageDispatcher & DidAccountRepo & MessageItemRepo + val protocolHandlerLayer: URLayer[DidAccountRepo & MessageItemRepo, ProtocolExecuter[Services]] = + ZLayer.succeed( + ProtocolExecuterCollection[Services]( BasicMessageExecuter, new TrustPingExecuter, MediatorCoordinationExecuter, ForwardMessageExecuter, PickupExecuter, ) - } + ) // private def _didSubjectAux = id // private def _keyStoreAux = keyStore.keys.toSeq @@ -48,11 +51,11 @@ case class MediatorAgent( // override def keys: Seq[PrivateKey] = _keyStoreAux // }) - val messageDispatcherLayer: ZLayer[Client, DidFail, MessageDispatcher] = - MessageDispatcherJVM.layer.mapError(ex => SomeThrowable(ex)) + val messageDispatcherLayer: ZLayer[Client, MediatorThrowable, MessageDispatcher] = + MessageDispatcherJVM.layer.mapError(ex => MediatorThrowable(ex)) // TODO move to another place & move validations and build a contex - def decrypt(msg: Message): ZIO[Agent & Resolver & Operations, DidFail, PlaintextMessage] = + def decrypt(msg: Message): ZIO[Agent & Resolver & Operations, MediatorError, PlaintextMessage] = for { ops <- ZIO.service[Operations] plaintextMessage <- msg match @@ -60,32 +63,41 @@ case class MediatorAgent( case em: EncryptedMessage => { em.`protected`.obj match - case AnonProtectedHeader(epk, apv, typ, enc, alg) => ops.anonDecrypt(em) - case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => ops.authDecrypt(em) + case AnonProtectedHeader(epk, apv, typ, enc, alg) => + ops + .anonDecrypt(em) + .mapError(ex => MediatorDidError(ex)) + case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => + ops + .authDecrypt(em) + .mapError(ex => MediatorDidError(ex)) }.flatMap(decrypt _) case sm: SignedMessage => - ops.verify(sm).flatMap { - case false => ZIO.fail(ValidationFailed) - case true => - sm.payload.content.fromJson[Message] match - case Left(error) => ZIO.fail(FailToParse(error)) - case Right(msg2) => decrypt(msg2) - } + ops + .verify(sm) + .mapError(ex => MediatorDidError(ex)) + .flatMap { + case false => ZIO.fail(MediatorDidError(ValidationFailed)) + case true => + sm.payload.content.fromJson[Message] match + case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) + case Right(msg2) => decrypt(msg2) + } } yield (plaintextMessage) def receiveMessage( data: String, mSocketID: Option[SocketID], ): ZIO[ - Operations & Resolver & MessageDispatcher & MediatorAgent & Ref[MediatorDB], - DidFail, + Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & DidAccountRepo, + MediatorError, Option[EncryptedMessage] ] = for { msg <- data.fromJson[EncryptedMessage] match case Left(error) => ZIO.logError(s"Data is not a EncryptedMessage: $error") - *> ZIO.fail(FailToParse(error)) + *> ZIO.fail(MediatorDidError(FailToParse(error))) case Right(message) => ZIO.logDebug( "Message's recipients KIDs: " + message.recipientsKid.mkString(",") + @@ -98,8 +110,8 @@ case class MediatorAgent( msg: EncryptedMessage, mSocketID: Option[SocketID] ): ZIO[ - Operations & Resolver & MessageDispatcher & MediatorAgent & Ref[MediatorDB], - DidFail, + Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & DidAccountRepo, + MediatorError, Option[EncryptedMessage] ] = ZIO @@ -112,7 +124,8 @@ case class MediatorAgent( *> ZIO.none else for { - _ <- messageDB.update(db => db.add(msg)) + messageItemRepo <- ZIO.service[MessageItemRepo] + _ <- messageItemRepo.insert(MessageItem(msg)) // store all message plaintextMessage <- decrypt(msg) _ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!! ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson)) @@ -137,7 +150,7 @@ case class MediatorAgent( def createSocketApp( annotationMap: Seq[LogAnnotation] ): ZIO[ - MediatorAgent & Resolver & Operations & MessageDispatcher & Ref[MediatorDB], + MediatorAgent & Resolver & Operations & MessageDispatcher & MessageItemRepo & DidAccountRepo, Nothing, zio.http.Response ] = { @@ -152,7 +165,7 @@ case class MediatorAgent( DIDSocketManager .newMessage(ch, text) .flatMap { case (socketID, encryptedMessage) => receiveMessage(encryptedMessage, Some(socketID)) } - .mapError(ex => DidException(ex)) + .mapError(ex => MediatorException(ex)) } case ChannelEvent(ch, ChannelEvent.ChannelUnregistered) => ZIO.logAnnotate(LogAnnotation(SOCKET_ID, ch.id), annotationMap: _*) { @@ -198,13 +211,7 @@ object MediatorAgent { def make(id: DID, keyStore: KeyStore): ZIO[Any, Nothing, MediatorAgent] = for { sm <- DIDSocketManager.make - db <- Ref.make(MessageDB()) - } yield MediatorAgent(id, keyStore, sm, db) - - // def make(agent: AgentDIDPeer): ZIO[Any, Nothing, MediatorAgent] = for { - // sm <- DIDSocketManager.make - // db <- Ref.make(MessageDB()) - // } yield MediatorAgent(agent.id, agent.keyStore, sm, db) + } yield MediatorAgent(id, keyStore, sm) def didCommApp = { Http.collectZIO[Request] { @@ -234,7 +241,7 @@ object MediatorAgent { data <- req.body.asString maybeSyncReplyMsg <- agent .receiveMessage(data, None) - .mapError(fail => DidException(fail)) + .mapError(fail => MediatorException(fail)) ret = maybeSyncReplyMsg match case None => Response.ok case Some(value) => Response.json(value.toJson) @@ -248,7 +255,7 @@ object MediatorAgent { .copy(status = Status.BadRequest) ) }: Http[ - Operations & Resolver & MessageDispatcher & MediatorAgent & Ref[MediatorDB], + Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & DidAccountRepo, Throwable, Request, Response diff --git a/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorCoordinationExecuter.scala b/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorCoordinationExecuter.scala index 355fa923..36f3bae0 100644 --- a/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorCoordinationExecuter.scala +++ b/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorCoordinationExecuter.scala @@ -8,59 +8,9 @@ import fmgp.did.comm._ import fmgp.did.comm.Operations._ import fmgp.did.comm.protocol._ import fmgp.did.comm.protocol.mediatorcoordination2._ +import fmgp.did.db.DidAccountRepo -/** Store all forwarded message */ -case class MediatorDB(db: Map[DIDSubject, Seq[EncryptedMessage]], alias: Map[DIDSubject, DIDSubject]) { - def isServing(subject: DIDSubject) = db.get(subject).isDefined - def enroll(subject: DIDSubject): Either[String, MediatorDB] = - alias.keys.find(_.string == subject.string) match - case Some(value) => Left(s"${subject.string} is alredy used as a alias for ${value.string}") - case None => - Right( - this.copy( - db = db.updatedWith(subject) { - case Some(value) => Some(value) - case None => Some(Seq.empty) - } - ) - ) - def addAlias(ower: DIDSubject, newAlias: DIDSubject) = - db.keys.find(_ == newAlias) match - case Some(did) => Left(s"${did} is alredy enrolled for mediation ") - case None => - alias.find(_._1 == newAlias) match - case Some((a, ower)) => Left(s"$newAlias is alredy an alias of $ower") - case None => Right(this.copy(alias = alias + (newAlias -> ower))) - def removeAlias(ower: DIDSubject, newAlias: DIDSubject) = - alias.find(_._1 == newAlias) match - case None => Left(s"$newAlias is not on DB") - case Some((oldAlias, oldOwer)) if (oldOwer != ower) => Left(s"$newAlias is not owed by $ower") - case Some((oldAlias, oldOwer)) => Right(this.copy(alias = alias.view.filterKeys(_ == newAlias).toMap)) - - def store(to: DIDSubject, msg: EncryptedMessage) = - MediatorDB( - db = db.updatedWith(alias.getOrElse(to, to))(_.map(e => msg +: e)), - alias = alias - ) - - def getMessages(to: DIDSubject, from: Option[DIDSubject]): Seq[EncryptedMessage] = - val allMessageToDid = db.get(to).toSeq.flatten - from match - case None => allMessageToDid - case Some(f) => - allMessageToDid.filter { case em => - em.`protected`.obj match - case header: AuthProtectedHeader => header.skid.did == f - case _ => false - - } -} - -object MediatorDB { - def empty = MediatorDB(db = Map.empty, alias = Map.empty) -} - -object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services & Ref[MediatorDB]] { +object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services & DidAccountRepo] { override def suportedPIURI: Seq[PIURI] = Seq( MediateRequest.piuri, @@ -72,9 +22,9 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco Keylist.piuri, ) - override def program[R1 <: Ref[MediatorDB]]( + override def program[R1 <: (DidAccountRepo)]( plaintextMessage: PlaintextMessage - ): ZIO[R1, DidFail, Action] = { + ): ZIO[R1, MediatorError, Action] = { // the val is from the match to be definitely stable val piuriMediateRequest = MediateRequest.piuri val piuriMediateGrant = MediateGrant.piuri @@ -98,53 +48,29 @@ object MediatorCoordinationExecuter extends ProtocolExecuterWithServices[Protoco case m: MediateRequest => for { _ <- ZIO.logInfo("MediateRequest") - db <- ZIO.service[Ref[MediatorDB]] - reply <- db.modify { db => - db.enroll(m.from.asDIDURL.toDID) match - case Left(fail) => (m.makeRespondMediateDeny.toPlaintextMessage, db) - case Right(newDB) => (m.makeRespondMediateGrant.toPlaintextMessage, newDB) - } + repo <- ZIO.service[DidAccountRepo] + result <- repo.newDidAccount(m.from.asDIDURL.toDID) + reply = result.n match + case 1 => m.makeRespondMediateGrant.toPlaintextMessage + case _ => m.makeRespondMediateDeny.toPlaintextMessage } yield SyncReplyOnly(reply) case m: KeylistUpdate => - case class Tmp(id: FROMTO, a: KeylistAction, r: KeylistResult) for { _ <- ZIO.logInfo("KeylistUpdate") - db <- ZIO.service[Ref[MediatorDB]] - updatesAndNewMediatorDB <- db.modify { mediatorDB => - val did2Add = m.updates.collect { case (fromto, KeylistAction.add) => fromto } - val did2Remove = m.updates.collect { case (fromto, KeylistAction.remove) => fromto } - m.updates.foldLeft((Seq.empty[Tmp], mediatorDB)) { - case ((resultList, tmpDB), (fromto, KeylistAction.add)) => - tmpDB.addAlias(ower = m.from.toDIDSubject, newAlias = fromto.toDIDSubject) match - case Left(value) => - ( - resultList :+ Tmp(fromto, KeylistAction.add, KeylistResult.server_error), - tmpDB - ) - case Right(newState) => - ( - resultList :+ Tmp(fromto, KeylistAction.add, KeylistResult.success), - newState - ) - - case ((resultList, tmpDB), (fromto, KeylistAction.remove)) => - tmpDB.removeAlias(ower = m.from.toDIDSubject, newAlias = fromto.toDIDSubject) match - case Left(value) => - ( - resultList :+ Tmp(fromto, KeylistAction.remove, KeylistResult.server_error), - tmpDB - ) - case Right(newState) => - ( - resultList :+ Tmp(fromto, KeylistAction.remove, KeylistResult.success), - newState - ) - } + repo <- ZIO.service[DidAccountRepo] + updateResponse <- ZIO.foreach(m.updates) { + case (fromto, KeylistAction.add) => + repo.addAlias(m.from.toDID, fromto.toDID).map { + case Left(value) => (fromto, KeylistAction.add, KeylistResult.server_error) + case Right(newState) => (fromto, KeylistAction.add, KeylistResult.success) + } + case (fromto, KeylistAction.remove) => + repo.removeAlias(m.from.toDID, fromto.toDID).map { + case Left(value) => (fromto, KeylistAction.remove, KeylistResult.server_error) + case Right(newState) => (fromto, KeylistAction.remove, KeylistResult.success) + } } - keylistResponse = m.makeKeylistResponse( - updatesAndNewMediatorDB.map(e => (e._1, e._2, e._3)) - ) - } yield SyncReplyOnly(keylistResponse.toPlaintextMessage) + } yield SyncReplyOnly(m.makeKeylistResponse(updateResponse).toPlaintextMessage) case m: KeylistResponse => ZIO.logWarning("KeylistResponse") *> ZIO.succeed(NoReply) case m: KeylistQuery => ZIO.logError("Not implemented KeylistQuery") *> ZIO.succeed(NoReply) // TODO case m: Keylist => ZIO.logWarning("Keylist") *> ZIO.succeed(NoReply) diff --git a/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorStandalone.scala b/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorStandalone.scala index 5cb33019..a223048b 100644 --- a/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorStandalone.scala +++ b/did-mediator/src/main/scala/fmgp/did/comm/mediator/MediatorStandalone.scala @@ -23,6 +23,7 @@ import fmgp.did.comm._ import fmgp.did.comm.mediator._ import fmgp.did.comm.protocol._ import fmgp.did.method.peer._ +import fmgp.did.db._ case class MediatorConfig(endpoint: java.net.URI, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) { val did = DIDPeer2.makeAgent( @@ -31,11 +32,23 @@ case class MediatorConfig(endpoint: java.net.URI, keyAgreement: OKPPrivateKey, k ) val agentLayer = ZLayer(MediatorAgent.make(id = did.id, keyStore = did.keyStore)) } +case class DataBaseConfig( + protocol: String, + host: String, + port: String, + userName: String, + password: String, + dbName: String +) { + val connectionString = s"$protocol://$userName:$password@$host:$port/$dbName" + val displayConnectionString = s"$protocol://$userName:******@$host:$port/$dbName" + override def toString: String = s"""DataBaseConfig($protocol, $host, $port, $userName, "******", $dbName)""" +} object MediatorStandalone extends ZIOAppDefault { val app: HttpApp[ // type HttpApp[-R, +Err] = Http[R, Err, Request, Response] - Hub[String] & Operations & MessageDispatcher & MediatorAgent & Ref[MediatorDB] & Resolver, + Hub[String] & Operations & MessageDispatcher & MediatorAgent & Resolver & MessageItemRepo & DidAccountRepo, Throwable ] = MediatorAgent.didCommApp ++ Http @@ -58,6 +71,8 @@ object MediatorStandalone extends ZIOAppDefault { _ <- ZIO.log(s"Mediator APP. See https://github.com/input-output-hk/atala-prism-mediator") _ <- ZIO.log(s"MediatorConfig: $mediatorConfig") _ <- ZIO.log(s"DID: ${mediatorConfig.did.id.string}") + mediatorDbConfig <- configs.nested("database").nested("mediator").load(deriveConfig[DataBaseConfig]) + _ <- ZIO.log(s"MediatorDb Connection String: ${mediatorDbConfig.displayConnectionString}") myHub <- Hub.sliding[String](5) _ <- ZStream.fromHub(myHub).run(ZSink.foreach((str: String) => ZIO.logInfo("HUB: " + str))).fork port <- configs @@ -87,9 +102,13 @@ object MediatorStandalone extends ZIOAppDefault { ) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(mediatorConfig.agentLayer) // .provideSomeLayer(AgentByHost.layer) + .provideSomeLayer( + AsyncDriverResource.layer + >>> ReactiveMongoApi.layer(mediatorDbConfig.connectionString) + >>> MessageItemRepo.layer.and(DidAccountRepo.layer) + ) .provideSomeLayer(Operations.layerDefault) .provideSomeLayer(client >>> MessageDispatcherJVM.layer) - .provideSomeLayer(ZLayer.fromZIO(Ref.make[MediatorDB](MediatorDB.empty))) // TODO move into AgentByHost .provideSomeEnvironment { (env: ZEnvironment[Server]) => env.add(myHub) } .provide(server) .debug diff --git a/did-mediator/src/main/scala/fmgp/did/comm/mediator/PickupExecuter.scala b/did-mediator/src/main/scala/fmgp/did/comm/mediator/PickupExecuter.scala index 2eaa7368..c8b11b56 100644 --- a/did-mediator/src/main/scala/fmgp/did/comm/mediator/PickupExecuter.scala +++ b/did-mediator/src/main/scala/fmgp/did/comm/mediator/PickupExecuter.scala @@ -8,8 +8,10 @@ import fmgp.did.comm._ import fmgp.did.comm.Operations._ import fmgp.did.comm.protocol._ import fmgp.did.comm.protocol.pickup3._ +import fmgp.did.db._ -object PickupExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services & Ref[MediatorDB]] { +object PickupExecuter + extends ProtocolExecuterWithServices[ProtocolExecuter.Services & DidAccountRepo & MessageItemRepo] { override def suportedPIURI: Seq[PIURI] = Seq( StatusRequest.piuri, @@ -20,9 +22,9 @@ object PickupExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Serv LiveModeChange.piuri, ) - override def program[R1 <: Ref[MediatorDB]]( + override def program[R1 <: DidAccountRepo & MessageItemRepo]( plaintextMessage: PlaintextMessage - ): ZIO[R1, DidFail, Action] = { + ): ZIO[R1, MediatorError, Action] = { // the val is from the match to be definitely stable val piuriStatusRequest = StatusRequest.piuri val piuriStatus = Status.piuri @@ -44,19 +46,29 @@ object PickupExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Serv case m: DeliveryRequest => for { _ <- ZIO.logInfo("DeliveryRequest") - db <- ZIO.service[Ref[MediatorDB]] - mediatorDB <- db.get + repoMessageItem <- ZIO.service[MessageItemRepo] + repoDidAccount <- ZIO.service[DidAccountRepo] didRequestingMessages = m.from.asFROMTO - messages = mediatorDB.getMessages( - to = didRequestingMessages.toDID, - from = m.recipient_did.map(_.toDID) - ) + mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) + msgHash = mDidAccount match + case None => ??? + case Some(didAccount) => didAccount.messagesRef.filter(_.state == false).map(_.hash) + allMessagesFor <- repoMessageItem.findByIds(msgHash) + messagesToReturn = + if (m.recipient_did.isEmpty) allMessagesFor + else { + allMessagesFor.filterNot( + _.msg.recipientsSubject + .map(_.did) + .forall(e => !m.recipient_did.map(_.toDID.did).contains(e)) + ) + } deliveryRequest = MessageDelivery( thid = m.id, from = m.to.asFROM, to = m.from.asTO, recipient_did = m.recipient_did, - attachments = messages.map(m => (m.hashCode.toString, m)).toMap, + attachments = messagesToReturn.map(m => (m.hashCode.toString, m.msg)).toMap, ) } yield SyncReplyOnly(deliveryRequest.toPlaintextMessage) case m: MessageDelivery => @@ -71,8 +83,17 @@ object PickupExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Serv ).toPlaintextMessage ) ) - case m: MessagesReceived => ZIO.logInfo("MessagesReceived") *> ZIO.succeed(NoReply) - case m: LiveModeChange => ZIO.logWarning("LiveModeChange not implemented") *> ZIO.succeed(NoReply) // TODO + case m: MessagesReceived => + for { + _ <- ZIO.logInfo("MessagesReceived") + repoDidAccount <- ZIO.service[DidAccountRepo] + didRequestingMessages = m.from.asFROMTO + mDidAccount <- repoDidAccount.makeAsDelivered( + didRequestingMessages.toDID, + m.message_id_list.map(e => e.toInt) // TODO have it safe 'toInt' + ) + } yield NoReply + case m: LiveModeChange => ZIO.logWarning("LiveModeChange not implemented") *> ZIO.succeed(NoReply) // TODO } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) diff --git a/did-mediator/src/main/scala/fmgp/did/comm/protocol/ProtocolExecute.scala b/did-mediator/src/main/scala/fmgp/did/comm/protocol/ProtocolExecute.scala index 7bf4af22..eb4976ac 100644 --- a/did-mediator/src/main/scala/fmgp/did/comm/protocol/ProtocolExecute.scala +++ b/did-mediator/src/main/scala/fmgp/did/comm/protocol/ProtocolExecute.scala @@ -10,6 +10,7 @@ import fmgp.did.comm.Operations._ import fmgp.did.comm.protocol._ import fmgp.did.comm.protocol.basicmessage2._ import fmgp.did.comm.protocol.trustping2._ +import fmgp.did.db._ //TODO pick a better name // maybe "Protocol" only @@ -18,10 +19,10 @@ trait ProtocolExecuter[-R] { def suportedPIURI: Seq[PIURI] /** @return can return a Sync Reply Msg */ - def execute[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, DidFail, Option[EncryptedMessage]] = + def execute[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, MediatorError, Option[EncryptedMessage]] = program(plaintextMessage) *> ZIO.none - def program[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, DidFail, Action] + def program[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, MediatorError, Action] } object ProtocolExecuter { @@ -35,14 +36,14 @@ case class ProtocolExecuterCollection[-R](executers: ProtocolExecuter[R]*) exten override def execute[R1 <: R]( plaintextMessage: PlaintextMessage, - ): ZIO[R1, DidFail, Option[EncryptedMessage]] = + ): ZIO[R1, MediatorError, Option[EncryptedMessage]] = selectExecutersFor(plaintextMessage.`type`) match case None => NullProtocolExecute.execute(plaintextMessage) case Some(px) => px.execute(plaintextMessage) override def program[R1 <: R]( plaintextMessage: PlaintextMessage, - ): ZIO[R1, DidFail, Action] = + ): ZIO[R1, MediatorError, Action] = selectExecutersFor(plaintextMessage.`type`) match case None => NullProtocolExecute.program(plaintextMessage) case Some(px) => px.program(plaintextMessage) @@ -53,7 +54,7 @@ trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends Prot override def execute[R1 <: R]( plaintextMessage: PlaintextMessage, // context: Context - ): ZIO[R1, DidFail, Option[EncryptedMessage]] = + ): ZIO[R1, MediatorError, Option[EncryptedMessage]] = program(plaintextMessage) .tap(v => ZIO.logDebug(v.toString)) // DEBUG .flatMap { @@ -61,9 +62,11 @@ trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends Prot case action: AnyReply => val reply = action.msg for { - msg <- reply.from match - case Some(value) => authEncrypt(reply) - case None => anonEncrypt(reply) + msg <- { + reply.from match + case Some(value) => authEncrypt(reply) + case None => anonEncrypt(reply) + }.mapError(fail => MediatorDidError(fail)) // TODO forward message maybeSyncReplyMsg <- reply.to.map(_.toSeq) match // TODO improve case None => ZIO.logWarning("Have a reply but the field 'to' is missing") *> ZIO.none @@ -71,10 +74,12 @@ trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends Prot case Some(send2DIDs) => ZIO .foreach(send2DIDs)(to => - val job = for { + val job: ZIO[MessageDispatcher & (Resolver & Any), MediatorError, Matchable] = for { messageDispatcher <- ZIO.service[MessageDispatcher] resolver <- ZIO.service[Resolver] - doc <- resolver.didDocument(to) + doc <- resolver + .didDocument(to) + .mapError(fail => MediatorDidError(fail)) mURL = doc.service.toSeq.flatten .filter(_.`type` match { case str: String => str == DIDService.TYPE_DIDCommMessaging @@ -84,20 +89,24 @@ trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends Prot head.getServiceEndpointAsURIs.headOption // TODO head case Seq() => None // TODO } - jobToRun <- mURL match + jobToRun = mURL match case None => ZIO.logWarning(s"No url to send message") - case Some(url) => + case Some(url) => { ZIO.log(s"Send to url: $url") *> - messageDispatcher.send( - msg, - url, // "http://localhost:8080", // FIXME REMOVE (use for local env) - None - // url match // FIXME REMOVE (use for local env) - // case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head) - // case https if https.startsWith("https://") => - // Some(url.drop(8).split(':').head.split('/').head) - // case _ => None - ) + messageDispatcher + .send( + msg, + url, // "http://localhost:8080", // FIXME REMOVE (use for local env) + None + // url match // FIXME REMOVE (use for local env) + // case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head) + // case https if https.startsWith("https://") => + // Some(url.drop(8).split(':').head.split('/').head) + // case _ => None + ) + .mapError(fail => MediatorDidError(fail)) + } + } yield (jobToRun) action match case Reply(_) => job @@ -122,14 +131,14 @@ trait ProtocolExecuterWithServices[-R <: ProtocolExecuter.Services] extends Prot override def program[R1 <: R]( plaintextMessage: PlaintextMessage, // context: Context - ): ZIO[R1, DidFail, Action] + ): ZIO[R1, MediatorError, Action] } object NullProtocolExecute extends ProtocolExecuter[Any] { override def suportedPIURI = Seq() override def program[R1 <: Any](plaintextMessage: PlaintextMessage) = - ZIO.fail(MissingProtocol(plaintextMessage.`type`)) + ZIO.fail(MissingProtocolError(plaintextMessage.`type`)) } object BasicMessageExecuter extends ProtocolExecuter[Any] { @@ -137,8 +146,8 @@ object BasicMessageExecuter extends ProtocolExecuter[Any] { override def suportedPIURI: Seq[PIURI] = Seq(BasicMessage.piuri) override def program[R1 <: Any](plaintextMessage: PlaintextMessage) = for { job <- BasicMessage.fromPlaintextMessage(plaintextMessage) match - case Left(error) => ZIO.fail(FailToParse(error)) - case Right(bm) => Console.printLine(bm.toString).mapError(ex => SomeThrowable(ex)) + case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) + case Right(bm) => Console.printLine(bm.toString).mapError(ex => MediatorThrowable(ex)) } yield NoReply } @@ -148,7 +157,7 @@ class TrustPingExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Se override def program[R1 <: Agent]( plaintextMessage: PlaintextMessage - ): ZIO[R1, DidFail, Action] = { + ): ZIO[R1, MediatorError, Action] = { // the val is from the match to be definitely stable val piuriTrustPing = TrustPing.piuri val piuriTrustPingResponse = TrustPingResponse.piuri @@ -156,7 +165,7 @@ class TrustPingExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Se plaintextMessage.`type` match case `piuriTrustPing` => TrustPing.fromPlaintextMessage(plaintextMessage) match - case Left(error) => ZIO.fail(FailToParse(error)) + case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) case Right(ping: TrustPingWithOutRequestedResponse) => ZIO.logInfo(ping.toString()) *> ZIO.succeed(NoReply) case Right(ping: TrustPingWithRequestedResponse) => for { @@ -167,7 +176,7 @@ class TrustPingExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Se case `piuriTrustPingResponse` => for { job <- TrustPingResponse.fromPlaintextMessage(plaintextMessage) match - case Left(error) => ZIO.fail(FailToParse(error)) + case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) case Right(ping) => ZIO.logInfo(ping.toString()) } yield NoReply } diff --git a/did-mediator/src/main/scala/fmgp/did/db/AsyncDriverResource.scala b/did-mediator/src/main/scala/fmgp/did/db/AsyncDriverResource.scala new file mode 100644 index 00000000..4a805bda --- /dev/null +++ b/did-mediator/src/main/scala/fmgp/did/db/AsyncDriverResource.scala @@ -0,0 +1,19 @@ +package fmgp.did.db + +import zio.{TaskLayer, ZIO, ZLayer} +import scala.concurrent.duration.DurationInt +import reactivemongo.api.{AsyncDriver, DB, MongoConnection} +import zio.{Task, ZIO, ZLayer} +import scala.concurrent.ExecutionContext + +object AsyncDriverResource { + private def acquire = ZIO.attempt(AsyncDriver()) + + private def release(asyncDriver: AsyncDriver) = ZIO + .fromFuture(implicit ec => asyncDriver.close(10.seconds)) + .orDie + .unit + + val layer: TaskLayer[AsyncDriver] = ZLayer.scoped(ZIO.acquireRelease(acquire)(release)) + +} diff --git a/did-mediator/src/main/scala/fmgp/did/db/BsonImplicits.scala b/did-mediator/src/main/scala/fmgp/did/db/BsonImplicits.scala new file mode 100644 index 00000000..0be73f1a --- /dev/null +++ b/did-mediator/src/main/scala/fmgp/did/db/BsonImplicits.scala @@ -0,0 +1,239 @@ +package fmgp.did.db + +import scala.util._ +import zio.json._ +import reactivemongo.api.bson._ + +import fmgp.crypto._ +import fmgp.did._ +import fmgp.did.comm._ +import fmgp.util._ + +given BSONWriter[DIDSubject] with { + import DIDSubject._ + def writeTry(obj: DIDSubject): Try[BSONValue] = Try(BSONString(obj.string)) +} + +given BSONReader[DIDSubject] with { + def readTry(bson: BSONValue): Try[DIDSubject] = bson.asTry[String].map(v => DIDSubject(v)) +} + +given BSONWriter[DID] with { + import DID._ + def writeTry(obj: DID): Try[BSONValue] = Try(BSONString(obj.string)) +} + +given BSONReader[DID] with { + def readTry(bson: BSONValue): Try[DID] = bson.asTry[String].map(v => DIDSubject(v)) +} + +given BSONWriter[APV] with { + import APV._ + def writeTry(obj: APV): Try[BSONValue] = Try(BSONString(obj.value)) +} +given BSONReader[APV] with { + def readTry(bson: BSONValue): Try[APV] = bson.asTry[String].map(v => APV(v)) +} + +given BSONWriter[APU] with { + import APU._ + def writeTry(obj: APU): Try[BSONValue] = Try(BSONString(obj.value)) +} +given BSONReader[APU] with { + def readTry(bson: BSONValue): Try[APU] = bson.asTry[String].map(v => APU(v)) +} + +given BSONWriter[CipherText] with { + import CipherText._ + def writeTry(obj: CipherText): Try[BSONValue] = Try(BSONString(obj.value)) +} + +given BSONReader[CipherText] with { + def readTry(bson: BSONValue): Try[CipherText] = bson.asTry[String].map(v => CipherText(v)) +} + +given BSONWriter[TAG] with { + import TAG._ + def writeTry(obj: TAG): Try[BSONValue] = Try(BSONString(obj.value)) +} +given BSONReader[TAG] with { + def readTry(bson: BSONValue): Try[TAG] = bson.asTry[String].map(v => TAG(v)) +} +given BSONWriter[IV] with { + import IV._ + def writeTry(obj: IV): Try[BSONValue] = Try(BSONString(obj.value)) +} +given BSONReader[IV] with { + def readTry(bson: BSONValue): Try[IV] = bson.asTry[String].map(v => IV(v)) +} + +given BSONWriter[Base64Obj[ProtectedHeader]] with { + import Base64Obj._ + def writeTry(obj: Base64Obj[ProtectedHeader]): Try[BSONValue] = Try(BSONString(obj.base64url)) +} +given BSONReader[Base64Obj[ProtectedHeader]] with { + def readTry(bson: BSONValue): Try[Base64Obj[ProtectedHeader]] = + bson + .asTry[String] + .flatMap{v => + s""""$v"""".fromJson[Base64Obj[ProtectedHeader]] match // TODO with a new methods from ScalaDid + case Left(value) => Failure(RuntimeException(value)) + case Right(value) => Try(value) +} +} + +given BSONWriter[Base64] with { + import Base64._ + def writeTry(obj: Base64): Try[BSONValue] = Try(BSONString(obj.urlBase64)) +} + +given BSONReader[Base64] with { + import Base64._ + def readTry(bson: BSONValue): Try[Base64] = bson.asTry[String].map(v => Base64.fromBase64url(v)) +} + +given BSONDocumentWriter[Recipient] = + Macros.writer[Recipient] +given BSONDocumentReader[Recipient] = + Macros.reader[Recipient] + +given BSONDocumentWriter[VerificationMethodReferenced] = + Macros.writer[VerificationMethodReferenced] +given BSONDocumentReader[VerificationMethodReferenced] = + Macros.reader[VerificationMethodReferenced] + +given BSONDocumentWriter[RecipientHeader] = + Macros.writer[RecipientHeader] +given BSONDocumentReader[RecipientHeader] = + Macros.reader[RecipientHeader] + +given BSONWriter[KTY] with { + def writeTry(obj: KTY): Try[BSONValue] = Try(BSONString(obj.toString())) +} +given BSONReader[KTY] with { + def readTry(bson: BSONValue): Try[KTY] = bson.asTry[String].map(v => KTY.valueOf(v)) +} + +given BSONWriter[Curve] with { + def writeTry(obj: Curve): Try[BSONValue] = Try(BSONString(obj.toString())) +} +given BSONReader[Curve] with { + def readTry(bson: BSONValue): Try[Curve] = bson.asTry[String].map(v => Curve.valueOf(v)) +} + +given BSONWriter[ENCAlgorithm] with { + def writeTry(obj: ENCAlgorithm): Try[BSONValue] = Try(BSONString(obj.toString())) +} +given BSONReader[ENCAlgorithm] with { + def readTry(bson: BSONValue): Try[ENCAlgorithm] = bson.asTry[String].map(v => ENCAlgorithm.valueOf(v)) +} + +given BSONWriter[KWAlgorithm] with { + def writeTry(obj: KWAlgorithm): Try[BSONValue] = Try(BSONString(obj.toString())) +} +given BSONReader[KWAlgorithm] with { + def readTry(bson: BSONValue): Try[KWAlgorithm] = bson.asTry[String].map(v => KWAlgorithm.valueOf(v)) +} +given BSONWriter[MediaTypes] with { + def writeTry(obj: MediaTypes): Try[BSONValue] = Try(BSONString(obj.toString())) +} +given BSONReader[MediaTypes] with { + def readTry(bson: BSONValue): Try[MediaTypes] = bson.asTry[String].map(v => MediaTypes.valueOf(v)) +} + +given BSONDocumentWriter[PublicKey] with { + override def writeTry(obj: PublicKey): Try[BSONDocument] = + obj match { + case ECPublicKey(kty, crv, x, y, kid) => + Try( + BSONDocument( + "kty" -> kty, + "crv" -> crv, + "x" -> x, + "y" -> y, + "kid" -> kid, + ) + ) + case OKPPublicKey(kty, crv, x, kid) => + Try( + BSONDocument( + "kty" -> kty, + "crv" -> crv, + "x" -> x, + "kid" -> kid, + ) + ) + } +} +given BSONDocumentReader[PublicKey] with { + + override def readDocument(doc: BSONDocument): Try[PublicKey] = + doc.get("kty").get.asTry[KTY] match + case Failure(exception) => Failure(exception) + case Success(KTY.OKP) => + ( + doc.getAsTry[Curve]("crv"), + doc.getAsTry[String]("x"), + ) match + case (Success(crv), Success(x)) => + Success( + OKPPublicKey( + kty = KTY.OKP, + crv = crv, + x = x, + kid = doc.getAsOpt[String]("kid") + ) + ) + case (Failure(ex), _) => Failure(ex) + case (_, Failure(ex)) => Failure(ex) + case Success(KTY.EC) => + ( + doc.getAsTry[Curve]("crv"), + doc.getAsTry[String]("x"), + doc.getAsTry[String]("y"), + ) match + case (Success(crv), Success(x), Success(y)) => + Success( + ECPublicKey( + kty = KTY.EC, + crv = crv, + x = x, + y = y, + kid = doc.getAsOpt[String]("kid") + ) + ) + case (Failure(ex), _, _) => Failure(ex) + case (_, Failure(ex), _) => Failure(ex) + case (_, _, Failure(ex)) => Failure(ex) + +} + +given BSONDocumentWriter[AnonProtectedHeader] = + Macros.writer[AnonProtectedHeader] +given BSONDocumentReader[AnonProtectedHeader] = + Macros.reader[AnonProtectedHeader] + +given BSONDocumentWriter[AuthProtectedHeader] = + Macros.writer[AuthProtectedHeader] +given BSONDocumentReader[AuthProtectedHeader] = + Macros.reader[AuthProtectedHeader] + +given BSONDocumentWriter[ProtectedHeader] = + Macros.writer[ProtectedHeader] +given BSONDocumentReader[ProtectedHeader] = + Macros.reader[ProtectedHeader] + +given BSONDocumentWriter[EncryptedMessage] with { + val aux = Macros.writer[EncryptedMessageGeneric] + override def writeTry(obj: EncryptedMessage): Try[BSONDocument] = + obj match { + case msg: EncryptedMessageGeneric => aux.writeTry(msg) // Success(msg): Try[reactivemongo.api.bson.BSONDocument] + case _ => Failure(RuntimeException("Only support EncryptedMessageGeneric")) + } + +} +given BSONDocumentReader[EncryptedMessage] with { + val aux = Macros.reader[EncryptedMessageGeneric] + override def readDocument(doc: BSONDocument): Try[EncryptedMessage] = + aux.readDocument(doc) +} diff --git a/did-mediator/src/main/scala/fmgp/did/db/DataModels.scala b/did-mediator/src/main/scala/fmgp/did/db/DataModels.scala new file mode 100644 index 00000000..675e10fe --- /dev/null +++ b/did-mediator/src/main/scala/fmgp/did/db/DataModels.scala @@ -0,0 +1,37 @@ +package fmgp.did.db + +import fmgp.did._ +import fmgp.did.comm._ +import reactivemongo.api.bson._ +import java.time.Instant +type HASH = Int +// messages + +case class MessageItem(_id: HASH, msg: EncryptedMessage, headers: ProtectedHeader) +object MessageItem { + def apply(msg: EncryptedMessage): MessageItem = new MessageItem(msg.hashCode(), msg, msg.`protected`.obj) + given BSONDocumentWriter[MessageItem] = Macros.writer[MessageItem] + given BSONDocumentReader[MessageItem] = Macros.reader[MessageItem] +} + +case class MessageMetaData(hash: HASH, recipient: DIDSubject, state: Boolean, ts: String) +object MessageMetaData { + given BSONDocumentWriter[MessageMetaData] = Macros.writer[MessageMetaData] + given BSONDocumentReader[MessageMetaData] = Macros.reader[MessageMetaData] + def apply(hash: HASH, recipient: DIDSubject) = { + new MessageMetaData(hash = hash, recipient = recipient, state = false, ts = Instant.now().toString) + } +} + +// did_account did +case class DidAccount( + _id: BSONObjectID = BSONObjectID.generate(), + did: DIDSubject, + alias: Seq[DID], + messagesRef: Seq[MessageMetaData], +) + +object DidAccount { + given BSONDocumentWriter[DidAccount] = Macros.writer[DidAccount] + given BSONDocumentReader[DidAccount] = Macros.reader[DidAccount] +} diff --git a/did-mediator/src/main/scala/fmgp/did/db/DidAccountRepo.scala b/did-mediator/src/main/scala/fmgp/did/db/DidAccountRepo.scala new file mode 100644 index 00000000..eae25a8d --- /dev/null +++ b/did-mediator/src/main/scala/fmgp/did/db/DidAccountRepo.scala @@ -0,0 +1,148 @@ +package fmgp.did.db + +import reactivemongo.api.bson._ +import reactivemongo.api.bson.collection.BSONCollection +import reactivemongo.api.commands.WriteResult +import reactivemongo.api.Cursor +import reactivemongo.api.CursorProducer +import zio._ +import fmgp.crypto.error._ +import fmgp.did._ +import fmgp.did.comm._ +import scala.concurrent.ExecutionContext + +object DidAccountRepo { + def layer: ZLayer[ReactiveMongoApi, Throwable, DidAccountRepo] = + ZLayer { + for { + ref <- ZIO.service[ReactiveMongoApi] + } yield DidAccountRepo(ref)(using scala.concurrent.ExecutionContext.global) + } +} + +class DidAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionContext) { + def collectionName: String = "user.account" + + def collection: IO[StorageCollection, BSONCollection] = reactiveMongoApi.database + .map(_.collection(collectionName)) + .mapError(ex => StorageCollection(ex)) + + def newDidAccount(did: DIDSubject /*, alias: Seq[DID] = Seq.empty*/ ): IO[StorageError, WriteResult] = { + val value = DidAccount( + did = did, + alias = Seq(did), + messagesRef = Seq.empty + ) + for { + coll <- collection + result <- ZIO + .fromFuture(implicit ec => coll.insert.one(value)) + .mapError(ex => StorageThrowable(ex)) + } yield result + } + + def getDidAccount(did: DIDSubject): IO[StorageError, Option[DidAccount]] = { + def selector: BSONDocument = BSONDocument("did" -> did) + def projection: Option[BSONDocument] = None + for { + coll <- collection + result <- ZIO + .fromFuture(implicit ec => + coll + .find(selector, projection) + .cursor[DidAccount]() + .collect[Seq](1, Cursor.FailOnError[Seq[DidAccount]]()) // Just one + ) + .mapError(ex => StorageThrowable(ex)) + } yield result.headOption + } + + def addAlias(owner: DIDSubject, newAlias: DIDSubject): ZIO[Any, StorageError, Either[String, Unit]] = { + def selector: BSONDocument = BSONDocument("did" -> owner) + def update: BSONDocument = BSONDocument( + "$push" -> BSONDocument( + "alias" -> newAlias + ) + ) + for { + coll <- collection + result <- ZIO + .fromFuture(implicit ec => + coll.update + .one(selector, update) // Just one + ) + .mapError(ex => StorageThrowable(ex)) + } yield Right(()) + + } + + def removeAlias(owner: DIDSubject, newAlias: DIDSubject): ZIO[Any, StorageError, Either[String, Unit]] = { + def selector: BSONDocument = BSONDocument("did" -> owner) + def update: BSONDocument = BSONDocument( + "$pull" -> BSONDocument( + "alias" -> newAlias + ) + ) + for { + coll <- collection + result <- ZIO + .fromFuture(implicit ec => + coll.update + .one(selector, update) // Just one + ) + .mapError(ex => StorageThrowable(ex)) + } yield Right(()) + } + + /** @return + * numbre of documents updated in DB + */ + def addToInboxes(recipients: Set[DIDSubject], msg: EncryptedMessage): ZIO[Any, StorageError, Int] = { + def selector = + BSONDocument( + "alias" -> BSONDocument("$in" -> recipients.map(_.did)), + "messagesRef" -> + BSONDocument( + "$not" -> + BSONDocument( + "$elemMatch" -> + BSONDocument( + "hash" -> msg.hashCode, + "recipient" -> BSONDocument("$in" -> recipients.map(_.did)) + ) + ) + ) + ) + + def update: BSONDocument = BSONDocument( + "$push" -> BSONDocument( + "messagesRef" -> BSONDocument( + "$each" -> + recipients.map(recipient => MessageMetaData(msg.hashCode, recipient)) + ) + ) + ) + + for { + coll <- collection + result <- ZIO + .fromFuture(implicit ec => + coll.update + .one(selector, update) // Just one + ) + .mapError(ex => StorageThrowable(ex)) + } yield result.nModified + } + + def makeAsDelivered(didAccount: DIDSubject, hashs: Seq[HASH]): ZIO[Any, StorageError, Int] = { + def selector = BSONDocument("did" -> didAccount.did, "messagesRef.hash" -> BSONDocument("$in" -> hashs)) + def update: BSONDocument = BSONDocument("$set" -> BSONDocument("messagesRef.$.state" -> true)) + + for { + coll <- collection + result <- ZIO + .fromFuture(implicit ec => coll.update.one(selector, update)) // Just one + .mapError(ex => StorageThrowable(ex)) + } yield result.nModified + } +} diff --git a/did-mediator/src/main/scala/fmgp/did/db/MessageItemRepo.scala b/did-mediator/src/main/scala/fmgp/did/db/MessageItemRepo.scala new file mode 100644 index 00000000..4d18d4cf --- /dev/null +++ b/did-mediator/src/main/scala/fmgp/did/db/MessageItemRepo.scala @@ -0,0 +1,73 @@ +package fmgp.did.db + +import zio._ +import scala.concurrent.ExecutionContext + +import reactivemongo.api.bson._ +import reactivemongo.api.bson.collection.BSONCollection +import reactivemongo.api.commands.WriteResult +import reactivemongo.api.Cursor +import reactivemongo.api.CursorProducer + +import fmgp.did._ +object MessageItemRepo { + def layer: ZLayer[ReactiveMongoApi, Throwable, MessageItemRepo] = + ZLayer { + for { + ref <- ZIO.service[ReactiveMongoApi] + } yield MessageItemRepo(ref)(using scala.concurrent.ExecutionContext.global) + } +} + +class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionContext) { + def collectionName: String = "messages" + + def collection: IO[StorageCollection, BSONCollection] = reactiveMongoApi.database + .map(_.collection(collectionName)) + .mapError(ex => StorageCollection(ex)) + + def insert(value: MessageItem): IO[StorageError, WriteResult] = { + for { + coll <- collection + result <- ZIO + .fromFuture(implicit ec => coll.insert.one(value)) + .mapError(ex => StorageThrowable(ex)) + } yield result + } + + def findById(id: HASH): IO[StorageError, Option[MessageItem]] = { + def selector: BSONDocument = BSONDocument("_id" -> id) + def projection: Option[BSONDocument] = None + for { + coll <- collection + result <- ZIO + .fromFuture(implicit ec => + coll + .find(selector, projection) + .cursor[MessageItem]() + .collect[Seq](1, Cursor.FailOnError[Seq[MessageItem]]()) + ) + .mapError(ex => StorageThrowable(ex)) + } yield result.headOption + } + + def findByIds(ids: Seq[HASH]): IO[StorageError, Seq[MessageItem]] = { + def selector: BSONDocument = { + println(s""" {"_id": {"$$in" -> $ids}} """) + BSONDocument("_id" -> BSONDocument("$in" -> ids)) + } + def projection: Option[BSONDocument] = None + for { + coll <- collection + result <- ZIO + .fromFuture(implicit ec => + coll + .find(selector, projection) + .cursor[MessageItem]() + .collect[Seq](-1, Cursor.FailOnError[Seq[MessageItem]]()) + ) + .mapError(ex => StorageThrowable(ex)) + } yield result + } + +} diff --git a/did-mediator/src/main/scala/fmgp/did/db/ReactiveMongoApi.scala b/did-mediator/src/main/scala/fmgp/did/db/ReactiveMongoApi.scala new file mode 100644 index 00000000..1cd9ed29 --- /dev/null +++ b/did-mediator/src/main/scala/fmgp/did/db/ReactiveMongoApi.scala @@ -0,0 +1,53 @@ +package fmgp.did.db + +import zio.{TaskLayer, ZIO, ZLayer} +import scala.concurrent.duration.DurationInt +import reactivemongo.api.{AsyncDriver, DB, MongoConnection} +import zio.{Task, ZIO, ZLayer} +import reactivemongo.api.MongoConnection.ParsedURIWithDB + +trait ReactiveMongoApi { + def driver: AsyncDriver + def connection: MongoConnection + def database: Task[DB] +} + +case class ReactiveMongoLive( + asyncDriver: AsyncDriver, + mongoParsedUri: ParsedURIWithDB, + mongoConnection: MongoConnection +) extends ReactiveMongoApi { + + lazy val driver: AsyncDriver = asyncDriver + + lazy val connection: MongoConnection = mongoConnection + + def database: Task[DB] = ZIO.fromFuture(implicit ec => connection.database(mongoParsedUri.db)) + +} + +object ReactiveMongoApi { + + private def acquire(connectionString: String) = ( + for { + asyncDriver <- ZIO.service[AsyncDriver] + mongoParsedUri <- ZIO.fromFuture(implicit ec => MongoConnection.fromStringWithDB(connectionString)) + connection <- ZIO.fromFuture(_ => + asyncDriver.connect(mongoParsedUri, Some(mongoParsedUri.db), strictMode = false) + ) + reactiveMongo = ReactiveMongoLive(asyncDriver, mongoParsedUri, connection) + } yield reactiveMongo + ).uninterruptible + + private def release(api: ReactiveMongoApi) = ZIO + .fromFuture(_ => api.connection.close()(10.seconds)) + .orDie + .unit + + def layer( + connectionString: String + ): ZLayer[AsyncDriver, Throwable, ReactiveMongoApi] = + ZLayer.scoped( + ZIO.acquireRelease(acquire(connectionString))(release(_)) + ) +} diff --git a/docker-compose.yaml b/docker-compose.yaml index 262ffbfa..6c32955b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,16 +1,26 @@ version: '3.9' services: - # mongo: - # image: mongo:5.0 - # ports: - # - 27017:27017 - # #volumes: - # # - ./tmp/mongo:/data/db - # environment: - # - MONGO_INITDB_ROOT_USERNAME=admin - # - MONGO_INITDB_ROOT_PASSWORD=admin - + mongo: + image: mongo:6.0 + ports: + - 27017:27017 + #volumes: + # - ./tmp/mongo:/data/db + command: ["--auth"] + # command: ["--bind_ip_all","--keyFile", "/opt/keyfile/keyfile","--replSet", "rs0","--auth"] + environment: + - MONGO_INITDB_ROOT_USERNAME=admin + - MONGO_INITDB_ROOT_PASSWORD=admin + - MONGO_INITDB_DATABASE=mediator + # - MONGO_REPLICA_SET_NAME=rs0 + volumes: + - ./initdb.js:/docker-entrypoint-initdb.d/initdb.js + # - ./:/opt/keyfile/ + # healthcheck: + # test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u $${MONGO_INITDB_ROOT_USERNAME} -p $${MONGO_INITDB_ROOT_PASSWORD} --quiet) -eq 1 + # interval: 10s + # start_period: 30s atalaprism-mediator: image: ghcr.io/input-output-hk/mediator:0.1.1-SNAPSHOT ports: @@ -24,15 +34,20 @@ services: - KEY_AUTHENTICATION_X=MBjnXZxkMcoQVVL21hahWAw43RuAG-i64ipbeKKqwoA - SERVICE_ENDPOINT=https://k8s-int.atalaprism.io/mediator # Config storage - #- DB_URL=mongodb://admin:admin@mongo:27017 - #- MONGODB_USER=admin - #- MONGODB_PASSWORD=admin - # depends_on: - # - "mongo" + #- DB_URL=mongodb://admin:admin@localhost:27017 + - MONGODB_USER=admin + - MONGODB_PASSWORD=admin + - MONGODB_PROTOCOL=mongodb + - MONGODB_HOST=localhost + - MONGODB_PORT=27017 + - MONGODB_DB_NAME=mediator + depends_on: + - "mongo" # RUN # docker-compose up -d # docker-compose ps +# docker-compose up mongo # docker-compose exec mongo /bin/sh # docker exec -it mongo-1 bash \ No newline at end of file diff --git a/initdb.js b/initdb.js new file mode 100644 index 00000000..4cec0647 --- /dev/null +++ b/initdb.js @@ -0,0 +1,21 @@ +db.createUser({ + user: "admin", + pwd: "admin", + roles: [ + { role: "readWrite", db: "mediator" } + ] +}); + +const database = 'mediator'; +const collectionDidAccount = 'user.account'; +const collectionMessages = 'messages'; + +// The current database to use. +use(database); +// Create collections. +db.createCollection(collectionDidAccount); +db.createCollection(collectionMessages); +//create index +db.getCollection(collectionDidAccount).createIndex({ 'did': 1 }, { unique: true }); +db.getCollection(collectionDidAccount).createIndex({ 'alias': 1 }, { unique: true }); +db.getCollection(collectionDidAccount).createIndex({ "messagesRef.hash": 1, "messagesRef.recipient": 1 });