From 1e6ea1ed940d86f9356ab63a81deff95c2f089aa Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Thu, 7 Nov 2024 13:13:37 -0600 Subject: [PATCH] Added (Async)Query.process methods --- .../scala/lightdb/async/AsyncCollection.scala | 2 +- .../main/scala/lightdb/async/AsyncQuery.scala | 17 ++++++++++++++++- core/src/main/scala/lightdb/Query.scala | 12 ++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/async/src/main/scala/lightdb/async/AsyncCollection.scala b/async/src/main/scala/lightdb/async/AsyncCollection.scala index f7b6bcff..a908d13f 100644 --- a/async/src/main/scala/lightdb/async/AsyncCollection.scala +++ b/async/src/main/scala/lightdb/async/AsyncCollection.scala @@ -90,7 +90,7 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un def list(implicit transaction: Transaction[Doc]): IO[List[Doc]] = stream.compile.toList - def query: AsyncQuery[Doc, Model] = AsyncQuery(underlying) + def query: AsyncQuery[Doc, Model] = AsyncQuery(this) def truncate()(implicit transaction: Transaction[Doc]): IO[Int] = IO.blocking(underlying.truncate()) diff --git a/async/src/main/scala/lightdb/async/AsyncQuery.scala b/async/src/main/scala/lightdb/async/AsyncQuery.scala index b5257b29..efb16580 100644 --- a/async/src/main/scala/lightdb/async/AsyncQuery.scala +++ b/async/src/main/scala/lightdb/async/AsyncQuery.scala @@ -17,7 +17,7 @@ import lightdb.store.Conversion import lightdb.transaction.Transaction import lightdb.util.GroupedIterator -case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: Collection[Doc, Model], +case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCollection: AsyncCollection[Doc, Model], filter: Option[Filter[Doc]] = None, sort: List[Sort] = Nil, offset: Int = 0, @@ -26,6 +26,8 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collect scoreDocs: Boolean = false, minDocScore: Option[Double] = None, facets: List[FacetQuery[Doc]] = Nil) { query => + protected def collection: Collection[Doc, Model] = asyncCollection.underlying + def toQuery: Query[Doc, Model] = Query[Doc, Model](collection, filter, sort, offset, limit, countTotal, scoreDocs, minDocScore, facets) def scored: AsyncQuery[Doc, Model] = copy(scoreDocs = true) @@ -217,6 +219,19 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collect apply(Conversion.Distance(f(collection.model), from, sort, radius)) } + def process(establishLock: Boolean = true) + (f: Doc => IO[Doc]) + (implicit transaction: Transaction[Doc]): IO[Int] = stream + .docs + .evalMap { doc => + asyncCollection.withLock(doc._id, IO.pure(Some(doc)), establishLock) { current => + f(current.getOrElse(doc)).map(Some.apply) + } + } + .compile + .count + .map(_.toInt) + def toList(implicit transaction: Transaction[Doc]): IO[List[Doc]] = stream.docs.compile.toList def first(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = stream.docs.take(1).compile.last diff --git a/core/src/main/scala/lightdb/Query.scala b/core/src/main/scala/lightdb/Query.scala index 359e5bbf..0ff60c8d 100644 --- a/core/src/main/scala/lightdb/Query.scala +++ b/core/src/main/scala/lightdb/Query.scala @@ -140,6 +140,18 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: } } + def process(establishLock: Boolean = true) + (f: Doc => Doc) + (implicit transaction: Transaction[Doc]): Unit = if (establishLock) { + search.docs.iterator.foreach { doc => + collection.lock(doc._id, Some(doc)) { current => + Some(f(current.getOrElse(doc))) + } + } + } else { + search.docs.iterator.foreach(f) + } + def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = search.docs.iterator def toList(implicit transaction: Transaction[Doc]): List[Doc] = search.docs.list