Skip to content

Commit

Permalink
Akka.Persistence: Made DateTime.UtcNow the default timestamp for `S…
Browse files Browse the repository at this point in the history
…napshotMetdata` (#7313)

* Made `DateTime.UtcNow` the default timestamp for `SnapshotMetdata`

* fixed all `SnapshotMetadata` calls

* added API approvals

* standardized on `Sys.Scheduler.Now.DateTime`

* Fix SQL query error in QueryExecutor

* Update API Approval list

---------

Co-authored-by: Gregorius Soedharmo <[email protected]>
  • Loading branch information
Aaronontheweb and Arkatufus authored Aug 8, 2024
1 parent 5e7be3a commit d6a8063
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public virtual async Task DeleteAsync(
await connection.ExecuteInTransaction(WriteIsolationLevel, cancellationToken, async (tx, token) =>
{
var sql = timestamp.HasValue
? DeleteSnapshotRangeSql + " AND { Configuration.TimestampColumnName} = @Timestamp"
? DeleteSnapshotSql + $" AND {Configuration.TimestampColumnName} <= @Timestamp"
: DeleteSnapshotSql;
using var command = GetCommand(connection, sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,8 @@ namespace Akka.Persistence
}
public sealed class SnapshotMetadata : System.IEquatable<Akka.Persistence.SnapshotMetadata>
{
public static System.DateTime TimestampNotSpecified;
[System.ObsoleteAttribute("This constructor is deprecated and will be removed in v1.6. Use the constructor w" +
"ith the timestamp parameter instead. Since v1.5.28", true)]
public SnapshotMetadata(string persistenceId, long sequenceNr) { }
[Newtonsoft.Json.JsonConstructorAttribute()]
public SnapshotMetadata(string persistenceId, long sequenceNr, System.DateTime timestamp) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,8 @@ namespace Akka.Persistence
}
public sealed class SnapshotMetadata : System.IEquatable<Akka.Persistence.SnapshotMetadata>
{
public static System.DateTime TimestampNotSpecified;
[System.ObsoleteAttribute("This constructor is deprecated and will be removed in v1.6. Use the constructor w" +
"ith the timestamp parameter instead. Since v1.5.28", true)]
public SnapshotMetadata(string persistenceId, long sequenceNr) { }
[Newtonsoft.Json.JsonConstructorAttribute()]
public SnapshotMetadata(string persistenceId, long sequenceNr, System.DateTime timestamp) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected override void AfterAll()
public void LocalSnapshotStore_can_snapshot_actors_with_PersistenceId_containing_invalid_path_characters()
{
var pid = @"p\/:*?-1";
SnapshotStore.Tell(new SaveSnapshot(new SnapshotMetadata(pid, 1), "sample data"), TestActor);
SnapshotStore.Tell(new SaveSnapshot(new SnapshotMetadata(pid, 1, Sys.Scheduler.Now.DateTime), "sample data"), TestActor);
ExpectMsg<SaveSnapshotSuccess>();

SnapshotStore.Tell(new LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, long.MaxValue), TestActor);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Persistence.TCK/Query/PersistenceIdsSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ protected IActorRef WriteSnapshot(string persistenceId, int n)
ExpectMsg($"{persistenceId}-{i}-done");
}

var metadata = new SnapshotMetadata(persistenceId, n + 10);
var metadata = new SnapshotMetadata(persistenceId, n + 10, Sys.Scheduler.Now.DateTime);
SnapshotStore.Tell(new SaveSnapshot(metadata, $"s-{n}"), _senderProbe.Ref);
_senderProbe.ExpectMsg<SaveSnapshotSuccess>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public virtual void SnapshotStore_should_serialize_Payload()

var snapshot = new Test.MySnapshot("a");

var metadata = new SnapshotMetadata(Pid, 1);
var metadata = new SnapshotMetadata(Pid, 1, Sys.Scheduler.Now.DateTime);
SnapshotStore.Tell(new SaveSnapshot(metadata, snapshot), probe.Ref);
probe.ExpectMsg<SaveSnapshotSuccess>();

Expand All @@ -85,7 +85,7 @@ public virtual void SnapshotStore_should_serialize_Payload_with_string_manifest(

var snapshot = new Test.MySnapshot2("a");

var metadata = new SnapshotMetadata(Pid, 1);
var metadata = new SnapshotMetadata(Pid, 1, Sys.Scheduler.Now.DateTime);
SnapshotStore.Tell(new SaveSnapshot(metadata, snapshot), probe.Ref);
probe.ExpectMsg<SaveSnapshotSuccess>();

Expand All @@ -107,7 +107,7 @@ public virtual void SnapshotStore_should_serialize_AtLeastOnceDeliverySnapshot()
};
var atLeastOnceDeliverySnapshot = new AtLeastOnceDeliverySnapshot(17, unconfirmed);

var metadata = new SnapshotMetadata(Pid, 2);
var metadata = new SnapshotMetadata(Pid, 2, Sys.Scheduler.Now.DateTime);
SnapshotStore.Tell(new SaveSnapshot(metadata, atLeastOnceDeliverySnapshot), probe.Ref);
probe.ExpectMsg<SaveSnapshotSuccess>();

Expand All @@ -123,7 +123,7 @@ public virtual void SnapshotStore_should_serialize_AtLeastOnceDeliverySnapshot_w
var unconfirmed = Array.Empty<UnconfirmedDelivery>();
var atLeastOnceDeliverySnapshot = new AtLeastOnceDeliverySnapshot(13, unconfirmed);

var metadata = new SnapshotMetadata(Pid, 2);
var metadata = new SnapshotMetadata(Pid, 2, Sys.Scheduler.Now.DateTime);
SnapshotStore.Tell(new SaveSnapshot(metadata, atLeastOnceDeliverySnapshot), probe.Ref);
probe.ExpectMsg<SaveSnapshotSuccess>();

Expand All @@ -138,7 +138,7 @@ public virtual void SnapshotStore_should_serialize_PersistentFSMSnapshot()

var persistentFSMSnapshot = new PersistentFSM.PersistentFSMSnapshot<string>("mystate", "mydata", TimeSpan.FromDays(4));

var metadata = new SnapshotMetadata(Pid, 2);
var metadata = new SnapshotMetadata(Pid, 2, Sys.Scheduler.Now.DateTime);
SnapshotStore.Tell(new SaveSnapshot(metadata, persistentFSMSnapshot), probe.Ref);
probe.ExpectMsg<SaveSnapshotSuccess>();

Expand Down
59 changes: 55 additions & 4 deletions src/core/Akka.Persistence.TCK/Snapshot/SnapshotStoreSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Akka.Persistence.Fsm;
using Akka.Persistence.TCK.Serialization;
using Akka.TestKit;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -100,7 +101,7 @@ private IEnumerable<SnapshotMetadata> WriteSnapshots()
{
for (int i = 1; i <= 5; i++)
{
var metadata = new SnapshotMetadata(Pid, i + 10);
var metadata = new SnapshotMetadata(Pid, i + 10, Sys.Scheduler.Now.DateTime);
SnapshotStore.Tell(new SaveSnapshot(metadata, $"s-{i}"), _senderProbe.Ref);
yield return _senderProbe.ExpectMsg<SaveSnapshotSuccess>().Metadata;
}
Expand Down Expand Up @@ -177,11 +178,13 @@ public virtual void SnapshotStore_should_load_the_most_recent_snapshot_matching_
&& result.Snapshot.Snapshot.ToString() == "s-3");
}

// Issue #7312
// Backward compatibility check, SnapshotMetadata .ctor should work if we pass in UtcNow
[Fact]
public virtual void SnapshotStore_should_delete_a_single_snapshot_identified_by_SequenceNr_in_snapshot_metadata()
{
var md = Metadata[2];
md = new SnapshotMetadata(md.PersistenceId, md.SequenceNr); // don't care about timestamp for delete of a single snap
md = new SnapshotMetadata(md.PersistenceId, md.SequenceNr, Sys.Scheduler.Now.DateTime);
var command = new DeleteSnapshot(md);
var sub = CreateTestProbe();

Expand All @@ -198,6 +201,54 @@ public virtual void SnapshotStore_should_delete_a_single_snapshot_identified_by_
&& result.Snapshot.Snapshot.ToString() == "s-2");
}

// Issue #7312
// Backward compatibility check, old SnapshotMetadata .ctor default value should work as expected
[Fact]
public virtual void SnapshotStore_should_delete_a_single_snapshot_identified_by_SequenceNr_in_snapshot_metadata_if_timestamp_is_MinValue()
{
var md = Metadata[2];
// In previous incarnation, timestamp argument defaults to DateTime.MinValue
md = new SnapshotMetadata(md.PersistenceId, md.SequenceNr, DateTime.MinValue);
var command = new DeleteSnapshot(md);
var sub = CreateTestProbe();

Subscribe<DeleteSnapshot>(sub.Ref);
SnapshotStore.Tell(command, _senderProbe.Ref);
sub.ExpectMsg(command);
_senderProbe.ExpectMsg<DeleteSnapshotSuccess>();

SnapshotStore.Tell(new LoadSnapshot(Pid, new SnapshotSelectionCriteria(md.SequenceNr), long.MaxValue), _senderProbe.Ref);
_senderProbe.ExpectMsg<LoadSnapshotResult>(result =>
result.ToSequenceNr == long.MaxValue
&& result.Snapshot != null
&& result.Snapshot.Metadata.Equals(Metadata[1])
&& result.Snapshot.Snapshot.ToString() == "s-2");
}

// Issue #7312, this is a side effect of the ctor signature changes
// DeleteSnapshot should not delete snapshot if timestamp value does not meet deletion criteria
[Fact]
public virtual void SnapshotStore_should_not_delete_snapshot_identified_by_SequenceNr_if_metadata_timestamp_is_less_than_stored_timestamp()
{
var md = Metadata[2];
// timestamp argument is less than the actual metadata data stored in the database, no deletion occured
md = new SnapshotMetadata(md.PersistenceId, md.SequenceNr, md.Timestamp - 2.Seconds());
var command = new DeleteSnapshot(md);
var sub = CreateTestProbe();

Subscribe<DeleteSnapshot>(sub.Ref);
SnapshotStore.Tell(command, _senderProbe.Ref);
sub.ExpectMsg(command);
_senderProbe.ExpectMsg<DeleteSnapshotSuccess>();

SnapshotStore.Tell(new LoadSnapshot(Pid, new SnapshotSelectionCriteria(md.SequenceNr), long.MaxValue), _senderProbe.Ref);
_senderProbe.ExpectMsg<LoadSnapshotResult>(result =>
result.ToSequenceNr == long.MaxValue
&& result.Snapshot != null
&& result.Snapshot.Metadata.Equals(Metadata[2])
&& result.Snapshot.Snapshot.ToString() == "s-3");
}

[Fact]
public virtual void SnapshotStore_should_delete_all_snapshots_matching_upper_sequence_number_and_timestamp_bounds()
{
Expand Down Expand Up @@ -260,7 +311,7 @@ public virtual void SnapshotStore_should_save_and_overwrite_snapshot_with_same_s
[Fact]
public virtual void SnapshotStore_should_save_bigger_size_snapshot()
{
var metadata = new SnapshotMetadata(Pid, 100);
var metadata = new SnapshotMetadata(Pid, 100, Sys.Scheduler.Now.DateTime);
var bigSnapshot = new byte[SnapshotByteSizeLimit];
new Random().NextBytes(bigSnapshot);
SnapshotStore.Tell(new SaveSnapshot(metadata, bigSnapshot), _senderProbe.Ref);
Expand All @@ -274,7 +325,7 @@ public virtual void ShouldSerializeSnapshots()
if (!SupportsSerialization) return;

var probe = CreateTestProbe();
var metadata = new SnapshotMetadata(Pid, 100L);
var metadata = new SnapshotMetadata(Pid, 100L, Sys.Scheduler.Now.DateTime);
var snap = new TestPayload(probe.Ref);

SnapshotStore.Tell(new SaveSnapshot(metadata, snap), _senderProbe.Ref);
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void LoadSnapshot(string persistenceId, SnapshotSelectionCriteria criteri
/// <param name="snapshot">TBD</param>
public void SaveSnapshot(object snapshot)
{
SnapshotStore.Tell(new SaveSnapshot(new SnapshotMetadata(SnapshotterId, SnapshotSequenceNr), snapshot));
SnapshotStore.Tell(new SaveSnapshot(new SnapshotMetadata(SnapshotterId, SnapshotSequenceNr, Context.System.Scheduler.Now.Date), snapshot));
}

/// <summary>
Expand All @@ -230,7 +230,7 @@ public void SaveSnapshot(object snapshot)
/// <param name="sequenceNr">TBD</param>
public void DeleteSnapshot(long sequenceNr)
{
SnapshotStore.Tell(new DeleteSnapshot(new SnapshotMetadata(SnapshotterId, sequenceNr)));
SnapshotStore.Tell(new DeleteSnapshot(new SnapshotMetadata(SnapshotterId, sequenceNr, DateTime.MinValue)));
}

/// <summary>
Expand Down
22 changes: 12 additions & 10 deletions src/core/Akka.Persistence/SnapshotProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,28 @@ public int Compare(SnapshotMetadata x, SnapshotMetadata y)
}

/// <summary>
/// TBD
/// The singleton comparer instance.
/// </summary>
public static IComparer<SnapshotMetadata> Comparer { get; } = new SnapshotMetadataComparer();

/// <summary>
/// TBD
/// </summary>
public static DateTime TimestampNotSpecified = DateTime.MinValue;

/// <summary>
/// Initializes a new instance of the <see cref="SnapshotMetadata"/> class.
/// </summary>
/// <param name="persistenceId">The id of the persistent actor fro which the snapshot was taken.</param>
/// <param name="sequenceNr">The sequence number at which the snapshot was taken.</param>
[Obsolete("This constructor is deprecated and will be removed in v1.6. Use the constructor with the timestamp parameter instead. Since v1.5.28", true)]
public SnapshotMetadata(string persistenceId, long sequenceNr)
: this(persistenceId, sequenceNr, TimestampNotSpecified)
: this(persistenceId, sequenceNr, DateTime.UtcNow)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SnapshotMetadata"/> class.
/// </summary>
/// <param name="persistenceId">The id of the persistent actor fro mwhich the snapshot was taken.</param>
/// <param name="persistenceId">The id of the persistent actor from which the snapshot was taken.</param>
/// <param name="sequenceNr">The sequence number at which the snapshot was taken.</param>
/// <param name="timestamp">The time at which the snapshot was saved.</param>
[JsonConstructor]
[JsonConstructor] // TODO: remove this
public SnapshotMetadata(string persistenceId, long sequenceNr, DateTime timestamp)
{
PersistenceId = persistenceId;
Expand All @@ -100,7 +96,13 @@ public SnapshotMetadata(string persistenceId, long sequenceNr, DateTime timestam
/// </summary>
public DateTime Timestamp { get; }


/// <summary>
/// We will probably use nullable in the future, but for the time being
/// we use <see cref="DateTime.MinValue"/> to represent "no timestamp"
/// </summary>
internal static DateTime TimestampNotSpecified => DateTime.MinValue;


public override bool Equals(object obj) => Equals(obj as SnapshotMetadata);


Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Actor/Scheduler/SchedulerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void IActionScheduler.ScheduleRepeatedly(TimeSpan initialDelay, TimeSpan interva
DateTimeOffset ITimeProvider.Now { get { return TimeNow; } }

/// <summary>
/// TBD
/// The current time in UTC.
/// </summary>
protected abstract DateTimeOffset TimeNow { get; }

Expand Down

0 comments on commit d6a8063

Please sign in to comment.