From 12104c70ffd4b6d16c95faedf2e240524165ae61 Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Wed, 18 Dec 2024 17:18:01 +0000 Subject: [PATCH 1/4] define strict config layer (keep typesafe at the edge) --- .../relation_embedder/BatchProcessor.scala | 28 +++---- .../pipeline/relation_embedder/CLIMain.scala | 5 +- .../relation_embedder/Downstream.scala | 34 -------- .../relation_embedder/LambdaMain.scala | 2 +- .../relation_embedder/lib/Downstream.scala | 38 +++++++++ .../lib/ElasticBuilder.scala | 77 +++++++++++++++++++ .../lib/LambdaConfiguration.scala | 17 +++- .../lib/RelationEmbedderConfig.scala | 35 +++++++++ .../BatchProcessorTest.scala | 6 +- .../RelationEmbedderWorkerServiceTest.scala | 2 +- 10 files changed, 179 insertions(+), 65 deletions(-) delete mode 100644 pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Downstream.scala create mode 100644 pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/Downstream.scala create mode 100644 pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala create mode 100644 pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala index 2d60765616..abad859478 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala @@ -1,7 +1,6 @@ package weco.pipeline.relation_embedder import com.sksamuel.elastic4s.Index -import com.typesafe.config.Config import grizzled.slf4j.Logging import org.apache.pekko.NotUsed import org.apache.pekko.actor.ActorSystem @@ -9,18 +8,14 @@ import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.{Sink, Source} import weco.catalogue.internal_model.work.Work import weco.catalogue.internal_model.work.WorkState.Denormalised -import weco.elasticsearch.typesafe.ElasticBuilder -import weco.pipeline.relation_embedder.models.{ - ArchiveRelationsCache, - Batch, - RelationWork -} +import lib.ElasticBuilder +import weco.pipeline.relation_embedder.models.{ArchiveRelationsCache, Batch, RelationWork} import weco.pipeline_storage.elastic.ElasticIndexer import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} import weco.catalogue.internal_model.Implicits._ -import weco.typesafe.config.builders.EnrichConfig._ +import weco.pipeline.relation_embedder.lib.{Downstream, RelationEmbedderConfig} class BatchProcessor( relationsService: RelationsService, @@ -103,7 +98,7 @@ class BatchProcessor( object BatchProcessor { def apply( - config: Config + config: RelationEmbedderConfig )( implicit actorSystem: ActorSystem, ec: ExecutionContext, @@ -111,31 +106,30 @@ object BatchProcessor { ): BatchProcessor = { val identifiedIndex = - Index(config.requireString("es.merged-works.index")) + Index(config.mergedWorkIndex) - val esClient = ElasticBuilder.buildElasticClient(config) + val esClient = ElasticBuilder.buildElasticClient(config.elasticConfig) val workIndexer = new ElasticIndexer[Work[Denormalised]]( client = esClient, - index = Index(config.requireString(s"es.denormalised-works.index")) + index = Index(config.denormalisedWorkIndex) ) val batchWriter = new BulkIndexWriter( workIndexer = workIndexer, - maxBatchWeight = config.requireInt("es.works.batch_size") + maxBatchWeight = config.maxBatchWeight ) new BatchProcessor( relationsService = new PathQueryRelationsService( esClient, identifiedIndex, - completeTreeScroll = config.requireInt("es.works.scroll.complete_tree"), - affectedWorksScroll = - config.requireInt("es.works.scroll.affected_works") + completeTreeScroll = config.completeTreeScroll, + affectedWorksScroll = config.affectedWorksScroll ), bulkWriter = batchWriter, - downstream = Downstream(Some(config)) + downstream = Downstream(config.downstreamTarget) ) } } diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala index 3dae51b067..64f01dbc94 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala @@ -2,16 +2,13 @@ package weco.pipeline.relation_embedder import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.DurationInt - import org.apache.pekko.actor.ActorSystem - import com.typesafe.config.ConfigFactory import com.sksamuel.elastic4s.Index - import weco.typesafe.config.builders.EnrichConfig._ import weco.elasticsearch.typesafe.ElasticBuilder import weco.json.JsonUtil._ -import weco.pipeline.relation_embedder.lib.StdInBatches +import weco.pipeline.relation_embedder.lib.{STDIODownstream, StdInBatches} /** A main function providing a local CLI for the relation embedder. To invoke, * provide a list of Batch objects in NDJSON on StdIn. diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Downstream.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Downstream.scala deleted file mode 100644 index c2d79dd9fa..0000000000 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Downstream.scala +++ /dev/null @@ -1,34 +0,0 @@ -package weco.pipeline.relation_embedder - -import com.typesafe.config.Config -import weco.messaging.typesafe.SNSBuilder - -import scala.util.Try - -trait Downstream { - def notify(workId: String): Try[Unit] -} - -class SNSDownstream(config: Config) extends Downstream { - private val msgSender = SNSBuilder - .buildSNSMessageSender(config, subject = "Sent from relation_embedder") - - override def notify(workId: String): Try[Unit] = Try(msgSender.send(workId)) -} - -object STDIODownstream extends Downstream { - override def notify(workId: String): Try[Unit] = Try(println(workId)) -} - -object Downstream { - def apply(maybeConfig: Option[Config]): Downstream = { - maybeConfig match { - case Some(config) => - config.getString("relation_embedder.use_downstream") match { - case "sns" => new SNSDownstream(config) - case "stdio" => STDIODownstream - } - case None => STDIODownstream - } - } -} diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala index ebd4c4565d..3cc50a0a52 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala @@ -12,7 +12,7 @@ import scala.concurrent.{Await, ExecutionContext, Future} object LambdaMain extends RequestHandler[SQSEvent, String] with Logging - with LambdaConfiguration { + with RelationEmbedderConfigurable { import SQSEventOps._ diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/Downstream.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/Downstream.scala new file mode 100644 index 0000000000..564681237a --- /dev/null +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/Downstream.scala @@ -0,0 +1,38 @@ +package weco.pipeline.relation_embedder.lib + +import software.amazon.awssdk.services.sns.SnsClient +import weco.messaging.sns.{SNSConfig, SNSMessageSender} + +import scala.util.Try + +trait Downstream { + def notify(workId: String): Try[Unit] +} + +class SNSDownstream(snsConfig: SNSConfig) extends Downstream { + private val msgSender = new SNSMessageSender( + snsClient = SnsClient.builder().build(), + snsConfig = snsConfig, + subject = "Sent from relation_embedder" + ) + + override def notify(workId: String): Try[Unit] = Try(msgSender.send(workId)) +} + +object STDIODownstream extends Downstream { + override def notify(workId: String): Try[Unit] = Try(println(workId)) +} + +sealed trait DownstreamTarget +case class SNS(config: SNSConfig) extends DownstreamTarget +case object StdOut extends DownstreamTarget + +object Downstream { + def apply(downstreamTarget: DownstreamTarget): Downstream = { + downstreamTarget match { + case SNS(config) => new SNSDownstream(config) + case StdOut => STDIODownstream + } + } + def apply(): Downstream = STDIODownstream +} diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala new file mode 100644 index 0000000000..21b23c932b --- /dev/null +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala @@ -0,0 +1,77 @@ +package weco.pipeline.relation_embedder.lib + +import com.sksamuel.elastic4s.ElasticClient +import com.typesafe.config.Config +import weco.elasticsearch.ElasticClientBuilder + +sealed trait ElasticConfig { + val host: String + val port: Int + val protocol: String +} + +case class ElasticConfigUsernamePassword( + host: String, + port: Int, + protocol: String, + username: String, + password:String, + ) extends ElasticConfig + +case class ElasticConfigApiKey( + host: String, + port: Int, + protocol: String, + apiKey: String, + ) extends ElasticConfig + +// This should be moved up to scala-libs, it's copied from there! + +object ElasticBuilder { + import weco.typesafe.config.builders.EnrichConfig._ + + def buildElasticClientConfig(config: Config, + namespace: String = ""): ElasticConfig = { + val hostname = config.requireString(s"es.$namespace.host") + val port = config + .getIntOption(s"es.$namespace.port") + .getOrElse(9200) + val protocol = config + .getStringOption(s"es.$namespace.protocol") + .getOrElse("http") + + ( + config.getStringOption(s"es.$namespace.username"), + config.getStringOption(s"es.$namespace.password"), + config.getStringOption(s"es.$namespace.apikey") + ) match { + case (Some(username), Some(password), None) => + ElasticConfigUsernamePassword(hostname, port, protocol, username, password) + // Use an API key if specified, even if username/password are also present + case (_, _, Some(apiKey)) => + ElasticConfigApiKey(hostname, port, protocol, apiKey) + case _ => + throw new Throwable( + s"You must specify username and password, or apikey, in the 'es.$namespace' config") + } + } + + def buildElasticClient(config: ElasticConfig): ElasticClient = + config match { + case ElasticConfigUsernamePassword(hostname, port, protocol, username, password) => + ElasticClientBuilder.create( + hostname, + port, + protocol, + username, + password + ) + case ElasticConfigApiKey(hostname, port, protocol, apiKey) => + ElasticClientBuilder.create(hostname, port, protocol, apiKey) + } + + + def buildElasticClient(config: Config, + namespace: String = ""): ElasticClient = + buildElasticClient(buildElasticClientConfig(config, namespace)) +} diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala index 1592c2b67f..8a8543cf75 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala @@ -3,11 +3,20 @@ package weco.pipeline.relation_embedder.lib import java.io.File import com.typesafe.config.{Config, ConfigFactory} -trait Configuration { - val config: Config +trait ApplicationConfig {} + +trait ConfigurationBuilder[C, T <: ApplicationConfig] { + protected val rawConfig: C + + def build(rawConfig: C): T + def config: T = build(rawConfig) +} + +trait TypesafeConfigurable[T <: ApplicationConfig] extends ConfigurationBuilder[Config, T] { + def build(rawConfig: Config): T } -trait LambdaConfiguration extends Configuration { +trait LambdaConfigurable[T <: ApplicationConfig] extends TypesafeConfigurable[T] { private val defaultResolveFromFile: String = "/tmp/config" private val defaultApplicationConfig: String = "application.conf" @@ -26,7 +35,7 @@ trait LambdaConfiguration extends Configuration { ConfigFactory.empty() } - lazy val config = lambdaConfig + lazy val rawConfig = lambdaConfig .withFallback(applicationConfig) .withFallback(baseConfig) .resolve() diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala new file mode 100644 index 0000000000..4e4bf8f5fe --- /dev/null +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala @@ -0,0 +1,35 @@ +package weco.pipeline.relation_embedder.lib + +import com.typesafe.config.Config +import weco.messaging.typesafe.SNSBuilder.buildSNSConfig +import ElasticBuilder.buildElasticClientConfig + +case class RelationEmbedderConfig( + mergedWorkIndex: String, + denormalisedWorkIndex: String, + maxBatchWeight: Int, + completeTreeScroll: Int, + affectedWorksScroll: Int, + elasticConfig: ElasticConfig, + downstreamTarget: DownstreamTarget + ) extends ApplicationConfig + +trait RelationEmbedderConfigurable extends LambdaConfigurable[RelationEmbedderConfig] { + import weco.typesafe.config.builders.EnrichConfig._ + + def build(rawConfig: Config): RelationEmbedderConfig = + RelationEmbedderConfig( + mergedWorkIndex = rawConfig.requireString("es.merged-works.index"), + denormalisedWorkIndex = rawConfig.requireString("es.denormalised-works.index"), + maxBatchWeight = rawConfig.requireInt("es.works.batch_size"), + completeTreeScroll = rawConfig.requireInt("es.works.scroll.complete_tree"), + affectedWorksScroll = rawConfig.requireInt("es.works.scroll.affected_works"), + elasticConfig = buildElasticClientConfig(rawConfig), + downstreamTarget = { + rawConfig.requireString("relation_embedder.use_downstream") match { + case "sns" => SNS(buildSNSConfig(rawConfig)) + case "stdio" => StdOut + } + } + ) +} diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala index ad5ea8535b..a39a23a050 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala @@ -3,15 +3,13 @@ package weco.pipeline.relation_embedder import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import weco.pekko.fixtures.Pekko -import weco.pipeline.relation_embedder.fixtures.{ - BulkWriterAssertions, - SampleWorkTree -} +import weco.pipeline.relation_embedder.fixtures.{BulkWriterAssertions, SampleWorkTree} import org.apache.pekko.stream.Materializer import weco.catalogue.internal_model.work.{Availability, Relations, Work} import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged} import weco.fixtures.TestWith import weco.messaging.memory.MemoryMessageSender +import weco.pipeline.relation_embedder.lib.Downstream import weco.pipeline.relation_embedder.models.Batch import weco.pipeline.relation_embedder.models.Selector.{Descendents, Node, Tree} import weco.pipeline_storage.memory.MemoryIndexer diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala index c1cf3b06c7..0890cb9190 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala @@ -22,7 +22,7 @@ import weco.messaging.sns.NotificationMessage import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged} import weco.catalogue.internal_model.work._ import weco.pipeline.relation_embedder.fixtures.SampleWorkTree - +import weco.pipeline.relation_embedder.lib.Downstream import weco.pipeline.relation_embedder.models._ import weco.pipeline_storage.memory.MemoryIndexer From b033c43ca0a3bc9295053d95c458f5f0c9e4d35a Mon Sep 17 00:00:00 2001 From: Github on behalf of Wellcome Collection Date: Wed, 18 Dec 2024 17:22:33 +0000 Subject: [PATCH 2/4] Apply auto-formatting rules --- .../relation_embedder/BatchProcessor.scala | 6 ++- .../lib/ElasticBuilder.scala | 54 ++++++++++++------- .../lib/LambdaConfiguration.scala | 6 ++- .../lib/RelationEmbedderConfig.scala | 32 ++++++----- 4 files changed, 62 insertions(+), 36 deletions(-) diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala index abad859478..f7effe5935 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala @@ -9,7 +9,11 @@ import org.apache.pekko.stream.scaladsl.{Sink, Source} import weco.catalogue.internal_model.work.Work import weco.catalogue.internal_model.work.WorkState.Denormalised import lib.ElasticBuilder -import weco.pipeline.relation_embedder.models.{ArchiveRelationsCache, Batch, RelationWork} +import weco.pipeline.relation_embedder.models.{ + ArchiveRelationsCache, + Batch, + RelationWork +} import weco.pipeline_storage.elastic.ElasticIndexer import scala.concurrent.{ExecutionContext, Future} diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala index 21b23c932b..0403736648 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala @@ -11,27 +11,29 @@ sealed trait ElasticConfig { } case class ElasticConfigUsernamePassword( - host: String, - port: Int, - protocol: String, - username: String, - password:String, - ) extends ElasticConfig + host: String, + port: Int, + protocol: String, + username: String, + password: String +) extends ElasticConfig case class ElasticConfigApiKey( - host: String, - port: Int, - protocol: String, - apiKey: String, - ) extends ElasticConfig + host: String, + port: Int, + protocol: String, + apiKey: String +) extends ElasticConfig // This should be moved up to scala-libs, it's copied from there! object ElasticBuilder { import weco.typesafe.config.builders.EnrichConfig._ - def buildElasticClientConfig(config: Config, - namespace: String = ""): ElasticConfig = { + def buildElasticClientConfig( + config: Config, + namespace: String = "" + ): ElasticConfig = { val hostname = config.requireString(s"es.$namespace.host") val port = config .getIntOption(s"es.$namespace.port") @@ -46,19 +48,32 @@ object ElasticBuilder { config.getStringOption(s"es.$namespace.apikey") ) match { case (Some(username), Some(password), None) => - ElasticConfigUsernamePassword(hostname, port, protocol, username, password) + ElasticConfigUsernamePassword( + hostname, + port, + protocol, + username, + password + ) // Use an API key if specified, even if username/password are also present case (_, _, Some(apiKey)) => ElasticConfigApiKey(hostname, port, protocol, apiKey) case _ => throw new Throwable( - s"You must specify username and password, or apikey, in the 'es.$namespace' config") + s"You must specify username and password, or apikey, in the 'es.$namespace' config" + ) } } def buildElasticClient(config: ElasticConfig): ElasticClient = config match { - case ElasticConfigUsernamePassword(hostname, port, protocol, username, password) => + case ElasticConfigUsernamePassword( + hostname, + port, + protocol, + username, + password + ) => ElasticClientBuilder.create( hostname, port, @@ -70,8 +85,9 @@ object ElasticBuilder { ElasticClientBuilder.create(hostname, port, protocol, apiKey) } - - def buildElasticClient(config: Config, - namespace: String = ""): ElasticClient = + def buildElasticClient( + config: Config, + namespace: String = "" + ): ElasticClient = buildElasticClient(buildElasticClientConfig(config, namespace)) } diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala index 8a8543cf75..8d2272e7b2 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala @@ -12,11 +12,13 @@ trait ConfigurationBuilder[C, T <: ApplicationConfig] { def config: T = build(rawConfig) } -trait TypesafeConfigurable[T <: ApplicationConfig] extends ConfigurationBuilder[Config, T] { +trait TypesafeConfigurable[T <: ApplicationConfig] + extends ConfigurationBuilder[Config, T] { def build(rawConfig: Config): T } -trait LambdaConfigurable[T <: ApplicationConfig] extends TypesafeConfigurable[T] { +trait LambdaConfigurable[T <: ApplicationConfig] + extends TypesafeConfigurable[T] { private val defaultResolveFromFile: String = "/tmp/config" private val defaultApplicationConfig: String = "application.conf" diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala index 4e4bf8f5fe..a740b59d31 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala @@ -5,30 +5,34 @@ import weco.messaging.typesafe.SNSBuilder.buildSNSConfig import ElasticBuilder.buildElasticClientConfig case class RelationEmbedderConfig( - mergedWorkIndex: String, - denormalisedWorkIndex: String, - maxBatchWeight: Int, - completeTreeScroll: Int, - affectedWorksScroll: Int, - elasticConfig: ElasticConfig, - downstreamTarget: DownstreamTarget - ) extends ApplicationConfig + mergedWorkIndex: String, + denormalisedWorkIndex: String, + maxBatchWeight: Int, + completeTreeScroll: Int, + affectedWorksScroll: Int, + elasticConfig: ElasticConfig, + downstreamTarget: DownstreamTarget +) extends ApplicationConfig -trait RelationEmbedderConfigurable extends LambdaConfigurable[RelationEmbedderConfig] { +trait RelationEmbedderConfigurable + extends LambdaConfigurable[RelationEmbedderConfig] { import weco.typesafe.config.builders.EnrichConfig._ def build(rawConfig: Config): RelationEmbedderConfig = RelationEmbedderConfig( mergedWorkIndex = rawConfig.requireString("es.merged-works.index"), - denormalisedWorkIndex = rawConfig.requireString("es.denormalised-works.index"), + denormalisedWorkIndex = + rawConfig.requireString("es.denormalised-works.index"), maxBatchWeight = rawConfig.requireInt("es.works.batch_size"), - completeTreeScroll = rawConfig.requireInt("es.works.scroll.complete_tree"), - affectedWorksScroll = rawConfig.requireInt("es.works.scroll.affected_works"), + completeTreeScroll = + rawConfig.requireInt("es.works.scroll.complete_tree"), + affectedWorksScroll = + rawConfig.requireInt("es.works.scroll.affected_works"), elasticConfig = buildElasticClientConfig(rawConfig), downstreamTarget = { rawConfig.requireString("relation_embedder.use_downstream") match { - case "sns" => SNS(buildSNSConfig(rawConfig)) - case "stdio" => StdOut + case "sns" => SNS(buildSNSConfig(rawConfig)) + case "stdio" => StdOut } } ) From 04772993ff597ebcf7741ee8770056b13316e313 Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Thu, 19 Dec 2024 10:20:54 +0000 Subject: [PATCH 3/4] update tests --- .../pipeline/relation_embedder/Main.scala | 6 ++- .../BatchProcessorTest.scala | 5 +- .../helpers/ConfigurationTestHelpers.scala | 49 +++++++++++++------ 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Main.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Main.scala index 9f096e59f4..0161477bc9 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Main.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Main.scala @@ -4,10 +4,12 @@ import org.apache.pekko.actor.ActorSystem import com.typesafe.config.Config import weco.messaging.sns.NotificationMessage import weco.messaging.typesafe.SQSBuilder +import weco.pipeline.relation_embedder.lib.RelationEmbedderConfigurable import weco.typesafe.WellcomeTypesafeApp + import scala.concurrent.ExecutionContext -object Main extends WellcomeTypesafeApp { +object Main extends WellcomeTypesafeApp with RelationEmbedderConfigurable { runWithConfig { config: Config => implicit val actorSystem: ActorSystem = @@ -15,7 +17,7 @@ object Main extends WellcomeTypesafeApp { implicit val ec: ExecutionContext = actorSystem.dispatcher - val processor = BatchProcessor(config) + val processor = BatchProcessor(build(config)) new RelationEmbedderWorkerService( sqsStream = SQSBuilder.buildSQSStream[NotificationMessage](config), diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala index a39a23a050..66540751a0 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala @@ -3,7 +3,10 @@ package weco.pipeline.relation_embedder import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import weco.pekko.fixtures.Pekko -import weco.pipeline.relation_embedder.fixtures.{BulkWriterAssertions, SampleWorkTree} +import weco.pipeline.relation_embedder.fixtures.{ + BulkWriterAssertions, + SampleWorkTree +} import org.apache.pekko.stream.Materializer import weco.catalogue.internal_model.work.{Availability, Relations, Work} import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged} diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/helpers/ConfigurationTestHelpers.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/helpers/ConfigurationTestHelpers.scala index 2e6c0b507f..1edc19eb64 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/helpers/ConfigurationTestHelpers.scala +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/helpers/ConfigurationTestHelpers.scala @@ -2,15 +2,34 @@ package weco.pipeline.relation_embedder.helpers import com.typesafe.config.{Config, ConfigFactory} import weco.fixtures.TestWith -import weco.pipeline.relation_embedder.lib.LambdaConfiguration +import weco.pipeline.relation_embedder.lib.{ + ApplicationConfig, + LambdaConfigurable +} trait ConfigurationTestHelpers { + case class TestAppConfiguration( + config1: Option[String] = None, + config2: Option[String] = None, + config3: Option[String] = None + ) extends ApplicationConfig + class TestLambdaConfiguration( - override val baseConfig: Config, - override val applicationConfig: Config, - override val lambdaConfig: Config - ) extends LambdaConfiguration + override val baseConfig: Config, + override val applicationConfig: Config, + override val lambdaConfig: Config + ) extends LambdaConfigurable[TestAppConfiguration] { + import weco.typesafe.config.builders.EnrichConfig._ + + override def build(rawConfig: Config): TestAppConfiguration = { + TestAppConfiguration( + config1 = rawConfig.getStringOption("config1"), + config2 = rawConfig.getStringOption("config2"), + config3 = rawConfig.getStringOption("config3") + ) + } + } def createConfig(configString: String): Config = { ConfigFactory.parseString(configString.stripMargin) @@ -23,14 +42,16 @@ trait ConfigurationTestHelpers { val emptyConfig = ConfigFactory.empty() def withLayeredConfig[R]( - baseConfig: Config = emptyConfig, - applicationConfig: Config = emptyConfig, - lambdaConfig: Config = emptyConfig - )(testWith: TestWith[Config, R]): R = { - testWith(new TestLambdaConfiguration( - baseConfig, - applicationConfig, - lambdaConfig - ).config) + baseConfig: Config = emptyConfig, + applicationConfig: Config = emptyConfig, + lambdaConfig: Config = emptyConfig + )(testWith: TestWith[TestAppConfiguration, R]): R = { + testWith( + new TestLambdaConfiguration( + baseConfig, + applicationConfig, + lambdaConfig + ).config + ) } } From 4a65af61eac272b543831a3407589bf4d62558e7 Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Thu, 19 Dec 2024 13:45:08 +0000 Subject: [PATCH 4/4] add config test --- .../lib/ConfigurationTest.scala | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/ConfigurationTest.scala diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/ConfigurationTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/ConfigurationTest.scala new file mode 100644 index 0000000000..c723439399 --- /dev/null +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/ConfigurationTest.scala @@ -0,0 +1,86 @@ +package weco.pipeline.relation_embedder.lib + +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers +import weco.pipeline.relation_embedder.helpers.ConfigurationTestHelpers + +class ConfigurationTest + extends AnyFunSpec + with ConfigurationTestHelpers + with Matchers { + + it("loads the base configuration") { + val baseConfig = + """ + |config1 = "value" + |""".asConfig + + withLayeredConfig( + baseConfig = baseConfig + ) { + config => + config shouldBe TestAppConfiguration( + config1 = Some("value") + ) + } + } + + it("overrides the base configuration with the application configuration") { + withLayeredConfig( + baseConfig = """ + |config1 = "valueFromBaseForConfig1" + |config2 = "valueFromBaseForConfig2" + |""".asConfig, + applicationConfig = """ + |config1 = "valueFromApplicationForConfig1" + |""".asConfig + ) { + config => + config shouldBe TestAppConfiguration( + config1 = Some("valueFromApplicationForConfig1"), + config2 = Some("valueFromBaseForConfig2") + ) + } + } + + it("overrides the application configuration with the lambda configuration") { + withLayeredConfig( + baseConfig = """ + |config1 = "valueFromApplicationForConfig1" + |config2 = "valueFromBaseForConfig2" + |config3 = "valueFromApplicationForConfig3" + |""".asConfig, + applicationConfig = """ + |config1 = "valueFromLambdaForConfig1" + |""".asConfig, + lambdaConfig = """ + |config1 = "valueFromLambdaForConfig1" + |""".asConfig + ) { + config => + config shouldBe TestAppConfiguration( + config1 = Some("valueFromLambdaForConfig1"), + config2 = Some("valueFromBaseForConfig2"), + config3 = Some("valueFromApplicationForConfig3") + ) + } + } + + it( + "resolves values in the application config from the lambda configuration" + ) { + withLayeredConfig( + applicationConfig = s""" + |config1 = $${?config2} + |""".asConfig, + lambdaConfig = """ + |config1 = "valueFromLambdaForConfig2" + |""".asConfig + ) { + config => + config shouldBe TestAppConfiguration( + config1 = Some("valueFromLambdaForConfig2") + ) + } + } +}