Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add withTimeout combinator #1777

Merged
merged 4 commits into from
Feb 5, 2020

Conversation

Z1kkurat
Copy link
Contributor

@Z1kkurat Z1kkurat commented Feb 2, 2020

On a day-to-day job basis we often use such a combinator for Stream, allowing us to time out and interrupt streams that do not complete within given FiniteDuration. Such a combinator may be useful as a part of the Stream itself

@Z1kkurat Z1kkurat force-pushed the add-with-timeout-combinator branch from 465ddb3 to dead200 Compare February 2, 2020 09:26
@Z1kkurat
Copy link
Contributor Author

Z1kkurat commented Feb 2, 2020

It's strange that 2.12 build has failed, b/c the failure is in the suite that was not touched

@kubukoz
Copy link
Member

kubukoz commented Feb 2, 2020

Isn't this just interruptAfter(n.seconds)?

@Z1kkurat
Copy link
Contributor Author

Z1kkurat commented Feb 2, 2020

@kubukoz seems like interruptAfter just interrupts (stops) stream, not resulting in any error if Stream wasn't completed yet, whereas this combinator will result in TimeoutException on evaluation if stream was not completed within given timeout

@kubukoz
Copy link
Member

kubukoz commented Feb 2, 2020

It won't result in any error indeed, but you can write a timeout operator on top of interruptAfter by checking the exit case, and reusing most of the logic of that (you'll probably need a Deferred with an optional TimeoutException that you'll only complete with the exception in onFinalizeCase if the case is Canceled. I can give you a snippet of what I meant later

@kubukoz
Copy link
Member

kubukoz commented Feb 3, 2020

I meant something like this:

def timeout[F[_]: Concurrent: Timer, A](duration: FiniteDuration): Pipe[F, A, A] = in =>
  Stream.eval(Deferred[F, Either[TimeoutException, Unit]])
    .flatMap { deff =>
      in.onFinalizeCase {
        case ExitCase.Canceled => deff.complete(Left(new TimeoutException(s"Timed out after $duration")))
        case _                 => deff.complete(Right(()))
      }.interruptAfter(duration) ++ Stream.eval_(deff.get.rethrow)
    }

@Z1kkurat
Copy link
Contributor Author

Z1kkurat commented Feb 3, 2020

@kubukoz Thanks! Yes, a solution like that came to my mind, but I thought that we wouldn't be able to distinguish between user-requested cancellation and one caused by the timeout. Am I wrong?

@kubukoz
Copy link
Member

kubukoz commented Feb 3, 2020

Oh, I suppose that might be true, really good point.

How about this...

def timeout[F[_]: Concurrent: Timer, A](duration: FiniteDuration): Pipe[F, A, A] = 
  _.interruptWhen(Stream.sleep[F](duration).as(true))

I don't know why I didn't come up with that sooner.

Edit: no, wait, that's too simple. here we go again...

@Z1kkurat
Copy link
Contributor Author

Z1kkurat commented Feb 3, 2020

This will not result in an error, I guess? That will just interrupt the stream. Maybe something like that will do?

def timeout[F[_]: Concurrent: Timer, A](duration: FiniteDuration): Pipe[F, A, A] = 
  _.interruptWhen(Timer[F].sleep(duration).as(Left(new TimeoutException(s"Timed out after $duration"))))

@kubukoz
Copy link
Member

kubukoz commented Feb 3, 2020

Yeah, that should do it 😅

@Z1kkurat
Copy link
Contributor Author

Z1kkurat commented Feb 3, 2020

What's your opinion on what this operator should be - a function on a Stream or a Pipe? I guess that's just a matter of usage at call-site - Stream.emit(...).timeout(...) vs Stream.emit(...).through(timeout)

@kubukoz
Copy link
Member

kubukoz commented Feb 3, 2020

I think it should be a function on Stream. Such is the convention for these, and we don't really have a good place to put pipes that are that generic in.

@kubukoz kubukoz requested a review from SystemFw February 3, 2020 21:19
@Z1kkurat
Copy link
Contributor Author

Z1kkurat commented Feb 3, 2020

Thank you very much for advice! The implementation became much shorter and clearer 😅

@@ -2713,6 +2714,17 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
)(f: (Stream[F, O], Stream[F2, O2]) => Stream[F2, O3]): Stream[F2, O3] =
f(this, s2)

/** Fails this stream with a [[TimeoutException]] if it does not complete within given `timeout`. */
def timeout[F2[x] >: F[x]](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use context bounds here

@@ -3786,4 +3787,14 @@ class StreamSpec extends Fs2Spec {
}
}
}

"withTimeout" - {
"timeout never-ending stream" in {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind also testing composability of timeouts? i.e. if you do (s1.timeout(d1) ++ s2).timeout(d2), what happens depending on the times of s1, s2, d1 > d2 (if d2 > d1, s1 might make it in time but s2 might not, so we still want to timeout, etc.)

@kubukoz
Copy link
Member

kubukoz commented Feb 3, 2020

Just a couple minor things, and I think we're good to go. I'd also like to see what @SystemFw thinks.

@Z1kkurat
Copy link
Contributor Author

Z1kkurat commented Feb 3, 2020

I added context bounds and a couple of tests as you recommended. It seems like other test cases (with different s1 and s2) are specific sub-cases of these ones and are derived from the composability of Stream. If you have any more test cases in mind - please let me know.

@kubukoz
Copy link
Member

kubukoz commented Feb 3, 2020

Sure, it makes sense. Thanks a lot!

@mpilquist mpilquist merged commit 61b8258 into typelevel:master Feb 5, 2020
@mpilquist mpilquist added this to the 2.3.0 milestone Mar 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants