From 1ac9bc493bfdee02a180315a1ba6dcc9429564f3 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Wed, 18 Dec 2024 20:59:09 -0600 Subject: [PATCH 1/2] Migrated async from cats-effect and fs2 to rapid --- .../test/scala/spec/AbstractAsyncSpec.scala | 69 ++++++------ all/src/test/scala/spec/AirportSpec.scala | 2 - .../lightdb/async/AsyncAggregateQuery.scala | 12 +- .../scala/lightdb/async/AsyncCollection.scala | 84 +++++++------- .../lightdb/async/AsyncDatabaseUpgrade.scala | 4 +- .../scala/lightdb/async/AsyncLightDB.scala | 23 ++-- .../main/scala/lightdb/async/AsyncQuery.scala | 106 +++++++++--------- .../lightdb/async/AsyncSearchResults.scala | 10 +- .../lightdb/async/AsyncStoredValue.scala | 10 +- .../async/AsyncTransactionConvenience.scala | 34 +++--- .../scala/benchmark/FlushingBacklog.scala | 18 +-- .../src/main/scala/benchmark/IOIterator.scala | 8 +- .../imdb/BenchmarkImplementation.scala | 18 +-- .../scala/benchmark/imdb/IMDBBenchmark.scala | 40 +++---- .../imdb/LightDBImplementation.scala | 20 ++-- .../imdb/MariaDBImplementation.scala | 22 ++-- .../imdb/MongoDBImplementation.scala | 22 ++-- .../imdb/PostgresImplementation.scala | 22 ++-- .../benchmark/imdb/SQLiteImplementation.scala | 22 ++-- .../imdb/ScarangoImplementation.scala | 22 ++-- build.sbt | 12 +- 21 files changed, 283 insertions(+), 297 deletions(-) diff --git a/all/src/test/scala/spec/AbstractAsyncSpec.scala b/all/src/test/scala/spec/AbstractAsyncSpec.scala index a616ce60..a21c1ad4 100644 --- a/all/src/test/scala/spec/AbstractAsyncSpec.scala +++ b/all/src/test/scala/spec/AbstractAsyncSpec.scala @@ -1,7 +1,5 @@ package spec -import cats.effect.IO -import cats.effect.testing.scalatest.AsyncIOSpec import fabric.rw._ import lightdb.async.{AsyncCollection, AsyncDatabaseUpgrade, AsyncLightDB, AsyncStoredValue} import lightdb.collection.Collection @@ -13,10 +11,11 @@ import lightdb.{Id, LightDB, Sort, StoredValue} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.{AnyWordSpec, AsyncWordSpec} import perfolation.double2Implicits +import rapid.Task import java.nio.file.Path -abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Matchers { spec => +abstract class AbstractAsyncSpec extends AnyWordSpec with Matchers { spec => protected def aggregationSupported: Boolean = true private val adam = Person("Adam", 21, Person.id("adam")) @@ -61,46 +60,46 @@ abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Mat specName should { "initialize the database" in { - db.init().map(b => b should be(true)) + db.init().map(b => b should be(true)).sync() } "verify the database is empty" in { db.people.transaction { implicit transaction => db.people.count.map(c => c should be(0)) - } + }.sync() } "insert the records" in { db.people.transaction { implicit transaction => db.people.insert(names).map(_ should not be None) - } + }.sync() } "retrieve the first record by _id -> id" in { db.people.transaction { implicit transaction => db.people(_._id -> adam._id).map(_ should be(adam)) - } + }.sync() } "retrieve the first record by id" in { db.people.transaction { implicit transaction => db.people(adam._id).map(_ should be(adam)) - } + }.sync() } "count the records in the database" in { db.people.transaction { implicit transaction => db.people.count.map(_ should be(26)) - } + }.sync() } "stream the ids in the database" in { db.people.transaction { implicit transaction => - db.people.query.search.id.flatMap(_.stream.compile.toList).map(_.toSet).map { ids => + db.people.query.search.id.flatMap(_.stream.toList).map(_.toSet).map { ids => ids should be(names.map(_._id).toSet) } - } + }.sync() } "stream the records in the database" in { db.people.transaction { implicit transaction => - db.people.stream.compile.toList.map(_.map(_.age).toSet).map { ages => + db.people.stream.toList.map(_.map(_.age).toSet).map { ages => ages should be(Set(101, 42, 89, 102, 53, 13, 2, 22, 12, 81, 35, 63, 99, 23, 30, 4, 21, 33, 11, 72, 15, 62)) } - } + }.sync() } "query with aggregate functions" in { if (aggregationSupported) { @@ -119,28 +118,28 @@ abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Mat list.map(m => m(_.age.avg).f(f = 6)).toSet should be(Set("41.807692")) list.map(m => m(_.age.sum)).toSet should be(Set(1087)) } - } + }.sync() } else { succeed } } "search by age range" in { db.people.transaction { implicit transaction => - db.people.query.filter(_.age BETWEEN 19 -> 22).search.id.flatMap(_.stream.compile.toList).map { ids => + db.people.query.filter(_.age BETWEEN 19 -> 22).search.id.flatMap(_.stream.toList).map { ids => ids.toSet should be(Set(adam._id, nancy._id, oscar._id, uba._id)) } - } + }.sync() } "sort by age" in { db.people.transaction { implicit transaction => - db.people.query.sort(Sort.ByField(Person.age).descending).search.docs.flatMap(_.stream.compile.toList).map { people => + db.people.query.sort(Sort.ByField(Person.age).descending).search.docs.flatMap(_.stream.toList).map { people => people.map(_.name).take(3) should be(List("Ruth", "Zoey", "Quintin")) } - } + }.sync() } "group by age" in { db.people.transaction { implicit transaction => - db.people.query.grouped(_.age).compile.toList.map { list => + db.people.query.grouped(_.age).toList.map { list => list.map(_._1) should be(List(2, 4, 11, 12, 13, 15, 21, 22, 23, 30, 33, 35, 42, 53, 62, 63, 72, 81, 89, 99, 101, 102)) list.map(_._2.map(_.name).toSet) should be(List( Set("Penny"), Set("Jenna"), Set("Brenda"), Set("Greg"), Set("Veronica"), Set("Diana"), @@ -149,7 +148,7 @@ abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Mat Set("Quintin"), Set("Zoey"), Set("Ruth") )) } - } + }.sync() } "delete some records" in { db.people.transaction { implicit transaction => @@ -157,59 +156,59 @@ abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Mat b1 <- db.people.delete(_._id -> linda._id) b2 <- db.people.delete(_._id -> yuri._id) } yield (b1, b2) should be((true, true)) - } + }.sync() } "verify the records were deleted" in { db.people.transaction { implicit transaction => db.people.count.map(_ should be(24)) - } + }.sync() } "modify a record" in { db.people.transaction { implicit transaction => db.people.modify(adam._id) { - case Some(p) => IO(Some(p.copy(name = "Allan"))) + case Some(p) => Task(Some(p.copy(name = "Allan"))) case None => fail("Adam was not found!") } }.map { case Some(p) => p.name should be("Allan") case None => fail("Allan was not returned!") - } + }.sync() } "verify the record has been renamed" in { db.people.transaction { implicit transaction => db.people(_._id -> adam._id).map(_.name should be("Allan")) - } + }.sync() } "verify start time has been set" in { - db.startTime.get.map(_ should be > 0L) + db.startTime.get.map(_ should be > 0L).sync() } "dispose the database before creating a new instance" in { - db.dispose().map(_ should be(true)) + db.dispose().map(_ should be(true)).sync() } "prepare a new instance" in { db = new DB - db.init().map(_ should be(true)) + db.init().map(_ should be(true)).sync() } "query the database to verify records were persisted properly" in { db.people.transaction { implicit transaction => - db.people.stream.compile.toList.map(_.map(_.name).toSet).map(_ should be(Set( + db.people.stream.toList.map(_.map(_.name).toSet).map(_ should be(Set( "Tori", "Ruth", "Nancy", "Jenna", "Hanna", "Wyatt", "Diana", "Ian", "Quintin", "Uba", "Oscar", "Kevin", "Penny", "Charlie", "Evan", "Sam", "Mike", "Brenda", "Zoey", "Allan", "Xena", "Fiona", "Greg", "Veronica" ))) - } + }.sync() } "truncate the collection" in { db.people.transaction { implicit transaction => db.people.truncate().map(_ should be(24)) - } + }.sync() } "verify the collection is empty" in { db.people.transaction { implicit transaction => db.people.count.map(_ should be(0)) - } + }.sync() } "dispose the database" in { - db.dispose().map(_ should be(true)) + db.dispose().map(_ should be(true)).sync() } } @@ -248,8 +247,8 @@ abstract class AbstractAsyncSpec extends AsyncWordSpec with AsyncIOSpec with Mat override def alwaysRun: Boolean = false - override def upgrade(ldb: AsyncLightDB): IO[Unit] = { - db.startTime.set(System.currentTimeMillis()).void + override def upgrade(ldb: AsyncLightDB): Task[Unit] = { + db.startTime.set(System.currentTimeMillis()).unit } } } diff --git a/all/src/test/scala/spec/AirportSpec.scala b/all/src/test/scala/spec/AirportSpec.scala index 8a8b7144..0b01486d 100644 --- a/all/src/test/scala/spec/AirportSpec.scala +++ b/all/src/test/scala/spec/AirportSpec.scala @@ -1,7 +1,5 @@ package spec -import cats.effect.IO -import cats.effect.testing.scalatest.AsyncIOSpec import fabric.rw._ import lightdb.collection.Collection import lightdb.doc.{Document, DocumentModel, JsonConversion} diff --git a/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala b/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala index cbe451bc..94617fb5 100644 --- a/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala +++ b/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala @@ -1,11 +1,11 @@ package lightdb.async -import cats.effect.IO import lightdb.aggregate.{AggregateFilter, AggregateFunction, AggregateQuery} import lightdb.doc.{Document, DocumentModel} import lightdb.materialized.MaterializedAggregate import lightdb.transaction.Transaction import lightdb.{Query, SortDirection} +import rapid.Task case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](query: Query[Doc, Model], functions: List[AggregateFunction[_, _, Doc]], @@ -45,13 +45,11 @@ case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc] sort = sort ) - def count(implicit transaction: Transaction[Doc]): IO[Int] = - IO.blocking(query.store.aggregateCount(aggregateQuery)) + def count(implicit transaction: Transaction[Doc]): Task[Int] = Task(query.store.aggregateCount(aggregateQuery)) - def stream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, MaterializedAggregate[Doc, Model]] = { - val iterator = query.store.aggregate(aggregateQuery) - fs2.Stream.fromBlockingIterator[IO](iterator, 100) + def stream(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] = { + rapid.Stream.fromIterator(Task(query.store.aggregate(aggregateQuery))) } - def toList(implicit transaction: Transaction[Doc]): IO[List[MaterializedAggregate[Doc, Model]]] = stream.compile.toList + def toList(implicit transaction: Transaction[Doc]): Task[List[MaterializedAggregate[Doc, Model]]] = stream.toList } diff --git a/async/src/main/scala/lightdb/async/AsyncCollection.scala b/async/src/main/scala/lightdb/async/AsyncCollection.scala index 0ea31b2f..66b18901 100644 --- a/async/src/main/scala/lightdb/async/AsyncCollection.scala +++ b/async/src/main/scala/lightdb/async/AsyncCollection.scala @@ -1,16 +1,18 @@ package lightdb.async -import cats.effect.IO import lightdb._ import lightdb.field.Field._ import lightdb.collection.Collection import lightdb.doc.{Document, DocumentModel} import lightdb.transaction.Transaction +import rapid.Task + +import scala.util.{Failure, Success} case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](underlying: Collection[Doc, Model]) extends AnyVal { - def transaction[Return](f: Transaction[Doc] => IO[Return]): IO[Return] = { + def transaction[Return](f: Transaction[Doc] => Task[Return]): Task[Return] = { val transaction = underlying.transaction.create() - f(transaction).guarantee(IO { + f(transaction).guarantee(Task { underlying.transaction.release(transaction) }) } @@ -20,41 +22,41 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un */ def t: AsyncTransactionConvenience[Doc, Model] = AsyncTransactionConvenience(this) - def insert(doc: Doc)(implicit transaction: Transaction[Doc]): IO[Doc] = IO.blocking(underlying.insert(doc)) + def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task(underlying.insert(doc)) - def insert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): IO[Seq[Doc]] = IO.blocking(underlying.insert(docs)) + def insert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): Task[Seq[Doc]] = Task(underlying.insert(docs)) - def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): IO[Doc] = IO.blocking(underlying.upsert(doc)) + def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task(underlying.upsert(doc)) - def upsert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): IO[Seq[Doc]] = IO.blocking(underlying.upsert(docs)) + def upsert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): Task[Seq[Doc]] = Task(underlying.upsert(docs)) - def get[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = - IO.blocking(underlying.get(f)) + def get[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = + Task(underlying.get(f)) - def apply[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): IO[Doc] = - IO.blocking(underlying(f)) + def apply[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Task[Doc] = + Task(underlying(f)) - def get(id: Id[Doc])(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = - IO.blocking(underlying.get(id)) + def get(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = + Task(underlying.get(id)) - def getAll(ids: Seq[Id[Doc]])(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Doc] = - fs2.Stream.fromBlockingIterator[IO](underlying.getAll(ids), 512) + def getAll(ids: Seq[Id[Doc]])(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = + rapid.Stream.fromIterator(Task(underlying.getAll(ids))) - def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): IO[Doc] = - IO.blocking(underlying(id)) + def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Doc] = + Task(underlying(id)) - def withLock(id: Id[Doc], doc: IO[Option[Doc]], establishLock: Boolean = true) - (f: Option[Doc] => IO[Option[Doc]]): IO[Option[Doc]] = if (establishLock) { + def withLock(id: Id[Doc], doc: Task[Option[Doc]], establishLock: Boolean = true) + (f: Option[Doc] => Task[Option[Doc]]): Task[Option[Doc]] = if (establishLock) { doc.map(d => if (establishLock) underlying.lock.acquire(id, d) else d).flatMap { existing => f(existing) .attempt .flatMap { - case Left(err) => - if (establishLock) underlying.lock.release(id, existing) - IO.raiseError(err) - case Right(modified) => + case Success(modified) => if (establishLock) underlying.lock.release(id, modified) - IO.pure(modified) + Task.pure(modified) + case Failure(err) => + if (establishLock) underlying.lock.release(id, existing) + Task.error(err) } } } else { @@ -62,39 +64,39 @@ case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](un } def modify(id: Id[Doc], establishLock: Boolean = true, deleteOnNone: Boolean = false) - (f: Option[Doc] => IO[Option[Doc]]) - (implicit transaction: Transaction[Doc]): IO[Option[Doc]] = withLock(id, get(id), establishLock) { existing => + (f: Option[Doc] => Task[Option[Doc]]) + (implicit transaction: Transaction[Doc]): Task[Option[Doc]] = withLock(id, get(id), establishLock) { existing => f(existing).flatMap { case Some(doc) => upsert(doc).map(doc => Some(doc)) case None if deleteOnNone => delete(id).map(_ => None) - case None => IO.pure(None) + case None => Task.pure(None) } } - def getOrCreate(id: Id[Doc], create: => IO[Doc], lock: Boolean = true) - (implicit transaction: Transaction[Doc]): IO[Doc] = modify(id, establishLock = lock) { - case Some(doc) => IO.pure(Some(doc)) + def getOrCreate(id: Id[Doc], create: => Task[Doc], lock: Boolean = true) + (implicit transaction: Transaction[Doc]): Task[Doc] = modify(id, establishLock = lock) { + case Some(doc) => Task.pure(Some(doc)) case None => create.map(Some.apply) }.map(_.get) - def delete[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): IO[Boolean] = - IO.blocking(underlying.delete(f)) + def delete[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Task[Boolean] = + Task(underlying.delete(f)) - def delete(id: Id[Doc])(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): IO[Boolean] = - IO.blocking(underlying.delete(id)) + def delete(id: Id[Doc])(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): Task[Boolean] = + Task(underlying.delete(id)) - def count(implicit transaction: Transaction[Doc]): IO[Int] = IO.blocking(underlying.count) + def count(implicit transaction: Transaction[Doc]): Task[Int] = Task(underlying.count) - def stream(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Doc] = fs2.Stream - .fromBlockingIterator[IO](underlying.iterator, 512) + def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = + rapid.Stream.fromIterator(Task(underlying.iterator)) - def list(implicit transaction: Transaction[Doc]): IO[List[Doc]] = stream.compile.toList + def list(implicit transaction: Transaction[Doc]): Task[List[Doc]] = stream.toList def query: AsyncQuery[Doc, Model] = AsyncQuery(this) - def truncate()(implicit transaction: Transaction[Doc]): IO[Int] = IO.blocking(underlying.truncate()) + def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = Task(underlying.truncate()) - def reIndex(): IO[Boolean] = IO.blocking(underlying.reIndex()) + def reIndex(): Task[Boolean] = Task(underlying.reIndex()) - def dispose(): IO[Unit] = IO.blocking(underlying.dispose()) + def dispose(): Task[Unit] = Task(underlying.dispose()) } \ No newline at end of file diff --git a/async/src/main/scala/lightdb/async/AsyncDatabaseUpgrade.scala b/async/src/main/scala/lightdb/async/AsyncDatabaseUpgrade.scala index 5804f244..42a7ba69 100644 --- a/async/src/main/scala/lightdb/async/AsyncDatabaseUpgrade.scala +++ b/async/src/main/scala/lightdb/async/AsyncDatabaseUpgrade.scala @@ -1,6 +1,6 @@ package lightdb.async -import cats.effect.IO +import rapid.Task trait AsyncDatabaseUpgrade { def label: String = getClass.getSimpleName.replace("$", "") @@ -8,5 +8,5 @@ trait AsyncDatabaseUpgrade { def blockStartup: Boolean def alwaysRun: Boolean - def upgrade(db: AsyncLightDB): IO[Unit] + def upgrade(db: AsyncLightDB): Task[Unit] } \ No newline at end of file diff --git a/async/src/main/scala/lightdb/async/AsyncLightDB.scala b/async/src/main/scala/lightdb/async/AsyncLightDB.scala index 6dd114cb..617f7242 100644 --- a/async/src/main/scala/lightdb/async/AsyncLightDB.scala +++ b/async/src/main/scala/lightdb/async/AsyncLightDB.scala @@ -1,7 +1,5 @@ package lightdb.async -import cats.effect.IO -import cats.effect.unsafe.implicits.global import fabric.rw.RW import lightdb.collection.Collection import lightdb.doc.{Document, DocumentModel} @@ -9,6 +7,7 @@ import lightdb.feature.{DBFeatureKey, FeatureSupport} import lightdb.{KeyValue, LightDB, Persistence, StoredValue} import lightdb.store.{Store, StoreManager} import lightdb.upgrade.DatabaseUpgrade +import rapid.Task import java.nio.file.Path @@ -32,7 +31,7 @@ trait AsyncLightDB extends FeatureSupport[DBFeatureKey] { db => override def alwaysRun: Boolean = u.alwaysRun - override def upgrade(ldb: LightDB): Unit = u.upgrade(db).unsafeRunSync() + override def upgrade(ldb: LightDB): Unit = u.upgrade(db).sync() } } } @@ -72,13 +71,11 @@ trait AsyncLightDB extends FeatureSupport[DBFeatureKey] { db => storeManager: Option[StoreManager] = None): AsyncCollection[Doc, Model] = AsyncCollection(underlying.collection[Doc, Model](model, name, storeManager)) - def reIndex(): IO[Int] = fs2.Stream(underlying.collections: _*) - .covary[IO] - .parEvalMap(32)(c => AsyncCollection[KeyValue, KeyValue.type](c.asInstanceOf[Collection[KeyValue, KeyValue.type]]).reIndex()) - .filter(identity) - .compile + def reIndex(): Task[Int] = rapid.Stream.emits(underlying.collections) + .par(maxThreads = 32) { collection => + AsyncCollection[KeyValue, KeyValue.type](collection.asInstanceOf[Collection[KeyValue, KeyValue.type]]).reIndex() + } .count - .map(_.toInt) object stored { def apply[T](key: String, @@ -102,14 +99,14 @@ trait AsyncLightDB extends FeatureSupport[DBFeatureKey] { db => )) } - final def init(): IO[Boolean] = IO.blocking(underlying.init()).flatMap { + final def init(): Task[Boolean] = Task(underlying.init()).flatMap { case true => initialize().map(_ => true) - case false => IO.pure(false) + case false => Task.pure(false) } - protected def initialize(): IO[Unit] = IO.unit + protected def initialize(): Task[Unit] = Task.unit - def dispose(): IO[Boolean] = IO.blocking { + def dispose(): Task[Boolean] = Task { if (underlying.disposed) { false } else { diff --git a/async/src/main/scala/lightdb/async/AsyncQuery.scala b/async/src/main/scala/lightdb/async/AsyncQuery.scala index c1fc7651..1dde984c 100644 --- a/async/src/main/scala/lightdb/async/AsyncQuery.scala +++ b/async/src/main/scala/lightdb/async/AsyncQuery.scala @@ -1,6 +1,5 @@ package lightdb.async -import cats.effect.IO import fabric.Json import lightdb.aggregate.AggregateFunction import lightdb._ @@ -16,6 +15,7 @@ import lightdb.spatial.{DistanceAndDoc, Geo} import lightdb.store.Conversion import lightdb.transaction.Transaction import lightdb.util.GroupedIterator +import rapid.Task case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCollection: AsyncCollection[Doc, Model], filter: Option[Filter[Doc]] = None, @@ -80,37 +80,37 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo object stream { object scored { def apply[V](conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): fs2.Stream[IO, (V, Double)] = { + (implicit transaction: Transaction[Doc]): rapid.Stream[(V, Double)] = { val io = search(conversion) .map(_.scoredStream) - fs2.Stream.force(io) + rapid.Stream.force(io) } - def docs(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (Doc, Double)] = apply(Conversion.Doc()) + def docs(implicit transaction: Transaction[Doc]): rapid.Stream[(Doc, Double)] = apply(Conversion.Doc()) def value[F](f: Model => Field[Doc, F]) - (implicit transaction: Transaction[Doc]): fs2.Stream[IO, (F, Double)] = + (implicit transaction: Transaction[Doc]): rapid.Stream[(F, Double)] = apply(Conversion.Value(f(collection.model))) - def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): fs2.Stream[IO, (Id[Doc], Double)] = + def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): rapid.Stream[(Id[Doc], Double)] = value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) - def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (Json, Double)] = + def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): rapid.Stream[(Json, Double)] = apply(Conversion.Json(f(collection.model))) - def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (T, Double)] = + def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): rapid.Stream[(T, Double)] = apply(Conversion.Converted(f)) def materialized(f: Model => List[Field[Doc, _]]) - (implicit transaction: Transaction[Doc]): fs2.Stream[IO, (MaterializedIndex[Doc, Model], Double)] = + (implicit transaction: Transaction[Doc]): rapid.Stream[(MaterializedIndex[Doc, Model], Double)] = apply(Conversion.Materialized[Doc, Model](f(collection.model))) - def indexes()(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (MaterializedIndex[Doc, Model], Double)] = { + def indexes()(implicit transaction: Transaction[Doc]): rapid.Stream[(MaterializedIndex[Doc, Model], Double)] = { val fields = collection.model.fields.filter(_.indexed) apply(Conversion.Materialized[Doc, Model](fields)) } - def docAndIndexes()(implicit transaction: Transaction[Doc]): fs2.Stream[IO, (MaterializedAndDoc[Doc, Model], Double)] = { + def docAndIndexes()(implicit transaction: Transaction[Doc]): rapid.Stream[(MaterializedAndDoc[Doc, Model], Double)] = { apply(Conversion.DocAndIndexes[Doc, Model]()) } @@ -118,42 +118,42 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo from: Geo.Point, sort: Boolean = true, radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): fs2.Stream[IO, (DistanceAndDoc[Doc], Double)] = + (implicit transaction: Transaction[Doc]): rapid.Stream[(DistanceAndDoc[Doc], Double)] = apply(Conversion.Distance(f(collection.model), from, sort, radius)) } def apply[V](conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): fs2.Stream[IO, V] = { - val io = search(conversion) + (implicit transaction: Transaction[Doc]): rapid.Stream[V] = { + val task = search(conversion) .map(_.stream) - fs2.Stream.force(io) + rapid.Stream.force(task) } - def docs(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Doc] = apply(Conversion.Doc()) + def docs(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = apply(Conversion.Doc()) def value[F](f: Model => Field[Doc, F]) - (implicit transaction: Transaction[Doc]): fs2.Stream[IO, F] = + (implicit transaction: Transaction[Doc]): rapid.Stream[F] = apply(Conversion.Value(f(collection.model))) - def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): fs2.Stream[IO, Id[Doc]] = + def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): rapid.Stream[Id[Doc]] = value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) - def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): fs2.Stream[IO, Json] = + def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): rapid.Stream[Json] = apply(Conversion.Json(f(collection.model))) - def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): fs2.Stream[IO, T] = + def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): rapid.Stream[T] = apply(Conversion.Converted(f)) def materialized(f: Model => List[Field[Doc, _]]) - (implicit transaction: Transaction[Doc]): fs2.Stream[IO, MaterializedIndex[Doc, Model]] = + (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedIndex[Doc, Model]] = apply(Conversion.Materialized[Doc, Model](f(collection.model))) - def indexes()(implicit transaction: Transaction[Doc]): fs2.Stream[IO, MaterializedIndex[Doc, Model]] = { + def indexes()(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedIndex[Doc, Model]] = { val fields = collection.model.fields.filter(_.indexed) apply(Conversion.Materialized[Doc, Model](fields)) } - def docAndIndexes()(implicit transaction: Transaction[Doc]): fs2.Stream[IO, MaterializedAndDoc[Doc, Model]] = { + def docAndIndexes()(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAndDoc[Doc, Model]] = { apply(Conversion.DocAndIndexes[Doc, Model]()) } @@ -161,14 +161,14 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo from: Geo.Point, sort: Boolean = true, radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): fs2.Stream[IO, DistanceAndDoc[Doc]] = + (implicit transaction: Transaction[Doc]): rapid.Stream[DistanceAndDoc[Doc]] = apply(Conversion.Distance(f(collection.model), from, sort, radius)) } object search { def apply[V](conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Model, V]] = - IO.blocking(collection.store.doSearch( + (implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, V]] = + Task(collection.store.doSearch( query = toQuery, conversion = conversion )).map { searchResults => @@ -177,37 +177,37 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo offset = searchResults.offset, limit = searchResults.limit, total = searchResults.total, - scoredStream = fs2.Stream.fromBlockingIterator[IO](searchResults.iteratorWithScore, 512), + scoredStream = rapid.Stream.fromIterator(Task(searchResults.iteratorWithScore)), facetResults = searchResults.facetResults, transaction = transaction ) } - def docs(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Model, Doc]] = apply(Conversion.Doc()) + def docs(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, Doc]] = apply(Conversion.Doc()) def value[F](f: Model => Field[Doc, F]) - (implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Model, F]] = + (implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, F]] = apply(Conversion.Value(f(collection.model))) - def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): IO[AsyncSearchResults[Doc, Model, Id[Doc]]] = + def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): Task[AsyncSearchResults[Doc, Model, Id[Doc]]] = value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) - def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Model, Json]] = + def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, Json]] = apply(Conversion.Json(f(collection.model))) - def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Model, T]] = + def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, T]] = apply(Conversion.Converted(f)) def materialized(f: Model => List[Field[Doc, _]]) - (implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Model, MaterializedIndex[Doc, Model]]] = + (implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, MaterializedIndex[Doc, Model]]] = apply(Conversion.Materialized(f(collection.model))) - def indexes()(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Model, MaterializedIndex[Doc, Model]]] = { + def indexes()(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, MaterializedIndex[Doc, Model]]] = { val fields = collection.model.fields.filter(_.indexed) apply(Conversion.Materialized(fields)) } - def docAndIndexes()(implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Model, MaterializedAndDoc[Doc, Model]]] = { + def docAndIndexes()(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, MaterializedAndDoc[Doc, Model]]] = { apply(Conversion.DocAndIndexes()) } @@ -215,7 +215,7 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo from: Geo.Point, sort: Boolean = true, radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): IO[AsyncSearchResults[Doc, Model, DistanceAndDoc[Doc]]] = + (implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, DistanceAndDoc[Doc]]] = apply(Conversion.Distance(f(collection.model), from, sort, radius)) } @@ -233,40 +233,38 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo deleteOnNone: Boolean = true, safeModify: Boolean = true, maxConcurrent: Int = 1) - (f: Doc => IO[Option[Doc]]) - (implicit transaction: Transaction[Doc]): IO[Int] = stream + (f: Doc => Task[Option[Doc]]) + (implicit transaction: Transaction[Doc]): Task[Int] = stream .docs - .parEvalMap(maxConcurrent) { doc => + .par(maxThreads = maxConcurrent) { doc => if (safeModify) { asyncCollection.modify(doc._id, establishLock, deleteOnNone) { case Some(doc) => f(doc) - case None => IO.pure(None) + case None => Task.pure(None) } } else { - asyncCollection.withLock(doc._id, IO.pure(Some(doc)), establishLock) { current => - val io = current match { + asyncCollection.withLock(doc._id, Task.pure(Some(doc)), establishLock) { current => + val io: Task[Option[Doc]] = current match { case Some(doc) => f(doc) - case None => IO.pure(None) + case None => Task.pure(None) } io.flatTap { case Some(modified) if !current.contains(modified) => asyncCollection.upsert(modified) case None if deleteOnNone => asyncCollection.delete(doc._id) - case _ => IO.unit + case _ => Task.unit } } } } - .compile .count - .map(_.toInt) - def toList(implicit transaction: Transaction[Doc]): IO[List[Doc]] = stream.docs.compile.toList + def toList(implicit transaction: Transaction[Doc]): Task[List[Doc]] = stream.docs.toList - def first(implicit transaction: Transaction[Doc]): IO[Option[Doc]] = stream.docs.take(1).compile.last + def first(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = stream.docs.take(1).lastOption - def one(implicit transaction: Transaction[Doc]): IO[Doc] = stream.docs.take(1).compile.lastOrError + def one(implicit transaction: Transaction[Doc]): Task[Doc] = stream.docs.take(1).last - def count(implicit transaction: Transaction[Doc]): IO[Int] = copy(limit = Some(1), countTotal = true) + def count(implicit transaction: Transaction[Doc]): Task[Int] = copy(limit = Some(1), countTotal = true) .search.docs.map(_.total.get) def aggregate(f: Model => List[AggregateFunction[_, _, Doc]]): AsyncAggregateQuery[Doc, Model] = @@ -274,17 +272,17 @@ case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCo def grouped[F](f: Model => Field[Doc, F], direction: SortDirection = SortDirection.Ascending) - (implicit transaction: Transaction[Doc]): fs2.Stream[IO, (F, List[Doc])] = { + (implicit transaction: Transaction[Doc]): rapid.Stream[(F, List[Doc])] = { val field = f(collection.model) val state = new IndexingState - val io = IO.blocking(sort(Sort.ByField(field, direction)) + val io = Task(sort(Sort.ByField(field, direction)) .toQuery .search .docs .iterator).map { iterator => val grouped = GroupedIterator[Doc, F](iterator, doc => field.get(doc, field, state)) - fs2.Stream.fromBlockingIterator[IO](grouped, 512) + rapid.Stream.fromIterator[(F, List[Doc])](Task(grouped)) } - fs2.Stream.force(io) + rapid.Stream.force(io) } } \ No newline at end of file diff --git a/async/src/main/scala/lightdb/async/AsyncSearchResults.scala b/async/src/main/scala/lightdb/async/AsyncSearchResults.scala index c2082102..79233951 100644 --- a/async/src/main/scala/lightdb/async/AsyncSearchResults.scala +++ b/async/src/main/scala/lightdb/async/AsyncSearchResults.scala @@ -1,23 +1,23 @@ package lightdb.async -import cats.effect.IO import lightdb.doc.{Document, DocumentModel} import lightdb.facet.FacetResult import lightdb.field.Field.FacetField import lightdb.transaction.Transaction +import rapid.Task case class AsyncSearchResults[Doc <: Document[Doc], Model <: DocumentModel[Doc], V](model: Model, offset: Int, limit: Option[Int], total: Option[Int], - scoredStream: fs2.Stream[IO, (V, Double)], + scoredStream: rapid.Stream[(V, Double)], facetResults: Map[FacetField[Doc], FacetResult], transaction: Transaction[Doc]) { - def stream: fs2.Stream[IO, V] = scoredStream.map(_._1) + def stream: rapid.Stream[V] = scoredStream.map(_._1) - def first: IO[Option[V]] = stream.take(1).compile.toList.map(_.headOption) + def first: Task[Option[V]] = stream.take(1).lastOption - def one: IO[V] = first.map { + def one: Task[V] = first.map { case Some(v) => v case None => throw new NullPointerException("No results for search") } diff --git a/async/src/main/scala/lightdb/async/AsyncStoredValue.scala b/async/src/main/scala/lightdb/async/AsyncStoredValue.scala index 5c4a176a..0a230135 100644 --- a/async/src/main/scala/lightdb/async/AsyncStoredValue.scala +++ b/async/src/main/scala/lightdb/async/AsyncStoredValue.scala @@ -1,11 +1,11 @@ package lightdb.async -import cats.effect.IO import lightdb.StoredValue +import rapid.Task case class AsyncStoredValue[T](underlying: StoredValue[T]) { - def get: IO[T] = IO.blocking(underlying.get()) - def exists: IO[Boolean] = IO.blocking(underlying.exists()) - def set(value: T): IO[T] = IO.blocking(underlying.set(value)) - def clear(): IO[Unit] = IO.blocking(underlying.clear()) + def get: Task[T] = Task(underlying.get()) + def exists: Task[Boolean] = Task(underlying.exists()) + def set(value: T): Task[T] = Task(underlying.set(value)) + def clear(): Task[Unit] = Task(underlying.clear()) } diff --git a/async/src/main/scala/lightdb/async/AsyncTransactionConvenience.scala b/async/src/main/scala/lightdb/async/AsyncTransactionConvenience.scala index 34544286..da9c1789 100644 --- a/async/src/main/scala/lightdb/async/AsyncTransactionConvenience.scala +++ b/async/src/main/scala/lightdb/async/AsyncTransactionConvenience.scala @@ -1,69 +1,69 @@ package lightdb.async -import cats.effect.IO import lightdb.doc.{Document, DocumentModel} import lightdb._ import lightdb.field.Field._ +import rapid.Task case class AsyncTransactionConvenience[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: AsyncCollection[Doc, Model]) { - def insert(doc: Doc): IO[Doc] = collection.transaction { implicit transaction => + def insert(doc: Doc): Task[Doc] = collection.transaction { implicit transaction => collection.insert(doc) } - def insert(docs: Seq[Doc]): IO[Seq[Doc]] = collection.transaction { implicit transaction => + def insert(docs: Seq[Doc]): Task[Seq[Doc]] = collection.transaction { implicit transaction => collection.insert(docs) } - def upsert(doc: Doc): IO[Doc] = collection.transaction { implicit transaction => + def upsert(doc: Doc): Task[Doc] = collection.transaction { implicit transaction => collection.upsert(doc) } - def upsert(docs: Seq[Doc]): IO[Seq[Doc]] = collection.transaction { implicit transaction => + def upsert(docs: Seq[Doc]): Task[Seq[Doc]] = collection.transaction { implicit transaction => collection.upsert(docs) } - def get[V](f: Model => (UniqueIndex[Doc, V], V)): IO[Option[Doc]] = collection.transaction { implicit transaction => + def get[V](f: Model => (UniqueIndex[Doc, V], V)): Task[Option[Doc]] = collection.transaction { implicit transaction => collection.get(f) } - def apply[V](f: Model => (UniqueIndex[Doc, V], V)): IO[Doc] = collection.transaction { implicit transaction => + def apply[V](f: Model => (UniqueIndex[Doc, V], V)): Task[Doc] = collection.transaction { implicit transaction => collection(f) } - def get(id: Id[Doc]): IO[Option[Doc]] = collection.transaction { implicit transaction => + def get(id: Id[Doc]): Task[Option[Doc]] = collection.transaction { implicit transaction => collection.get(id) } - def apply(id: Id[Doc]): IO[Doc] = collection.transaction { implicit transaction => + def apply(id: Id[Doc]): Task[Doc] = collection.transaction { implicit transaction => collection(id) } - def list: IO[List[Doc]] = collection.transaction { implicit transaction => - collection.stream.compile.toList + def list: Task[List[Doc]] = collection.transaction { implicit transaction => + collection.stream.toList } def modify(id: Id[Doc], lock: Boolean = true, deleteOnNone: Boolean = false) - (f: Option[Doc] => IO[Option[Doc]]): IO[Option[Doc]] = collection.transaction { implicit transaction => + (f: Option[Doc] => Task[Option[Doc]]): Task[Option[Doc]] = collection.transaction { implicit transaction => collection.modify(id, lock, deleteOnNone)(f) } - def getOrCreate(id: Id[Doc], create: => IO[Doc], lock: Boolean = true): IO[Doc] = collection.transaction { implicit transaction => + def getOrCreate(id: Id[Doc], create: => Task[Doc], lock: Boolean = true): Task[Doc] = collection.transaction { implicit transaction => collection.getOrCreate(id, create, lock) } - def delete[V](f: Model => (UniqueIndex[Doc, V], V)): IO[Boolean] = collection.transaction { implicit transaction => + def delete[V](f: Model => (UniqueIndex[Doc, V], V)): Task[Boolean] = collection.transaction { implicit transaction => collection.delete(f) } - def delete(id: Id[Doc])(implicit ev: Model <:< DocumentModel[_]): IO[Boolean] = collection.transaction { implicit transaction => + def delete(id: Id[Doc])(implicit ev: Model <:< DocumentModel[_]): Task[Boolean] = collection.transaction { implicit transaction => collection.delete(id) } - def count: IO[Int] = collection.transaction { implicit transaction => + def count: Task[Int] = collection.transaction { implicit transaction => collection.count } - def truncate(): IO[Int] = collection.transaction { implicit transaction => + def truncate(): Task[Int] = collection.transaction { implicit transaction => collection.truncate() } } diff --git a/benchmark/src/main/scala/benchmark/FlushingBacklog.scala b/benchmark/src/main/scala/benchmark/FlushingBacklog.scala index a65e53b3..2d7bd98d 100644 --- a/benchmark/src/main/scala/benchmark/FlushingBacklog.scala +++ b/benchmark/src/main/scala/benchmark/FlushingBacklog.scala @@ -12,7 +12,7 @@ abstract class FlushingBacklog[Key, Value](val batchSize: Int, val maxSize: Int) private val size = new AtomicInteger(0) private val flushing = new AtomicBoolean(false) - def enqueue(key: Key, value: Value): IO[Value] = IO.blocking { + def enqueue(key: Key, value: Value): Task[Value] = Task { val exists = map.put(key, value) != null var doFlush = false if (!exists) { @@ -35,10 +35,10 @@ abstract class FlushingBacklog[Key, Value](val batchSize: Int, val maxSize: Int) b } - private def waitForBuffer(): IO[Unit] = if (size.get() > maxSize) { + private def waitForBuffer(): Task[Unit] = if (size.get() > maxSize) { IO.sleep(1.second).flatMap(_ => waitForBuffer()) } else { - IO.unit + Task.unit } private def shouldFlush(): Boolean = synchronized { @@ -50,7 +50,7 @@ abstract class FlushingBacklog[Key, Value](val batchSize: Int, val maxSize: Int) } } - private def pollingStream: fs2.Stream[IO, Value] = fs2.Stream + private def pollingStream: rapid.Stream[Value] = fs2.Stream .fromBlockingIterator[IO](map.keys().asIterator().asScala, 512) .map { key => val o = Option(map.remove(key)) @@ -65,7 +65,7 @@ abstract class FlushingBacklog[Key, Value](val batchSize: Int, val maxSize: Int) } .unNone - private def prepareWrite(): IO[Unit] = pollingStream + private def prepareWrite(): Task[Unit] = pollingStream .compile .toList .flatMap { list => @@ -75,7 +75,7 @@ abstract class FlushingBacklog[Key, Value](val batchSize: Int, val maxSize: Int) flushing.set(false) } - private def writeBatched(list: List[Value]): IO[Unit] = { + private def writeBatched(list: List[Value]): Task[Unit] = { val (current, more) = list.splitAt(batchSize) val w = write(current) if (more.nonEmpty) { @@ -85,10 +85,10 @@ abstract class FlushingBacklog[Key, Value](val batchSize: Int, val maxSize: Int) } } - protected def write(list: List[Value]): IO[Unit] + protected def write(list: List[Value]): Task[Unit] - def flush(): IO[Unit] = if (map.isEmpty) { - IO.unit + def flush(): Task[Unit] = if (map.isEmpty) { + Task.unit } else { prepareWrite() } diff --git a/benchmark/src/main/scala/benchmark/IOIterator.scala b/benchmark/src/main/scala/benchmark/IOIterator.scala index 20dda037..4f7b9416 100644 --- a/benchmark/src/main/scala/benchmark/IOIterator.scala +++ b/benchmark/src/main/scala/benchmark/IOIterator.scala @@ -9,9 +9,9 @@ import scala.concurrent.{ExecutionContext, Future} trait IOIterator[T] { val running = new AtomicInteger(0) - def next(): IO[Option[T]] + def next(): Task[Option[T]] - def stream(concurrency: Int)(f: T => IO[Unit]): IO[Unit] = { + def stream(concurrency: Int)(f: T => Task[Unit]): Task[Unit] = { val ios = (0 until concurrency).toList.map { _ => running.incrementAndGet() recursiveStream(f) @@ -19,13 +19,13 @@ trait IOIterator[T] { ios.sequence.map(_ => ()) } - private def recursiveStream(f: T => IO[Unit]): IO[Unit] = next().flatMap { + private def recursiveStream(f: T => Task[Unit]): Task[Unit] = next().flatMap { case Some(t) => f(t).flatMap { _ => recursiveStream(f) } case None => { running.decrementAndGet() - IO.unit + Task.unit } } } \ No newline at end of file diff --git a/benchmark/src/main/scala/benchmark/imdb/BenchmarkImplementation.scala b/benchmark/src/main/scala/benchmark/imdb/BenchmarkImplementation.scala index 91f252d1..f34383a4 100644 --- a/benchmark/src/main/scala/benchmark/imdb/BenchmarkImplementation.scala +++ b/benchmark/src/main/scala/benchmark/imdb/BenchmarkImplementation.scala @@ -8,25 +8,25 @@ trait BenchmarkImplementation { def name: String - def init(): IO[Unit] = IO.unit + def init(): Task[Unit] = Task.unit def map2TitleAka(map: Map[String, String]): TitleAka def map2TitleBasics(map: Map[String, String]): TitleBasics - def persistTitleAka(t: TitleAka): IO[Unit] - def persistTitleBasics(t: TitleBasics): IO[Unit] + def persistTitleAka(t: TitleAka): Task[Unit] + def persistTitleBasics(t: TitleBasics): Task[Unit] - def flush(): IO[Unit] + def flush(): Task[Unit] def idFor(t: TitleAka): String def titleIdFor(t: TitleAka): String - def streamTitleAka(): fs2.Stream[IO, TitleAka] - def verifyTitleAka(): IO[Unit] - def verifyTitleBasics(): IO[Unit] + def streamTitleAka(): rapid.Stream[TitleAka] + def verifyTitleAka(): Task[Unit] + def verifyTitleBasics(): Task[Unit] - def get(id: String): IO[TitleAka] - def findByTitleId(titleId: String): IO[List[TitleAka]] + def get(id: String): Task[TitleAka] + def findByTitleId(titleId: String): Task[List[TitleAka]] private val excludeChars = Set(2.toChar) diff --git a/benchmark/src/main/scala/benchmark/imdb/IMDBBenchmark.scala b/benchmark/src/main/scala/benchmark/imdb/IMDBBenchmark.scala index 0d015a8e..bfd780e2 100644 --- a/benchmark/src/main/scala/benchmark/imdb/IMDBBenchmark.scala +++ b/benchmark/src/main/scala/benchmark/imdb/IMDBBenchmark.scala @@ -25,15 +25,15 @@ // // type TitleAka = implementation.TitleAka // -// implicit class ElapsedIO[Return](io: IO[Return]) { -// def elapsed: IO[Elapsed[Return]] = { +// implicit class ElapsedTask[Return](io: Task[Return]) { +// def elapsed: Task[Elapsed[Return]] = { // val start = System.currentTimeMillis() // io.map { r => // Elapsed(r, (System.currentTimeMillis() - start) / 1000.0) // } // } // -// def elapsedValue: IO[Double] = elapsed.map(_.elapsed) +// def elapsedValue: Task[Double] = elapsed.map(_.elapsed) // } // // case class Elapsed[Return](value: Return, elapsed: Double) @@ -78,7 +78,7 @@ // sys.exit(0) // } // -// private def process[T](file: File, map2t: Map[String, String] => T, persist: T => IO[Unit]): IO[Int] = IO.blocking { +// private def process[T](file: File, map2t: Map[String, String] => T, persist: T => Task[Unit]): Task[Int] = Task { // val reader = new BufferedReader(new FileReader(file)) // val counter = new AtomicInteger(0) // val concurrency = 32 @@ -99,7 +99,7 @@ // } // // val iterator = new IOIterator[T] { -// override def next(): IO[Option[T]] = IO.blocking { +// override def next(): Task[Option[T]] = Task { // nextLine() // }.map(_.map { line => // val values = line.split('\t').toList @@ -140,7 +140,7 @@ // // private val counter = new AtomicInteger(0) // -// def cycleThroughEntireCollection(idEvery: Int): IO[Unit] = implementation.streamTitleAka().map { titleAka => +// def cycleThroughEntireCollection(idEvery: Int): Task[Unit] = implementation.streamTitleAka().map { titleAka => // val v = counter.incrementAndGet() // if (v % idEvery == 0) { // ids = Ids(implementation.idFor(titleAka), implementation.titleIdFor(titleAka)) :: ids @@ -149,8 +149,8 @@ // scribe.info(s"Counter for entire collection: ${counter.get()}") // } // -// def validateIds(idsList: List[Ids]): IO[Unit] = if (idsList.isEmpty) { -// IO.unit +// def validateIds(idsList: List[Ids]): Task[Unit] = if (idsList.isEmpty) { +// Task.unit // } else { // val ids = idsList.head // implementation.get(ids.id).flatMap { titleAka => @@ -161,8 +161,8 @@ // } // } // -// def validateTitleIds(idsList: List[Ids]): IO[Unit] = if (idsList.isEmpty) { -// IO.unit +// def validateTitleIds(idsList: List[Ids]): Task[Unit] = if (idsList.isEmpty) { +// Task.unit // } else { // val ids = idsList.head // implementation.findByTitleId(ids.titleId).flatMap { titleAkas => @@ -176,8 +176,8 @@ // } // } // -//// def validateTitleIds(idsList: List[Ids]): IO[Unit] = { -//// fs2.Stream[IO, Ids](idsList: _*) +//// def validateTitleIds(idsList: List[Ids]): Task[Unit] = { +//// rapid.Stream[Ids](idsList: _*) //// .parEvalMap(8) { ids => //// implementation.findByTitleId(ids.titleId).map { titleAkas => //// val results = titleAkas.map(ta => implementation.titleIdFor(ta)) @@ -192,7 +192,7 @@ //// .drain //// } // -//// override def run(args: List[String]): IO[ExitCode] = { +//// override def run(args: List[String]): Task[ExitCode] = { //// val baseDirectory = new File("data") //// val start = System.currentTimeMillis() //// for { @@ -210,9 +210,9 @@ //// isOriginalTitle = map.boolOption("isOriginalTitle") //// ) ////// db.titleAka.put(t) -//// IO.unit +//// Task.unit //// } else { -//// IO.unit +//// Task.unit //// } //// }.foldMap(_ => 1L).compile.lastOrError //// } @@ -225,10 +225,10 @@ //// } //// } // -// private def downloadFile(file: File, limit: Limit): IO[File] = (if (file.exists()) { -// IO.pure(file) +// private def downloadFile(file: File, limit: Limit): Task[File] = (if (file.exists()) { +// Task.pure(file) // } else { -// IO.blocking { +// Task { // scribe.info(s"File doesn't exist, downloading ${file.getName}...") // file.getParentFile.mkdirs() // val fileName = s"${file.getName}.gz" @@ -265,8 +265,8 @@ // } // }).flatMap { file => // limit match { -// case Limit.Unlimited => IO.pure(file) -// case _ => IO.blocking { +// case Limit.Unlimited => Task.pure(file) +// case _ => Task { // val source = Source.fromFile(file) // try { // val (pre, post) = file.getName.splitAt(file.getName.lastIndexOf(".")) diff --git a/benchmark/src/main/scala/benchmark/imdb/LightDBImplementation.scala b/benchmark/src/main/scala/benchmark/imdb/LightDBImplementation.scala index cf99a28e..ec7bfc28 100644 --- a/benchmark/src/main/scala/benchmark/imdb/LightDBImplementation.scala +++ b/benchmark/src/main/scala/benchmark/imdb/LightDBImplementation.scala @@ -20,7 +20,7 @@ // // override def name: String = "LightDB" // -// override def init(): IO[Unit] = IO(DB.init()) +// override def init(): Task[Unit] = IO(DB.init()) // // override def map2TitleAka(map: Map[String, String]): TitleAkaLDB = TitleAkaLDB( // titleId = map.value("titleId"), @@ -50,25 +50,25 @@ // private implicit var akaTransaction: Transaction[TitleAkaLDB] = _ // private implicit var basicsTransaction: Transaction[TitleBasicsLDB] = _ // -// override def persistTitleAka(t: TitleAkaLDB): IO[Unit] = IO.blocking { +// override def persistTitleAka(t: TitleAkaLDB): Task[Unit] = Task { // if (akaTransaction == null) akaTransaction = DB.titleAka.transaction.create() // DB.titleAka.set(t) // } // -// override def persistTitleBasics(t: TitleBasicsLDB): IO[Unit] = IO.blocking { +// override def persistTitleBasics(t: TitleBasicsLDB): Task[Unit] = Task { // if (basicsTransaction == null) basicsTransaction = DB.titleBasics.transaction.create() // DB.titleBasics.set(t) // } // -// override def streamTitleAka(): fs2.Stream[IO, TitleAkaLDB] = fs2.Stream.fromBlockingIterator[IO](DB.titleAka.iterator, 100) +// override def streamTitleAka(): rapid.Stream[TitleAkaLDB] = fs2.Stream.fromBlockingIterator[IO](DB.titleAka.iterator, 100) // // override def idFor(t: TitleAkaLDB): String = t._id.value // // override def titleIdFor(t: TitleAkaLDB): String = t.titleId // -// override def get(id: String): IO[TitleAkaLDB] = IO(DB.titleAka(Id[TitleAkaLDB](id))) +// override def get(id: String): Task[TitleAkaLDB] = IO(DB.titleAka(Id[TitleAkaLDB](id))) // -// override def findByTitleId(titleId: String): IO[List[TitleAkaLDB]] = IO.blocking { +// override def findByTitleId(titleId: String): Task[List[TitleAkaLDB]] = Task { // DB.titleAka // .query // .filter(_.titleId === titleId) @@ -77,20 +77,20 @@ // .list // .map(_.json.as[TitleAkaLDB]) // } -// // override def findByTitleId(titleId: String): IO[List[TitleAkaLDB]] = TitleAkaLDB.titleId.query(titleId).compile.toList +// // override def findByTitleId(titleId: String): Task[List[TitleAkaLDB]] = TitleAkaLDB.titleId.query(titleId).compile.toList // -// override def flush(): IO[Unit] = IO.blocking { +// override def flush(): Task[Unit] = Task { // akaTransaction.commit() // basicsTransaction.commit() // } // -// override def verifyTitleAka(): IO[Unit] = IO.blocking { +// override def verifyTitleAka(): Task[Unit] = Task { // val haloCount = DB.titleAka.count // val luceneCount = DB.titleAka.indexer.count // scribe.info(s"TitleAka counts -- Halo: $haloCount, Lucene: $luceneCount") // } // -// override def verifyTitleBasics(): IO[Unit] = IO.blocking { +// override def verifyTitleBasics(): Task[Unit] = Task { // val haloCount = DB.titleBasics.count // // luceneCount <- DB.titleBasics.indexer.count() // scribe.info(s"TitleBasic counts -- Halo: $haloCount") //, Lucene: $luceneCount") diff --git a/benchmark/src/main/scala/benchmark/imdb/MariaDBImplementation.scala b/benchmark/src/main/scala/benchmark/imdb/MariaDBImplementation.scala index 01f88dc9..94f75b28 100644 --- a/benchmark/src/main/scala/benchmark/imdb/MariaDBImplementation.scala +++ b/benchmark/src/main/scala/benchmark/imdb/MariaDBImplementation.scala @@ -19,7 +19,7 @@ object MariaDBImplementation extends BenchmarkImplementation { } private lazy val backlogAka = new FlushingBacklog[String, TitleAka](1000, 10000) { - override protected def write(list: List[TitleAkaPG]): IO[Unit] = IO.blocking { + override protected def write(list: List[TitleAkaPG]): Task[Unit] = Task { val ps = connection.prepareStatement("INSERT INTO title_aka(id, titleId, ordering, title, region, language, types, attributes, isOriginalTitle) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") try { list.foreach { t => @@ -42,7 +42,7 @@ object MariaDBImplementation extends BenchmarkImplementation { } private lazy val backlogBasics = new FlushingBacklog[String, TitleBasics](1000, 10000) { - override protected def write(list: List[TitleBasicsPG]): IO[Unit] = IO.blocking { + override protected def write(list: List[TitleBasicsPG]): Task[Unit] = Task { val ps = connection.prepareStatement("INSERT INTO title_basics(id, tconst, titleType, primaryTitle, originalTitle, isAdult, startYear, endYear, runtimeMinutes, genres) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") try { list.foreach { t => @@ -67,7 +67,7 @@ object MariaDBImplementation extends BenchmarkImplementation { override def name: String = "MariaDB" - override def init(): IO[Unit] = IO.blocking { + override def init(): Task[Unit] = Task { executeUpdate("DROP TABLE IF EXISTS title_aka") executeUpdate("DROP TABLE IF EXISTS title_basics") executeUpdate("CREATE TABLE title_aka(id VARCHAR(128) NOT NULL, titleId TEXT, ordering INTEGER, title TEXT, region TEXT, language TEXT, types TEXT, attributes TEXT, isOriginalTitle SMALLINT, PRIMARY KEY (id))") @@ -100,9 +100,9 @@ object MariaDBImplementation extends BenchmarkImplementation { genres = map.value("genres") ) - override def persistTitleAka(t: TitleAka): IO[Unit] = backlogAka.enqueue(t.id, t).map(_ => ()) + override def persistTitleAka(t: TitleAka): Task[Unit] = backlogAka.enqueue(t.id, t).map(_ => ()) - override def persistTitleBasics(t: TitleBasicsPG): IO[Unit] = backlogBasics.enqueue(t.id, t).map(_ => ()) + override def persistTitleBasics(t: TitleBasicsPG): Task[Unit] = backlogBasics.enqueue(t.id, t).map(_ => ()) private def fromRS(rs: ResultSet): TitleAkaPG = TitleAkaPG( id = rs.getString("id"), @@ -116,7 +116,7 @@ object MariaDBImplementation extends BenchmarkImplementation { isOriginalTitle = rs.getInt("isOriginalTitle") ) - override def streamTitleAka(): fs2.Stream[IO, TitleAkaPG] = { + override def streamTitleAka(): rapid.Stream[TitleAkaPG] = { val s = connection.createStatement() try { val rs = s.executeQuery("SELECT * FROM title_aka") @@ -137,7 +137,7 @@ object MariaDBImplementation extends BenchmarkImplementation { override def titleIdFor(t: TitleAkaPG): String = t.titleId - override def get(id: String): IO[TitleAkaPG] = IO.blocking { + override def get(id: String): Task[TitleAkaPG] = Task { val s = connection.prepareStatement("SELECT * FROM title_aka WHERE id = ?") try { s.setString(1, id) @@ -153,7 +153,7 @@ object MariaDBImplementation extends BenchmarkImplementation { } } - override def findByTitleId(titleId: String): IO[List[TitleAkaPG]] = IO.blocking { + override def findByTitleId(titleId: String): Task[List[TitleAkaPG]] = Task { val s = connection.prepareStatement("SELECT * FROM title_aka WHERE titleId = ?") try { s.setString(1, titleId) @@ -172,14 +172,14 @@ object MariaDBImplementation extends BenchmarkImplementation { } } - override def flush(): IO[Unit] = for { + override def flush(): Task[Unit] = for { _ <- backlogAka.flush() _ <- IO(commit()) } yield { () } - override def verifyTitleAka(): IO[Unit] = IO.blocking { + override def verifyTitleAka(): Task[Unit] = Task { val s = connection.createStatement() val rs = s.executeQuery("SELECT COUNT(1) FROM title_aka") rs.next() @@ -187,7 +187,7 @@ object MariaDBImplementation extends BenchmarkImplementation { scribe.info(s"Counted $count records in title_aka table") } - override def verifyTitleBasics(): IO[Unit] = IO.blocking { + override def verifyTitleBasics(): Task[Unit] = Task { val s = connection.createStatement() val rs = s.executeQuery("SELECT COUNT(1) FROM title_basics") rs.next() diff --git a/benchmark/src/main/scala/benchmark/imdb/MongoDBImplementation.scala b/benchmark/src/main/scala/benchmark/imdb/MongoDBImplementation.scala index 6ad326ac..e7bbe69b 100644 --- a/benchmark/src/main/scala/benchmark/imdb/MongoDBImplementation.scala +++ b/benchmark/src/main/scala/benchmark/imdb/MongoDBImplementation.scala @@ -54,7 +54,7 @@ object MongoDBImplementation extends BenchmarkImplementation { } private lazy val backlogAka = new FlushingBacklog[String, Document](1000, 10000) { - override protected def write(list: List[Document]): IO[Unit] = IO.blocking { + override protected def write(list: List[Document]): Task[Unit] = Task { val javaList = new util.ArrayList[Document](batchSize) list.foreach(javaList.add) titleAka.insertMany(javaList) @@ -63,7 +63,7 @@ object MongoDBImplementation extends BenchmarkImplementation { } private lazy val backlogBasics = new FlushingBacklog[String, Document](1000, 10000) { - override protected def write(list: List[Document]): IO[Unit] = IO.blocking { + override protected def write(list: List[Document]): Task[Unit] = Task { val javaList = new util.ArrayList[Document](batchSize) list.foreach(javaList.add) titleBasics.insertMany(javaList) @@ -71,15 +71,15 @@ object MongoDBImplementation extends BenchmarkImplementation { } } - override def init(): IO[Unit] = IO.blocking { + override def init(): Task[Unit] = Task { titleAka.createIndex(Indexes.ascending("titleId")) } - override def persistTitleAka(t: Document): IO[Unit] = backlogAka.enqueue(t.getString("_id"), t).map(_ => ()) + override def persistTitleAka(t: Document): Task[Unit] = backlogAka.enqueue(t.getString("_id"), t).map(_ => ()) - override def persistTitleBasics(t: Document): IO[Unit] = backlogBasics.enqueue(t.getString("_id"), t).map(_ => ()) + override def persistTitleBasics(t: Document): Task[Unit] = backlogBasics.enqueue(t.getString("_id"), t).map(_ => ()) - override def streamTitleAka(): fs2.Stream[IO, Document] = { + override def streamTitleAka(): rapid.Stream[Document] = { val iterator: Iterator[Document] = titleAka.find().iterator().asScala fs2.Stream.fromBlockingIterator[IO](iterator, 512) } @@ -90,27 +90,27 @@ object MongoDBImplementation extends BenchmarkImplementation { import com.mongodb.client.model.Filters - override def get(id: String): IO[Document] = IO.blocking { + override def get(id: String): Task[Document] = Task { titleAka.find(Filters.eq("_id", id)).first() } - override def findByTitleId(titleId: String): IO[List[Document]] = IO.blocking { + override def findByTitleId(titleId: String): Task[List[Document]] = Task { titleAka.find(Filters.eq("titleId", titleId)).iterator().asScala.toList } - override def flush(): IO[Unit] = for { + override def flush(): Task[Unit] = for { _ <- backlogAka.flush() _ <- backlogBasics.flush() } yield { () } - override def verifyTitleAka(): IO[Unit] = IO.blocking { + override def verifyTitleAka(): Task[Unit] = Task { val docs = titleAka.countDocuments() scribe.info(s"TitleAka counts -- $docs") } - override def verifyTitleBasics(): IO[Unit] = IO.blocking { + override def verifyTitleBasics(): Task[Unit] = Task { val docs = titleBasics.countDocuments() scribe.info(s"TitleBasics counts -- $docs") } diff --git a/benchmark/src/main/scala/benchmark/imdb/PostgresImplementation.scala b/benchmark/src/main/scala/benchmark/imdb/PostgresImplementation.scala index 7b2027e8..ccdc9094 100644 --- a/benchmark/src/main/scala/benchmark/imdb/PostgresImplementation.scala +++ b/benchmark/src/main/scala/benchmark/imdb/PostgresImplementation.scala @@ -19,7 +19,7 @@ object PostgresImplementation extends BenchmarkImplementation { } private lazy val backlogAka = new FlushingBacklog[String, TitleAka](1000, 10000) { - override protected def write(list: List[TitleAkaPG]): IO[Unit] = IO.blocking { + override protected def write(list: List[TitleAkaPG]): Task[Unit] = Task { val ps = connection.prepareStatement("INSERT INTO title_aka(id, titleId, ordering, title, region, language, types, attributes, isOriginalTitle) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") try { list.foreach { t => @@ -42,7 +42,7 @@ object PostgresImplementation extends BenchmarkImplementation { } private lazy val backlogBasics = new FlushingBacklog[String, TitleBasics](1000, 10000) { - override protected def write(list: List[TitleBasicsPG]): IO[Unit] = IO.blocking { + override protected def write(list: List[TitleBasicsPG]): Task[Unit] = Task { val ps = connection.prepareStatement("INSERT INTO title_basics(id, tconst, titleType, primaryTitle, originalTitle, isAdult, startYear, endYear, runtimeMinutes, genres) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") try { list.foreach { t => @@ -67,7 +67,7 @@ object PostgresImplementation extends BenchmarkImplementation { override def name: String = "PostgreSQL" - override def init(): IO[Unit] = IO.blocking { + override def init(): Task[Unit] = Task { executeUpdate("DROP TABLE IF EXISTS title_aka") executeUpdate("DROP TABLE IF EXISTS title_basics") executeUpdate("CREATE TABLE title_aka(id VARCHAR NOT NULL, titleId TEXT, ordering INTEGER, title TEXT, region TEXT, language TEXT, types TEXT, attributes TEXT, isOriginalTitle SMALLINT, PRIMARY KEY (id))") @@ -100,9 +100,9 @@ object PostgresImplementation extends BenchmarkImplementation { genres = map.value("genres") ) - override def persistTitleAka(t: TitleAka): IO[Unit] = backlogAka.enqueue(t.id, t).map(_ => ()) + override def persistTitleAka(t: TitleAka): Task[Unit] = backlogAka.enqueue(t.id, t).map(_ => ()) - override def persistTitleBasics(t: TitleBasicsPG): IO[Unit] = backlogBasics.enqueue(t.id, t).map(_ => ()) + override def persistTitleBasics(t: TitleBasicsPG): Task[Unit] = backlogBasics.enqueue(t.id, t).map(_ => ()) private def fromRS(rs: ResultSet): TitleAkaPG = TitleAkaPG( id = rs.getString("id"), @@ -116,7 +116,7 @@ object PostgresImplementation extends BenchmarkImplementation { isOriginalTitle = rs.getInt("isOriginalTitle") ) - override def streamTitleAka(): fs2.Stream[IO, TitleAkaPG] = { + override def streamTitleAka(): rapid.Stream[TitleAkaPG] = { val s = connection.createStatement() try { val rs = s.executeQuery("SELECT * FROM title_aka") @@ -137,7 +137,7 @@ object PostgresImplementation extends BenchmarkImplementation { override def titleIdFor(t: TitleAkaPG): String = t.titleId - override def get(id: String): IO[TitleAkaPG] = IO.blocking { + override def get(id: String): Task[TitleAkaPG] = Task { val s = connection.prepareStatement("SELECT * FROM title_aka WHERE id = ?") try { s.setString(1, id) @@ -153,7 +153,7 @@ object PostgresImplementation extends BenchmarkImplementation { } } - override def findByTitleId(titleId: String): IO[List[TitleAkaPG]] = IO.blocking { + override def findByTitleId(titleId: String): Task[List[TitleAkaPG]] = Task { val s = connection.prepareStatement("SELECT * FROM title_aka WHERE titleId = ?") try { s.setString(1, titleId) @@ -172,14 +172,14 @@ object PostgresImplementation extends BenchmarkImplementation { } } - override def flush(): IO[Unit] = for { + override def flush(): Task[Unit] = for { _ <- backlogAka.flush() _ <- IO(commit()) } yield { () } - override def verifyTitleAka(): IO[Unit] = IO.blocking { + override def verifyTitleAka(): Task[Unit] = Task { val s = connection.createStatement() val rs = s.executeQuery("SELECT COUNT(1) FROM title_aka") rs.next() @@ -187,7 +187,7 @@ object PostgresImplementation extends BenchmarkImplementation { scribe.info(s"Counted $count records in title_aka table") } - override def verifyTitleBasics(): IO[Unit] = IO.blocking { + override def verifyTitleBasics(): Task[Unit] = Task { val s = connection.createStatement() val rs = s.executeQuery("SELECT COUNT(1) FROM title_basics") rs.next() diff --git a/benchmark/src/main/scala/benchmark/imdb/SQLiteImplementation.scala b/benchmark/src/main/scala/benchmark/imdb/SQLiteImplementation.scala index 4616b17d..1020578d 100644 --- a/benchmark/src/main/scala/benchmark/imdb/SQLiteImplementation.scala +++ b/benchmark/src/main/scala/benchmark/imdb/SQLiteImplementation.scala @@ -20,7 +20,7 @@ object SQLiteImplementation extends BenchmarkImplementation { } private lazy val backlogAka = new FlushingBacklog[String, TitleAka](1000, 10000) { - override protected def write(list: List[TitleAkaPG]): IO[Unit] = IO.blocking { + override protected def write(list: List[TitleAkaPG]): Task[Unit] = Task { val ps = connection.prepareStatement("INSERT INTO title_aka(id, titleId, ordering, title, region, language, types, attributes, isOriginalTitle) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") try { list.foreach { t => @@ -43,7 +43,7 @@ object SQLiteImplementation extends BenchmarkImplementation { } private lazy val backlogBasics = new FlushingBacklog[String, TitleBasics](1000, 10000) { - override protected def write(list: List[TitleBasicsPG]): IO[Unit] = IO.blocking { + override protected def write(list: List[TitleBasicsPG]): Task[Unit] = Task { val ps = connection.prepareStatement("INSERT INTO title_basics(id, tconst, titleType, primaryTitle, originalTitle, isAdult, startYear, endYear, runtimeMinutes, genres) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") try { list.foreach { t => @@ -68,7 +68,7 @@ object SQLiteImplementation extends BenchmarkImplementation { override def name: String = "SQLite" - override def init(): IO[Unit] = IO.blocking { + override def init(): Task[Unit] = Task { executeUpdate("DROP TABLE IF EXISTS title_aka") executeUpdate("DROP TABLE IF EXISTS title_basics") executeUpdate("CREATE TABLE title_aka(id VARCHAR NOT NULL, titleId TEXT, ordering INTEGER, title TEXT, region TEXT, language TEXT, types TEXT, attributes TEXT, isOriginalTitle SMALLINT, PRIMARY KEY (id))") @@ -101,9 +101,9 @@ object SQLiteImplementation extends BenchmarkImplementation { genres = map.value("genres") ) - override def persistTitleAka(t: TitleAka): IO[Unit] = backlogAka.enqueue(t.id, t).map(_ => ()) + override def persistTitleAka(t: TitleAka): Task[Unit] = backlogAka.enqueue(t.id, t).map(_ => ()) - override def persistTitleBasics(t: TitleBasicsPG): IO[Unit] = backlogBasics.enqueue(t.id, t).map(_ => ()) + override def persistTitleBasics(t: TitleBasicsPG): Task[Unit] = backlogBasics.enqueue(t.id, t).map(_ => ()) private def fromRS(rs: ResultSet): TitleAkaPG = TitleAkaPG( id = rs.getString("id"), @@ -117,7 +117,7 @@ object SQLiteImplementation extends BenchmarkImplementation { isOriginalTitle = rs.getInt("isOriginalTitle") ) - override def streamTitleAka(): fs2.Stream[IO, TitleAkaPG] = { + override def streamTitleAka(): rapid.Stream[TitleAkaPG] = { val s = connection.createStatement() try { val rs = s.executeQuery("SELECT * FROM title_aka") @@ -138,7 +138,7 @@ object SQLiteImplementation extends BenchmarkImplementation { override def titleIdFor(t: TitleAkaPG): String = t.titleId - override def get(id: String): IO[TitleAkaPG] = IO.blocking { + override def get(id: String): Task[TitleAkaPG] = Task { val s = connection.prepareStatement("SELECT * FROM title_aka WHERE id = ?") try { s.setString(1, id) @@ -154,7 +154,7 @@ object SQLiteImplementation extends BenchmarkImplementation { } } - override def findByTitleId(titleId: String): IO[List[TitleAkaPG]] = IO.blocking { + override def findByTitleId(titleId: String): Task[List[TitleAkaPG]] = Task { val s = connection.prepareStatement("SELECT * FROM title_aka WHERE titleId = ?") try { s.setString(1, titleId) @@ -173,14 +173,14 @@ object SQLiteImplementation extends BenchmarkImplementation { } } - override def flush(): IO[Unit] = for { + override def flush(): Task[Unit] = for { _ <- backlogAka.flush() _ <- IO(commit()) } yield { () } - override def verifyTitleAka(): IO[Unit] = IO.blocking { + override def verifyTitleAka(): Task[Unit] = Task { val s = connection.createStatement() val rs = s.executeQuery("SELECT COUNT(1) FROM title_aka") rs.next() @@ -188,7 +188,7 @@ object SQLiteImplementation extends BenchmarkImplementation { scribe.info(s"Counted $count records in title_aka table") } - override def verifyTitleBasics(): IO[Unit] = IO.blocking { + override def verifyTitleBasics(): Task[Unit] = Task { val s = connection.createStatement() val rs = s.executeQuery("SELECT COUNT(1) FROM title_basics") rs.next() diff --git a/benchmark/src/main/scala/benchmark/imdb/ScarangoImplementation.scala b/benchmark/src/main/scala/benchmark/imdb/ScarangoImplementation.scala index 13a75107..4c7e45ea 100644 --- a/benchmark/src/main/scala/benchmark/imdb/ScarangoImplementation.scala +++ b/benchmark/src/main/scala/benchmark/imdb/ScarangoImplementation.scala @@ -13,17 +13,17 @@ object ScarangoImplementation extends BenchmarkImplementation { override type TitleBasics = TitleBasicsADB private lazy val backlogAka = new FlushingBacklog[Id[TitleAkaADB], TitleAkaADB](1000, 10000) { - override protected def write(list: List[TitleAkaADB]): IO[Unit] = db.titleAka.batch.insert(list).map(_ => ()) + override protected def write(list: List[TitleAkaADB]): Task[Unit] = db.titleAka.batch.insert(list).map(_ => ()) } private lazy val backlogBasics = new FlushingBacklog[Id[TitleBasicsADB], TitleBasicsADB](1000, 10000) { - override protected def write(list: List[TitleBasicsADB]): IO[Unit] = + override protected def write(list: List[TitleBasicsADB]): Task[Unit] = db.titleBasics.batch.insert(list).map(_ => ()) } override def name: String = "Scarango" - override def init(): IO[Unit] = db.init() + override def init(): Task[Unit] = db.init() override def map2TitleAka(map: Map[String, String]): TitleAkaADB = { val title = map.value("title") @@ -52,11 +52,11 @@ object ScarangoImplementation extends BenchmarkImplementation { genres = map.list("genres") ) - override def persistTitleAka(t: TitleAkaADB): IO[Unit] = backlogAka.enqueue(t._id, t).map(_ => ()) + override def persistTitleAka(t: TitleAkaADB): Task[Unit] = backlogAka.enqueue(t._id, t).map(_ => ()) - override def persistTitleBasics(t: TitleBasicsADB): IO[Unit] = backlogBasics.enqueue(t._id, t).map(_ => ()) + override def persistTitleBasics(t: TitleBasicsADB): Task[Unit] = backlogBasics.enqueue(t._id, t).map(_ => ()) - override def flush(): IO[Unit] = for { + override def flush(): Task[Unit] = for { _ <- backlogAka.flush() _ <- backlogBasics.flush() } yield { @@ -67,9 +67,9 @@ object ScarangoImplementation extends BenchmarkImplementation { override def titleIdFor(t: TitleAkaADB): String = t.titleId - override def streamTitleAka(): fs2.Stream[IO, TitleAkaADB] = db.titleAka.query.stream() + override def streamTitleAka(): rapid.Stream[TitleAkaADB] = db.titleAka.query.stream() - override def verifyTitleAka(): IO[Unit] = db.titleAka + override def verifyTitleAka(): Task[Unit] = db.titleAka .query(aql"FOR d IN titleAka COLLECT WITH COUNT INTO length RETURN length") .as[Int] .one @@ -77,7 +77,7 @@ object ScarangoImplementation extends BenchmarkImplementation { scribe.info(s"TitleAka counts -- $count") } - override def verifyTitleBasics(): IO[Unit] = db.titleAka + override def verifyTitleBasics(): Task[Unit] = db.titleAka .query(aql"FOR d IN titleBasics COLLECT WITH COUNT INTO length RETURN length") .as[Int] .one @@ -85,9 +85,9 @@ object ScarangoImplementation extends BenchmarkImplementation { scribe.info(s"TitleBasics counts -- $count") } - override def get(id: String): IO[TitleAkaADB] = db.titleAka(TitleAkaADB.id(id)) + override def get(id: String): Task[TitleAkaADB] = db.titleAka(TitleAkaADB.id(id)) - override def findByTitleId(titleId: String): IO[List[TitleAkaADB]] = db.titleAka + override def findByTitleId(titleId: String): Task[List[TitleAkaADB]] = db.titleAka .query .byFilter(ref => ref.titleId === titleId) .toList diff --git a/build.sbt b/build.sbt index 2117116e..0c9bc45f 100644 --- a/build.sbt +++ b/build.sbt @@ -82,14 +82,10 @@ val h2Version: String = "2.3.232" val postgresqlVersion: String = "42.7.3" -val catsVersion: String = "3.5.7" - -val fs2Version: String = "3.11.0" +val rapidVersion: String = "0.3.1-SNAPSHOT" val scalaTestVersion: String = "3.2.19" -val catsEffectTestingVersion: String = "1.6.0" - lazy val root = project.in(file(".")) .aggregate(core.jvm, sql, sqlite, postgresql, duckdb, h2, lucene, halodb, rocksdb, mapdb, redis, async, all) .settings( @@ -253,8 +249,7 @@ lazy val async = project.in(file("async")) name := s"$projectName-async", fork := true, libraryDependencies ++= Seq( - "org.typelevel" %% "cats-effect" % catsVersion, - "co.fs2" %% "fs2-core" % fs2Version + "com.outr" %% "rapid-core" % rapidVersion ) ) @@ -264,8 +259,7 @@ lazy val all = project.in(file("all")) name := s"$projectName-all", fork := true, libraryDependencies ++= Seq( - "org.scalatest" %% "scalatest" % scalaTestVersion % Test, - "org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectTestingVersion % Test + "org.scalatest" %% "scalatest" % scalaTestVersion % Test ) ) From 693bda7c63ee797eb95889f795131e4df151ec8c Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Fri, 20 Dec 2024 09:12:39 -0600 Subject: [PATCH 2/2] Preparing for next release --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 16e0883c..c6d7caf4 100644 --- a/build.sbt +++ b/build.sbt @@ -15,7 +15,7 @@ val developerURL: String = "https://matthicks.com" name := projectName ThisBuild / organization := org -ThisBuild / version := "1.1.2" +ThisBuild / version := "1.2.0-SNAPSHOT" ThisBuild / scalaVersion := scala213 ThisBuild / crossScalaVersions := allScalaVersions ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation")