Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Console#readLine cancelable #3465

Merged
merged 5 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions std/jvm-native/src/main/scala/cats/effect/std/Console.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
package cats.effect.std

import cats.{~>, Show}
import cats.effect.kernel.Sync

import java.nio.charset.Charset

/**
* Effect type agnostic `Console` with common methods to write to and read from the standard
* console. Due to issues around cancellation in `readLine`, suited only for extremely simple
* console input and output in trivial applications.
* console. Suited only for extremely simple console input and output in trivial applications.
*
* @example
* {{{
Expand Down Expand Up @@ -100,13 +98,7 @@ trait Console[F[_]] extends ConsoleCrossPlatform[F] {

}

object Console extends ConsoleCompanionCrossPlatform {

/**
* Constructs a `Console` instance for `F` data types that are [[cats.effect.kernel.Sync]].
*/
def make[F[_]](implicit F: Sync[F]): Console[F] =
new SyncConsole[F]
object Console extends ConsoleCompanionPlatform {

private[std] abstract class MapKConsole[F[_], G[_]](self: Console[F], f: F ~> G)
extends Console[G] {
Expand Down
103 changes: 103 additions & 0 deletions std/jvm/src/main/scala/cats/effect/std/ConsoleCompanionPlatform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2020-2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

import cats.effect.kernel.{Async, Sync}
import cats.syntax.all._

import java.nio.charset.Charset
import java.util.concurrent.LinkedTransferQueue

private[std] trait ConsoleCompanionPlatform extends ConsoleCompanionCrossPlatform {

/**
* Constructs a `Console` instance for `F` data types that are [[cats.effect.kernel.Async]].
*/
def make[F[_]](implicit F: Async[F]): Console[F] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this now the same constraint on both JS and JVM? Do we need the platform split?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, at the moment there's still Native with the Sync constraint. I can revisit that when we get polling system and/or Scala Native multithreading.

new AsyncConsole[F]

@deprecated("Use overload with Async constraint", "3.5.0")
def make[F[_]](F: Sync[F]): Console[F] = F match {
Comment on lines +33 to +34
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, I know, v3.5.0 shop is closed. But ... we could fix this beginner stumbling block once and for all 😇

No strong feelings 😁

Copy link
Member Author

@armanbilge armanbilge Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh actually, we can fix this source-compatibly and get it into 3.5.x. We can't expose the new instance publicly without a source-breaking change, but we can replace the instance for IO right now, and do the source-breaking change in 3.6. This gets us most of the win.

case async: Async[F] => make(async)
case _ => new SyncConsole()(F)
}

private final class AsyncConsole[F[_]](implicit F: Async[F]) extends SyncConsole[F] {
override def readLineWithCharset(charset: Charset): F[String] =
F.async[String] { cb =>
F.delay(stdinReader.readLineWithCharset(charset, cb)).map { cancel =>
Some(F.delay(cancel.run()))
}
}
}

private object stdinReader extends Thread {

private final class ReadLineRequest(
val charset: Charset,
@volatile var callback: Either[Throwable, String] => Unit
) extends Runnable {
def run() = callback = null
}

private[this] val requests = new LinkedTransferQueue[ReadLineRequest]

def readLineWithCharset(
charset: Charset,
cb: Either[Throwable, String] => Unit): Runnable = {
val request = new ReadLineRequest(charset, cb)
requests.offer(request)
request
}

setName("cats-effect-stdin")
setDaemon(true)
start()

override def run(): Unit = {
var request: ReadLineRequest = null
var charset: Charset = null
var line: Either[Throwable, String] = null

while (true) {
// wait for a non-canceled request. store callback b/c it is volatile read
var callback: Either[Throwable, String] => Unit = null
while ((request eq null) || { callback = request.callback; callback eq null })
request = requests.take()

if (line eq null) { // need a line for current request
charset = request.charset // remember the charset we used
line = Either.catchNonFatal(readLineWithCharsetImpl(charset))
// we just blocked, so loop to possibly freshen current request
} else { // we have a request and a line!
if (request.charset != charset) { // not the charset we have :/
callback(Left(new IllegalStateException(s"Next read must be for $charset line")))
request = null // loop to get a new request that can handle this charset
} else { // happy days!
callback(line)
Comment on lines +90 to +91
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that is nagging at me is that this is a leak here. The callback may have already be canceled, but we are invoking it anyway. There seems no reliable way to know for sure.

I don't see any way to plug this without changing the callback in async { cb => to return a Boolean indicating whether it completed successfully or not. I feel like we've discussed this before.

// reset our state
request = null
charset = null
line = null
}
}

}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2020-2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

import cats.effect.kernel.Sync

private[std] trait ConsoleCompanionPlatform extends ConsoleCompanionCrossPlatform {

/**
* Constructs a `Console` instance for `F` data types that are [[cats.effect.kernel.Sync]].
*/
def make[F[_]](implicit F: Sync[F]): Console[F] =
new SyncConsole[F]

}
126 changes: 64 additions & 62 deletions std/shared/src/main/scala/cats/effect/std/ConsoleCrossPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,69 +173,9 @@ private[std] abstract class ConsoleCompanionCrossPlatform {
]: Console[ReaderWriterStateT[F, E, L, S, *]] =
Console[F].mapK(ReaderWriterStateT.liftK)

private[std] final class SyncConsole[F[_]](implicit F: Sync[F]) extends Console[F] {
private[std] class SyncConsole[F[_]](implicit F: Sync[F]) extends Console[F] {
def readLineWithCharset(charset: Charset): F[String] =
F.interruptible {
val in = System.in
val decoder = charset
.newDecoder()
.onMalformedInput(CodingErrorAction.REPORT)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
val bytes = ByteBuffer.allocate(64)
val builder = new JStringBuilder()

def decodeNext(): CharBuffer = {
bytes.clear()
decodeNextLoop()
}

@tailrec
def decodeNextLoop(): CharBuffer = {
val b = in.read()
if (b == -1) null
else {
bytes.put(b.toByte)
val limit = bytes.limit()
val position = bytes.position()
var result: CharBuffer = null
try {
bytes.flip()
result = decoder.decode(bytes)
} catch {
case _: MalformedInputException =>
bytes.limit(limit)
bytes.position(position)
}
if (result == null) decodeNextLoop() else result
}
}

@tailrec
def loop(): String = {
val buffer = decodeNext()
if (buffer == null) {
val result = builder.toString()
if (result.nonEmpty) result
else throw new EOFException()
} else {
val decoded = buffer.toString()
if (decoded == "\n") {
val len = builder.length()
if (len > 0) {
if (builder.charAt(len - 1) == '\r') {
builder.deleteCharAt(len - 1)
}
}
builder.toString()
} else {
builder.append(decoded)
loop()
}
}
}

loop()
}
F.blocking(readLineWithCharsetImpl(charset))

def print[A](a: A)(implicit S: Show[A] = Show.fromToString[A]): F[Unit] = {
val text = a.show
Expand All @@ -261,6 +201,68 @@ private[std] abstract class ConsoleCompanionCrossPlatform {
F.blocking(t.printStackTrace())
}

private[std] def readLineWithCharsetImpl(charset: Charset): String = {
val in = System.in
val decoder = charset
.newDecoder()
.onMalformedInput(CodingErrorAction.REPORT)
.onUnmappableCharacter(CodingErrorAction.REPLACE)
val bytes = ByteBuffer.allocate(64)
val builder = new JStringBuilder()

def decodeNext(): CharBuffer = {
bytes.clear()
decodeNextLoop()
}

@tailrec
def decodeNextLoop(): CharBuffer = {
val b = in.read()
if (b == -1) null
else {
bytes.put(b.toByte)
val limit = bytes.limit()
val position = bytes.position()
var result: CharBuffer = null
try {
bytes.flip()
result = decoder.decode(bytes)
} catch {
case _: MalformedInputException =>
bytes.limit(limit)
bytes.position(position)
}
if (result == null) decodeNextLoop() else result
}
}

@tailrec
def loop(): String = {
val buffer = decodeNext()
if (buffer == null) {
val result = builder.toString()
if (result.nonEmpty) result
else throw new EOFException()
} else {
val decoded = buffer.toString()
if (decoded == "\n") {
val len = builder.length()
if (len > 0) {
if (builder.charAt(len - 1) == '\r') {
builder.deleteCharAt(len - 1)
}
}
builder.toString()
} else {
builder.append(decoded)
loop()
}
}
}

loop()
}

private[std] def printStackTrace[F[_]](c: Console[F])(t: Throwable): F[Unit] = {
val baos = new ByteArrayOutputStream()
val ps = new PrintStream(baos)
Expand Down
54 changes: 40 additions & 14 deletions tests/jvm/src/test/scala/cats/effect/std/ConsoleJVMSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@ import cats.syntax.all._

import org.specs2.matcher.MatchResult

import scala.concurrent.duration._
import scala.io.Source

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, PrintStream}
import java.io.{
ByteArrayInputStream,
ByteArrayOutputStream,
InputStream,
PipedInputStream,
PipedOutputStream,
PrintStream
}
import java.nio.charset.{Charset, StandardCharsets}

class ConsoleJVMSpec extends BaseSpec {
Expand Down Expand Up @@ -67,6 +75,19 @@ class ConsoleJVMSpec extends BaseSpec {
private def replaceStandardErr(ps: PrintStream): Resource[IO, Unit] =
Resource.make(replace(ps, () => System.err, System.setErr))(restore(_, System.setErr)).void

private def replaceStandardIn(in: InputStream): Resource[IO, Unit] = {
def replace(in: InputStream): IO[InputStream] =
for {
std <- IO(System.in)
_ <- IO(System.setIn(in))
} yield std

def restore(in: InputStream): IO[Unit] =
IO(System.setIn(in))

Resource.make(replace(in))(restore).void
}

private def standardErrTest(io: => IO[Unit]): IO[String] = {
val test = for {
out <- Resource.eval(IO(new ByteArrayOutputStream()))
Expand Down Expand Up @@ -105,19 +126,6 @@ class ConsoleJVMSpec extends BaseSpec {
Resource.make(acquire)(release)
}

def replaceStandardIn(in: InputStream): Resource[IO, Unit] = {
def replace(in: InputStream): IO[InputStream] =
for {
std <- IO(System.in)
_ <- IO(System.setIn(in))
} yield std

def restore(in: InputStream): IO[Unit] =
IO(System.setIn(in))

Resource.make(replace(in))(restore).void
}

val test = for {
in <- inputStream
_ <- replaceStandardIn(in)
Expand Down Expand Up @@ -247,5 +255,23 @@ class ConsoleJVMSpec extends BaseSpec {
val cs = StandardCharsets.UTF_16LE
readLineTest(cs.name(), cs)
}

"readLine is cancelable and does not lose lines" in real {
IO(new PipedOutputStream).flatMap { out =>
IO(new PipedInputStream(out)).flatMap { in =>
replaceStandardIn(in).surround {
for {
read1 <- IO.readLine.timeout(100.millis).attempt
_ <- IO(read1 should beLeft)
_ <- IO(out.write("unblocked\n".getBytes()))
read2 <- Console[IO].readLineWithCharset(StandardCharsets.US_ASCII).attempt
_ <- IO(read2 should beLeft)
read3 <- IO.readLine
_ <- IO(read3 must beEqualTo("unblocked"))
} yield ok
}
}
}
}
}
}