Skip to content

Commit

Permalink
Document 'withBackoff' srouce/flow/sink (akka#25770)
Browse files Browse the repository at this point in the history
  • Loading branch information
raboof authored and chbatey committed Oct 15, 2018
1 parent f4dd0ac commit 4b012cc
Show file tree
Hide file tree
Showing 16 changed files with 1,068 additions and 837 deletions.
1 change: 1 addition & 0 deletions akka-docs/src/main/categories/error-handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
For more background see the @ref[Error Handling in Streams](../stream-error.md) section.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# RestartFlow.onFailuresWithBackoff

Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @unidoc[Flow] will not restart on completion of the wrapped flow.

@ref[Error handling](../index.md#error-handling)

@@@div { .group-scala }

## Signature

@@signature [RestartFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala) { #onFailuresWithBackoff }

@@@

## Description

This @unidoc[Flow] will not emit any failure
The failures by the wrapped @unidoc[Flow] will be handled by
restarting the wrapping @unidoc[Flow] as long as maxRestarts is not reached.
Any termination signals sent to this @unidoc[Flow] however will terminate the wrapped @unidoc[Flow], if it's
running, and then the @unidoc[Flow] will be allowed to terminate without being restarted.

The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
messages. A termination signal from either end of the wrapped @unidoc[Flow] will cause the other end to be terminated,
and any in transit messages will be lost. During backoff, this @unidoc[Flow] will backpressure.

This uses the same exponential backoff algorithm as @unidoc[Backoff].

## Reactive Streams semantics

@@@div { .callout }

**emits** when the wrapped flow emits

**backpressures** during backoff and when the wrapped flow backpressures

@@@
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# RestartFlow.withBackoff

Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails or complete using an exponential backoff.

@ref[Error handling](../index.md#error-handling)

@@@div { .group-scala }

## Signature

@@signature [RestartFlow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala) { #withBackoff }

@@@

## Description

The resulting @unidoc[Flow] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or
completed. Any termination by the @unidoc[Flow] before that time will be handled by restarting it. Any termination
signals sent to this @unidoc[Flow] however will terminate the wrapped @unidoc[Flow], if it's running, and then the @unidoc[Flow]
will be allowed to terminate without being restarted.

The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
messages. A termination signal from either end of the wrapped @unidoc[Flow] will cause the other end to be terminated,
and any in transit messages will be lost. During backoff, this @unidoc[Flow] will backpressure.

This uses the same exponential backoff algorithm as @unidoc[Backoff].

## Reactive Streams semantics

@@@div { .callout }

**emits** when the wrapped flow emits

**backpressures** during backoff and when the wrapped flow backpressures

**completes** when the wrapped flow completes

@@@
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# RestartSink.withBackoff

Wrap the given @unidoc[Sink] with a @unidoc[Sink] that will restart it when it fails or complete using an exponential backoff.

@ref[Error handling](../index.md#error-handling)

@@@div { .group-scala }

## Signature

@@signature [RestartSink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala) { #withBackoff }

@@@

## Description

This @unidoc[Sink] will never cancel, since cancellation by the wrapped @unidoc[Sink] is always handled by restarting it.
The wrapped @unidoc[Sink] can however be completed by feeding a completion or error into this @unidoc[Sink]. When that
happens, the @unidoc[Sink], if currently running, will terminate and will not be restarted. This can be triggered
simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this @unidoc[Sink] in the
graph.

The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
messages. When the wrapped @unidoc[Sink] does cancel, this @unidoc[Sink] will backpressure, however any elements already
sent may have been lost.

This uses the same exponential backoff algorithm as @unidoc[Backoff].
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# RestartSource.onFailuresWithBackoff

Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails using an exponential backoff.

@ref[Error handling](../index.md#error-handling)

@@@div { .group-scala }

## Signature

@@signature [RestartSource.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala) { #onFailuresWithBackoff }

@@@

## Description

This @unidoc[Source] will never emit a failure, since the failure of the wrapped @unidoc[Source] is always handled by
restarting. The wrapped @unidoc[Source] can be cancelled by cancelling this @unidoc[Source].
When that happens, the wrapped @unidoc[Source], if currently running will be cancelled, and it will not be restarted.
This can be triggered simply by the downstream cancelling, or externally by introducing a @unidoc[KillSwitch] right
after this @unidoc[Source] in the graph.

## Reactive Streams semantics

@@@div { .callout }

**emits** when the wrapped source emits

@@@
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# RestartSource.withBackoff

Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails or complete using an exponential backoff.

@ref[Error handling](../index.md#error-handling)

@@@div { .group-scala }

## Signature

@@signature [RestartSource.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala) { #withBackoff }

@@@

## Description

This @unidoc[Flow] will never emit a complete or failure, since the completion or failure of the wrapped @unidoc[Source]
is always handled by restarting it. The wrapped @unidoc[Source] can however be cancelled by cancelling this @unidoc[Source].
When that happens, the wrapped @unidoc[Source], if currently running will be cancelled, and it will not be restarted.
This can be triggered simply by the downstream cancelling, or externally by introducing a @unidoc[KillSwitch] right
after this @unidoc[Source] in the graph.

This uses the same exponential backoff algorithm as @unidoc[Backoff].

## Reactive Streams semantics

@@@div { .callout }

**emits** when the wrapped source emits

**completes** when the wrapped source completes

@@@
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Scala
Java
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue }

## Reactive Streams Semantics
## Reactive Streams semantics

@@@div { .callout }

Expand Down
17 changes: 17 additions & 0 deletions akka-docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ Operators meant for inter-operating between Akka Streams and Actors:
|ActorSink|<a name="actorref"></a>@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`].|
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.|

## Error handling

For more background see the @ref[Error Handling in Streams](../stream-error.md) section.

| |Operator|Description|
|--|--|--|
|RestartSource|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails using an exponential backoff.|
|RestartFlow|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @unidoc[Flow] will not restart on completion of the wrapped flow.|
|RestartSource|<a name="withbackoff"></a>@ref[withBackoff](RestartSource/withBackoff.md)|Wrap the given @unidoc[Source] with a @unidoc[Source] that will restart it when it fails or complete using an exponential backoff.|
|RestartFlow|<a name="withbackoff"></a>@ref[withBackoff](RestartFlow/withBackoff.md)|Wrap the given @unidoc[Flow] with a @unidoc[Flow] that will restart it when it fails or complete using an exponential backoff.|
|RestartSink|<a name="withbackoff"></a>@ref[withBackoff](RestartSink/withBackoff.md)|Wrap the given @unidoc[Sink] with a @unidoc[Sink] that will restart it when it fails or complete using an exponential backoff.|

@@@ index

* [combine](Source/combine.md)
Expand Down Expand Up @@ -408,6 +420,11 @@ Operators meant for inter-operating between Akka Streams and Actors:
* [fromPath](FileIO/fromPath.md)
* [toFile](FileIO/toFile.md)
* [toPath](FileIO/toPath.md)
* [withBackoff](RestartSource/withBackoff.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [withBackoff](RestartFlow/withBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
* [withBackoff](RestartSink/withBackoff.md)
* [ask](ActorFlow/ask.md)
* [actorRef](ActorSink/actorRef.md)

Expand Down
Loading

0 comments on commit 4b012cc

Please sign in to comment.