From bb5ea8f6d752030819c219d9b11f3153453302a4 Mon Sep 17 00:00:00 2001 From: Benjamin Voiturier Date: Tue, 8 Oct 2024 15:17:14 +0200 Subject: [PATCH] chore: refactor/simplify MessagingService-related layers creation Signed-off-by: Benjamin Voiturier --- .../src/main/resources/application.conf | 2 +- .../identus/agent/server/MainApp.scala | 54 +++---------------- .../agent/server/config/AppConfig.scala | 3 +- .../OIDCCredentialIssuerServiceSpec.scala | 18 +++---- .../service/ConnectionServiceImplSpec.scala | 12 ++--- .../ConnectionServiceNotifierSpec.scala | 8 +-- .../messaging/MessagingServiceTest.scala | 7 +-- .../kafka/InMemoryMessagingServiceSpec.scala | 10 ++-- .../service/CredentialServiceSpecHelper.scala | 15 ++---- .../PresentationServiceSpecHelper.scala | 11 ++-- .../shared/messaging/MessagingService.scala | 27 +++++++++- .../messaging/MessagingServiceConfig.scala | 21 ++++++++ .../kafka/InMemoryMessagingService.scala | 33 ++---------- .../kafka/ZKafkaMessagingServiceImpl.scala | 54 ++++++------------- 14 files changed, 107 insertions(+), 168 deletions(-) diff --git a/cloud-agent/service/server/src/main/resources/application.conf b/cloud-agent/service/server/src/main/resources/application.conf index 4dad27b8d2..7b5bebb01a 100644 --- a/cloud-agent/service/server/src/main/resources/application.conf +++ b/cloud-agent/service/server/src/main/resources/application.conf @@ -248,7 +248,6 @@ agent { authApiKey = "default" authApiKey = ${?DEFAULT_WALLET_AUTH_API_KEY} } - inMemoryQueueCapacity = 1000 messagingService { connectFlow { consumerCount = 5 @@ -280,6 +279,7 @@ agent { statusListSync { consumerCount = 5 } + inMemoryQueueCapacity = 1000 kafkaEnabled = false kafkaEnabled = ${?DEFAULT_KAFKA_ENABLED} kafka { diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/MainApp.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/MainApp.scala index 7f8caf31d6..922593389a 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/MainApp.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/MainApp.scala @@ -63,11 +63,7 @@ import org.hyperledger.identus.pollux.sql.repository.{ } import org.hyperledger.identus.presentproof.controller.PresentProofControllerImpl import org.hyperledger.identus.resolvers.DIDResolver -import org.hyperledger.identus.shared.messaging.kafka.{ - InMemoryMessagingService, - ZKafkaMessagingServiceImpl, - ZKafkaProducerImpl -} +import org.hyperledger.identus.shared.messaging import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId import org.hyperledger.identus.shared.models.WalletId import org.hyperledger.identus.system.controller.SystemControllerImpl @@ -172,46 +168,6 @@ object MainApp extends ZIOAppDefault { ) _ <- preMigrations _ <- migrations - appConfig <- ZIO.service[AppConfig].provide(SystemModule.configLayer) - messagingServiceLayer <- - if (appConfig.agent.messagingService.kafkaEnabled) { - val kafkaConfig = appConfig.agent.messagingService.kafka.get - ZIO.succeed( - ZKafkaMessagingServiceImpl.layer( - kafkaConfig.bootstrapServers.split(',').toList, - kafkaConfig.consumers.autoCreateTopics, - kafkaConfig.consumers.maxPollRecords, - kafkaConfig.consumers.maxPollInterval, - kafkaConfig.consumers.pollTimeout, - kafkaConfig.consumers.rebalanceSafeCommits - ) - ) - } else { - ZIO.succeed( - ZLayer.succeed(appConfig.agent.inMemoryQueueCapacity) >>> InMemoryMessagingService.messagingServiceLayer - ) - } - messageProducerLayer <- - if (appConfig.agent.messagingService.kafkaEnabled) { - ZIO.succeed( - ZKafkaProducerImpl.layer[UUID, WalletIdAndRecordId] - ) - } else { - ZIO.succeed( - InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId] - ) - } - syncDIDStateProducerLayer <- - if (appConfig.agent.messagingService.kafkaEnabled) { - ZIO.succeed( - ZKafkaProducerImpl.layer[WalletId, WalletId] - ) - } else { - ZIO.succeed( - InMemoryMessagingService.producerLayer[WalletId, WalletId] - ) - } - app <- CloudAgentApp.run .provide( DidCommX.liveLayer, @@ -296,9 +252,11 @@ object MainApp extends ZIOAppDefault { // HTTP client SystemModule.zioHttpClientLayer, Scope.default, - messagingServiceLayer, - messageProducerLayer, - syncDIDStateProducerLayer + // Messaging Service + ZLayer.fromZIO(ZIO.service[AppConfig].map(_.agent.messagingService)), + messaging.MessagingService.serviceLayer, + messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId], + messaging.MessagingService.producerLayer[WalletId, WalletId] ) } yield app diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/config/AppConfig.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/config/AppConfig.scala index 60639e8fe9..be3cb07eef 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/config/AppConfig.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/config/AppConfig.scala @@ -5,8 +5,8 @@ import org.hyperledger.identus.iam.authentication.AuthenticationConfig import org.hyperledger.identus.pollux.vc.jwt.* import org.hyperledger.identus.shared.db.DbConfig import org.hyperledger.identus.shared.messaging.MessagingServiceConfig -import zio.config.magnolia.* import zio.Config +import zio.config.magnolia.* import java.net.URL import java.time.Duration @@ -165,7 +165,6 @@ final case class AgentConfig( secretStorage: SecretStorageConfig, webhookPublisher: WebhookPublisherConfig, defaultWallet: DefaultWalletConfig, - inMemoryQueueCapacity: Int, messagingService: MessagingServiceConfig ) { def validate: Either[String, Unit] = diff --git a/cloud-agent/service/server/src/test/scala/org/hyperledger/identus/oid4vci/domain/OIDCCredentialIssuerServiceSpec.scala b/cloud-agent/service/server/src/test/scala/org/hyperledger/identus/oid4vci/domain/OIDCCredentialIssuerServiceSpec.scala index f3b0d25c56..4c49398ba6 100644 --- a/cloud-agent/service/server/src/test/scala/org/hyperledger/identus/oid4vci/domain/OIDCCredentialIssuerServiceSpec.scala +++ b/cloud-agent/service/server/src/test/scala/org/hyperledger/identus/oid4vci/domain/OIDCCredentialIssuerServiceSpec.scala @@ -9,25 +9,19 @@ import org.hyperledger.identus.castor.core.service.{DIDService, MockDIDService} import org.hyperledger.identus.oid4vci.http.{ClaimDescriptor, CredentialDefinition, Localization} import org.hyperledger.identus.oid4vci.service.{OIDCCredentialIssuerService, OIDCCredentialIssuerServiceImpl} import org.hyperledger.identus.oid4vci.storage.InMemoryIssuanceSessionService -import org.hyperledger.identus.pollux.core.model.oid4vci.CredentialConfiguration import org.hyperledger.identus.pollux.core.model.CredentialFormat -import org.hyperledger.identus.pollux.core.repository.{ - CredentialRepository, - CredentialRepositoryInMemory, - CredentialStatusListRepositoryInMemory -} +import org.hyperledger.identus.pollux.core.model.oid4vci.CredentialConfiguration +import org.hyperledger.identus.pollux.core.repository.{CredentialRepositoryInMemory, CredentialStatusListRepositoryInMemory} import org.hyperledger.identus.pollux.core.service.* import org.hyperledger.identus.pollux.core.service.uriResolvers.ResourceUrlResolver import org.hyperledger.identus.pollux.vc.jwt.PrismDidResolver -import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService -import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId +import org.hyperledger.identus.shared.messaging.{MessagingService, MessagingServiceConfig, WalletIdAndRecordId} import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId} -import zio.{Clock, Random, URLayer, ZIO, ZLayer} -import zio.json.* import zio.json.ast.Json import zio.mock.MockSpecDefault import zio.test.* import zio.test.Assertion.* +import zio.{Clock, Random, URLayer, ZIO, ZLayer} import java.net.URI import java.time.Instant @@ -56,8 +50,8 @@ object OIDCCredentialIssuerServiceSpec GenericSecretStorageInMemory.layer, LinkSecretServiceImpl.layer, CredentialServiceImpl.layer, - (ZLayer.succeed(100) >>> InMemoryMessagingService.messagingServiceLayer >>> - InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie, + (MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>> + MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie, OIDCCredentialIssuerServiceImpl.layer ) diff --git a/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceImplSpec.scala b/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceImplSpec.scala index 0df05ca537..74011ee0ce 100644 --- a/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceImplSpec.scala +++ b/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceImplSpec.scala @@ -1,22 +1,19 @@ package org.hyperledger.identus.connect.core.service import io.circe.syntax.* +import org.hyperledger.identus.connect.core.model.ConnectionRecord.* import org.hyperledger.identus.connect.core.model.error.ConnectionServiceError import org.hyperledger.identus.connect.core.model.error.ConnectionServiceError.InvalidStateForOperation -import org.hyperledger.identus.connect.core.model.ConnectionRecord -import org.hyperledger.identus.connect.core.model.ConnectionRecord.* import org.hyperledger.identus.connect.core.repository.ConnectionRepositoryInMemory import org.hyperledger.identus.mercury.model.{DidId, Message} import org.hyperledger.identus.mercury.protocol.connection.ConnectionResponse -import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService +import org.hyperledger.identus.shared.messaging import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId -import org.hyperledger.identus.shared.models.* import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId} import zio.* import zio.test.* import zio.test.Assertion.* -import java.time.Instant import java.util.UUID object ConnectionServiceImplSpec extends ZIOSpecDefault { @@ -315,8 +312,9 @@ object ConnectionServiceImplSpec extends ZIOSpecDefault { } ).provide( connectionServiceLayer, - ZLayer.succeed(100) >>> InMemoryMessagingService.messagingServiceLayer, - InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId], + messaging.MessagingServiceConfig.inMemoryLayer, + messaging.MessagingService.serviceLayer, + messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId], ZLayer.succeed(WalletAccessContext(WalletId.random)), ) } diff --git a/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceNotifierSpec.scala b/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceNotifierSpec.scala index 0a88ee9607..185bd95b95 100644 --- a/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceNotifierSpec.scala +++ b/connect/core/src/test/scala/org/hyperledger/identus/connect/core/service/ConnectionServiceNotifierSpec.scala @@ -7,9 +7,8 @@ import org.hyperledger.identus.event.notification.* import org.hyperledger.identus.mercury.model.DidId import org.hyperledger.identus.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse} import org.hyperledger.identus.mercury.protocol.invitation.v2.Invitation -import org.hyperledger.identus.shared.messaging.kafka.{ZKafkaMessagingServiceImpl, ZKafkaProducerImpl} +import org.hyperledger.identus.shared.messaging import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId -import org.hyperledger.identus.shared.models.* import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId} import zio.* import zio.mock.Expectation @@ -154,8 +153,9 @@ object ConnectionServiceNotifierSpec extends ZIOSpecDefault { inviteeExpectations.toLayer ) >>> ConnectionServiceNotifier.layer, ZLayer.succeed(WalletAccessContext(WalletId.random)), - ZKafkaMessagingServiceImpl.layer(List("localhost:29092")), - ZKafkaProducerImpl.layer[UUID, WalletIdAndRecordId] + messaging.MessagingServiceConfig.inMemoryLayer, + messaging.MessagingService.serviceLayer, + messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId] ) ) } diff --git a/event-notification/src/test/scala/org/hyperledger/identus/messaging/MessagingServiceTest.scala b/event-notification/src/test/scala/org/hyperledger/identus/messaging/MessagingServiceTest.scala index 859765b331..fb62386d2b 100644 --- a/event-notification/src/test/scala/org/hyperledger/identus/messaging/MessagingServiceTest.scala +++ b/event-notification/src/test/scala/org/hyperledger/identus/messaging/MessagingServiceTest.scala @@ -1,9 +1,9 @@ package org.hyperledger.identus.messaging +import org.hyperledger.identus.shared.messaging import org.hyperledger.identus.shared.messaging.{Message, MessagingService, Serde} -import org.hyperledger.identus.shared.messaging.kafka.ZKafkaMessagingServiceImpl -import zio.{durationInt, Random, Schedule, Scope, URIO, ZIO, ZIOAppArgs, ZIOAppDefault, ZLayer} import zio.json.{DecoderOps, DeriveJsonDecoder, DeriveJsonEncoder, EncoderOps, JsonDecoder, JsonEncoder} +import zio.{Random, Schedule, Scope, URIO, ZIO, ZIOAppArgs, ZIOAppDefault, ZLayer, durationInt} import java.nio.charset.StandardCharsets import java.util.UUID @@ -36,7 +36,8 @@ object MessagingServiceTest extends ZIOAppDefault { _ <- ZIO.never } yield () effect.provide( - ZKafkaMessagingServiceImpl.layer(List("localhost:29092")), + messaging.MessagingServiceConfig.inMemoryLayer, + messaging.MessagingService.serviceLayer, ZLayer.succeed("Sample 'R' passed to handler") ) } diff --git a/event-notification/src/test/scala/org/hyperledger/identus/messaging/kafka/InMemoryMessagingServiceSpec.scala b/event-notification/src/test/scala/org/hyperledger/identus/messaging/kafka/InMemoryMessagingServiceSpec.scala index 6d86440463..c6b068f16b 100644 --- a/event-notification/src/test/scala/org/hyperledger/identus/messaging/kafka/InMemoryMessagingServiceSpec.scala +++ b/event-notification/src/test/scala/org/hyperledger/identus/messaging/kafka/InMemoryMessagingServiceSpec.scala @@ -1,16 +1,14 @@ package org.hyperledger.identus.messaging.kafka -import org.hyperledger.identus.messaging.* -import org.hyperledger.identus.shared.messaging.{Consumer, Message, Producer} -import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService +import org.hyperledger.identus.shared.messaging.* import zio.* import zio.test.* import zio.test.Assertion.* object InMemoryMessagingServiceSpec extends ZIOSpecDefault { - val testLayer = ZLayer.succeed(100) >+> InMemoryMessagingService.messagingServiceLayer >+> - InMemoryMessagingService.producerLayer[String, String] >+> - InMemoryMessagingService.consumerLayer[String, String]("test-group") + val testLayer = MessagingServiceConfig.inMemoryLayer >+> MessagingService.serviceLayer >+> + MessagingService.producerLayer[String, String] >+> + MessagingService.consumerLayer[String, String]("test-group") def spec = suite("InMemoryMessagingServiceSpec")( test("should produce and consume messages") { diff --git a/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/CredentialServiceSpecHelper.scala b/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/CredentialServiceSpecHelper.scala index 0a5b024e0a..ab2ad3801d 100644 --- a/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/CredentialServiceSpecHelper.scala +++ b/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/CredentialServiceSpecHelper.scala @@ -3,24 +3,17 @@ package org.hyperledger.identus.pollux.core.service import io.circe.Json import org.hyperledger.identus.agent.walletapi.memory.GenericSecretStorageInMemory import org.hyperledger.identus.agent.walletapi.service.ManagedDIDService -import org.hyperledger.identus.agent.walletapi.storage.GenericSecretStorage import org.hyperledger.identus.castor.core.model.did.PrismDID import org.hyperledger.identus.castor.core.service.DIDService import org.hyperledger.identus.mercury.model.{AttachmentDescriptor, DidId} import org.hyperledger.identus.mercury.protocol.issuecredential.* import org.hyperledger.identus.pollux.core.model.* import org.hyperledger.identus.pollux.core.model.presentation.Options -import org.hyperledger.identus.pollux.core.repository.{ - CredentialDefinitionRepositoryInMemory, - CredentialRepositoryInMemory, - CredentialStatusListRepositoryInMemory -} +import org.hyperledger.identus.pollux.core.repository.{CredentialDefinitionRepositoryInMemory, CredentialRepositoryInMemory, CredentialStatusListRepositoryInMemory} import org.hyperledger.identus.pollux.prex.{ClaimFormat, Ldp, PresentationDefinition} import org.hyperledger.identus.pollux.vc.jwt.* import org.hyperledger.identus.shared.http.UriResolver -import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService -import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId -import org.hyperledger.identus.shared.models.* +import org.hyperledger.identus.shared.messaging.{MessagingService, MessagingServiceConfig, WalletIdAndRecordId} import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId} import zio.* @@ -44,8 +37,8 @@ trait CredentialServiceSpecHelper { credentialDefinitionServiceLayer, GenericSecretStorageInMemory.layer, LinkSecretServiceImpl.layer, - (ZLayer.succeed(100) >>> InMemoryMessagingService.messagingServiceLayer >>> - InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie, + (MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>> + MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie, CredentialServiceImpl.layer ) diff --git a/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/PresentationServiceSpecHelper.scala b/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/PresentationServiceSpecHelper.scala index e1bcc3bbdb..53e36c2a99 100644 --- a/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/PresentationServiceSpecHelper.scala +++ b/pollux/core/src/test/scala/org/hyperledger/identus/pollux/core/service/PresentationServiceSpecHelper.scala @@ -1,11 +1,10 @@ package org.hyperledger.identus.pollux.core.service -import com.nimbusds.jose.jwk.* import org.hyperledger.identus.agent.walletapi.memory.GenericSecretStorageInMemory import org.hyperledger.identus.castor.core.model.did.DID -import org.hyperledger.identus.mercury.{AgentPeerService, PeerDID} import org.hyperledger.identus.mercury.model.{AttachmentDescriptor, DidId} import org.hyperledger.identus.mercury.protocol.presentproof.* +import org.hyperledger.identus.mercury.{AgentPeerService, PeerDID} import org.hyperledger.identus.pollux.core.model.* import org.hyperledger.identus.pollux.core.model.error.PresentationError import org.hyperledger.identus.pollux.core.repository.* @@ -14,9 +13,7 @@ import org.hyperledger.identus.pollux.core.service.uriResolvers.ResourceUrlResol import org.hyperledger.identus.pollux.vc.jwt.* import org.hyperledger.identus.shared.crypto.KmpSecp256k1KeyOps import org.hyperledger.identus.shared.http.UriResolver -import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService -import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId -import org.hyperledger.identus.shared.models.* +import org.hyperledger.identus.shared.messaging.{MessagingService, MessagingServiceConfig, WalletIdAndRecordId} import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId} import zio.* @@ -46,8 +43,8 @@ trait PresentationServiceSpecHelper { linkSecretLayer, PresentationRepositoryInMemory.layer, CredentialRepositoryInMemory.layer, - (ZLayer.succeed(100) >>> InMemoryMessagingService.messagingServiceLayer >>> - InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie, + (MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>> + MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie, ) ++ defaultWalletLayer def createIssuer(did: String): Issuer = { diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala index 9b0c1a560b..8c3a60e56e 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingService.scala @@ -1,6 +1,7 @@ package org.hyperledger.identus.shared.messaging -import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, Task, URIO, ZIO} +import org.hyperledger.identus.shared.messaging.kafka.{InMemoryMessagingService, ZKafkaMessagingServiceImpl} +import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, RLayer, Task, URIO, URLayer, ZIO, ZLayer} import java.time.Instant trait MessagingService { @@ -69,6 +70,30 @@ object MessagingService { )(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & MessagingService, Unit] = consumeWithRetryStrategy(groupId, handler, Seq(RetryStep(topicName, consumerCount, 0.seconds, None))) + val serviceLayer: URLayer[MessagingServiceConfig, MessagingService] = + ZLayer + .service[MessagingServiceConfig] + .flatMap(config => + if (config.get.kafkaEnabled) ZKafkaMessagingServiceImpl.layer + else InMemoryMessagingService.layer + ) + + def producerLayer[K: EnvironmentTag, V: EnvironmentTag](implicit + kSerde: Serde[K], + vSerde: Serde[V] + ): RLayer[MessagingService, Producer[K, V]] = ZLayer.fromZIO(for { + messagingService <- ZIO.service[MessagingService] + producer <- messagingService.makeProducer[K, V]() + } yield producer) + + def consumerLayer[K: EnvironmentTag, V: EnvironmentTag](groupId: String)(implicit + kSerde: Serde[K], + vSerde: Serde[V] + ): RLayer[MessagingService, Consumer[K, V]] = ZLayer.fromZIO(for { + messagingService <- ZIO.service[MessagingService] + consumer <- messagingService.makeConsumer[K, V](groupId) + } yield consumer) + } case class Message[K, V](key: K, value: V, offset: Long, timestamp: Long) diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingServiceConfig.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingServiceConfig.scala index f42510d873..dd63c1a424 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingServiceConfig.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingServiceConfig.scala @@ -1,5 +1,7 @@ package org.hyperledger.identus.shared.messaging +import zio.{ULayer, ZLayer} + import java.time.Duration case class MessagingServiceConfig( @@ -8,6 +10,7 @@ case class MessagingServiceConfig( presentFlow: ConsumerJobConfig, didStateSync: ConsumerJobConfig, statusListSync: ConsumerJobConfig, + inMemoryQueueCapacity: Int, kafkaEnabled: Boolean, kafka: Option[KafkaConfig] ) @@ -35,3 +38,21 @@ final case class KafkaConsumersConfig( pollTimeout: Duration, rebalanceSafeCommits: Boolean ) + +object MessagingServiceConfig { + + val inMemoryLayer: ULayer[MessagingServiceConfig] = + ZLayer.succeed( + MessagingServiceConfig( + ConsumerJobConfig(1, None), + ConsumerJobConfig(1, None), + ConsumerJobConfig(1, None), + ConsumerJobConfig(1, None), + ConsumerJobConfig(1, None), + 100, + false, + None + ) + ) + +} diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala index 1aa2b480d5..54d8c935c2 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/InMemoryMessagingService.scala @@ -1,12 +1,10 @@ package org.hyperledger.identus.shared.messaging.kafka -import org.hyperledger.identus.shared.messaging.{Consumer, Message, MessagingService, Producer, Serde} +import org.hyperledger.identus.shared.messaging.* +import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService.* import zio.* import zio.concurrent.ConcurrentMap import zio.stream.* -import zio.Clock -import zio.Task -import InMemoryMessagingService.* import java.util.concurrent.TimeUnit @@ -109,35 +107,14 @@ object InMemoryMessagingService { def apply(value: Long): TimeStamp = value extension (ts: TimeStamp) def value: Long = ts - val messagingServiceLayer: URLayer[Int, MessagingService] = + val layer: URLayer[MessagingServiceConfig, MessagingService] = ZLayer.fromZIO { for { - queueCapacity <- ZIO.service[Int] + config <- ZIO.service[MessagingServiceConfig] queueMap <- ConcurrentMap.empty[Topic, (Queue[Message[_, _]], Ref[Offset])] processedMessagesMap <- ConcurrentMap.empty[ConsumerGroupKey, ConcurrentMap[Offset, TimeStamp]] _ <- cleanupTaskForProcessedMessages(processedMessagesMap) - } yield new InMemoryMessagingService(queueMap, queueCapacity, processedMessagesMap) - } - - def producerLayer[K: EnvironmentTag, V: EnvironmentTag](using - kSerde: Serde[K], - vSerde: Serde[V] - ): RLayer[MessagingService, Producer[K, V]] = - ZLayer.fromZIO { - for { - messagingService <- ZIO.service[MessagingService] - producer <- messagingService.makeProducer[K, V]() - } yield producer - } - - def consumerLayer[K: EnvironmentTag, V: EnvironmentTag]( - groupId: String - )(using kSerde: Serde[K], vSerde: Serde[V]): RLayer[MessagingService, Consumer[K, V]] = - ZLayer.fromZIO { - for { - messagingService <- ZIO.service[MessagingService] - consumer <- messagingService.makeConsumer[K, V](groupId) - } yield consumer + } yield new InMemoryMessagingService(queueMap, config.inMemoryQueueCapacity, processedMessagesMap) } private def cleanupTaskForProcessedMessages( diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala index 1c24d5cb9c..9180fc4d62 100644 --- a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/kafka/ZKafkaMessagingServiceImpl.scala @@ -3,7 +3,7 @@ package org.hyperledger.identus.shared.messaging.kafka import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.header.Headers import org.hyperledger.identus.shared.messaging.* -import zio.{durationInt, Duration, EnvironmentTag, RIO, RLayer, Task, ULayer, URIO, ZIO, ZLayer} +import zio.{Duration, RIO, Task, URIO, URLayer, ZIO, ZLayer} import zio.kafka.consumer.{ Consumer as ZKConsumer, ConsumerSettings as ZKConsumerSettings, @@ -40,24 +40,22 @@ class ZKafkaMessagingServiceImpl( } object ZKafkaMessagingServiceImpl { - def layer( - bootstrapServers: List[String], - autoCreateTopics: Boolean = false, - maxPollRecords: Int = 500, - maxPollInterval: Duration = 5.minutes, - pollTimeout: Duration = 50.millis, - rebalanceSafeCommits: Boolean = true - ): ULayer[MessagingService] = - ZLayer.succeed( - new ZKafkaMessagingServiceImpl( - bootstrapServers, - autoCreateTopics, - maxPollRecords, - maxPollInterval, - pollTimeout, - rebalanceSafeCommits + val layer: URLayer[MessagingServiceConfig, MessagingService] = + ZLayer.fromZIO { + for { + config <- ZIO.service[MessagingServiceConfig] + kafkaConfig <- config.kafka match + case Some(cfg) => ZIO.succeed(cfg) + case None => ZIO.dieMessage("Kafka config is undefined") + } yield new ZKafkaMessagingServiceImpl( + kafkaConfig.bootstrapServers.split(',').toList, + kafkaConfig.consumers.autoCreateTopics, + kafkaConfig.consumers.maxPollRecords, + kafkaConfig.consumers.maxPollInterval, + kafkaConfig.consumers.pollTimeout, + kafkaConfig.consumers.rebalanceSafeCommits ) - ) + } } class ZKafkaConsumerImpl[K, V]( @@ -110,16 +108,6 @@ class ZKafkaConsumerImpl[K, V]( .runDrain } -object ZKafkaConsumerImpl { - def layer[K: EnvironmentTag, V: EnvironmentTag](groupId: String)(implicit - kSerde: Serde[K], - vSerde: Serde[V] - ): RLayer[MessagingService, Consumer[K, V]] = ZLayer.fromZIO(for { - messagingService <- ZIO.service[MessagingService] - consumer <- messagingService.makeConsumer[K, V](groupId) - } yield consumer) -} - class ZKafkaProducerImpl[K, V](bootstrapServers: List[String], kSerde: Serde[K], vSerde: Serde[V]) extends Producer[K, V] { private val zkProducer = ZLayer.scoped( @@ -146,13 +134,3 @@ class ZKafkaProducerImpl[K, V](bootstrapServers: List[String], kSerde: Serde[K], .provideSome(zkProducer) } - -object ZKafkaProducerImpl { - def layer[K: EnvironmentTag, V: EnvironmentTag](implicit - kSerde: Serde[K], - vSerde: Serde[V] - ): RLayer[MessagingService, Producer[K, V]] = ZLayer.fromZIO(for { - messagingService <- ZIO.service[MessagingService] - producer <- messagingService.makeProducer[K, V]() - } yield producer) -}