Skip to content

Commit

Permalink
feat(agent): add mediation and ability to send messages
Browse files Browse the repository at this point in the history
  • Loading branch information
goncalo-frade-iohk committed Feb 8, 2023
1 parent 7cc738f commit 352d230
Show file tree
Hide file tree
Showing 17 changed files with 414 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.iohk.atala.prism.domain.buildingBlocks

import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.DIDPair
import io.iohk.atala.prism.domain.models.MediatorDID
import io.iohk.atala.prism.domain.models.Mediator
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.domain.models.PeerDID
import io.iohk.atala.prism.domain.models.PrismDIDInfo
Expand Down Expand Up @@ -70,7 +70,7 @@ interface Pluto {

fun getMessage(id: String): Flow<Message?>

fun getAllMediators(): Flow<Array<MediatorDID>>
fun getAllMediators(): Flow<Array<Mediator>>

fun getAllCredentials(): Flow<Array<VerifiableCredential>>
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.iohk.atala.prism.domain.models

data class MediatorDID(
data class Mediator(
val id: String,
val mediatorDID: DID,
val hostDID: DID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data class Message(
val extraHeaders: Array<String>,
val createdTime: String,
val expiresTimePlus: String,
val attachments: Array<String>, // TODO: Change to AttachmentDescriptor
val attachments: Array<AttachmentDescriptor>,
val thid: String? = null,
val pthid: String? = null,
val ack: Array<String>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.iohk.atala.prism.domain.models

import kotlinx.serialization.Serializable

interface AttachmentData

data class AttachmentHeader(
Expand Down Expand Up @@ -48,6 +50,7 @@ data class AttachmentJsonData(
val data: String
) : AttachmentData

@Serializable
data class AttachmentDescriptor(
val id: String,
val mediaType: String? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import io.iohk.atala.prism.domain.models.CredentialType
import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.DIDPair
import io.iohk.atala.prism.domain.models.JWTVerifiableCredential
import io.iohk.atala.prism.domain.models.MediatorDID
import io.iohk.atala.prism.domain.models.Mediator
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.domain.models.PeerDID
import io.iohk.atala.prism.domain.models.PrismDIDInfo
Expand Down Expand Up @@ -445,13 +445,13 @@ class PlutoImpl(connection: DbConnection) : Pluto {
}
}

override fun getAllMediators(): Flow<Array<MediatorDID>> {
override fun getAllMediators(): Flow<Array<Mediator>> {
return db.mediatorQueries.fetchAllMediators()
.asFlow()
.mapToList()
.map { list ->
list.map {
MediatorDID(
Mediator(
it.id,
DID(it.MediatorDID),
DID(it.HostDID),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.iohk.atala.prism.walletsdk.prismagent

import io.iohk.atala.prism.domain.buildingBlocks.Castor
import io.iohk.atala.prism.domain.buildingBlocks.Mercury
import io.iohk.atala.prism.domain.buildingBlocks.Pluto
import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.domain.models.PrismAgentError
import io.iohk.atala.prism.walletsdk.prismagent.mediation.MediatorHandler
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map

class ConnectionManager {
private val mercury: Mercury
private val castor: Castor
private val pluto: Pluto
val mediatorHandler: MediatorHandler

constructor(
mercury: Mercury,
castor: Castor,
pluto: Pluto,
mediatorHandler: MediatorHandler
) {
this.mercury = mercury
this.castor = castor
this.pluto = pluto
this.mediatorHandler = mediatorHandler
}

@Throws()
suspend fun startMediator() {
mediatorHandler.bootRegisteredMediator()
.first()
}

@Throws()
suspend fun registerMediator(host: DID) {
mediatorHandler.achieveMediation(host)
.first()
}

@Throws()
suspend fun sendMessage(message: Message): Message? {
if (mediatorHandler.mediator == null) {
throw PrismAgentError.noMediatorAvailableError()
}
pluto.storeMessage(message)
return mercury.sendMessageParseMessage(message)
}

@Throws()
fun awaitMessages(): Flow<Array<Message>> {
return mediatorHandler.pickupUnreadMessages(NUMBER_OF_MESSAGES)
.map {
val messagesIds = it.map { it.first }.toTypedArray()
mediatorHandler.registerMessagesAsRead(messagesIds)
it.map { it.second }.toTypedArray()
}
.map {
pluto.storeMessages(it)
it
}
}

companion object {
const val NUMBER_OF_MESSAGES = 10
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@ package io.iohk.atala.prism.walletsdk.prismagent

import io.iohk.atala.prism.domain.buildingBlocks.Apollo
import io.iohk.atala.prism.domain.buildingBlocks.Castor
import io.iohk.atala.prism.domain.buildingBlocks.Mercury
import io.iohk.atala.prism.domain.buildingBlocks.Pluto
import io.iohk.atala.prism.domain.models.Curve
import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.DIDDocument
import io.iohk.atala.prism.domain.models.KeyCurve
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.domain.models.PrismAgentError
import io.iohk.atala.prism.domain.models.Seed
import io.iohk.atala.prism.domain.models.Signature
import io.iohk.atala.prism.walletsdk.prismagent.helpers.ApiImpl
import io.iohk.atala.prism.walletsdk.prismagent.helpers.HttpClient
import io.iohk.atala.prism.walletsdk.prismagent.mediation.MediatorHandler
import io.iohk.atala.prism.walletsdk.prismagent.protocols.prismOnboarding.PrismOnboardingInvitation
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.http.HttpMethod
import io.ktor.http.Url
import io.ktor.serialization.kotlinx.json.json
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
Expand All @@ -38,18 +43,23 @@ final class PrismAgent {
private val apollo: Apollo
private val castor: Castor
private val pluto: Pluto
private val mercury: Mercury
private val api: ApiImpl
private val connectionManager: ConnectionManager

constructor(
apollo: Apollo,
castor: Castor,
pluto: Pluto,
mercury: Mercury,
seed: Seed? = null,
api: ApiImpl? = null
api: ApiImpl? = null,
mediatorHandler: MediatorHandler
) {
this.apollo = apollo
this.castor = castor
this.pluto = pluto
this.mercury = mercury
this.seed = seed ?: apollo.createRandomSeed().second
this.api = api ?: ApiImpl(
HttpClient {
Expand All @@ -64,6 +74,37 @@ final class PrismAgent {
}
}
)
this.connectionManager = ConnectionManager(mercury, castor, pluto, mediatorHandler)
}

@Throws()
suspend fun start() {
if (state != State.STOPED) { return }
state = State.STARTING
try {
connectionManager.startMediator()
} catch (error: PrismAgentError.noMediatorAvailableError) {
val hostDID = createNewPeerDID(arrayOf(DIDDocument.Service(
"#didcomm-1",
arrayOf("DIDCommMessaging"),
DIDDocument.ServiceEndpoint(connectionManager.mediatorHandler.mediatorDID.toString())
)
),
false
)
connectionManager.registerMediator(hostDID)
}
if (connectionManager.mediatorHandler.mediator != null) {
state = State.RUNNING
} else {
throw PrismAgentError.mediationRequestFailedError()
}
}

@Throws()
suspend fun stop() {
if (state != State.RUNNING) { return }
state = State.STOPING
}

suspend fun createNewPrismDID(
Expand Down Expand Up @@ -170,4 +211,8 @@ final class PrismAgent {
ownDID = did
)
}

fun sendMessage(message: Message): Flow<Message?> {
return flow { connectionManager.sendMessage(message) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.iohk.atala.prism.walletsdk.prismagent.mediation

import io.iohk.atala.prism.apollo.uuid.UUID
import io.iohk.atala.prism.domain.buildingBlocks.Mercury
import io.iohk.atala.prism.domain.buildingBlocks.Pluto
import io.iohk.atala.prism.domain.models.AttachmentBase64
import io.iohk.atala.prism.domain.models.AttachmentJsonData
import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.Mediator
import io.iohk.atala.prism.domain.models.Message
import io.iohk.atala.prism.domain.models.PrismAgentError
import io.iohk.atala.prism.walletsdk.prismagent.protocols.mediation.MediationGrant
import io.iohk.atala.prism.walletsdk.prismagent.protocols.mediation.MediationKeysUpdateList
import io.iohk.atala.prism.walletsdk.prismagent.protocols.mediation.MediationRequest
import io.iohk.atala.prism.walletsdk.prismagent.protocols.pickup.PickupDelivery
import io.iohk.atala.prism.walletsdk.prismagent.protocols.pickup.PickupReceived
import io.iohk.atala.prism.walletsdk.prismagent.protocols.pickup.PickupRequest
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map

final class BasicMediationHandler(
override val mediatorDID: DID,
private val mercury: Mercury,
private val store: MediatorRepository
) : MediatorHandler {
final class PlutoMediatorRepositoryImpl(private val pluto: Pluto) : MediatorRepository {
override fun getAllMediators(): Flow<Array<Mediator>> {
return pluto.getAllMediators()
}

override fun storeMediator(mediator: Mediator) {
pluto.storeMediator(mediator.mediatorDID, mediator.hostDID, mediator.routingDID)
}
}

override var mediator: Mediator? = null
private set

init {
this.mediator = null
}

override fun bootRegisteredMediator(): Flow<Mediator> {
return mediator?.let { flow { emit(it) } }
?: store.getAllMediators().map { it.first() }
}

override fun achieveMediation(host: DID): Flow<Mediator> {
val requestMessage = MediationRequest(from = host, to = mediatorDID).makeMessage()
return flow {
emit(mercury.sendMessageParseMessage(message = requestMessage))
}.map {
if (it != null) {
val grantedMessage = it.let { MediationGrant(it) }
val routingDID = DID(grantedMessage.body.routingDid)
Mediator(
id = UUID.randomUUID4().toString(),
mediatorDID = mediatorDID,
hostDID = host,
routingDID = routingDID
)
} else {
throw PrismAgentError.mediationRequestFailedError()
}
}
}

override suspend fun updateKeyListWithDIDs(dids: Array<DID>) {
val keyListUpdateMessage = mediator?.let {
MediationKeysUpdateList(
from = it.hostDID,
to = it.mediatorDID,
recipientDids = dids
).makeMessage()
} ?: throw PrismAgentError.noMediatorAvailableError()
keyListUpdateMessage.let { message -> mercury.sendMessage(message) }
}

override fun pickupUnreadMessages(limit: Int): Flow<Array<Pair<String, Message>>> {
val requestMessage = mediator?.let {
PickupRequest(
from = it.hostDID,
to = it.mediatorDID,
body = PickupRequest.Body(null, limit.toString())
).makeMessage()
} ?: throw PrismAgentError.noMediatorAvailableError()

return flow {
emit(mercury.sendMessageParseMessage(requestMessage))
}.map {
val receivedMessage = it?.let { PickupDelivery(it) }
receivedMessage?.let {
it.attachments.mapNotNull {
val data = it.data
when(data) {
is AttachmentBase64 -> Pair(it.id, data.base64)
is AttachmentJsonData -> Pair(it.id, data.data)
else -> null
}
}.map {
Pair(it.first, mercury.unpackMessage(it.second))
}.toTypedArray()
} ?: emptyArray()
}
}

override suspend fun registerMessagesAsRead(ids: Array<String>) {
val requestMessage = mediator?.let {
PickupReceived(
from = it.hostDID,
to = it.mediatorDID,
body = PickupReceived.Body(messageIdList = ids)
).makeMessage()
} ?: throw PrismAgentError.noMediatorAvailableError()
mercury.sendMessage(requestMessage)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.iohk.atala.prism.walletsdk.prismagent.mediation

import io.iohk.atala.prism.domain.models.DID
import io.iohk.atala.prism.domain.models.Mediator
import io.iohk.atala.prism.domain.models.Message
import kotlinx.coroutines.flow.Flow

interface MediatorHandler {
val mediator: Mediator?
val mediatorDID: DID

@Throws()
fun bootRegisteredMediator(): Flow<Mediator?>

@Throws()
fun achieveMediation(host: DID): Flow<Mediator>

@Throws()
suspend fun updateKeyListWithDIDs(dids: Array<DID>)

@Throws()
fun pickupUnreadMessages(limit: Int): Flow<Array<Pair<String, Message>>>

@Throws()
suspend fun registerMessagesAsRead(ids: Array<String>)
}
Loading

0 comments on commit 352d230

Please sign in to comment.