-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Monix removal #713
Monix removal #713
Conversation
idGenerator | ||
.nextId[Email]() | ||
.map(id => emailModel.insert(Email(id, data))) | ||
.unsafeRunSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should ideally run .unsafeRunSync
in one place (in main
). That's why there was the .to[]
construct, which allowed embedding an IO/Task into a ConnectionIO. For why this matters see: https://blog.softwaremill.com/from-transactional-to-type-safe-reasonable-transactions-a5019906245e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During my research of topics I found out that doing it is such way, using .to
method is probably impossible.
Currently suggested way to use WeakAsync.liftK[IO, ConnectionIO]
but it unfortunately uses unsafeToFuture
and we want to preserve context - fiber local
, so we cannot use any unsafe
All of this is caused mostly by updates in Cats Effect type type-class hierarchy.
val serverOptions: Http4sServerOptions[IO, IO] = Http4sServerOptions | ||
.customInterceptors[IO, IO] | ||
.decodeFailureHandler(decodeFailureHandler) | ||
.serverLog(Log.defaultServerLog[IO]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to specify this? I think it's the default
val openapi = OpenAPIDocsInterpreter() | ||
.serverEndpointsToOpenAPI(es.toList, "Bootzooka", "1.0") | ||
.servers(List(Server(s"$apiContextPath", None))) | ||
val yaml = openapi.toYaml | ||
new SwaggerHttp4s(yaml).routes[Task] | ||
val value: List[ServerEndpoint[_, _, _, Any, IO]] = SwaggerUI[IO](yaml) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs a better name ;)
.bindHttp(config.port, config.host) | ||
.withHttpApp(app) | ||
.resource | ||
} | ||
} | ||
|
||
private val staticFileBlocker = Blocker.liftExecutionContext(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))) | ||
private def indexResponse(r: Request[IO]): IO[Response[IO]] = | ||
StaticFile.fromResource(s"/webapp/index.html", Some(r)).getOrElseF(IO.pure(Response.notFound[IO])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can use a tapir endpoint for that?
val passwordResetCode: PasswordResetCode = PasswordResetCode(id, user.id, validUntil) | ||
passwordResetCodeModel.insert(passwordResetCode).map(_ => passwordResetCode) | ||
} | ||
passwordResetCodeIO.unsafeRunSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, we should use .to
logger.debug(s"Creating a new api key for user $userId, valid until: $validUntil") | ||
apiKeyModel.insert(apiKey).map(_ => apiKey) | ||
} | ||
apiKeyIO.unsafeRunSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here as well
@@ -37,21 +35,21 @@ class Auth[T]( | |||
case None => | |||
logger.debug(s"Auth failed for: ${authTokenOps.tokenName} $id") | |||
// random sleep to prevent timing attacks | |||
Timer[Task].sleep(random.nextInt(1000).millis) >> Task.raiseError(Fail.Unauthorized("Unauthorized")) | |||
Temporal[IO].sleep(random.nextInt(1000).millis) >> IO.raiseError(Fail.Unauthorized("Unauthorized")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IO.sleep simply?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I read in cats 3 migration guide this is the preferred way to replace Timer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in another place you're using IO.sleep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But there I am replacing Task.sleep not Timer.sleep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it any different?
apiKey <- apiKeyService.create(user.id, config.defaultApiKeyValid) | ||
} yield apiKey | ||
} | ||
apiKeyIO.unsafeRunSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an dhere
|
||
override lazy val baseSttpBackend: SttpBackend[Task, Any] = SttpBackendStub(TaskMonadAsyncError) | ||
override lazy val baseSttpBackend: SttpBackend[IO, Any] = SttpBackendStub(CatsMonadError.monadError) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you can use AsyncHttpClientFs2Backend.stub
isntead? and remove the custom CatsMonadError
impl
docs/stack.md
Outdated
@@ -26,7 +26,7 @@ React is one of the most popular JavaScript framework right now, developed and s | |||
|
|||
### Why Monix? | |||
|
|||
Monix is a concurrency toolkit, thanks to which working with side-effects is concurrency-safe and type-safe. The basic datatype, `Task`, is a lazily evaluated description of side effects. It offers a number of combinators to sequence, handle errors, allocate resources and run effects asynchronously. | |||
Monix is a concurrency toolkit, thanks to which working with side-effects is concurrency-safe and type-safe. The basic datatype, `IO`, is a lazily evaluated description of side effects. It offers a number of combinators to sequence, handle errors, allocate resources and run effects asynchronously. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:D I think this needs more attention
|
||
/** Based on [[https://olegpy.com/better-logging-monix-1/]]. Makes the current correlation id available for logback loggers. | ||
*/ | ||
class MonixMDCAdapter extends LogbackMDCAdapter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm what about correlation ids - do they work in the new setup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logback adapter needs to stay :)
- Renamed variable for swagger routes - Removed default log addition
@@ -40,19 +41,19 @@ class EmailService(emailModel: EmailModel, idGenerator: IdGenerator, emailSender | |||
emailModel.count().transact(xa).map(_.toDouble).map(Metrics.emailQueueGauge.set) | |||
} | |||
|
|||
Task.parZip2(sendProcess, monitoringProcess) | |||
sendProcess.flatMap(r => monitoringProcess.flatMap(d => IO(r, d))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that will work differenlty. Previously, we were runing send & monitor in parallel. Now, they would run sequentially - causing monitoring never to run, as send never finishes. So we need to change this back to parallel
val openapi = OpenAPIDocsInterpreter() | ||
.serverEndpointsToOpenAPI(es.toList, "Bootzooka", "1.0") | ||
.servers(List(Server(s"$apiContextPath", None))) | ||
val yaml = openapi.toYaml | ||
new SwaggerHttp4s(yaml).routes[Task] | ||
val swaggerRoutes: List[ServerEndpoint[_, _, _, Any, IO]] = SwaggerUI[IO](openapi.toYaml) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tapir needs updating to 0.19-final :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also maybe we can use the bundle: https://tapir.softwaremill.com/en/latest/docs/openapi.html#using-swagger
There's quite a lot of compiler warnings - maybe we could fix them. Also, needs a rebase. |
# Conflicts: # build.sbt
import sttp.client3.SttpBackend | ||
|
||
object Main extends StrictLogging { | ||
def main(args: Array[String]): Unit = { | ||
CorrelationId.init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to migrate correlation id support as well:) probably implementing https://github.com/softwaremill/monix-correlation-id using fiber-locals inside bootzooka for now, we might extract it as a separate repository later
for { | ||
emails <- emailModel.find(config.batchSize).transact(xa) | ||
_ = if (emails.nonEmpty) logger.info(s"Sending ${emails.size} emails") | ||
_ <- Task.sequence(emails.map(_.data).map(emailSender.apply)) | ||
_ <- IO(emails.map(_.data).map(emailSender.apply)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happened to .sequence? This won't work, we're constructing an IO[IO[Unit]]
and ignoring the inner result
@@ -14,7 +14,7 @@ object DummyEmailSender extends EmailSender with StrictLogging { | |||
sentEmails.clear() | |||
} | |||
|
|||
override def apply(email: EmailData): Task[Unit] = Task { | |||
override def apply(email: EmailData): IO[Unit] = IO.pure { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This definitely isn't pure! Fixing
import org.scalatest.flatspec.AnyFlatSpec | ||
import org.scalatest.matchers.should.Matchers | ||
|
||
trait BaseTest extends AnyFlatSpec with Matchers { | ||
CorrelationId.init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, this should stay in the migrated form
} yield () | ||
} yield { | ||
val confirmationEmail = emailTemplates.passwordChangeNotification(user.login) | ||
emailScheduler(EmailData(user.emailLowerCased, confirmationEmail)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This effect needs to be sequenced with the others - I'm surprised this worked (did tests pass)? Anyway, fixing
val confirmationEmail = emailTemplates.profileDetailsChangeNotification(user.login) | ||
emailScheduler(EmailData(user.emailLowerCased, confirmationEmail)) | ||
} | ||
|
||
doChange() flatMap { anyUpdate => | ||
doChange().map { anyUpdate => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we need a .flatMap - to sequence the effects, and not ignore them. Fixing
Thanks! :) I added one follow-up: #769 |
No description provided.