Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.4.22 Release #5170

Merged
merged 40 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
623140a
Added v1.4.22 placeholder for nightlies (#5097)
Aaronontheweb Jun 16, 2021
b297c02
Use ranged nuget versioning for Newtonsoft.Json (#5099)
Arkatufus Jun 19, 2021
43bfec7
[FLAKY TESTS] Update and clean up DilatedTests (#5104)
Arkatufus Jun 23, 2021
d9242bf
Make deserialization message more verbose to help with debugging. (#5…
Arkatufus Jun 23, 2021
eb19cc4
[FLAKY TESTS] Update and clean up FlowThrottleSpec (#5107)
Arkatufus Jun 23, 2021
f5e1b4a
Add conditional skip to BackoffOnRestart if we can't satisfy the time…
Arkatufus Jun 24, 2021
38b7443
[FLAKY TESTS] Fix several flaky specs (#5110)
Arkatufus Jun 28, 2021
dba9097
[ShardedDaemonProcess] Support for custom stop message (#5112)
ismaelhamed Jun 28, 2021
4a53729
Reduce AwaitAssert default interval to help tests that have tight tim…
Arkatufus Jun 29, 2021
188657d
IsNobody for remote DeadLetters (#5114)
Zetanova Jun 30, 2021
a4d5491
Bump Hyperion from 0.10.1 to 0.10.2 (#5115)
dependabot[bot] Jul 1, 2021
bf8a983
Bump Google.Protobuf from 3.17.2 to 3.17.3 (#5080)
dependabot[bot] Jul 5, 2021
e5c0946
log full exception upon deserialization failure (#5121)
Aaronontheweb Jul 5, 2021
1c3eaf0
`MessageContainerSerializer` reports on serializerId and manifest of …
Aaronontheweb Jul 6, 2021
78dd689
Fix persistence cluster sharding serialization failure (#5125)
Arkatufus Jul 6, 2021
951be4a
Change references to MyGet in the nightly nuget package documentation…
Arkatufus Jul 6, 2021
0a21bc2
include attempted bind address in DotNetty binding failure messages (…
Aaronontheweb Jul 6, 2021
0b64872
Fix actor TestActorRef<> loosing Self reference in async context (#4861)
IgorFedchenko Jul 6, 2021
303d632
Pipe of Canceled Tasks (#5123)
Zetanova Jul 8, 2021
1927ac1
Akka.Cluster.Sharding should not use Hyperion serializer (#5140)
Arkatufus Jul 13, 2021
d81767e
Restart(Source|Flow|Sink): Configurable stream restart deadline (#5122)
ismaelhamed Jul 13, 2021
c835e85
Proposes a new combinator, the most common partition-case, as divertT…
ismaelhamed Jul 14, 2021
731107f
ddata replicator stops but doesn't look like it can be restarted easi…
andyfurnival Jul 14, 2021
b4f3ac4
Add a HOCON setting to control DData replicator restart on failure (#…
Arkatufus Jul 14, 2021
8d81e5e
Reworked `Ask<T>` to allocate fewer objects, `await` less (#5098)
Aaronontheweb Jul 16, 2021
fb9a2d5
Fix typo in logging (#5151)
Arkatufus Jul 23, 2021
f2b99b4
Fixed CB open state should return a faulted Task instead of throwing …
ismaelhamed Jul 25, 2021
a4e26fb
Catch possible InvalidCastException thrown from inside the Hyperion s…
Arkatufus Jul 26, 2021
620ef56
Add Hyperion surrogate option to HOCON and HyperionSerializerSetup (#…
Arkatufus Jul 28, 2021
fb5d79c
fixed racy Akka.DistributedData.Tests.ReplicatorResiliencySpecs (#5162)
Aaronontheweb Aug 2, 2021
044622f
ddata ReadMajorityPlus and WriteMajorityPlus (#5146)
zbynek001 Aug 2, 2021
6730175
Change comparator from >= to > (#5163)
Arkatufus Aug 3, 2021
23aead4
exposed `CreateRemoteRef` method overload for creating remotely-deplo…
Aaronontheweb Aug 3, 2021
e873ecb
SBR fix & update (#5147)
zbynek001 Aug 3, 2021
f759a1e
Bump Microsoft.Data.SQLite from 5.0.7 to 5.0.8 (#5141)
dependabot[bot] Aug 3, 2021
b1c9085
Bump Microsoft.Extensions.DependencyInjection from 5.0.1 to 5.0.2 (#5…
dependabot[bot] Aug 3, 2021
6967338
Add a way to signal a manual flush to stream FileSubscriber (#4871)
Arkatufus Aug 3, 2021
743c0d0
Bump Fsharp.Core from 5.0.1 to 5.0.2 (#5106)
dependabot[bot] Aug 4, 2021
d8841d6
updated Akka.Cluster.Sharding docs to mention Akka.Cluster.Sharding.R…
Aaronontheweb Aug 5, 2021
ca94fcc
added v1.4.22 release notes (#5169)
Aaronontheweb Aug 5, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,59 @@
#### 1.4.22 August 05 2021 ####
**Maintenance Release for Akka.NET 1.4**

Akka.NET v1.4.22 is a fairly large release that includes an assortment of performance and bug fixes.

**Performance Fixes**
Akka.NET v1.4.22 includes a _significant_ performance improvement for `Ask<T>`, which now requires 1 internal `await` operation instead of 3:

*Before*

| Method | Iterations | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
|------------------------------ |----------- |----------:|----------:|----------:|----------:|------:|------:|----------:|
| RequestResponseActorSelection | 10000 | 83.313 ms | 0.7553 ms | 0.7065 ms | 4666.6667 | - | - | 19 MB |
| CreateActorSelection | 10000 | 5.572 ms | 0.1066 ms | 0.1140 ms | 953.1250 | - | - | 4 MB |


*After*

| Method | Iterations | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
|------------------------------ |----------- |----------:|----------:|----------:|----------:|------:|------:|----------:|
| RequestResponseActorSelection | 10000 | 71.216 ms | 0.9885 ms | 0.9246 ms | 4285.7143 | - | - | 17 MB |
| CreateActorSelection | 10000 | 5.462 ms | 0.0495 ms | 0.0439 ms | 953.1250 | - | - | 4 MB |

**Bug Fixes and Improvements**

* [Akka: Use ranged nuget versioning for Newtonsoft.Json](https://github.com/akkadotnet/akka.net/pull/5099)
* [Akka: Pipe of Canceled Tasks](https://github.com/akkadotnet/akka.net/pull/5123)
* [Akka: CircuitBreaker's Open state should return a faulted Task instead of throwing](https://github.com/akkadotnet/akka.net/issues/5117)
* [Akka.Remote: Can DotNetty socket exception include information about the address?](https://github.com/akkadotnet/akka.net/issues/5130)
* [Akka.Remote: log full exception upon deserialization failure](https://github.com/akkadotnet/akka.net/pull/5121)
* [Akka.Cluster: SBR fix & update](https://github.com/akkadotnet/akka.net/pull/5147)
* [Akka.Streams: Restart Source|Flow|Sink: Configurable stream restart deadline](https://github.com/akkadotnet/akka.net/pull/5122)
* [Akka.DistributedData: ddata replicator stops but doesn't look like it can be restarted easily](https://github.com/akkadotnet/akka.net/pull/5145)
* [Akka.DistributedData: ddata ReadMajorityPlus and WriteMajorityPlus](https://github.com/akkadotnet/akka.net/pull/5146)
* [Akka.DistributedData: DData Max-Delta-Elements may not be fully honoured](https://github.com/akkadotnet/akka.net/issues/5157)

You can [see the full set of changes introduced in Akka.NET v1.4.22 here](https://github.com/akkadotnet/akka.net/milestone/52)

**Akka.Cluster.Sharding.RepairTool**
In addition to the work done on Akka.NET itself, we've also created a separate tool for cleaning up any left-over data in the event of an Akka.Cluster.Sharding cluster running with `akka.cluster.sharding.state-store-mode=persistence` was terminated abruptly before it had a chance to cleanup.

We've added documentation to the Akka.NET website that explains how to use this tool here: https://getakka.net/articles/clustering/cluster-sharding.html#cleaning-up-akkapersistence-shard-state

And the tool itself has documentation here: https://github.com/petabridge/Akka.Cluster.Sharding.RepairTool

| COMMITS | LOC+ | LOC- | AUTHOR |
| --- | --- | --- | --- |
| 16 | 1254 | 160 | Gregorius Soedharmo |
| 7 | 104 | 83 | Aaron Stannard |
| 5 | 8 | 8 | dependabot[bot] |
| 4 | 876 | 302 | Ismael Hamed |
| 2 | 3942 | 716 | zbynek001 |
| 2 | 17 | 3 | Andreas Dirnberger |
| 1 | 187 | 2 | andyfurnival |
| 1 | 110 | 5 | Igor Fedchenko |

#### 1.4.21 June 16 2021 ####
**Maintenance Release for Akka.NET 1.4**

Expand Down
46 changes: 32 additions & 14 deletions docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public sealed class MessageExtractor : IMessageExtractor
// register actor type as a sharded entity
var region = await ClusterSharding.Get(system).StartAsync(
typeName: "my-actor",
entityProps: Props.Create<MyActor>(),
entityPropsFactory: s => Props.Create(() => new MyActor(s)),
settings: ClusterShardingSettings.Create(system),
messageExtractor: new MessageExtractor());

Expand All @@ -55,6 +55,10 @@ In this example, we first specify way to resolve our message recipients in conte

Second part of an example is registering custom actor type as sharded entity using `ClusterSharding.Start` or `ClusterSharding.StartAsync` methods. Result is the `IActorRef` to shard region used to communicate between current actor system and target entities. Shard region must be specified once per each type on each node, that is expected to participate in sharding entities of that type. Keep in mind, that it's recommended to wait for the current node to first fully join the cluster before initializing a shard regions in order to avoid potential timeouts.

> N.B. Sharded entity actors are automatically created by the Akka.Cluster.Sharding guardian actor hierarchy, hence why they live under the `/system` portion of the actor hierarchy. This is done intentionally - in the event of an `ActorSystem` termination the `/user` side of the actor hierachy is always terminated first before the `/system` actors are.
>
> Therefore, this design gives the sharding system a chance to hand over all of the sharded entity actors running on the terminating node over to the other remaining nodes in the cluster.

In some cases, the actor may need to know the `entityId` associated with it. This can be achieved using the `entityPropsFactory` parameter to `ClusterSharding.Start` or `ClusterSharding.StartAsync`. The entity ID will be passed to the factory as a parameter, which can then be used in the creation of the actor.

In case when you want to send message to entities from specific node, but you don't want that node to participate in sharding itself, you can use `ShardRegionProxy` for that.
Expand Down Expand Up @@ -84,7 +88,7 @@ To reduce memory consumption, you may decide to stop entities after some period

The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.PassivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`. It is always disabled if [Remembering Entities](#remembering-entities) is enabled.

## Remembering entities
## Remembering Entities

By default, when a shard is rebalanced to another node, the entities it stored before migration, are NOT started immediately after. Instead they are recreated ad-hoc, when new messages are incoming. This behavior can be modified by `akka.cluster.sharding.remember-entities = true` configuration. It will instruct shards to keep their state between rebalances - it also comes with extra cost due to necessity of persisting information about started/stopped entities. Additionally a message extractor logic must be aware of `ShardRegion.StartEntity` message:

Expand Down Expand Up @@ -115,7 +119,7 @@ public sealed class MessageExtractor : HashCodeMessageExtractor

Using `ShardRegion.StartEntity` implies, that you're able to infer a shard id given an entity id alone. For this reason, in example above we modified a cluster sharding routing logic to make use of `HashCodeMessageExtractor` - in this variant, shard id doesn't have to be provided explicitly, as it will be computed from the hash of entity id itself. Notice a `maxNumberOfShards`, which is the maximum available number of shards allowed for this type of an actor - this value must never change during a single lifetime of a cluster.

### Terminating remembered entities
### Terminating Remembered Entities
One complication that `akka.cluster.sharding.remember-entities = true` introduces is that your sharded entity actors can no longer be terminated through the normal Akka.NET channels, i.e. `Context.Stop(Self)`, `PoisonPill.Instance`, and the like. This is because as part of the `remember-entities` contract - the sharding system is going to insist on keeping all remembered entities alive until explictily told to stop.

To terminate a remembered entity, the sharded entity actor needs to send a [`Passivate` command](xref:Akka.Cluster.Sharding.Passivate) _to its parent actor_ in order to signal to the sharding system that we no longer need to remember this particular entity.
Expand Down Expand Up @@ -152,22 +156,28 @@ It is common to simply use `Context.Parent.Tell(new Passivate(PoisonPill.Instanc

To recreate a remembered entity actor after it has been passivated all you have to do is message the `ShardRegion` actor with a message containing the entity's `EntityId` again just like how you instantiated the actor the first time.

## Retrieving sharding state
## Retrieving Sharding State

You can inspect current sharding stats by using following messages:

- On `GetShardRegionState` shard region will reply with `ShardRegionState` containing data about shards living in the current actor system and what entities are alive on each one of them.
- On `GetClusterShardingStats` shard region will reply with `ClusterShardingStats` having information about shards living in the whole cluster and how many entities alive in each one of them.

## Integrating cluster sharding with persistent actors
## Integrating Cluster Sharding with Persistent Actors

One of the most common scenarios, where cluster sharding is used, is to combine them with eventsourced persistent actors from [Akka.Persistence](xref:persistence-architecture) module. However as the entities are incarnated automatically based on provided props, specifying a dedicated, static unique `PersistenceId` for each entity may seem troublesome.
One of the most common scenarios, where cluster sharding is used, is to combine them with eventsourced persistent actors from [Akka.Persistence](xref:persistence-architecture) module.

This can be resolved by getting information about shard/entity ids directly from actor's path and constructing unique id from it. For each entity actor path will follow */system/{typeName}/{shardId}/{entityId}* pattern, where *{typeName}* was the parameter provided to `ClusterSharding.Start` method, while *{shardId}* and *{entityId}* where strings returned by message extractor logic.
Entity actors are instantiated automatically by Akka.Cluster.Sharding - but in order for persistent actors to recover and persist their state correctly they must be given a globally unique `PersistentId`. This can be most easily accomplished using the `entityPropsFactory` overload on the `Sharding.Start` call used to create a new `ShardRegion`:

> N.B. Sharded entity actors are automatically created by the Akka.Cluster.Sharding guardian actor hierarchy, hence why they live under the `/system` portion of the actor hierarchy. This is done intentionally - in the event of an `ActorSystem` termination the `/user` side of the actor hierachy is always terminated first before the `/system` actors are.
>
> Therefore, this design gives the sharding system a chance to hand over all of the sharded entity actors running on the terminating node over to the other remaining nodes in the cluster.
```csharp
// register actor type as a sharded entity
var region = ClusterSharding.Get(system).Start(
typeName: "aggregate",
entityPropsFactory: s => Props.Create(() => new Aggregate(s)),
settings: ClusterShardingSettings.Create(system),
messageExtractor: new MessageExtractor());

```

Given these values we can build consistent, unique `PersistenceId`s on the fly using the `entityId` (the expectation is that `entityId` are globally unique) as in the following example:

Expand All @@ -176,11 +186,19 @@ public class Aggregate : PersistentActor
{
public override string PersistenceId { get; }

public Aggregate()
// Passed in via entityPropsFactory via the ShardRegion
public Aggregate(string persistentId)
{
PersistenceId = Self.Path.Name;
PersistenceId = persistentId;
}

...
// rest of class
}
```
```

## Cleaning Up Akka.Persistence Shard State
In the normal operation of an Akka.NET cluster, the sharding system automatically terminates and rebalances Akka.Cluster.Sharding regions gracefully whenever an `ActorSystem` terminates.

However, in the event that an `ActorSystem` is aborted as a result of a process / hardware failure it's possible that when using `akka.cluster.sharding.state-store-mode=persistence` leftover sharding data can still be present inside the Akka.Persistence journal and snapshot store - which will prevent the Akka.Cluster.Sharding system from recovering and starting up correctly the next time it's launched.

This is a _rare_, but not impossible occurence. In the event that this happens you'll need to purge the old Akka.Cluster.Sharding data before restarting the sharding system. You can purge this data automatically by [using the Akka.Cluster.Sharding.RepairTool](https://github.com/petabridge/Akka.Cluster.Sharding.RepairTool) produced by [Petabridge](https://petabridge.com/).
106 changes: 106 additions & 0 deletions docs/articles/networking/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,109 @@ var system = ActorSystem.Create("actorSystem", bootstrap);
```

In the example above, we're using compiler directives to make sure that the correct name transform are used during compilation.

## Complex object serialization using Hyperion

One of the limitation of a reflection based serializer is that it would fail to serialize
objects with complex internal looping references in its properties or fields and ended up throwing
a stack overflow exception as it tries to recurse through all the looping references, for example,
an `XmlDocument` class, but we needed to send them over the wire to another remote node in our
cluster.

While having a very complex internal structure, an `XmlDocument` object can be simplified into a
string that can be sent safely across the wire, but we would need to create a special code that
handles all of `XmlDocument` occurrences or make it so that it is stored in string format inside
our messages, and converting XML documents every time we needed to access this information is
an expensive operation that we would like to avoid while working with our code.

Hyperion introduces a simple adapter called `Surrogate` that can help with de/serializing these
type of complex objects as a man in the middle, intercepting the type and de/serialize them into
the much simpler type for wire transfer.

For this example, we would use these two classes, the class `Foo` is an imaginary "complex" class
that we want to send across the wire and the class `FooSurrogate` is the actual class that we're
serializing and send across the wire:

```c#
public class Foo
{
public Foo(string bar)
{
Bar = bar;
ComplexProperty = ComputeComplexProperty();
}

public string Bar { get; }
public HighlyComplexComputedProperty ComplexProperty { get; }

private ComputeComplexProperty()
{
// ...
}
}

public class FooSurrogate
{
public FooSurrogate(string bar)
{
Bar = bar;
}

public string Bar { get; }
}
```

### Creating and declaring `Surrogate`s via HOCON

To create a serializer surrogate in HOCON, we would first create a class that inherits from
the `Surrogate` class:

```c#
public class FooHyperionSurrogate : Surrogate
{
public FooHyperionSurrogate()
{
From = typeof(Foo);
To = typeof(FooSurrogate);
ToSurrogate = obj => new FooSurrogate(((Foo)obj).Bar);
FromSurrogate = obj => new Foo(((FooSurrogate)obj).Bar);
}
}
```

This class will inform the Hyperion serializer to intercept any `Foo` class and instead of
reflecting actual fields and properties of `Foo` class, it will use the much simpler
`FooSurrogate` class instead. To tell Hyperion to use this information, we need to pass the
surrogate information inside the HOCON settings:

```
akka.actor {
serializers.hyperion = ""Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion""
serialization-bindings {
""System.Object"" = hyperion
}
serialization-settings.hyperion {
surrogates = [
""MyAssembly.FooHyperionSurrogate, MyAssembly""
]
}
}
```

### Creating and declaring `Surrogate`s programatically using `HyperionSerializerSetup`

We can also use `HyperionSerializerSetup` to declare our surrogates:

```c#
var hyperionSetup = HyperionSerializerSetup.Empty
.WithSurrogates(new [] { Surrogate.Create<Foo, FooSurrogate>(
foo => new FooSurrogate(foo.Bar),
surrogate => new Foo(surrogate.Bar))
});

var bootstrap = BootstrapSetup.Create().And(hyperionSetup);
var system = ActorSystem.Create("actorSystem", bootstrap);
```

Note that we do not need to declare any bindings in HOCON for this to work, and if you do,
`HyperionSerializerSetup` will override the HOCON settings with the one programatically declared.
9 changes: 9 additions & 0 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,15 @@ a function has to be provided to calculate the individual cost of each element.

**completes** when upstream completes

### DivertTo

Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.

**emits** when the chosen output stops backpressuring and there is an input element available

**backpressures** when the chosen output backpressures

**completes** when upstream completes and no output is pending

# Asynchronous processing stages

Expand Down
10 changes: 10 additions & 0 deletions docs/articles/streams/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ when a WebSocket connection fails due to the HTTP server it's running on going d
By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side.

The various restart shapes mentioned all expect an `Akka.Stream.RestartSettings` which configures the restart behavior.

Configurable parameters are:

* `minBackoff` is the initial duration until the underlying stream is restarted
* `maxBackoff` caps the exponential backoff
* `randomFactor` allows addition of a random delay following backoff calculation
* `maxRestarts` caps the total number of restarts
* `maxRestartsWithin` sets a timeframe during which restarts are counted towards the same total for `maxRestarts`

The following snippet shows how to create a backoff supervisor using `Akka.Streams.Dsl.RestartSource`
which will supervise the given `Source`. The `Source` in this case is a
`HttpResponseMessage`, produced by `HttpCLient`. If the stream fails or completes at any point, the request will
Expand Down
Loading