From 6ed509b7d111d7c2e61df1c3d1fe2521401efdc8 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Wed, 4 Dec 2024 17:14:51 +0100 Subject: [PATCH] Ony save time when result contains timestamp --- modules/ingestor/src/main/scala/ingestor.scala | 11 ++++++----- modules/ingestor/src/main/scala/mongo.study.scala | 5 +++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/modules/ingestor/src/main/scala/ingestor.scala b/modules/ingestor/src/main/scala/ingestor.scala index 27fbd8f..9a04648 100644 --- a/modules/ingestor/src/main/scala/ingestor.scala +++ b/modules/ingestor/src/main/scala/ingestor.scala @@ -38,7 +38,7 @@ object Ingestor: .eval(startAt) .flatMap(repo.watch) .evalMap: result => - updateElastic(result, false) *> saveLastIndexedTimestamp(index, result.timestamp) + updateElastic(result, false) *> saveLastIndexedTimestamp(result.timestamp) .compile .drain @@ -87,7 +87,8 @@ object Ingestor: .whenA(sources.nonEmpty) *> Logger[IO].info(s"Indexed ${sources.size} ${index.value}s") - private def saveLastIndexedTimestamp(index: Index, time: Option[Instant]): IO[Unit] = - val savedTime = time.getOrElse(Instant.now()) - store.put(index.value, savedTime) - *> Logger[IO].info(s"Stored last indexed time ${savedTime.getEpochSecond} for $index") + private val saveLastIndexedTimestamp: Option[Instant] => IO[Unit] = + _.traverse_(time => + store.put(index.value, time) + *> Logger[IO].info(s"Stored last indexed time ${time.getEpochSecond} for $index") + ) diff --git a/modules/ingestor/src/main/scala/mongo.study.scala b/modules/ingestor/src/main/scala/mongo.study.scala index 15d933c..19a76df 100644 --- a/modules/ingestor/src/main/scala/mongo.study.scala +++ b/modules/ingestor/src/main/scala/mongo.study.scala @@ -44,6 +44,7 @@ object StudyRepo: fs2.Stream.eval(info"Fetching studies from $since to $until") *> pullForIndex(since, until) .merge(pullForDelete(since, until)) + ++ fs2.Stream(Result(Nil, Nil, until.some)) def pullForIndex(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] = val filter = range(F.createdAt)(since, until.some) @@ -56,7 +57,7 @@ object StudyRepo: .map(_.toList) // .evalTap(_.traverse_(x => debug"received $x")) .evalMap(_.toSources) - .map(Result(_, Nil, until.some)) + .map(Result(_, Nil, none)) def pullForDelete(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] = val filter = @@ -72,7 +73,7 @@ object StudyRepo: .chunkN(config.batchSize) .map(_.toList.flatMap(extractId)) .evalTap(xs => info"Deleting $xs") - .map(Result(Nil, _, until.some)) + .map(Result(Nil, _, none)) def extractId(doc: Document): Option[Id] = doc.getNestedAs[String](F.oplogId).map(Id.apply)