Skip to content

Commit

Permalink
Use Indexable in store and storeBulk
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed May 10, 2024
1 parent 39eb2f3 commit 2a3eadb
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 18 deletions.
16 changes: 7 additions & 9 deletions modules/core/src/main/scala/ESClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package lila.search
import com.sksamuel.elastic4s.ElasticDsl.{ RichFuture => _, _ }
import com.sksamuel.elastic4s.fields.ElasticField
import com.sksamuel.elastic4s.{ ElasticClient, ElasticDsl, Index => ESIndex, Response }
import com.sksamuel.elastic4s.{ Executor, Functor }
import com.sksamuel.elastic4s.{ Executor, Functor, Indexable }
import cats.syntax.all.*
import cats.MonadThrow

case class JsonObject(json: String) extends AnyVal

case class Index(name: String) extends AnyVal {
def toES: ESIndex = ESIndex(name)
}
Expand All @@ -17,8 +15,8 @@ trait ESClient[F[_]] {

def search[A](index: Index, query: A, from: From, size: Size)(implicit q: Queryable[A]): F[SearchResponse]
def count[A](index: Index, query: A)(implicit q: Queryable[A]): F[CountResponse]
def store(index: Index, id: Id, obj: JsonObject): F[Unit]
def storeBulk(index: Index, objs: List[(String, JsonObject)]): F[Unit]
def store[A](index: Index, id: Id, obj: A)(implicit indexable: Indexable[A]): F[Unit]
def storeBulk[A](index: Index, objs: Seq[(String, A)])(implicit indexable: Indexable[A]): F[Unit]
def deleteOne(index: Index, id: Id): F[Unit]
def deleteMany(index: Index, ids: List[Id]): F[Unit]
def putMapping(index: Index, fields: Seq[ElasticField]): F[Unit]
Expand Down Expand Up @@ -48,16 +46,16 @@ object ESClient {
.flatMap(toResult)
.map(CountResponse.apply)

def store(index: Index, id: Id, obj: JsonObject): F[Unit] =
client.execute(indexInto(index.name).source(obj.json).id(id.value)).void
def store[A](index: Index, id: Id, obj: A)(implicit indexable: Indexable[A]): F[Unit] =
client.execute(indexInto(index.name).source(obj).id(id.value)).void

def storeBulk(index: Index, objs: List[(String, JsonObject)]): F[Unit] =
def storeBulk[A](index: Index, objs: Seq[(String, A)])(implicit indexable: Indexable[A]): F[Unit] =
if (objs.isEmpty) ().pure[F]
else
client.execute {
ElasticDsl.bulk {
objs.map { case (id, obj) =>
indexInto(index.name).source(obj.json).id(id)
indexInto(index.name).source(obj).id(id)
}
}
}.void
Expand Down
13 changes: 9 additions & 4 deletions modules/e2e/src/test/scala/CompatSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import play.api.libs.ws.*
import play.api.libs.ws.ahc.*
import lila.search.spec.{ Query, Index as SpecIndex }
import scala.concurrent.ExecutionContext.Implicits.*
import com.sksamuel.elastic4s.Indexable

object CompatSuite extends weaver.IOSuite:

Expand Down Expand Up @@ -56,13 +57,17 @@ object CompatSuite extends weaver.IOSuite:

def fakeClient: ESClient[IO] = new ESClient[IO]:

override def putMapping(index: Index, fields: Seq[ElasticField]): IO[Unit] = IO.unit
override def store[A](index: lila.search.Index, id: Id, obj: A)(implicit
indexable: Indexable[A]
): IO[Unit] = IO.unit

override def refreshIndex(index: Index): IO[Unit] = IO.unit
override def storeBulk[A](index: lila.search.Index, objs: Seq[(String, A)])(implicit
indexable: Indexable[A]
): IO[Unit] = IO.unit

override def storeBulk(index: Index, objs: List[(String, JsonObject)]): IO[Unit] = IO.unit
override def putMapping(index: Index, fields: Seq[ElasticField]): IO[Unit] = IO.unit

override def store(index: Index, id: Id, obj: JsonObject): IO[Unit] = IO.unit
override def refreshIndex(index: Index): IO[Unit] = IO.unit

override def deleteOne(index: Index, id: Id): IO[Unit] = IO.unit

Expand Down
11 changes: 6 additions & 5 deletions play/app/src/main/scala/controllers/WebApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ import play.api.libs.json._
import play.api.mvc._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import com.sksamuel.elastic4s.Indexable

class WebApi @Inject() (cc: ControllerComponents, client: ESClient[Future])(implicit ec: ExecutionContext)
extends AbstractController(cc) {

implicit val indexableJsValue: Indexable[JsValue] = (t: JsValue) => Json.stringify(t)
implicit val indexableJsObject: Indexable[JsObject] = (t: JsObject) => Json.stringify(t)

def store(index: String, id: String) =
JsObjectBody { obj =>
client.store(Index(index), Id(id), JsonObject(Json.stringify(obj))).inject(Ok(s"inserted $index/$id"))
client.store(Index(index), Id(id), obj).inject(Ok(s"inserted $index/$id"))
}

def deleteById(index: String, id: String) =
Expand Down Expand Up @@ -64,11 +68,8 @@ class WebApi @Inject() (cc: ControllerComponents, client: ESClient[Future])(impl

def storeBulk(index: String) =
JsObjectBody { objs =>
val jsonObjs = objs.fields.collect { case (id, obj) =>
(id, JsonObject(Json.stringify(obj)))
}.toList
Chronometer(s"bulk ${objs.fields.size} $index") {
client.storeBulk(Index(index), jsonObjs).map { _ =>
client.storeBulk(Index(index), objs.fields.toList).map { _ =>
Ok("thx")
}
}
Expand Down

0 comments on commit 2a3eadb

Please sign in to comment.