Skip to content

Commit

Permalink
split event from flow
Browse files Browse the repository at this point in the history
  • Loading branch information
kenoir committed Dec 17, 2024
1 parent 2622c78 commit fb66b33
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,5 @@ object BatchProcessor {
bulkWriter = batchWriter,
downstream = Downstream(Some(config))
)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class CalmTransformerEndToEndTest
new TransformerWorker[CalmSourcePayload, CalmSourceData, String](
transformer = CalmTransformer,
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new CalmSourceDataRetriever(store)
)
worker.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TransformerMain[Payload <: SourcePayload, SourceData](
new TransformerWorker(
transformer = transformer,
pipelineStream = pipelineStream,
retriever = sourceWorkRetriever,
transformedWorkRetriever = sourceWorkRetriever,
sourceDataRetriever = sourceDataRetriever
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,61 +36,22 @@ trait SourceDataRetriever[Payload, SourceData] {
): Either[ReadError, Identified[Version[String, Int], SourceData]]
}

/** A TransformerWorker:
* - Takes an SQS stream that emits VHS keys
* - Gets the record of type `SourceData`
* - Runs it through a transformer and transforms the `SourceData` to
* `Work[Source]`
* - Emits the message via `MessageSender` to SNS
*/
final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
transformer: Transformer[SourceData],
retriever: Retriever[Work[Source]],
pipelineStream: PipelineStorageStream[NotificationMessage, Work[
Source
], SenderDest],
sourceDataRetriever: SourceDataRetriever[Payload, SourceData]
)(implicit ec: ExecutionContext, decoder: Decoder[Payload])
extends Logging
with Runnable {
trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Logging {
type Result[T] = Either[TransformerWorkerError, T]
type StoreKey = Version[String, Int]

def name: String = this.getClass.getSimpleName
implicit val ec: ExecutionContext
implicit val decoder: Decoder[Payload]

def run(): Future[Done] =
pipelineStream.foreach(
name,
(notification: NotificationMessage) =>
process(notification).map {
case Left(err) =>
// We do some slightly nicer logging here to give context to the errors
err match {
case DecodePayloadError(_, notificationMsg) =>
error(s"$name: DecodePayloadError from $notificationMsg")
case StoreReadError(_, key) =>
error(s"$name: StoreReadError on $key")
case TransformerError(t, sourceData, key) =>
error(s"$name: TransformerError on $sourceData with $key ($t)")
}

throw err
val sourceDataRetriever: SourceDataRetriever[Payload, SourceData]
val transformer: Transformer[SourceData]
val transformedWorkRetriever: Retriever[Work[Source]]

case Right(None) =>
debug(
s"$name: no transformed Work returned for $notification (this means the Work is already in the pipeline)"
)
Nil
val transformerName: String

case Right(Some((work, key))) =>
info(s"$name: from $key transformed work with id ${work.id}")
List(work)
}
)

def process(
message: NotificationMessage
): Future[Result[Option[(Work[Source], StoreKey)]]] =
def processEvent(
message: NotificationMessage
): Future[Result[Option[(Work[Source], StoreKey)]]] =
Future {
for {
payload <- decodePayload(message)
Expand All @@ -107,8 +68,8 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
}.flatMap { compareToStored }

private def compareToStored(
workResult: Result[(Work[Source], StoreKey)]
): Future[Result[Option[(Work[Source], StoreKey)]]] =
workResult: Result[(Work[Source], StoreKey)]
): Future[Result[Option[(Work[Source], StoreKey)]]] =
workResult match {

// Once we've transformed the Work, we query forward -- is this a work we've
Expand All @@ -123,15 +84,15 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
// Calm. The records get a new modifiedDate from Sierra, but none of the data
// we care about for the pipeline is changed.
case Right((transformedWork, key)) =>
retriever
transformedWorkRetriever
.apply(workIndexable.id(transformedWork))
.map {
storedWork =>
if (shouldSend(transformedWork, storedWork)) {
Right(Some((transformedWork, key)))
} else {
info(
s"$name: from $key transformed work with id ${transformedWork.id}; already in pipeline so not re-sending"
s"$transformerName: from $key transformed work with id ${transformedWork.id}; already in pipeline so not re-sending"
)
Right(None)
}
Expand All @@ -146,10 +107,10 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
}

private def work(
sourceData: SourceData,
version: Int,
key: StoreKey
): Result[Work[Source]] =
sourceData: SourceData,
version: Int,
key: StoreKey
): Result[Work[Source]] =
transformer(id = key.id, sourceData, version) match {
case Right(result) => Right(result)
case Left(err) =>
Expand Down Expand Up @@ -182,9 +143,9 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
}

private def shouldSend(
transformedWork: Work[Source],
storedWork: Work[Source]
): Boolean = {
transformedWork: Work[Source],
storedWork: Work[Source]
): Boolean = {
if (transformedWork.version < storedWork.version) {
debug(
s"${transformedWork.id}: transformed Work is older than the stored Work"
Expand Down Expand Up @@ -226,9 +187,9 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
}

private def areEquivalent(
transformedWork: Work[Source],
storedWork: Work[Source]
): Boolean = {
transformedWork: Work[Source],
storedWork: Work[Source]
): Boolean = {
// Sometimes we get updates from our sources even though the data hasn't necessarily changed.
// One example of that is the Sierra Calm sync script that triggers an update to
// every sierra catalogued in calm every night. It can be very expensive if we let
Expand All @@ -247,3 +208,59 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
modifiedTransformedWork == modifiedSourceWork
}
}

/** A TransformerWorker:
* - Takes an SQS stream that emits VHS keys
* - Gets the record of type `SourceData`
* - Runs it through a transformer and transforms the `SourceData` to
* `Work[Source]`
* - Emits the message via `MessageSender` to SNS
*/
final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
val transformer: Transformer[SourceData],
val transformedWorkRetriever: Retriever[Work[Source]],
pipelineStream: PipelineStorageStream[NotificationMessage, Work[
Source
], SenderDest],
val sourceDataRetriever: SourceDataRetriever[Payload, SourceData]
)(implicit val ec: ExecutionContext, val decoder: Decoder[Payload])
extends Logging
with TransformerEventProcessor[Payload, SourceData]
with Runnable {

lazy val transformerName: String = this.getClass.getSimpleName

def run(): Future[Done] =
pipelineStream.foreach(
transformerName,
(notification: NotificationMessage) =>
process(notification).map {
case Left(err) =>
// We do some slightly nicer logging here to give context to the errors
err match {
case DecodePayloadError(_, notificationMsg) =>
error(s"$transformerName: DecodePayloadError from $notificationMsg")
case StoreReadError(_, key) =>
error(s"$transformerName: StoreReadError on $key")
case TransformerError(t, sourceData, key) =>
error(s"$transformerName: TransformerError on $sourceData with $key ($t)")
}

throw err

case Right(None) =>
debug(
s"$transformerName: no transformed Work returned for $notification (this means the Work is already in the pipeline)"
)
Nil

case Right(Some((work, key))) =>
info(s"$transformerName: from $key transformed work with id ${work.id}")
List(work)
}
)

def process(
message: NotificationMessage
): Future[Result[Option[(Work[Source], StoreKey)]]] = processEvent(message)
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class TransformerWorkerTest
val worker = new TransformerWorker(
transformer = transformer,
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever =
new ExampleSourcePayloadLookup(sourceStore = store)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MetsTransformerEndToEndTest
new TransformerWorker[MetsSourcePayload, MetsSourceData, String](
transformer = new MetsXmlTransformer(store),
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new MetsSourceDataRetriever
)
worker.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class MiroTransformerEndToEndTest
](
transformer = new MiroRecordTransformer,
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new MiroSourceDataRetriever(store)
)
worker.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class SierraTransformerEndToEndTest
version: Int
) => SierraTransformer(transformable, version).toEither,
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new SierraSourceDataRetriever(store)
)
worker.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TeiTransformerEndToEndTest
new TransformerWorker[TeiSourcePayload, TeiMetadata, String](
transformer = new TeiTransformer(store),
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new TeiSourceDataRetriever
)
worker.run()
Expand Down

0 comments on commit fb66b33

Please sign in to comment.