From 304eb76d2f86f7d75e316860364590dd2911aef6 Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Wed, 18 Dec 2024 15:26:29 +0000 Subject: [PATCH] push decoding message up a level --- .../transformer/TransformerWorker.scala | 86 +++++++++++-------- 1 file changed, 48 insertions(+), 38 deletions(-) diff --git a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala index e2362cdb66..716fa24aab 100644 --- a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala +++ b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala @@ -36,7 +36,8 @@ trait SourceDataRetriever[Payload, SourceData] { ): Either[ReadError, Identified[Version[String, Int], SourceData]] } -trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Logging { +trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] + extends Logging { type Result[T] = Either[TransformerWorkerError, T] type StoreKey = Version[String, Int] @@ -50,26 +51,21 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo val transformerName: String def processEvent( - message: NotificationMessage - ): Future[Result[Option[(Work[Source], StoreKey)]]] = + payload: Payload + ): Future[Result[Option[(Work[Source], StoreKey)]]] = Future { - for { - payload <- decodePayload(message) - key = Version(payload.id, payload.version) - - _ = debug(s"Decoded payload $payload and key $key") + val key = Version(payload.id, payload.version) + for { getResult <- getSourceData(payload) (sourceData, version) = getResult - _ = debug(s"Retrieved sourceData version $version for key $key") - newWork <- work(sourceData, version, key) } yield (newWork, key) }.flatMap { compareToStored } private def compareToStored( - workResult: Result[(Work[Source], StoreKey)] - ): Future[Result[Option[(Work[Source], StoreKey)]]] = + workResult: Result[(Work[Source], StoreKey)] + ): Future[Result[Option[(Work[Source], StoreKey)]]] = workResult match { // Once we've transformed the Work, we query forward -- is this a work we've @@ -107,22 +103,16 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo } private def work( - sourceData: SourceData, - version: Int, - key: StoreKey - ): Result[Work[Source]] = + sourceData: SourceData, + version: Int, + key: StoreKey + ): Result[Work[Source]] = transformer(id = key.id, sourceData, version) match { case Right(result) => Right(result) case Left(err) => Left(TransformerError(err, sourceData, key)) } - private def decodePayload(message: NotificationMessage): Result[Payload] = - fromJson[Payload](message.body) match { - case Success(storeKey) => Right(storeKey) - case Failure(err) => Left(DecodePayloadError(err, message)) - } - private def getSourceData(p: Payload): Result[(SourceData, Int)] = sourceDataRetriever .lookupSourceData(p) @@ -133,7 +123,9 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo s"Stored ID ($storedId) does not match ID from message (${p.id})" ) } - + debug( + s"Retrieved sourceData version $storedVersion for key $storedId" + ) (sourceData, storedVersion) } .left @@ -143,9 +135,9 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo } private def shouldSend( - transformedWork: Work[Source], - storedWork: Work[Source] - ): Boolean = { + transformedWork: Work[Source], + storedWork: Work[Source] + ): Boolean = { if (transformedWork.version < storedWork.version) { debug( s"${transformedWork.id}: transformed Work is older than the stored Work" @@ -187,9 +179,9 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo } private def areEquivalent( - transformedWork: Work[Source], - storedWork: Work[Source] - ): Boolean = { + transformedWork: Work[Source], + storedWork: Work[Source] + ): Boolean = { // Sometimes we get updates from our sources even though the data hasn't necessarily changed. // One example of that is the Sierra Calm sync script that triggers an update to // every sierra catalogued in calm every night. It can be very expensive if we let @@ -217,15 +209,15 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo * - Emits the message via `MessageSender` to SNS */ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( - val transformer: Transformer[SourceData], - val transformedWorkRetriever: Retriever[Work[Source]], - pipelineStream: PipelineStorageStream[NotificationMessage, Work[ + val transformer: Transformer[SourceData], + val transformedWorkRetriever: Retriever[Work[Source]], + pipelineStream: PipelineStorageStream[NotificationMessage, Work[ Source ], SenderDest], - val sourceDataRetriever: SourceDataRetriever[Payload, SourceData] + val sourceDataRetriever: SourceDataRetriever[Payload, SourceData] )(implicit val ec: ExecutionContext, val decoder: Decoder[Payload]) extends Logging - with TransformerEventProcessor[Payload, SourceData] + with TransformerEventProcessor[Payload, SourceData] with Runnable { lazy val transformerName: String = this.getClass.getSimpleName @@ -239,11 +231,15 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( // We do some slightly nicer logging here to give context to the errors err match { case DecodePayloadError(_, notificationMsg) => - error(s"$transformerName: DecodePayloadError from $notificationMsg") + error( + s"$transformerName: DecodePayloadError from $notificationMsg" + ) case StoreReadError(_, key) => error(s"$transformerName: StoreReadError on $key") case TransformerError(t, sourceData, key) => - error(s"$transformerName: TransformerError on $sourceData with $key ($t)") + error( + s"$transformerName: TransformerError on $sourceData with $key ($t)" + ) } throw err @@ -255,12 +251,26 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( Nil case Right(Some((work, key))) => - info(s"$transformerName: from $key transformed work with id ${work.id}") + info( + s"$transformerName: from $key transformed work with id ${work.id}" + ) List(work) } ) def process( message: NotificationMessage - ): Future[Result[Option[(Work[Source], StoreKey)]]] = processEvent(message) + ): Future[Result[Option[(Work[Source], StoreKey)]]] = { + decodePayload(message).flatMap { + payload => + debug(s"Decoded payload $payload, successfully") + processEvent(payload) + } + } + + private def decodePayload(message: NotificationMessage): Future[Payload] = + fromJson[Payload](message.body) match { + case Success(storeKey) => Future.successful(storeKey) + case Failure(err) => Future.failed(DecodePayloadError(err, message)) + } }