Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple relation_embedder config from typesafe config #2793

Open
wants to merge 4 commits into
base: rk/transformer-event-from-flow
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package weco.pipeline.relation_embedder

import com.sksamuel.elastic4s.Index
import com.typesafe.config.Config
import grizzled.slf4j.Logging
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import weco.catalogue.internal_model.work.Work
import weco.catalogue.internal_model.work.WorkState.Denormalised
import weco.elasticsearch.typesafe.ElasticBuilder
import lib.ElasticBuilder
import weco.pipeline.relation_embedder.models.{
ArchiveRelationsCache,
Batch,
Expand All @@ -20,7 +19,7 @@ import weco.pipeline_storage.elastic.ElasticIndexer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import weco.catalogue.internal_model.Implicits._
import weco.typesafe.config.builders.EnrichConfig._
import weco.pipeline.relation_embedder.lib.{Downstream, RelationEmbedderConfig}

class BatchProcessor(
relationsService: RelationsService,
Expand Down Expand Up @@ -103,39 +102,38 @@ class BatchProcessor(
object BatchProcessor {

def apply(
config: Config
config: RelationEmbedderConfig
)(
implicit actorSystem: ActorSystem,
ec: ExecutionContext,
materializer: Materializer
): BatchProcessor = {

val identifiedIndex =
Index(config.requireString("es.merged-works.index"))
Index(config.mergedWorkIndex)

val esClient = ElasticBuilder.buildElasticClient(config)
val esClient = ElasticBuilder.buildElasticClient(config.elasticConfig)

val workIndexer =
new ElasticIndexer[Work[Denormalised]](
client = esClient,
index = Index(config.requireString(s"es.denormalised-works.index"))
index = Index(config.denormalisedWorkIndex)
)

val batchWriter = new BulkIndexWriter(
workIndexer = workIndexer,
maxBatchWeight = config.requireInt("es.works.batch_size")
maxBatchWeight = config.maxBatchWeight
)

new BatchProcessor(
relationsService = new PathQueryRelationsService(
esClient,
identifiedIndex,
completeTreeScroll = config.requireInt("es.works.scroll.complete_tree"),
affectedWorksScroll =
config.requireInt("es.works.scroll.affected_works")
completeTreeScroll = config.completeTreeScroll,
affectedWorksScroll = config.affectedWorksScroll
),
bulkWriter = batchWriter,
downstream = Downstream(Some(config))
downstream = Downstream(config.downstreamTarget)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package weco.pipeline.relation_embedder

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt

import org.apache.pekko.actor.ActorSystem

import com.typesafe.config.ConfigFactory
import com.sksamuel.elastic4s.Index

import weco.typesafe.config.builders.EnrichConfig._
import weco.elasticsearch.typesafe.ElasticBuilder
import weco.json.JsonUtil._
import weco.pipeline.relation_embedder.lib.StdInBatches
import weco.pipeline.relation_embedder.lib.{STDIODownstream, StdInBatches}

/** A main function providing a local CLI for the relation embedder. To invoke,
* provide a list of Batch objects in NDJSON on StdIn.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.concurrent.{Await, ExecutionContext, Future}
object LambdaMain
extends RequestHandler[SQSEvent, String]
with Logging
with LambdaConfiguration {
with RelationEmbedderConfigurable {

import SQSEventOps._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ import org.apache.pekko.actor.ActorSystem
import com.typesafe.config.Config
import weco.messaging.sns.NotificationMessage
import weco.messaging.typesafe.SQSBuilder
import weco.pipeline.relation_embedder.lib.RelationEmbedderConfigurable
import weco.typesafe.WellcomeTypesafeApp

import scala.concurrent.ExecutionContext

object Main extends WellcomeTypesafeApp {
object Main extends WellcomeTypesafeApp with RelationEmbedderConfigurable {
runWithConfig {
config: Config =>
implicit val actorSystem: ActorSystem =
ActorSystem("main-actor-system")
implicit val ec: ExecutionContext =
actorSystem.dispatcher

val processor = BatchProcessor(config)
val processor = BatchProcessor(build(config))

new RelationEmbedderWorkerService(
sqsStream = SQSBuilder.buildSQSStream[NotificationMessage](config),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package weco.pipeline.relation_embedder.lib

import software.amazon.awssdk.services.sns.SnsClient
import weco.messaging.sns.{SNSConfig, SNSMessageSender}

import scala.util.Try

trait Downstream {
def notify(workId: String): Try[Unit]
}

class SNSDownstream(snsConfig: SNSConfig) extends Downstream {
private val msgSender = new SNSMessageSender(
snsClient = SnsClient.builder().build(),
snsConfig = snsConfig,
subject = "Sent from relation_embedder"
)

override def notify(workId: String): Try[Unit] = Try(msgSender.send(workId))
}

object STDIODownstream extends Downstream {
override def notify(workId: String): Try[Unit] = Try(println(workId))
}

sealed trait DownstreamTarget
case class SNS(config: SNSConfig) extends DownstreamTarget
case object StdOut extends DownstreamTarget

object Downstream {
def apply(downstreamTarget: DownstreamTarget): Downstream = {
downstreamTarget match {
case SNS(config) => new SNSDownstream(config)
case StdOut => STDIODownstream
}
}
def apply(): Downstream = STDIODownstream
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package weco.pipeline.relation_embedder.lib

import com.sksamuel.elastic4s.ElasticClient
import com.typesafe.config.Config
import weco.elasticsearch.ElasticClientBuilder

sealed trait ElasticConfig {
val host: String
val port: Int
val protocol: String
}

case class ElasticConfigUsernamePassword(
host: String,
port: Int,
protocol: String,
username: String,
password: String
) extends ElasticConfig

case class ElasticConfigApiKey(
host: String,
port: Int,
protocol: String,
apiKey: String
) extends ElasticConfig

// This should be moved up to scala-libs, it's copied from there!

object ElasticBuilder {
import weco.typesafe.config.builders.EnrichConfig._

def buildElasticClientConfig(
config: Config,
namespace: String = ""
): ElasticConfig = {
val hostname = config.requireString(s"es.$namespace.host")
val port = config
.getIntOption(s"es.$namespace.port")
.getOrElse(9200)
val protocol = config
.getStringOption(s"es.$namespace.protocol")
.getOrElse("http")

(
config.getStringOption(s"es.$namespace.username"),
config.getStringOption(s"es.$namespace.password"),
config.getStringOption(s"es.$namespace.apikey")
) match {
case (Some(username), Some(password), None) =>
ElasticConfigUsernamePassword(
hostname,
port,
protocol,
username,
password
)
// Use an API key if specified, even if username/password are also present
case (_, _, Some(apiKey)) =>
ElasticConfigApiKey(hostname, port, protocol, apiKey)
case _ =>
throw new Throwable(
s"You must specify username and password, or apikey, in the 'es.$namespace' config"
)
}
}

def buildElasticClient(config: ElasticConfig): ElasticClient =
config match {
case ElasticConfigUsernamePassword(
hostname,
port,
protocol,
username,
password
) =>
ElasticClientBuilder.create(
hostname,
port,
protocol,
username,
password
)
case ElasticConfigApiKey(hostname, port, protocol, apiKey) =>
ElasticClientBuilder.create(hostname, port, protocol, apiKey)
}

def buildElasticClient(
config: Config,
namespace: String = ""
): ElasticClient =
buildElasticClient(buildElasticClientConfig(config, namespace))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,22 @@ package weco.pipeline.relation_embedder.lib
import java.io.File
import com.typesafe.config.{Config, ConfigFactory}

trait Configuration {
val config: Config
trait ApplicationConfig {}

trait ConfigurationBuilder[C, T <: ApplicationConfig] {
protected val rawConfig: C

def build(rawConfig: C): T
def config: T = build(rawConfig)
}

trait TypesafeConfigurable[T <: ApplicationConfig]
extends ConfigurationBuilder[Config, T] {
def build(rawConfig: Config): T
}

trait LambdaConfiguration extends Configuration {
trait LambdaConfigurable[T <: ApplicationConfig]
extends TypesafeConfigurable[T] {
private val defaultResolveFromFile: String = "/tmp/config"
private val defaultApplicationConfig: String = "application.conf"

Expand All @@ -26,7 +37,7 @@ trait LambdaConfiguration extends Configuration {
ConfigFactory.empty()
}

lazy val config = lambdaConfig
lazy val rawConfig = lambdaConfig
.withFallback(applicationConfig)
.withFallback(baseConfig)
.resolve()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package weco.pipeline.relation_embedder.lib

import com.typesafe.config.Config
import weco.messaging.typesafe.SNSBuilder.buildSNSConfig
import ElasticBuilder.buildElasticClientConfig

case class RelationEmbedderConfig(
mergedWorkIndex: String,
denormalisedWorkIndex: String,
maxBatchWeight: Int,
completeTreeScroll: Int,
affectedWorksScroll: Int,
elasticConfig: ElasticConfig,
downstreamTarget: DownstreamTarget
) extends ApplicationConfig

trait RelationEmbedderConfigurable
extends LambdaConfigurable[RelationEmbedderConfig] {
import weco.typesafe.config.builders.EnrichConfig._

def build(rawConfig: Config): RelationEmbedderConfig =
RelationEmbedderConfig(
mergedWorkIndex = rawConfig.requireString("es.merged-works.index"),
denormalisedWorkIndex =
rawConfig.requireString("es.denormalised-works.index"),
maxBatchWeight = rawConfig.requireInt("es.works.batch_size"),
completeTreeScroll =
rawConfig.requireInt("es.works.scroll.complete_tree"),
affectedWorksScroll =
rawConfig.requireInt("es.works.scroll.affected_works"),
elasticConfig = buildElasticClientConfig(rawConfig),
downstreamTarget = {
rawConfig.requireString("relation_embedder.use_downstream") match {
case "sns" => SNS(buildSNSConfig(rawConfig))
case "stdio" => StdOut
}
}
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import weco.catalogue.internal_model.work.{Availability, Relations, Work}
import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged}
import weco.fixtures.TestWith
import weco.messaging.memory.MemoryMessageSender
import weco.pipeline.relation_embedder.lib.Downstream
import weco.pipeline.relation_embedder.models.Batch
import weco.pipeline.relation_embedder.models.Selector.{Descendents, Node, Tree}
import weco.pipeline_storage.memory.MemoryIndexer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import weco.messaging.sns.NotificationMessage
import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged}
import weco.catalogue.internal_model.work._
import weco.pipeline.relation_embedder.fixtures.SampleWorkTree

import weco.pipeline.relation_embedder.lib.Downstream
import weco.pipeline.relation_embedder.models._
import weco.pipeline_storage.memory.MemoryIndexer

Expand Down
Loading
Loading