Skip to content

Commit

Permalink
feat(agent): add mediation and ability to send messages
Browse files Browse the repository at this point in the history
  • Loading branch information
goncalo-frade-iohk committed Feb 6, 2023
1 parent 7cc738f commit e1944ac
Show file tree
Hide file tree
Showing 16 changed files with 417 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.iohk.atala.prism.domain.buildingBlocks

import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.DIDPair
import io.iohk.atala.prism.domain.models.MediatorDID
import io.iohk.atala.prism.domain.models.Mediator
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.domain.models.PeerDID
import io.iohk.atala.prism.domain.models.PrismDIDInfo
Expand Down Expand Up @@ -70,7 +70,7 @@ interface Pluto {

fun getMessage(id: String): Flow<Message?>

fun getAllMediators(): Flow<Array<MediatorDID>>
fun getAllMediators(): Flow<Array<Mediator>>

fun getAllCredentials(): Flow<Array<VerifiableCredential>>
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.iohk.atala.prism.domain.models

data class MediatorDID(
data class Mediator(
val id: String,
val mediatorDID: DID,
val hostDID: DID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data class Message(
val extraHeaders: Array<String>,
val createdTime: String,
val expiresTimePlus: String,
val attachments: Array<String>, // TODO: Change to AttachmentDescriptor
val attachments: Array<AttachmentDescriptor>,
val thid: String? = null,
val pthid: String? = null,
val ack: Array<String>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.iohk.atala.prism.domain.models

import kotlinx.serialization.Serializable

interface AttachmentData

data class AttachmentHeader(
Expand Down Expand Up @@ -48,6 +50,7 @@ data class AttachmentJsonData(
val data: String
) : AttachmentData

@Serializable
data class AttachmentDescriptor(
val id: String,
val mediaType: String? = null,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.iohk.atala.prism.walletsdk.prismagent

import io.iohk.atala.prism.domain.buildingBlocks.Castor
import io.iohk.atala.prism.domain.buildingBlocks.Mercury
import io.iohk.atala.prism.domain.buildingBlocks.Pluto
import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.domain.models.PrismAgentError
import io.iohk.atala.prism.walletsdk.prismagent.mediation.MediatorHandler
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map

class ConnectionManager {
private val mercury: Mercury
private val castor: Castor
private val pluto: Pluto
val mediator: MediatorHandler

constructor(
mercury: Mercury,
castor: Castor,
pluto: Pluto,
mediatorHandler: MediatorHandler
) {
this.mercury = mercury
this.castor = castor
this.pluto = pluto
this.mediator = mediatorHandler
}

@Throws()
suspend fun startMediator() {
mediator.bootRegisteredMediator()
.first()
}

@Throws()
suspend fun registerMediator(host: DID) {
mediator.achieveMediation(host)
.first()
}

@Throws()
suspend fun sendMessage(message: Message): Message? {
if(mediator.mediator == null) {
throw PrismAgentError.noMediatorAvailableError()
}
pluto.storeMessage(message)
return mercury.sendMessageParseMessage(message)
}

@Throws()
fun awaitMessages(): Flow<Array<Message>> {
return mediator.pickupUnreadMessages(10)
.map {
val messagesIds = it.map { it.first }.toTypedArray()
mediator.registerMessagesAsRead(messagesIds)
it.map { it.second }.toTypedArray()
}
.map {
pluto.storeMessages(it)
it
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@ package io.iohk.atala.prism.walletsdk.prismagent

import io.iohk.atala.prism.domain.buildingBlocks.Apollo
import io.iohk.atala.prism.domain.buildingBlocks.Castor
import io.iohk.atala.prism.domain.buildingBlocks.Mercury
import io.iohk.atala.prism.domain.buildingBlocks.Pluto
import io.iohk.atala.prism.domain.models.Curve
import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.DIDDocument
import io.iohk.atala.prism.domain.models.KeyCurve
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.domain.models.PrismAgentError
import io.iohk.atala.prism.domain.models.Seed
import io.iohk.atala.prism.domain.models.Signature
import io.iohk.atala.prism.walletsdk.prismagent.helpers.ApiImpl
import io.iohk.atala.prism.walletsdk.prismagent.helpers.HttpClient
import io.iohk.atala.prism.walletsdk.prismagent.mediation.MediatorHandler
import io.iohk.atala.prism.walletsdk.prismagent.protocols.prismOnboarding.PrismOnboardingInvitation
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.http.HttpMethod
import io.ktor.http.Url
import io.ktor.serialization.kotlinx.json.json
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
Expand All @@ -38,18 +43,23 @@ final class PrismAgent {
private val apollo: Apollo
private val castor: Castor
private val pluto: Pluto
private val mercury: Mercury
private val api: ApiImpl
private val connectionManager: ConnectionManager

constructor(
apollo: Apollo,
castor: Castor,
pluto: Pluto,
mercury: Mercury,
seed: Seed? = null,
api: ApiImpl? = null
api: ApiImpl? = null,
mediatorHandler: MediatorHandler
) {
this.apollo = apollo
this.castor = castor
this.pluto = pluto
this.mercury = mercury
this.seed = seed ?: apollo.createRandomSeed().second
this.api = api ?: ApiImpl(
HttpClient {
Expand All @@ -64,6 +74,35 @@ final class PrismAgent {
}
}
)
this.connectionManager = ConnectionManager(mercury, castor, pluto, mediatorHandler)
}

@Throws()
suspend fun start() {
if(state != State.STOPED) { return }
state = State.STARTING
try {
connectionManager.startMediator()
} catch (error: PrismAgentError.noMediatorAvailableError) {
val hostDID = createNewPeerDID(arrayOf(
DIDDocument.Service(
"#didcomm-1",
arrayOf("DIDCommMessaging"),
DIDDocument.ServiceEndpoint(connectionManager.mediator.mediatorDID.toString())
)), false)
connectionManager.registerMediator(hostDID)
}
if(connectionManager.mediator.mediator != null) {
state = State.RUNNING
} else {
throw PrismAgentError.mediationRequestFailedError()
}
}

@Throws()
suspend fun stop() {
if(state != State.RUNNING) { return }
state = State.STOPING
}

suspend fun createNewPrismDID(
Expand Down Expand Up @@ -170,4 +209,8 @@ final class PrismAgent {
ownDID = did
)
}

fun sendMessage(message: Message): Flow<Message?> {
return flow { connectionManager.sendMessage(message) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package io.iohk.atala.prism.walletsdk.prismagent.mediation

import io.iohk.atala.prism.apollo.uuid.UUID
import io.iohk.atala.prism.domain.buildingBlocks.Mercury
import io.iohk.atala.prism.domain.buildingBlocks.Pluto
import io.iohk.atala.prism.domain.models.AttachmentBase64
import io.iohk.atala.prism.domain.models.AttachmentJsonData
import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.Mediator
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.domain.models.PrismAgentError
import io.iohk.atala.prism.walletsdk.prismagent.protocols.mediation.MediationGrant
import io.iohk.atala.prism.walletsdk.prismagent.protocols.mediation.MediationKeysUpdateList
import io.iohk.atala.prism.walletsdk.prismagent.protocols.mediation.MediationRequest
import io.iohk.atala.prism.walletsdk.prismagent.protocols.pickup.PickupDelivery
import io.iohk.atala.prism.walletsdk.prismagent.protocols.pickup.PickupReceived
import io.iohk.atala.prism.walletsdk.prismagent.protocols.pickup.PickupRequest
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map

final class BasicMediationHandler: MediatorHandler {
final class PlutoMediatorStoreImpl: MediatorStore {
private val pluto: Pluto

constructor(pluto: Pluto) {
this.pluto = pluto
}

override fun getAllMediators(): Flow<Array<Mediator>> {
return pluto.getAllMediators()
}

override fun storeMediator(mediator: Mediator) {
pluto.storeMediator(mediator.mediatorDID, mediator.hostDID, mediator.routingDID)
}
}

override val mediatorDID: DID
override var mediator: Mediator?
private set

private val mercury: Mercury
private val store: MediatorStore

constructor(mediatorDID: DID, mercury: Mercury, store: MediatorStore) {
this.mediatorDID = mediatorDID
this.mediator = null
this.mercury = mercury
this.store = store
}

override fun bootRegisteredMediator(): Flow<Mediator> {
return mediator?.let {flow { emit(it) } }
?: store.getAllMediators().map { it.first() }
}

override fun achieveMediation(host: DID): Flow<Mediator> {
val requestMessage = MediationRequest(from = host, to = mediatorDID).makeMessage()
return flow {
emit(mercury.sendMessageParseMessage(message = requestMessage))
}.map {
if(it != null) {
val grantedMessage = it.let { MediationGrant(it) }
val routingDID = DID(grantedMessage.body.routingDid)
Mediator(
id = UUID.randomUUID4().toString(),
mediatorDID = mediatorDID,
hostDID = host,
routingDID = routingDID
)
} else {
throw PrismAgentError.mediationRequestFailedError()
}
}
}

override suspend fun updateKeyListWithDIDs(dids: Array<DID>) {
val keyListUpdateMessage = mediator?.let{
MediationKeysUpdateList(
from = it.hostDID,
to = it.mediatorDID,
recipientDids = dids
).makeMessage()
} ?: run {
throw PrismAgentError.noMediatorAvailableError()
}
keyListUpdateMessage.let { message -> mercury.sendMessage(message) }
}

override fun pickupUnreadMessages(limit: Int): Flow<Array<Pair<String, Message>>> {
val requestMessage = mediator?.let{
PickupRequest(
from = it.hostDID,
to = it.mediatorDID,
body = PickupRequest.Body(null, limit.toString())
).makeMessage()
} ?: throw PrismAgentError.noMediatorAvailableError()

return flow {
emit(mercury.sendMessageParseMessage(requestMessage))
}.map {
val receivedMessage = it?.let { PickupDelivery(it) }
receivedMessage?.let {
it.attachments.map {
val data = it.data
when(data) {
is AttachmentBase64 -> Pair(it.id, data.base64)
is AttachmentJsonData ->Pair(it.id, data.data)
else -> Pair("", "") // TODO: This needs to be changed
}
}.map {
Pair(it.first, mercury.unpackMessage(it.second))
}.toTypedArray()
} ?: emptyArray()
}
}

override suspend fun registerMessagesAsRead(ids: Array<String>) {
val requestMessage = mediator?.let{
PickupReceived(
from = it.hostDID,
to = it.mediatorDID,
body = PickupReceived.Body(messageIdList = ids)
).makeMessage()
} ?: throw PrismAgentError.noMediatorAvailableError()
mercury.sendMessage(requestMessage)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.iohk.atala.prism.walletsdk.prismagent.mediation

import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.Mediator
import io.iohk.atala.prism.domain.models.Message
import kotlinx.coroutines.flow.Flow

interface MediatorHandler {
val mediator: Mediator?
val mediatorDID: DID

@Throws()
fun bootRegisteredMediator(): Flow<Mediator?>

@Throws()
fun achieveMediation(host: DID): Flow<Mediator>

@Throws()
suspend fun updateKeyListWithDIDs(dids: Array<DID>)

@Throws()
fun pickupUnreadMessages(limit: Int): Flow<Array<Pair<String, Message>>>

@Throws()
suspend fun registerMessagesAsRead(ids: Array<String>)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.iohk.atala.prism.walletsdk.prismagent.mediation

import io.iohk.atala.prism.domain.models.Mediator
import kotlinx.coroutines.flow.Flow

interface MediatorStore {
fun storeMediator(mediator: Mediator)
fun getAllMediators(): Flow<Array<Mediator>>
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.iohk.atala.prism.walletsdk.prismagent.protocols
package io.iohk.atala.prism.walletsdk.prismagent.protocols.mediation

import io.iohk.atala.prism.apollo.uuid.UUID
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.walletsdk.prismagent.protocols.ProtocolType
import kotlinx.serialization.Serializable
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json
Expand Down
Loading

0 comments on commit e1944ac

Please sign in to comment.