From e4c426054f4826cf4f74acd69096f08415ef262d Mon Sep 17 00:00:00 2001 From: Benjamin Voiturier Date: Tue, 8 Oct 2024 11:55:42 +0200 Subject: [PATCH] chore: refactor messaging service config and migrate to messaging service package Signed-off-by: Benjamin Voiturier --- .../src/main/resources/application.conf | 24 ++++++------ .../identus/agent/server/MainApp.scala | 8 ++-- .../agent/server/config/AppConfig.scala | 29 +-------------- .../server/jobs/BackgroundJobsHelper.scala | 4 +- .../server/jobs/ConnectBackgroundJobs.scala | 2 +- .../jobs/DIDStateSyncBackgroundJobs.scala | 2 +- .../server/jobs/IssueBackgroundJobs.scala | 2 +- .../server/jobs/PresentBackgroundJobs.scala | 2 +- .../agent/server/jobs/StatusListJobs.scala | 2 +- .../messaging/MessagingServiceConfig.scala | 37 +++++++++++++++++++ 10 files changed, 63 insertions(+), 49 deletions(-) create mode 100644 shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingServiceConfig.scala diff --git a/cloud-agent/service/server/src/main/resources/application.conf b/cloud-agent/service/server/src/main/resources/application.conf index 6d215328d3..4dad27b8d2 100644 --- a/cloud-agent/service/server/src/main/resources/application.conf +++ b/cloud-agent/service/server/src/main/resources/application.conf @@ -249,16 +249,7 @@ agent { authApiKey = ${?DEFAULT_WALLET_AUTH_API_KEY} } inMemoryQueueCapacity = 1000 - kafka { - enabled = false - enabled = ${?DEFAULT_KAFKA_ENABLED} - bootstrapServers = "kafka:9092" - consumers { - autoCreateTopics = false, - maxPollRecords = 500 - maxPollInterval = 5.minutes - pollTimeout = 50.millis - rebalanceSafeCommits = true + messagingService { connectFlow { consumerCount = 5 retryStrategy { @@ -289,6 +280,17 @@ agent { statusListSync { consumerCount = 5 } - } + kafkaEnabled = false + kafkaEnabled = ${?DEFAULT_KAFKA_ENABLED} + kafka { + bootstrapServers = "kafka:9092" + consumers { + autoCreateTopics = false, + maxPollRecords = 500 + maxPollInterval = 5.minutes + pollTimeout = 50.millis + rebalanceSafeCommits = true + } + } } } diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/MainApp.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/MainApp.scala index c418eaee1b..7f8caf31d6 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/MainApp.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/MainApp.scala @@ -174,8 +174,8 @@ object MainApp extends ZIOAppDefault { _ <- migrations appConfig <- ZIO.service[AppConfig].provide(SystemModule.configLayer) messagingServiceLayer <- - if (appConfig.agent.kafka.enabled) { - val kafkaConfig = appConfig.agent.kafka + if (appConfig.agent.messagingService.kafkaEnabled) { + val kafkaConfig = appConfig.agent.messagingService.kafka.get ZIO.succeed( ZKafkaMessagingServiceImpl.layer( kafkaConfig.bootstrapServers.split(',').toList, @@ -192,7 +192,7 @@ object MainApp extends ZIOAppDefault { ) } messageProducerLayer <- - if (appConfig.agent.kafka.enabled) { + if (appConfig.agent.messagingService.kafkaEnabled) { ZIO.succeed( ZKafkaProducerImpl.layer[UUID, WalletIdAndRecordId] ) @@ -202,7 +202,7 @@ object MainApp extends ZIOAppDefault { ) } syncDIDStateProducerLayer <- - if (appConfig.agent.kafka.enabled) { + if (appConfig.agent.messagingService.kafkaEnabled) { ZIO.succeed( ZKafkaProducerImpl.layer[WalletId, WalletId] ) diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/config/AppConfig.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/config/AppConfig.scala index 55948ae861..60639e8fe9 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/config/AppConfig.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/config/AppConfig.scala @@ -4,6 +4,7 @@ import org.hyperledger.identus.castor.core.model.did.VerificationRelationship import org.hyperledger.identus.iam.authentication.AuthenticationConfig import org.hyperledger.identus.pollux.vc.jwt.* import org.hyperledger.identus.shared.db.DbConfig +import org.hyperledger.identus.shared.messaging.MessagingServiceConfig import zio.config.magnolia.* import zio.Config @@ -154,32 +155,6 @@ final case class DefaultWalletConfig( authApiKey: String ) -final case class KafkaConfig(enabled: Boolean, bootstrapServers: String, consumers: KafkaConsumersConfig) - -final case class KafkaConsumersConfig( - connectFlow: KafkaConsumerJobConfig, - issueFlow: KafkaConsumerJobConfig, - presentFlow: KafkaConsumerJobConfig, - didStateSync: KafkaConsumerJobConfig, - statusListSync: KafkaConsumerJobConfig, - maxPollRecords: Int, - maxPollInterval: Duration, - pollTimeout: Duration, - rebalanceSafeCommits: Boolean, - autoCreateTopics: Boolean, -) - -final case class KafkaConsumerJobConfig( - consumerCount: Int, - retryStrategy: Option[KafkaConsumerRetryStrategy] -) - -final case class KafkaConsumerRetryStrategy( - maxRetries: Int, - initialDelay: Duration, - maxDelay: Duration, -) - final case class AgentConfig( httpEndpoint: HttpEndpointConfig, didCommEndpoint: DidCommEndpointConfig, @@ -191,7 +166,7 @@ final case class AgentConfig( webhookPublisher: WebhookPublisherConfig, defaultWallet: DefaultWalletConfig, inMemoryQueueCapacity: Int, - kafka: KafkaConfig + messagingService: MessagingServiceConfig ) { def validate: Either[String, Unit] = for { diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/BackgroundJobsHelper.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/BackgroundJobsHelper.scala index ac2adabebe..67867bb2fb 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/BackgroundJobsHelper.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/BackgroundJobsHelper.scala @@ -1,6 +1,5 @@ package org.hyperledger.identus.agent.server.jobs -import org.hyperledger.identus.agent.server.config.KafkaConsumerJobConfig import org.hyperledger.identus.agent.walletapi.model.{ManagedDIDState, PublicationState} import org.hyperledger.identus.agent.walletapi.model.error.DIDSecretStorageError.{KeyNotFoundError, WalletNotFoundError} import org.hyperledger.identus.agent.walletapi.model.error.GetManagedDIDError @@ -29,6 +28,7 @@ import org.hyperledger.identus.pollux.vc.jwt.{ * } import org.hyperledger.identus.shared.crypto.* +import org.hyperledger.identus.shared.messaging.ConsumerJobConfig import org.hyperledger.identus.shared.messaging.MessagingService.RetryStep import org.hyperledger.identus.shared.models.{KeyId, WalletAccessContext} import zio.{durationInt, Duration, ZIO, ZLayer} @@ -232,7 +232,7 @@ trait BackgroundJobsHelper { } } - def retryStepsFromConfig(topicName: String, jobConfig: KafkaConsumerJobConfig): Seq[RetryStep] = { + def retryStepsFromConfig(topicName: String, jobConfig: ConsumerJobConfig): Seq[RetryStep] = { val retryTopics = jobConfig.retryStrategy match case None => Seq.empty case Some(rs) => diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/ConnectBackgroundJobs.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/ConnectBackgroundJobs.scala index 0882cff6d5..07cfd05a22 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/ConnectBackgroundJobs.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/ConnectBackgroundJobs.scala @@ -28,7 +28,7 @@ object ConnectBackgroundJobs extends BackgroundJobsHelper { _ <- messaging.MessagingService.consumeWithRetryStrategy( "identus-cloud-agent", ConnectBackgroundJobs.handleMessage, - retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.connectFlow) + retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.connectFlow) ) } yield () diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/DIDStateSyncBackgroundJobs.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/DIDStateSyncBackgroundJobs.scala index 261c9fbf86..2ee44e91bc 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/DIDStateSyncBackgroundJobs.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/DIDStateSyncBackgroundJobs.scala @@ -34,7 +34,7 @@ object DIDStateSyncBackgroundJobs extends BackgroundJobsHelper { _ <- MessagingService.consumeWithRetryStrategy( "identus-cloud-agent", DIDStateSyncBackgroundJobs.handleMessage, - retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.didStateSync) + retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.didStateSync) ) } yield () diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/IssueBackgroundJobs.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/IssueBackgroundJobs.scala index 0b89d6cdb3..3cdde2853f 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/IssueBackgroundJobs.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/IssueBackgroundJobs.scala @@ -28,7 +28,7 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { _ <- messaging.MessagingService.consumeWithRetryStrategy( "identus-cloud-agent", IssueBackgroundJobs.handleMessage, - retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.issueFlow) + retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.issueFlow) ) } yield () diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/PresentBackgroundJobs.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/PresentBackgroundJobs.scala index 2251b1fe7f..4bfb247176 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/PresentBackgroundJobs.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/PresentBackgroundJobs.scala @@ -63,7 +63,7 @@ object PresentBackgroundJobs extends BackgroundJobsHelper { _ <- messaging.MessagingService.consumeWithRetryStrategy( "identus-cloud-agent", PresentBackgroundJobs.handleMessage, - retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.presentFlow) + retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.presentFlow) ) } yield () diff --git a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/StatusListJobs.scala b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/StatusListJobs.scala index 43e7f9b6ed..1fe5d77551 100644 --- a/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/StatusListJobs.scala +++ b/cloud-agent/service/server/src/main/scala/org/hyperledger/identus/agent/server/jobs/StatusListJobs.scala @@ -44,7 +44,7 @@ object StatusListJobs extends BackgroundJobsHelper { _ <- messaging.MessagingService.consumeWithRetryStrategy( "identus-cloud-agent", StatusListJobs.handleMessage, - retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.statusListSync) + retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.statusListSync) ) } yield () diff --git a/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingServiceConfig.scala b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingServiceConfig.scala new file mode 100644 index 0000000000..f42510d873 --- /dev/null +++ b/shared/core/src/main/scala/org/hyperledger/identus/shared/messaging/MessagingServiceConfig.scala @@ -0,0 +1,37 @@ +package org.hyperledger.identus.shared.messaging + +import java.time.Duration + +case class MessagingServiceConfig( + connectFlow: ConsumerJobConfig, + issueFlow: ConsumerJobConfig, + presentFlow: ConsumerJobConfig, + didStateSync: ConsumerJobConfig, + statusListSync: ConsumerJobConfig, + kafkaEnabled: Boolean, + kafka: Option[KafkaConfig] +) + +final case class ConsumerJobConfig( + consumerCount: Int, + retryStrategy: Option[ConsumerRetryStrategy] +) + +final case class ConsumerRetryStrategy( + maxRetries: Int, + initialDelay: Duration, + maxDelay: Duration +) + +final case class KafkaConfig( + bootstrapServers: String, + consumers: KafkaConsumersConfig +) + +final case class KafkaConsumersConfig( + autoCreateTopics: Boolean, + maxPollRecords: Int, + maxPollInterval: Duration, + pollTimeout: Duration, + rebalanceSafeCommits: Boolean +)