Skip to content
Gustavo De Micheli edited this page Dec 9, 2024 · 3 revisions

Helenus integrates with Monix through Monix Reactive.

Installation

Include the library into you project definition:

libraryDependencies += "net.nmoncho" %% "helenus-monix" % "1.7.0"

Setup

The integration with Monix tries to be as seamless as possible. Marking a CqlSession implicit is the only thing required to use Helenus:

implicit val session: CqlSession = cqlSession

Reading from Cassandra

Cassandra queries are mapped to Monix Observables. With Helenus we can leverage any defined RowMapper to adapt a Observable[Row] to a Observable[A].

import net.nmoncho.helenus._
import net.nmoncho.helenus.api.RowMapper
import net.nmoncho.helenus.api.cql.Adapter
import net.nmoncho.helenus.monix._

case class Person(id: Int, name: String, city: String)

implicit val rowMapper: RowMapper[Person] = RowMapper[Person]

val peopleQuery = "SELECT * FROM monix_people WHERE id = ?".toCQL
        .prepare[Int].as[Person]

val helenusObservable: Observable[Person] = peopleQuery.asObservable(1)

The integration usage is intended as follows:

  • Transform your query with .toCQLAsync.
  • Prepare the query with .prepare, providing the query parameter types.
  • Define a query result, which uses an implicit RowMapper. If this isn't provided, Row will be used as query result type.
  • Transform your query into an Observable with asObservable. This method follows the same principle as ScalaPreparedStatement. Use this method as a factory for Observables.

We arrive at the same Observable[Person]. Executing these queries is performed as usual:

Await.result(helenusObservable.consumeWith(Consumer.toList).runToFuture, 5.seconds)
// res4: List[Person] = List(Person(id = 1, name = "John", city = "Rome"))

Writing to Cassandra

We can use the same facilities when inserting, or updating, to Cassandra.

implicit val personAdapter: Adapter[Person, (Int, String, String)] = Adapter[Person]

val consumer: Consumer[Person, Unit] =
    "INSERT INTO monix_people(id, name, city) VALUES (?, ?, ?)".toCQL
        .prepare[Int, String, String]
        .from[Person]
        .asConsumer()

val people: Observable[Person] = Observable.from(List(
  Person(4, "Jane", "Amsterdam"),
  Person(5, "Lisa", "Paris"),
  Person(6, "Maria", "Madrid")
))

val insert = people.consumeWith(consumer)

We can then query these data as shown above:

Await.result(
    peopleQuery.asObservable(5).consumeWith(Consumer.toList).runToFuture,
    5.seconds
)
// res6: List[Person] = List(Person(id = 5, name = "Lisa", city = "Paris"))
Clone this wiki locally