Skip to content

Commit

Permalink
Merge pull request #375 from lichess-org/ingestor-refactor/0
Browse files Browse the repository at this point in the history
Refactor ingestor module
lenguyenthanh authored Nov 29, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 247064a + 905b2c5 commit d5ad73e
Showing 15 changed files with 344 additions and 400 deletions.
4 changes: 2 additions & 2 deletions modules/app/src/main/scala/app.scala
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ package app

import cats.effect.*
import cats.syntax.all.*
import org.typelevel.log4cats.slf4j.{ Slf4jFactory, Slf4jLogger }
import org.typelevel.log4cats.slf4j.Slf4jFactory
import org.typelevel.log4cats.{ Logger, LoggerFactory }
import org.typelevel.otel4s.experimental.metrics.*
import org.typelevel.otel4s.metrics.Meter
@@ -14,8 +14,8 @@ import org.typelevel.otel4s.sdk.metrics.exporter.MetricExporter

object App extends IOApp.Simple:

given Logger[IO] = Slf4jLogger.getLogger[IO]
given LoggerFactory[IO] = Slf4jFactory.create[IO]
given Logger[IO] = LoggerFactory[IO].getLogger

override def run: IO[Unit] = app.useForever

2 changes: 1 addition & 1 deletion modules/app/src/main/scala/service.health.scala
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory }

class HealthServiceImpl(esClient: ESClient[IO])(using LoggerFactory[IO]) extends HealthService[IO]:

given logger: Logger[IO] = summon[LoggerFactory[IO]].getLogger
given logger: Logger[IO] = LoggerFactory[IO].getLogger

override def healthCheck(): IO[HealthCheckOutput] =
esClient.status
9 changes: 1 addition & 8 deletions modules/app/src/main/scala/service.search.scala
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package lila.search
package app

import cats.effect.*
import com.sksamuel.elastic4s.Indexable
import io.github.arainko.ducktape.*
import lila.search.forum.Forum
import lila.search.game.Game
@@ -11,15 +10,14 @@ import lila.search.study.Study
import lila.search.team.Team
import org.typelevel.log4cats.{ Logger, LoggerFactory }
import smithy4s.Timestamp
import smithy4s.schema.Schema

import java.time.Instant

class SearchServiceImpl(esClient: ESClient[IO])(using LoggerFactory[IO]) extends SearchService[IO]:

import SearchServiceImpl.given

given logger: Logger[IO] = summon[LoggerFactory[IO]].getLogger
given logger: Logger[IO] = LoggerFactory[IO].getLogger

override def count(query: Query): IO[CountOutput] =
esClient
@@ -68,8 +66,3 @@ object SearchServiceImpl:
case _: Query.Game => Index.Game
case _: Query.Study => Index.Study
case _: Query.Team => Index.Team

import smithy4s.json.Json.given
import com.github.plokhotnyuk.jsoniter_scala.core.*

given [A: Schema]: Indexable[A] = (a: A) => writeToString(a)
4 changes: 2 additions & 2 deletions modules/e2e/src/test/scala/IntegrationSuite.scala
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ package test
import cats.effect.{ IO, Resource }
import cats.syntax.all.*
import com.comcast.ip4s.*
import lila.search.ingestor.given
import lila.search.ingestor.Ingestor.given
import lila.search.spec.*
import org.http4s.Uri
import org.typelevel.log4cats.noop.{ NoOpFactory, NoOpLogger }
@@ -41,7 +41,7 @@ object IntegrationSuite extends IOSuite:

def testAppConfig(elastic: ElasticConfig) = AppConfig(
server =
HttpServerConfig(ip"0.0.0.0", port"9999", apiLogger = false, shutdownTimeout = 30, enableDocs = false),
HttpServerConfig(ip"0.0.0.0", port"9999", apiLogger = false, shutdownTimeout = 1, enableDocs = false),
elastic = elastic
)

45 changes: 45 additions & 0 deletions modules/ingestor/src/main/scala/Repo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package lila.search
package ingestor

import cats.effect.IO

import java.time.Instant

trait Repo[A]:
def watch(since: Option[Instant]): fs2.Stream[IO, Repo.Result[A]]
def fetch(since: Instant, until: Instant): fs2.Stream[IO, Repo.Result[A]]

object Repo:
type SourceWithId[A] = (String, A)
case class Result[A](toIndex: List[SourceWithId[A]], toDelete: List[Id], timestamp: Option[Instant])

import cats.effect.IO
import mongo4cats.bson.Document
import mongo4cats.collection.GenericMongoCollection
import mongo4cats.models.collection.ChangeStreamDocument
import mongo4cats.operations.Filter
import org.bson.BsonTimestamp

import java.time.Instant

val _id = "_id"

type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]]

given [A]: HasDocId[ChangeStreamDocument[A]] with
extension (change: ChangeStreamDocument[A])
def docId: Option[String] =
change.documentKey.flatMap(_.id)

extension (doc: Document)
def id: Option[String] =
doc.getString(_id)

extension (instant: Instant)
inline def asBsonTimestamp: BsonTimestamp = BsonTimestamp(instant.getEpochSecond.toInt, 1)

def range(field: String)(since: Instant, until: Option[Instant]): Filter =
inline def gtes = Filter.gte(field, since)
until.fold(gtes)(until => gtes.and(Filter.lt(field, until)))

extension (s: String) def dollarPrefix = "$" + s
6 changes: 3 additions & 3 deletions modules/ingestor/src/main/scala/app.scala
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ package lila.search
package ingestor

import cats.effect.*
import org.typelevel.log4cats.slf4j.{ Slf4jFactory, Slf4jLogger }
import org.typelevel.log4cats.slf4j.Slf4jFactory
import org.typelevel.log4cats.{ Logger, LoggerFactory }
import org.typelevel.otel4s.experimental.metrics.*
import org.typelevel.otel4s.metrics.Meter
@@ -11,8 +11,8 @@ import org.typelevel.otel4s.sdk.metrics.SdkMetrics

object App extends IOApp.Simple:

given Logger[IO] = Slf4jLogger.getLogger[IO]
given LoggerFactory[IO] = Slf4jFactory.create[IO]
given Logger[IO] = LoggerFactory[IO].getLogger

override def run: IO[Unit] = app.useForever

@@ -33,7 +33,7 @@ object App extends IOApp.Simple:

class IngestorApp(res: AppResources, config: AppConfig)(using Logger[IO], LoggerFactory[IO]):
def run(): Resource[IO, Unit] =
Ingestor(res.lichess, res.study, res.studyLocal, res.elastic, res.store, config.ingestor)
Ingestors(res.lichess, res.study, res.studyLocal, res.store, res.elastic, config.ingestor)
.flatMap(_.run())
.toResource
.evalTap(_ => Logger[IO].info("Ingestor started"))
68 changes: 33 additions & 35 deletions modules/ingestor/src/main/scala/cli.scala
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import cats.syntax.all.*
import com.monovore.decline.*
import com.monovore.decline.effect.*
import lila.search.ingestor.opts.{ IndexOpts, WatchOpts }
import org.typelevel.log4cats.slf4j.{ Slf4jFactory, Slf4jLogger }
import org.typelevel.log4cats.slf4j.Slf4jFactory
import org.typelevel.log4cats.{ Logger, LoggerFactory }
import org.typelevel.otel4s.metrics.Meter

@@ -20,36 +20,29 @@ object cli
version = "3.0.0"
):

given Logger[IO] = Slf4jLogger.getLogger[IO]
given LoggerFactory[IO] = Slf4jFactory.create[IO]
given Logger[IO] = LoggerFactory[IO].getLogger
given Meter[IO] = Meter.noop[IO]

override def main: Opts[IO[ExitCode]] =
opts.parse.map: opts =>
makeExecutor.use(_.execute(opts).as(ExitCode.Success))
makeIngestor.use(_.execute(opts).as(ExitCode.Success))

def makeExecutor: Resource[IO, Executor] =
def makeIngestor: Resource[IO, Ingestors] =
for
config <- AppConfig.load.toResource
res <- AppResources.instance(config)
forum <- ForumIngestor(res.lichess, res.elastic, res.store, config.ingestor.forum).toResource
team <- TeamIngestor(res.lichess, res.elastic, res.store, config.ingestor.team).toResource
study <- StudyIngestor(
ingestor <- Ingestors(
res.lichess,
res.study,
res.studyLocal,
res.elastic,
res.store,
config.ingestor.study
res.elastic,
config.ingestor
).toResource
game <- GameIngestor(res.lichess, res.elastic, res.store, config.ingestor.game).toResource
yield Executor(forum, study, game, team)

class Executor(
val forum: ForumIngestor,
val study: StudyIngestor,
val game: GameIngestor,
val team: TeamIngestor
):
yield ingestor

extension (ingestor: Ingestors)
def execute(opts: IndexOpts | WatchOpts): IO[Unit] =
opts match
case opts: IndexOpts => index(opts)
@@ -58,28 +51,38 @@ object cli
def index(opts: IndexOpts): IO[Unit] =
opts.index match
case Index.Forum =>
forum.run(opts.since, opts.until, opts.dry).compile.drain
ingestor.forum.run(opts.since, opts.until, opts.dry)
case Index.Study =>
study.run(opts.since, opts.until, opts.dry).compile.drain
ingestor.study.run(opts.since, opts.until, opts.dry)
case Index.Game =>
game.run(opts.since, opts.until, opts.dry).compile.drain
ingestor.game.run(opts.since, opts.until, opts.dry)
case Index.Team =>
team.run(opts.since, opts.until, opts.dry).compile.drain
ingestor.team.run(opts.since, opts.until, opts.dry)
case _ =>
forum.run(opts.since, opts.until, opts.dry).compile.drain *>
study.run(opts.since, opts.until, opts.dry).compile.drain *>
game.run(opts.since, opts.until, opts.dry).compile.drain *>
team.run(opts.since, opts.until, opts.dry).compile.drain
ingestor.forum.run(opts.since, opts.until, opts.dry) *>
ingestor.study.run(opts.since, opts.until, opts.dry) *>
ingestor.game.run(opts.since, opts.until, opts.dry) *>
ingestor.team.run(opts.since, opts.until, opts.dry)

def watch(opts: WatchOpts): IO[Unit] =
opts.index match
case Index.Game =>
game.watch(opts.since.some, opts.dry).compile.drain
case _ => IO.println("We only support game watch for now")
ingestor.game.watch(opts.since.some, opts.dry)
case Index.Forum =>
ingestor.forum.watch(opts.since.some, opts.dry)
case Index.Team =>
ingestor.team.watch(opts.since.some, opts.dry)
case Index.Study =>
ingestor.study.watch(opts.since.some, opts.dry)
case _ =>
ingestor.forum.watch(opts.since.some, opts.dry) *>
ingestor.team.watch(opts.since.some, opts.dry) *>
ingestor.study.watch(opts.since.some, opts.dry) *>
ingestor.game.watch(opts.since.some, opts.dry)

object opts:
case class IndexOpts(index: Index | Unit, since: Instant, until: Instant, dry: Boolean)
case class WatchOpts(index: Index, since: Instant, dry: Boolean)
case class WatchOpts(index: Index | Unit, since: Instant, dry: Boolean)

def parse = Opts.subcommand("index", "index documents")(indexOpt) <+>
Opts.subcommand("watch", "watch change events and index documents")(watchOpt)
@@ -128,12 +131,7 @@ object opts:
)

val watchOpt = (
Opts.option[Index](
long = "index",
help = "Target index (only `game` for now)",
short = "i",
metavar = "forum|team|study|game"
),
singleIndexOpt orElse allIndexOpt,
Opts
.option[Instant](
long = "since",
106 changes: 82 additions & 24 deletions modules/ingestor/src/main/scala/ingestor.scala
Original file line number Diff line number Diff line change
@@ -3,33 +3,91 @@ package ingestor

import cats.effect.*
import cats.syntax.all.*
import mongo4cats.database.MongoDatabase
import org.typelevel.log4cats.LoggerFactory
import com.github.plokhotnyuk.jsoniter_scala.core.*
import com.sksamuel.elastic4s.Indexable
import org.typelevel.log4cats.syntax.*
import org.typelevel.log4cats.{ Logger, LoggerFactory }
import smithy4s.json.Json.given
import smithy4s.schema.Schema

import java.time.Instant

trait Ingestor:
def run(): IO[Unit]
// watch change events from database and ingest documents into elastic search
def watch: IO[Unit]
// Similar to watch but started from a given timestamp
def watch(since: Option[Instant], dryRun: Boolean): IO[Unit]
// Fetch documents in [since, until] and ingest into elastic search
def run(since: Instant, until: Instant, dryRun: Boolean): IO[Unit]

object Ingestor:

def apply(
lichess: MongoDatabase[IO],
study: MongoDatabase[IO],
local: MongoDatabase[IO],
elastic: ESClient[IO],
given [A: Schema]: Indexable[A] = (a: A) => writeToString(a)

def apply[A: Schema](
index: Index,
repo: Repo[A],
store: KVStore,
config: IngestorConfig
)(using LoggerFactory[IO]): IO[Ingestor] =
(
ForumIngestor(lichess, elastic, store, config.forum),
TeamIngestor(lichess, elastic, store, config.team),
StudyIngestor(study, local, elastic, store, config.study),
GameIngestor(lichess, elastic, store, config.game)
).mapN: (forum, team, study, game) =>
new Ingestor:
def run() =
fs2
.Stream(forum.watch, team.watch, study.watch, game.watch)
.covary[IO]
.parJoinUnbounded
.compile
.drain
elastic: ESClient[IO],
defaultStartAt: Option[Instant]
)(using LoggerFactory[IO]): Ingestor = new:
given Logger[IO] = LoggerFactory[IO].getLogger

def watch: IO[Unit] =
fs2.Stream
.eval(startAt)
.flatMap(repo.watch)
.evalMap: result =>
updateElastic(result, false) *> saveLastIndexedTimestamp(index, result.timestamp)
.compile
.drain

def watch(since: Option[Instant], dryRun: Boolean): IO[Unit] =
repo
.watch(since)
.evalMap(updateElastic(_, dryRun))
.compile
.drain

def run(since: Instant, until: Instant, dryRun: Boolean): IO[Unit] =
repo
.fetch(since, until)
.evalMap(updateElastic(_, dryRun))
.compile
.drain

private def updateElastic(result: Repo.Result[A], dryRun: Boolean): IO[Unit] =
dryRun.fold(
info"Would index total ${result.toIndex.size} games and delete ${result.toDelete.size} games" *>
result.toIndex.traverse_(x => debug"Would index $x")
*> result.toDelete.traverse_(x => debug"Would delete $x"),
storeBulk(index, result.toIndex)
*> deleteMany(index, result.toDelete)
)

private def startAt: IO[Option[Instant]] =
defaultStartAt
.fold(store.get(index.value))(_.some.pure[IO])
.flatTap(since => info"Starting ${index.value} ingestor from $since")

private def deleteMany(index: Index, ids: List[Id]): IO[Unit] =
elastic
.deleteMany(index, ids)
.flatTap(_ => Logger[IO].info(s"Deleted ${ids.size} ${index.value}s"))
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to delete ${index.value}: ${ids.map(_.value).mkString(", ")}")
.whenA(ids.nonEmpty)

private def storeBulk(index: Index, sources: List[(String, A)]): IO[Unit] =
Logger[IO].info(s"Received ${sources.size} docs to ${index.value}") *>
elastic
.storeBulk(index, sources)
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to ${index.value} index: ${sources.map(_._1).mkString(", ")}")
.whenA(sources.nonEmpty)
*> Logger[IO].info(s"Indexed ${sources.size} ${index.value}s")

private def saveLastIndexedTimestamp(index: Index, time: Option[Instant]): IO[Unit] =
val savedTime = time.getOrElse(Instant.now())
store.put(index.value, savedTime)
*> Logger[IO].info(s"Stored last indexed time ${savedTime.getEpochSecond} for $index")
39 changes: 39 additions & 0 deletions modules/ingestor/src/main/scala/ingestors.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package lila.search
package ingestor

import cats.effect.*
import cats.syntax.all.*
import mongo4cats.database.MongoDatabase
import org.typelevel.log4cats.LoggerFactory

class Ingestors(
val forum: Ingestor,
val study: Ingestor,
val game: Ingestor,
val team: Ingestor
):
def run(): IO[Unit] =
List(forum.watch, team.watch, study.watch, game.watch).parSequence_

object Ingestors:

def apply(
lichess: MongoDatabase[IO],
study: MongoDatabase[IO],
local: MongoDatabase[IO],
store: KVStore,
elastic: ESClient[IO],
config: IngestorConfig
)(using LoggerFactory[IO]): IO[Ingestors] =
(
ForumRepo(lichess, config.forum),
StudyRepo(study, local, config.study),
GameRepo(lichess, config.game),
TeamRepo(lichess, config.team)
).mapN: (forums, studies, games, teams) =>
new Ingestors(
Ingestor(Index.Forum, forums, store, elastic, config.forum.startAt),
Ingestor(Index.Study, studies, store, elastic, config.study.startAt),
Ingestor(Index.Game, games, store, elastic, config.game.startAt),
Ingestor(Index.Team, teams, store, elastic, config.team.startAt)
)
6 changes: 3 additions & 3 deletions modules/ingestor/src/main/scala/mongo.chapter.scala
Original file line number Diff line number Diff line change
@@ -11,6 +11,8 @@ import mongo4cats.database.MongoDatabase
import mongo4cats.operations.{ Accumulator, Aggregate, Filter }
import org.typelevel.log4cats.Logger

import Repo.*

trait ChapterRepo:
// Aggregate chapters data and convert them to StudyChapterText by their study ids
def byStudyIds(ids: List[String]): IO[Map[String, StudyData]]
@@ -108,8 +110,6 @@ object ChapterRepo:
def byStudyIds(ids: List[String]): IO[Map[String, StudyData]] =
coll
.aggregateWithCodec[StudyData](Query.aggregate(ids))
.stream
.compile
.toList
.all
.flatTap(docs => Logger[IO].debug(s"Received $docs chapters"))
.map(_.map(x => x._id -> x).toMap)
Original file line number Diff line number Diff line change
@@ -15,15 +15,9 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory }
import java.time.Instant
import scala.concurrent.duration.*

trait ForumIngestor:
// watch change events from MongoDB and ingest forum posts into elastic search
def watch: fs2.Stream[IO, Unit]
// Fetch posts in [since, until] and ingest into elastic search
def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit]
import Repo.{ *, given }

object ForumIngestor:

private val index = Index.Forum
object ForumRepo:

private val interestedOperations = List(DELETE, INSERT, REPLACE, UPDATE).map(_.getValue)

@@ -43,74 +37,37 @@ object ForumIngestor:
private def aggregate(maxPostLength: Int) =
Aggregate.matchBy(eventFilter(maxPostLength)).combinedWith(Aggregate.project(eventProjection))

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)(
using LoggerFactory[IO]
): IO[ForumIngestor] =
given Logger[IO] = summon[LoggerFactory[IO]].getLogger
(mongo.getCollection("f_topic"), mongo.getCollection("f_post")).mapN(apply(elastic, store, config))
def apply(mongo: MongoDatabase[IO], config: IngestorConfig.Forum)(using
LoggerFactory[IO]
): IO[Repo[ForumSource]] =
given Logger[IO] = LoggerFactory[IO].getLogger
(mongo.getCollection("f_topic"), mongo.getCollection("f_post")).mapN(apply(config))

def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)(
def apply(config: IngestorConfig.Forum)(
topics: MongoCollection,
posts: MongoCollection
)(using Logger[IO]): ForumIngestor = new:

def watch: fs2.Stream[IO, Unit] =
fs2.Stream
.eval(startAt.flatTap(since => info"Starting forum ingestor from $since"))
.flatMap: last =>
changes(last)
.evalMap: events =>
val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex.flatten(_.fullDocument))
*> elastic.deleteMany(index, toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now()))

def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] =
)(using Logger[IO]): Repo[ForumSource] = new:

def fetch(since: Instant, until: Instant) =
val filter = range(F.createdAt)(since, until.some)
.or(range(F.updatedAt)(since, until.some))
.or(range(F.erasedAt)(since, until.some))
posts
.find(filter)
.projection(postProjection)
.boundedStream(config.batchSize)
.filter(_.validText)
.chunkN(config.batchSize)
.map(_.toList)
.metered(1.second) // to avoid overloading the elasticsearch
.evalMap: docs =>
val (toDelete, toIndex) = docs.partition(_.isErased)
dryRun.fold(
toIndex.traverse_(doc => debug"Would index $doc")
*> toDelete.traverse_(doc => debug"Would delete $doc"),
storeBulk(toIndex) *> elastic.deleteMany(index, toDelete)
)

private def storeBulk(docs: List[Document]): IO[Unit] =
info"Received ${docs.size} forum posts to index" *>
docs.toSources
.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_.id).mkString(", ")}")
.whenA(docs.nonEmpty)

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
*> info"Stored last indexed time ${time.getEpochSecond} for $index"

private def startAt: IO[Option[Instant]] =
config.startAt.fold(store.get(index.value))(_.some.pure[IO])

// Fetches topic names by their ids
private def topicByIds(ids: Seq[String]): IO[Map[String, String]] =
topics
.find(Filter.in(_id, ids))
.projection(Projection.include(List(_id, Topic.name)))
.all
.map(_.map(doc => (doc.id, doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap)

private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
fs2.Stream.eval(info"Fetching teams from $since to $until") *>
posts
.find(filter)
.projection(postProjection)
.boundedStream(config.batchSize)
.filter(_.validText)
.chunkN(config.batchSize)
.map(_.toList)
.metered(1.second)
.evalMap: events =>
val (toDelete, toIndex) = events.partition(_.isErased)
toIndex.toSources
.map: sources =>
Result(sources, toDelete.flatten(_.id.map(Id.apply)), none)

def watch(since: Option[Instant]): fs2.Stream[IO, Result[ForumSource]] =
val builder = posts.watch(aggregate(config.maxPostLength))
// skip the first event if we're starting from a specific timestamp
// since the event at that timestamp is already indexed
@@ -124,11 +81,25 @@ object ForumIngestor:
.groupWithin(config.batchSize, config.timeWindows.second)
.evalTap(_.traverse_(x => debug"received $x"))
.map(_.toList.distincByDocId)
.evalMap: events =>
val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption
val (toDelete, toIndex) = events.partition(_.isDelete)
toIndex
.flatten(_.fullDocument)
.toSources
.map: sources =>
Result(sources, toDelete.flatten(_.docId.map(Id.apply)), lastEventTimestamp)

private type SourceWithId = (String, ForumSource)
// Fetches topic names by their ids
private def topicByIds(ids: Seq[String]): IO[Map[String, String]] =
topics
.find(Filter.in(_id, ids))
.projection(Projection.include(List(_id, Topic.name)))
.all
.map(_.map(doc => (doc.id, doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap)

extension (events: List[Document])
private def toSources: IO[List[SourceWithId]] =
private def toSources: IO[List[SourceWithId[ForumSource]]] =
val topicIds = events.flatMap(_.topicId).distinct
topicIds.isEmpty.fold(
info"no topics found for posts: $events".as(Nil),
@@ -141,7 +112,7 @@ object ForumIngestor:

extension (doc: Document)

private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId]] =
private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId[ForumSource]]] =
(doc.id, doc.topicId)
.flatMapN: (id, topicId) =>
doc.toSource(topicMap.get(topicId), topicId).map(id -> _)
Original file line number Diff line number Diff line change
@@ -20,17 +20,9 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory }
import java.time.Instant
import scala.concurrent.duration.*

trait GameIngestor:
// watch change events from game5 collection and ingest games into elastic search
def watch: fs2.Stream[IO, Unit]
// Similar to watch but started from a given timestamp
def watch(since: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit]
// Fetch posts in [since, until] and ingest into elastic search
def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit]
import Repo.{ *, given }

object GameIngestor:

private val index = Index.Game
object GameRepo:

private val interestedOperations = List(UPDATE, DELETE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)
@@ -70,63 +62,39 @@ object GameIngestor:
private val aggregate =
Aggregate.matchBy(eventFilter.and(changeFilter)).combinedWith(Aggregate.project(eventProjection))

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Game)(
using LoggerFactory[IO]
): IO[GameIngestor] =
given Logger[IO] = summon[LoggerFactory[IO]].getLogger
mongo.getCollectionWithCodec[DbGame]("game5").map(apply(elastic, store, config))

def apply(
elastic: ESClient[IO],
store: KVStore,
config: IngestorConfig.Game
)(games: MongoCollection[IO, DbGame])(using Logger[IO]): GameIngestor = new:
def apply(mongo: MongoDatabase[IO], config: IngestorConfig.Game)(using
LoggerFactory[IO]
): IO[Repo[GameSource]] =
given Logger[IO] = LoggerFactory[IO].getLogger
mongo.getCollectionWithCodec[DbGame]("game5").map(apply(config))

def watch: fs2.Stream[IO, Unit] =
fs2.Stream
.eval(startAt.flatTap(since => info"Starting game ingestor from $since"))
.flatMap(watch(_, dryRun = false))
def apply(config: IngestorConfig.Game)(games: MongoCollection[IO, DbGame])(using
Logger[IO]
): Repo[GameSource] = new:

def watch(since: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] =
def watch(since: Option[Instant]): fs2.Stream[IO, Result[GameSource]] =
changes(since)
.evalMap: events =>
.map: events =>
val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
val (toDelete, toIndex) = events.partition(_.operationType == DELETE)
dryRun.fold(
info"Would index total ${toIndex.size} games and delete ${toDelete.size} games" *>
toIndex.flatMap(_.fullDocument).traverse_(x => debug"Would index ${x.debug}")
*> toDelete.traverse_(x => debug"Would delete ${x.docId}"),
storeBulk(toIndex.flatten(_.fullDocument))
*> elastic.deleteMany(index, toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now))
Result(
toIndex.flatten(_.fullDocument.map(_.toSource)),
toDelete.flatten(_.docId.map(Id.apply)),
lastEventTimestamp
)

def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] =
def fetch(since: Instant, until: Instant): fs2.Stream[IO, Result[GameSource]] =
val filter = range(F.createdAt)(since, until.some)
.or(range(F.updatedAt)(since, until.some))
games
.find(filter.and(gameFilter))
// .projection(postProjection)
.boundedStream(config.batchSize)
.chunkN(config.batchSize)
.map(_.toList)
.metered(1.second) // to avoid overloading the elasticsearch
.evalMap: docs =>
dryRun.fold(
info"Would index total ${docs.size} games" *>
docs.traverse_(doc => debug"Would index $doc"),
storeBulk(docs)
)

private def storeBulk(docs: List[DbGame]): IO[Unit] =
val sources = docs.map(_.toSource)
info"Received ${docs.size} ${index.value}s to index" *>
elastic
.storeBulk(index, sources)
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index ${index.value}s: ${docs.map(_.id).mkString(", ")}")
.whenA(sources.nonEmpty)
*> info"Indexed ${sources.size} ${index.value}s"
fs2.Stream.eval(info"Fetching teams from $since to $until") *>
games
.find(filter.and(gameFilter))
// .projection(postProjection)
.boundedStream(config.batchSize)
.chunkN(config.batchSize)
.map(_.toList)
.metered(1.second) // to avoid overloading the elasticsearch
.map(ds => Result(ds.map(_.toSource), Nil, none))

private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[DbGame]]] =
val builder = games.watch(aggregate)
@@ -144,13 +112,6 @@ object GameIngestor:
.map(_.toList.distincByDocId)
.evalTap(_.traverse_(x => x.fullDocument.traverse_(x => debug"${x.debug}")))

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
*> info"Stored last indexed time ${time.getEpochSecond} for $index"

private def startAt: IO[Option[Instant]] =
config.startAt.fold(store.get(index.value))(_.some.pure[IO])

object F:
val createdAt = "ca"
val updatedAt = "ua"
Original file line number Diff line number Diff line change
@@ -11,14 +11,9 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory }

import java.time.Instant

trait StudyIngestor:
// pull changes from study MongoDB and ingest into elastic search
def watch: fs2.Stream[IO, Unit]
def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit]
import Repo.*

object StudyIngestor:

private val index = Index.Study
object StudyRepo:

private val interestedfields = List("_id", F.name, F.members, F.ownerId, F.visibility, F.topics, F.likes)

@@ -28,33 +23,30 @@ object StudyIngestor:
def apply(
study: MongoDatabase[IO],
local: MongoDatabase[IO],
elastic: ESClient[IO],
store: KVStore,
config: IngestorConfig.Study
)(using LoggerFactory[IO]): IO[StudyIngestor] =
given Logger[IO] = summon[LoggerFactory[IO]].getLogger
)(using LoggerFactory[IO]): IO[Repo[StudySource]] =
given Logger[IO] = LoggerFactory[IO].getLogger
(study.getCollection("study"), ChapterRepo(study), local.getCollection("oplog.rs"))
.mapN(apply(elastic, store, config))
.mapN(apply(config))

def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Study)(
def apply(config: IngestorConfig.Study)(
studies: MongoCollection,
chapters: ChapterRepo,
oplogs: MongoCollection
)(using Logger[IO]): StudyIngestor = new:
def watch: fs2.Stream[IO, Unit] =
intervalStream
)(using Logger[IO]): Repo[StudySource] = new:

def watch(since: Option[Instant]): fs2.Stream[IO, Result[StudySource]] =
intervalStream(since)
.meteredStartImmediately(config.interval)
.flatMap: (since, until) =>
run(since, until, dryRun = false)
.flatMap(fetch)

def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit] =
fs2.Stream.eval(info"Indexing studies from $since to $until") ++
pullAndIndex(since, until, dryRun) ++
fs2.Stream.eval(info"deleting studies from $since to $until") ++
pullAndDelete(since, until, dryRun)
++ fs2.Stream.eval(saveLastIndexedTimestamp(until))
def fetch(since: Instant, until: Instant): fs2.Stream[IO, Result[StudySource]] =
fs2.Stream.eval(info"Fetching studies from $since to $until") *>
pullAndIndex(since, until)
.zip(pullAndDelete(since, until))
.map((toIndex, toDelete) => Result(toIndex, toDelete, until.some))

def pullAndIndex(since: Instant, until: Instant, dryRun: Boolean = false): fs2.Stream[IO, Unit] =
def pullAndIndex(since: Instant, until: Instant) =
val filter = range(F.createdAt)(since, until.some)
.or(range(F.updatedAt)(since, until.some))
studies
@@ -64,9 +56,9 @@ object StudyIngestor:
.chunkN(config.batchSize)
.map(_.toList)
.evalTap(_.traverse_(x => debug"received $x"))
.evalMap(storeBulk(_, dryRun))
.evalMap(_.toSources)

def pullAndDelete(since: Instant, until: Instant, dryRun: Boolean = false): fs2.Stream[IO, Unit] =
def pullAndDelete(since: Instant, until: Instant) =
val filter =
Filter
.gte("ts", since.asBsonTimestamp)
@@ -80,40 +72,17 @@ object StudyIngestor:
.chunkN(config.batchSize)
.map(_.toList.flatMap(extractId))
.evalTap(xs => info"Deleting $xs")
.evalMap:
dryRun.fold(
xs => xs.traverse_(x => debug"Would delete $x"),
elastic.deleteMany(index, _)
)

def storeBulk(docs: List[Document], dryRun: Boolean = false): IO[Unit] =
info"Received ${docs.size} studies to index" *>
docs.toSources.flatMap: sources =>
dryRun.fold(
sources.traverse_(source => debug"Would index $source"),
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} studies"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index studies: ${docs.map(_.id).mkString(", ")}")
.whenA(docs.nonEmpty)
)

def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
*> info"Stored last indexed time ${time.getEpochSecond} for $index"

def extractId(doc: Document): Option[Id] =
doc.getNestedAs[String](F.oplogId).map(Id.apply)

def intervalStream: fs2.Stream[IO, (Instant, Instant)] =
fs2.Stream
.eval:
config.startAt.fold(store.get(index.value))(_.some.pure[IO])
.flatMap: startAt =>
startAt.fold(fs2.Stream.empty)(since => fs2.Stream(since))
++ fs2.Stream
.eval(IO.realTimeInstant)
.flatMap(now => fs2.Stream.unfold(now)(s => (s, s.plusSeconds(config.interval.toSeconds)).some))
.zipWithNext
def intervalStream(startAt: Option[Instant]): fs2.Stream[IO, (Instant, Instant)] =
(startAt.fold(fs2.Stream.empty)(since => fs2.Stream(since))
++ fs2.Stream
.eval(IO.realTimeInstant)
.flatMap(now =>
fs2.Stream.unfold(now)(s => (s, s.plusSeconds(config.interval.toSeconds)).some)
)).zipWithNext
.map((since, until) => since -> until.get)

extension (docs: List[Document])
Original file line number Diff line number Diff line change
@@ -15,15 +15,9 @@ import org.typelevel.log4cats.{ Logger, LoggerFactory }
import java.time.Instant
import scala.concurrent.duration.*

trait TeamIngestor:
// watch change events from MongoDB and ingest team data into elastic search
def watch: fs2.Stream[IO, Unit]
// Fetch teams in [since, until] and ingest into elastic search
def run(since: Instant, until: Instant, dryRun: Boolean): fs2.Stream[IO, Unit]
import Repo.{ *, given }

object TeamIngestor:

private val index = Index.Team
object TeamRepo:

private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)
@@ -37,65 +31,15 @@ object TeamIngestor:

private val aggregate = Aggregate.matchBy(eventFilter).combinedWith(Aggregate.project(eventProjection))

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(
using LoggerFactory[IO]
): IO[TeamIngestor] =
given Logger[IO] = summon[LoggerFactory[IO]].getLogger
mongo.getCollection("team").map(apply(elastic, store, config))

def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(teams: MongoCollection)(using
Logger[IO]
): TeamIngestor = new:
def watch =
fs2.Stream
.eval(startAt.flatTap(since => info"Starting team ingestor from $since"))
.flatMap: last =>
changeStream(last)
.filterNot(_.isEmpty)
.evalMap: events =>
val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex.flatten(_.fullDocument))
*> elastic.deleteMany(index, toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now))

def run(since: Instant, until: Instant, dryRun: Boolean) =
val filter = range(F.createdAt)(since, until.some)
.or(range(F.updatedAt)(since, until.some))
.or(range(F.erasedAt)(since, until.some))
teams
.find(filter)
.projection(postProjection)
.boundedStream(config.batchSize)
.chunkN(config.batchSize)
.map(_.toList)
.metered(1.second) // to avoid overloading the elasticsearch
.evalMap: docs =>
val (toDelete, toIndex) = docs.partition(!_.isEnabled)
dryRun.fold(
toIndex.traverse_(doc => debug"Would index $doc")
*> toDelete.traverse_(doc => debug"Would delete $doc"),
storeBulk(toIndex) *> elastic.deleteMany(index, toDelete)
)

private def storeBulk(docs: List[Document]): IO[Unit] =
val sources = docs.toSources
info"Received ${docs.size} teams to index" *>
elastic
.storeBulk(index, sources)
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_.id).mkString(", ")}")
.whenA(sources.nonEmpty)
*> info"Indexed ${sources.size} teams"
def apply(mongo: MongoDatabase[IO], config: IngestorConfig.Team)(using
LoggerFactory[IO]
): IO[Repo[TeamSource]] =
given Logger[IO] = LoggerFactory[IO].getLogger
mongo.getCollection("team").map(apply(config))

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
*> info"Stored last indexed time ${time.getEpochSecond} for $index"
def apply(config: IngestorConfig.Team)(teams: MongoCollection)(using Logger[IO]): Repo[TeamSource] = new:

private def startAt: IO[Option[Instant]] =
config.startAt.fold(store.get(index.value))(_.some.pure[IO])

private def changeStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
def watch(since: Option[Instant]) =
// skip the first event if we're starting from a specific timestamp
// since the event at that timestamp is already indexed
val skip = since.fold(0)(_ => 1)
@@ -109,6 +53,34 @@ object TeamIngestor:
.evalTap(x => debug"Team change stream event: $x")
.groupWithin(config.batchSize, config.timeWindows.second)
.map(_.toList.distincByDocId)
.map: docs =>
val lastEventTimestamp = docs.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
val (toDelete, toIndex) = docs.partition(_.isDelete)
Result(
toIndex.flatten(_.fullDocument).toSources,
toDelete.flatten(_.docId.map(Id.apply)),
lastEventTimestamp
)

def fetch(since: Instant, until: Instant) =
val filter = range(F.createdAt)(since, until.some)
.or(range(F.updatedAt)(since, until.some))
.or(range(F.erasedAt)(since, until.some))
fs2.Stream.eval(info"Fetching teams from $since to $until") *>
teams
.find(filter)
.projection(postProjection)
.boundedStream(config.batchSize)
.chunkN(config.batchSize)
.map(_.toList)
.metered(1.second) // to avoid overloading the elasticsearch
.map: docs =>
val (toDelete, toIndex) = docs.partition(!_.isEnabled)
Result(
toIndex.toSources,
toDelete.flatten(_.id.map(Id.apply)),
none
)

extension (docs: List[Document])
private def toSources: List[(String, TeamSource)] =
62 changes: 0 additions & 62 deletions modules/ingestor/src/main/scala/package.scala

This file was deleted.

0 comments on commit d5ad73e

Please sign in to comment.