Skip to content

Commit

Permalink
push decoding message up a level
Browse files Browse the repository at this point in the history
  • Loading branch information
kenoir committed Dec 18, 2024
1 parent fb66b33 commit 304eb76
Showing 1 changed file with 48 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ trait SourceDataRetriever[Payload, SourceData] {
): Either[ReadError, Identified[Version[String, Int], SourceData]]
}

trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Logging {
trait TransformerEventProcessor[Payload <: SourcePayload, SourceData]
extends Logging {
type Result[T] = Either[TransformerWorkerError, T]
type StoreKey = Version[String, Int]

Expand All @@ -50,26 +51,21 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo
val transformerName: String

def processEvent(
message: NotificationMessage
): Future[Result[Option[(Work[Source], StoreKey)]]] =
payload: Payload
): Future[Result[Option[(Work[Source], StoreKey)]]] =
Future {
for {
payload <- decodePayload(message)
key = Version(payload.id, payload.version)

_ = debug(s"Decoded payload $payload and key $key")
val key = Version(payload.id, payload.version)

for {
getResult <- getSourceData(payload)
(sourceData, version) = getResult
_ = debug(s"Retrieved sourceData version $version for key $key")

newWork <- work(sourceData, version, key)
} yield (newWork, key)
}.flatMap { compareToStored }

private def compareToStored(
workResult: Result[(Work[Source], StoreKey)]
): Future[Result[Option[(Work[Source], StoreKey)]]] =
workResult: Result[(Work[Source], StoreKey)]
): Future[Result[Option[(Work[Source], StoreKey)]]] =
workResult match {

// Once we've transformed the Work, we query forward -- is this a work we've
Expand Down Expand Up @@ -107,22 +103,16 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo
}

private def work(
sourceData: SourceData,
version: Int,
key: StoreKey
): Result[Work[Source]] =
sourceData: SourceData,
version: Int,
key: StoreKey
): Result[Work[Source]] =
transformer(id = key.id, sourceData, version) match {
case Right(result) => Right(result)
case Left(err) =>
Left(TransformerError(err, sourceData, key))
}

private def decodePayload(message: NotificationMessage): Result[Payload] =
fromJson[Payload](message.body) match {
case Success(storeKey) => Right(storeKey)
case Failure(err) => Left(DecodePayloadError(err, message))
}

private def getSourceData(p: Payload): Result[(SourceData, Int)] =
sourceDataRetriever
.lookupSourceData(p)
Expand All @@ -133,7 +123,9 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo
s"Stored ID ($storedId) does not match ID from message (${p.id})"
)
}

debug(
s"Retrieved sourceData version $storedVersion for key $storedId"
)
(sourceData, storedVersion)
}
.left
Expand All @@ -143,9 +135,9 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo
}

private def shouldSend(
transformedWork: Work[Source],
storedWork: Work[Source]
): Boolean = {
transformedWork: Work[Source],
storedWork: Work[Source]
): Boolean = {
if (transformedWork.version < storedWork.version) {
debug(
s"${transformedWork.id}: transformed Work is older than the stored Work"
Expand Down Expand Up @@ -187,9 +179,9 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo
}

private def areEquivalent(
transformedWork: Work[Source],
storedWork: Work[Source]
): Boolean = {
transformedWork: Work[Source],
storedWork: Work[Source]
): Boolean = {
// Sometimes we get updates from our sources even though the data hasn't necessarily changed.
// One example of that is the Sierra Calm sync script that triggers an update to
// every sierra catalogued in calm every night. It can be very expensive if we let
Expand Down Expand Up @@ -217,15 +209,15 @@ trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Lo
* - Emits the message via `MessageSender` to SNS
*/
final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
val transformer: Transformer[SourceData],
val transformedWorkRetriever: Retriever[Work[Source]],
pipelineStream: PipelineStorageStream[NotificationMessage, Work[
val transformer: Transformer[SourceData],
val transformedWorkRetriever: Retriever[Work[Source]],
pipelineStream: PipelineStorageStream[NotificationMessage, Work[
Source
], SenderDest],
val sourceDataRetriever: SourceDataRetriever[Payload, SourceData]
val sourceDataRetriever: SourceDataRetriever[Payload, SourceData]
)(implicit val ec: ExecutionContext, val decoder: Decoder[Payload])
extends Logging
with TransformerEventProcessor[Payload, SourceData]
with TransformerEventProcessor[Payload, SourceData]
with Runnable {

lazy val transformerName: String = this.getClass.getSimpleName
Expand All @@ -239,11 +231,15 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
// We do some slightly nicer logging here to give context to the errors
err match {
case DecodePayloadError(_, notificationMsg) =>
error(s"$transformerName: DecodePayloadError from $notificationMsg")
error(
s"$transformerName: DecodePayloadError from $notificationMsg"
)
case StoreReadError(_, key) =>
error(s"$transformerName: StoreReadError on $key")
case TransformerError(t, sourceData, key) =>
error(s"$transformerName: TransformerError on $sourceData with $key ($t)")
error(
s"$transformerName: TransformerError on $sourceData with $key ($t)"
)
}

throw err
Expand All @@ -255,12 +251,26 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
Nil

case Right(Some((work, key))) =>
info(s"$transformerName: from $key transformed work with id ${work.id}")
info(
s"$transformerName: from $key transformed work with id ${work.id}"
)
List(work)
}
)

def process(
message: NotificationMessage
): Future[Result[Option[(Work[Source], StoreKey)]]] = processEvent(message)
): Future[Result[Option[(Work[Source], StoreKey)]]] = {
decodePayload(message).flatMap {
payload =>
debug(s"Decoded payload $payload, successfully")
processEvent(payload)
}
}

private def decodePayload(message: NotificationMessage): Future[Payload] =
fromJson[Payload](message.body) match {
case Success(storeKey) => Future.successful(storeKey)
case Failure(err) => Future.failed(DecodePayloadError(err, message))
}
}

0 comments on commit 304eb76

Please sign in to comment.