Skip to content

Commit

Permalink
Use merge instead of zip study pulling stream
Browse files Browse the repository at this point in the history
With zip, each element of each stream need to wait for the other
stream to emit, which may take a long time.
  • Loading branch information
lenguyenthanh committed Dec 4, 2024
1 parent b720ecd commit b6db210
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion modules/ingestor/src/main/scala/mongo.chapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ object ChapterRepo:
coll
.aggregateWithCodec[StudyData](Query.aggregate(ids))
.all
.flatTap(docs => Logger[IO].debug(s"Received $docs chapters"))
// .flatTap(docs => Logger[IO].debug(s"Received $docs chapters"))
.map(_.map(x => x._id -> x).toMap)
16 changes: 8 additions & 8 deletions modules/ingestor/src/main/scala/mongo.study.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ object StudyRepo:

def fetch(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] =
fs2.Stream.eval(info"Fetching studies from $since to $until") *>
pullAndIndex(since, until)
.zip(pullAndDelete(since, until))
.map((toIndex, toDelete) => Result(toIndex, toDelete, until.some))
pullForIndex(since, until)
.merge(pullForDelete(since, until))

def pullAndIndex(since: Instant, until: Instant) =
def pullForIndex(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] =
val filter = range(F.createdAt)(since, until.some)
.or(range(F.updatedAt)(since, until.some))
studies
Expand All @@ -55,10 +54,11 @@ object StudyRepo:
.boundedStream(config.batchSize)
.chunkN(config.batchSize)
.map(_.toList)
.evalTap(_.traverse_(x => debug"received $x"))
// .evalTap(_.traverse_(x => debug"received $x"))
.evalMap(_.toSources)
.map(Result(_, Nil, until.some))

def pullAndDelete(since: Instant, until: Instant) =
def pullForDelete(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] =
val filter =
Filter
.gte("ts", since.asBsonTimestamp)
Expand All @@ -72,6 +72,7 @@ object StudyRepo:
.chunkN(config.batchSize)
.map(_.toList.flatMap(extractId))
.evalTap(xs => info"Deleting $xs")
.map(Result(Nil, _, until.some))

def extractId(doc: Document): Option[Id] =
doc.getNestedAs[String](F.oplogId).map(Id.apply)
Expand All @@ -91,8 +92,7 @@ object StudyRepo:
chapters
.byStudyIds(studyIds)
.flatMap: chapters =>
docs
.traverseFilter(_.toSource(chapters))
docs.traverseFilter(_.toSource(chapters))

type StudySourceWithId = (String, StudySource)
extension (doc: Document)
Expand Down

0 comments on commit b6db210

Please sign in to comment.