-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMongoChangeStreamService.cs
81 lines (72 loc) · 2.81 KB
/
MongoChangeStreamService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MongoDB.Bson;
using MongoDB.Driver;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class MongoChangeStreamService : BackgroundService
{
private readonly ILogger<MongoChangeStreamService> _logger;
private readonly string _connectionString;
private readonly string _databaseName;
private readonly string _collectionName;
public MongoChangeStreamService(
ILogger<MongoChangeStreamService> logger,
string connectionString,
string databaseName,
string collectionName)
{
_logger = logger;
_connectionString = connectionString;
_databaseName = databaseName;
_collectionName = collectionName;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("MongoChangeStreamService is starting.");
var client = new MongoClient(_connectionString);
var database = client.GetDatabase(_databaseName);
var collection = database.GetCollection<BsonDocument>(_collectionName);
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.OperationType == ChangeStreamOperationType.Insert ||
change.OperationType == ChangeStreamOperationType.Update ||
change.OperationType == ChangeStreamOperationType.Replace
)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
@"{
$project: {
'_id': 1,
'fullDocument': 1,
'ns': 1,
'documentKey': 1
}
}"
);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};
using var cursor = collection.Watch(pipeline, options);
_logger.LogInformation("MongoChangeStreamService is now watching for changes.");
try
{
while (!stoppingToken.IsCancellationRequested && cursor.MoveNext(stoppingToken))
{
foreach (var change in cursor.Current)
{
_logger.LogInformation("Change detected: {Change}", change.ToJson());
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("MongoChangeStreamService is stopping.");
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred in MongoChangeStreamService.");
}
}
}