Skip to content

Commit

Permalink
Cleanup and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Mar 29, 2024
1 parent 3aa7a5e commit f7afa40
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 13 deletions.
19 changes: 16 additions & 3 deletions core/shared/src/main/scala/lightdb/collection/Collection.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lightdb.collection

import cats.effect.IO
import cats.implicits.toTraverseOps
import lightdb.data.DataManager
import lightdb.index.Indexer
import lightdb.query.Query
Expand Down Expand Up @@ -33,6 +34,10 @@ case class Collection[D <: Document[D]](db: LightDB, mapping: ObjectMapping[D],
}
}

def putAll(values: Seq[D]): IO[Seq[D]] = values.map(put).sequence

def putStream(stream: fs2.Stream[IO, D]): IO[Int] = stream.evalMap(put).compile.count.map(_.toInt)

def all(chunkSize: Int = 512, maxConcurrent: Int = 16): fs2.Stream[IO, D] = {
db.verifyInitialized()
store
Expand Down Expand Up @@ -60,6 +65,10 @@ case class Collection[D <: Document[D]](db: LightDB, mapping: ObjectMapping[D],
}
}

def deleteAll(ids: Seq[Id[D]]): IO[Unit] = ids.map(delete).sequence.map(_ => ())

def deleteStream(stream: fs2.Stream[IO, Id[D]]): IO[Int] = stream.evalMap(delete).compile.count.map(_.toInt)

def commit(): IO[Unit] = {
db.verifyInitialized()
for {
Expand All @@ -80,9 +89,13 @@ case class Collection[D <: Document[D]](db: LightDB, mapping: ObjectMapping[D],
}
}

def dispose(): IO[Unit] = {
db.verifyInitialized()
indexer.dispose()
def dispose(): IO[Unit] = if (db.initialized) {
for {
_ <- store.dispose()
_ <- indexer.dispose()
} yield ()
} else {
IO.unit
}

protected lazy val data: CollectionData[D] = CollectionData(this)
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/lightdb/index/Indexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ trait Indexer[D <: Document[D]] {

def commit(): IO[Unit]

def count(): IO[Long]
def count(): IO[Int]

def search(query: Query[D]): IO[SearchResults[D]]

Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/lightdb/index/NullIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ case class NullIndexer[D <: Document[D]](collection: Collection[D]) extends Inde

override def commit(): IO[Unit] = IO.unit

override def count(): IO[Long] = IO.pure(0L)
override def count(): IO[Int] = IO.pure(0)

override def search(query: Query[D]): IO[SearchResults[D]] = IO.pure(SearchResults(query, 0, fs2.Stream.empty))

Expand Down
3 changes: 3 additions & 0 deletions core/shared/src/main/scala/lightdb/query/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ package object query {
def &&(that: Filter[D]): Filter[D] = Filter.GroupedFilter(0, List(
filter -> Condition.Must, that -> Condition.Must
))
def ||(that: Filter[D]): Filter[D] = Filter.GroupedFilter(1, List(
filter -> Condition.Should, that -> Condition.Should
))
}
}
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/lightdb/store/MapStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class MapStore extends ObjectStore {
IO.unit
}

override def count(): IO[Long] = IO.pure(map.size)
override def count(): IO[Int] = IO.pure(map.size)

override def commit(): IO[Unit] = IO.unit

Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/lightdb/store/NullStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object NullStore extends ObjectStore {

override def dispose(): IO[Unit] = IO.unit

override def count(): IO[Long] = IO.pure(0L)
override def count(): IO[Int] = IO.pure(0)

override def all[T](chunkSize: Int): fs2.Stream[IO, ObjectData[T]] = fs2.Stream.empty

Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/lightdb/store/ObjectStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait ObjectStore {
}
}

def count(): IO[Long]
def count(): IO[Int]

def all[T](chunkSize: Int = 512): Stream[IO, ObjectData[T]]

Expand Down
2 changes: 1 addition & 1 deletion halo/src/main/scala/lightdb/store/halo/HaloIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ case class HaloIndexer[D <: Document[D]](collection: Collection[D]) extends Inde
override def put(value: D): IO[D] = IO.pure(value)
override def delete(id: Id[D]): IO[Unit] = IO.unit
override def commit(): IO[Unit] = IO.unit
override def count(): IO[Long] = collection.store.all().compile.count
override def count(): IO[Int] = collection.store.all().compile.count.map(_.toInt)
override def search(query: Query[D]): IO[SearchResults[D]] = IO {
val stream = collection.store
.all[D]()
Expand Down
4 changes: 2 additions & 2 deletions halo/src/main/scala/lightdb/store/halo/HaloStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ case class HaloStore(directory: Path, indexThreads: Int = 2, maxFileSize: Int =
halo.delete(id.bytes)
}

override def count(): IO[Long] = IO {
halo.size()
override def count(): IO[Int] = IO {
halo.size().toInt
}

override def commit(): IO[Unit] = IO.unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ case class LuceneIndexer[D <: Document[D]](collection: Collection[D],

override def commit(): IO[Unit] = IO(commitBlocking())

override def count(): IO[Long] = IO {
override def count(): IO[Int] = IO {
indexSearcher.count(new MatchAllDocsQuery)
}

Expand Down
2 changes: 1 addition & 1 deletion mapdb/src/main/scala/lighdb/storage/mapdb/MapDBStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ case class MapDBStore(directory: Option[Path]) extends ObjectStore {

override def dispose(): IO[Unit] = IO(db.close())

override def count(): IO[Long] = IO(map.size())
override def count(): IO[Int] = IO(map.size())

override def all[T](chunkSize: Int): fs2.Stream[IO, ObjectData[T]] = fs2.Stream
.fromBlockingIterator[IO](map.entrySet().iterator().asScala, chunkSize)
Expand Down

0 comments on commit f7afa40

Please sign in to comment.