forked from perrich/Hangfire.MemoryStorage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCountersAggregator.cs
81 lines (66 loc) · 2.54 KB
/
CountersAggregator.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
using System;
using System.Linq;
using System.Threading;
using Hangfire.MemoryStorage.Database;
using Hangfire.MemoryStorage.Dto;
using Hangfire.Server;
namespace Hangfire.MemoryStorage
{
public class CountersAggregator : IServerComponent
{
private const int NumberOfRecordsInSinglePass = 1000;
private static readonly TimeSpan DelayBetweenPasses = TimeSpan.FromSeconds(1);
private readonly TimeSpan _aggregateInterval;
public CountersAggregator(TimeSpan aggregateInterval)
{
_aggregateInterval = aggregateInterval;
}
public void Execute(CancellationToken cancellationToken)
{
var removedCount = 0;
do
{
var counters = Data.GetEnumeration<CounterDto>().Take(NumberOfRecordsInSinglePass).ToList();
var groupedCounters = counters.GroupBy(c => c.Key).Select(g => new
{
g.Key,
Value = g.Sum(c => c.Value),
ExpireAt = g.Max(c => c.ExpireAt)
});
foreach (var counter in groupedCounters)
{
var aggregate = Data.GetEnumeration<AggregatedCounterDto>()
.FirstOrDefault(a => a.Key == counter.Key);
if (aggregate == null)
{
aggregate = new AggregatedCounterDto
{
Id = AutoIncrementIdGenerator.GenerateId(typeof(AggregatedCounterDto)),
Key = counter.Key,
Value = 0,
ExpireAt = DateTime.MinValue
};
Data.Create(aggregate);
}
aggregate.Value += counter.Value;
if (counter.ExpireAt > aggregate.ExpireAt)
{
aggregate.ExpireAt = counter.ExpireAt;
}
}
removedCount = counters.Count();
Data.Delete(counters);
if (removedCount > 0)
{
cancellationToken.WaitHandle.WaitOne(DelayBetweenPasses);
cancellationToken.ThrowIfCancellationRequested();
}
} while (removedCount != 0);
cancellationToken.WaitHandle.WaitOne(_aggregateInterval);
}
public override string ToString()
{
return "Counter Table Aggregator";
}
}
}