Skip to content

Commit

Permalink
Merge pull request #384 from lichess-org/fix/study-ingestor-stream-zi…
Browse files Browse the repository at this point in the history
…pping

Fix study ingestor stream zipping
  • Loading branch information
lenguyenthanh authored Dec 4, 2024
2 parents b720ecd + 6ed509b commit 47ea964
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
11 changes: 6 additions & 5 deletions modules/ingestor/src/main/scala/ingestor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object Ingestor:
.eval(startAt)
.flatMap(repo.watch)
.evalMap: result =>
updateElastic(result, false) *> saveLastIndexedTimestamp(index, result.timestamp)
updateElastic(result, false) *> saveLastIndexedTimestamp(result.timestamp)
.compile
.drain

Expand Down Expand Up @@ -87,7 +87,8 @@ object Ingestor:
.whenA(sources.nonEmpty)
*> Logger[IO].info(s"Indexed ${sources.size} ${index.value}s")

private def saveLastIndexedTimestamp(index: Index, time: Option[Instant]): IO[Unit] =
val savedTime = time.getOrElse(Instant.now())
store.put(index.value, savedTime)
*> Logger[IO].info(s"Stored last indexed time ${savedTime.getEpochSecond} for $index")
private val saveLastIndexedTimestamp: Option[Instant] => IO[Unit] =
_.traverse_(time =>
store.put(index.value, time)
*> Logger[IO].info(s"Stored last indexed time ${time.getEpochSecond} for $index")
)
2 changes: 1 addition & 1 deletion modules/ingestor/src/main/scala/mongo.chapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ object ChapterRepo:
coll
.aggregateWithCodec[StudyData](Query.aggregate(ids))
.all
.flatTap(docs => Logger[IO].debug(s"Received $docs chapters"))
// .flatTap(docs => Logger[IO].debug(s"Received $docs chapters"))
.map(_.map(x => x._id -> x).toMap)
17 changes: 9 additions & 8 deletions modules/ingestor/src/main/scala/mongo.study.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ object StudyRepo:

def fetch(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] =
fs2.Stream.eval(info"Fetching studies from $since to $until") *>
pullAndIndex(since, until)
.zip(pullAndDelete(since, until))
.map((toIndex, toDelete) => Result(toIndex, toDelete, until.some))
pullForIndex(since, until)
.merge(pullForDelete(since, until))
++ fs2.Stream(Result(Nil, Nil, until.some))

def pullAndIndex(since: Instant, until: Instant) =
def pullForIndex(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] =
val filter = range(F.createdAt)(since, until.some)
.or(range(F.updatedAt)(since, until.some))
studies
Expand All @@ -55,10 +55,11 @@ object StudyRepo:
.boundedStream(config.batchSize)
.chunkN(config.batchSize)
.map(_.toList)
.evalTap(_.traverse_(x => debug"received $x"))
// .evalTap(_.traverse_(x => debug"received $x"))
.evalMap(_.toSources)
.map(Result(_, Nil, none))

def pullAndDelete(since: Instant, until: Instant) =
def pullForDelete(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] =
val filter =
Filter
.gte("ts", since.asBsonTimestamp)
Expand All @@ -72,6 +73,7 @@ object StudyRepo:
.chunkN(config.batchSize)
.map(_.toList.flatMap(extractId))
.evalTap(xs => info"Deleting $xs")
.map(Result(Nil, _, none))

def extractId(doc: Document): Option[Id] =
doc.getNestedAs[String](F.oplogId).map(Id.apply)
Expand All @@ -91,8 +93,7 @@ object StudyRepo:
chapters
.byStudyIds(studyIds)
.flatMap: chapters =>
docs
.traverseFilter(_.toSource(chapters))
docs.traverseFilter(_.toSource(chapters))

type StudySourceWithId = (String, StudySource)
extension (doc: Document)
Expand Down

0 comments on commit 47ea964

Please sign in to comment.