diff --git a/Microsoft.Azure.Cosmos.Samples/Usage/ChangeFeed/ChangeFeed.csproj b/Microsoft.Azure.Cosmos.Samples/Usage/ChangeFeed/ChangeFeed.csproj index 49d2d8a68e..b1f7cca2a0 100644 --- a/Microsoft.Azure.Cosmos.Samples/Usage/ChangeFeed/ChangeFeed.csproj +++ b/Microsoft.Azure.Cosmos.Samples/Usage/ChangeFeed/ChangeFeed.csproj @@ -9,6 +9,7 @@ + diff --git a/Microsoft.Azure.Cosmos.Samples/Usage/ChangeFeed/Program.cs b/Microsoft.Azure.Cosmos.Samples/Usage/ChangeFeed/Program.cs index eb9ef5b6b3..bca544e5f0 100644 --- a/Microsoft.Azure.Cosmos.Samples/Usage/ChangeFeed/Program.cs +++ b/Microsoft.Azure.Cosmos.Samples/Usage/ChangeFeed/Program.cs @@ -6,7 +6,9 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; + using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing; using Microsoft.Extensions.Configuration; + using ChangeFeedProcessorLibrary = Microsoft.Azure.Documents.ChangeFeedProcessor; // ---------------------------------------------------------------------------------------------------------- // Prerequisites - @@ -26,6 +28,8 @@ // 3. Listening for changes that happen since the container was created. // // 4. Generate Estimator metrics to expose current Change Feed Processor progress + // + // 5. Code migration template from existing Change Feed Processor library V2 //----------------------------------------------------------------------------------------------------------- @@ -66,6 +70,8 @@ static async Task Main(string[] args) await Program.RunStartFromBeginningChangeFeed("changefeed-beginning", client); Console.WriteLine($"\n4. Generate Estimator metrics to expose current Change Feed Processor progress."); await Program.RunEstimatorChangeFeed("changefeed-estimator", client); + Console.WriteLine($"\n5. Code migration template from existing Change Feed Processor library V2."); + await Program.RunMigrationSample("changefeed-migration", client, configuration); } } finally @@ -243,6 +249,89 @@ public static async Task RunEstimatorChangeFeed( await changeFeedEstimator.StopAsync(); } + /// + /// Example of a code migration template from Change Feed Processor V2 to SDK V3. + /// + /// + public static async Task RunMigrationSample( + string databaseId, + CosmosClient client, + IConfigurationRoot configuration) + { + await Program.InitializeAsync(databaseId, client); + + Console.WriteLine("Generating 10 items that will be picked up by the old Change Feed Processor library..."); + await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer)); + + // This is how you would initialize the processor in V2 + // + ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo() + { + DatabaseName = databaseId, + CollectionName = Program.monitoredContainer, + Uri = new Uri(configuration["EndPointUrl"]), + MasterKey = configuration["AuthorizationKey"] + }; + + ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo() + { + DatabaseName = databaseId, + CollectionName = Program.leasesContainer, + Uri = new Uri(configuration["EndPointUrl"]), + MasterKey = configuration["AuthorizationKey"] + }; + + ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder(); + var oldChangeFeedProcessor = await builder + .WithHostName("consoleHost") + .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions { + StartFromBeginning = true, + LeasePrefix = "MyLeasePrefix" }) + .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions() + { + MaxItemCount = 10, + FeedPollDelay = TimeSpan.FromSeconds(1) + }) + .WithFeedCollection(monitoredCollectionInfo) + .WithLeaseCollection(leaseCollectionInfo) + .WithObserver() + .BuildAsync(); + // + + await oldChangeFeedProcessor.StartAsync(); + + // Wait random time for the delegate to output all messages after initialization is done + await Task.Delay(5000); + Console.WriteLine("Now we will stop the V2 Processor and start a V3 with the same parameters to pick up from the same state, press any key to continue..."); + Console.ReadKey(); + await oldChangeFeedProcessor.StopAsync(); + + Console.WriteLine("Generating 5 items that will be picked up by the new Change Feed Processor..."); + await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer)); + + // This is how you would do the same initialization in V3 + // + Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer); + Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer); + ChangeFeedProcessor changeFeedProcessor = monitoredContainer + .GetChangeFeedProcessorBuilder("MyLeasePrefix", Program.HandleChangesAsync) + .WithInstanceName("consoleHost") + .WithLeaseContainer(leaseContainer) + .WithMaxItems(10) + .WithPollInterval(TimeSpan.FromSeconds(1)) + .WithStartTime(DateTime.MinValue.ToUniversalTime()) + .Build(); + // + + await changeFeedProcessor.StartAsync(); + + // Wait random time for the delegate to output all messages after initialization is done + await Task.Delay(5000); + Console.WriteLine("Press any key to continue with the next demo..."); + Console.ReadKey(); + await changeFeedProcessor.StopAsync(); + } + /// /// The delegate receives batches of changes as they are generated in the change feed and can process them. /// @@ -322,4 +411,27 @@ public class ToDoItem public DateTime creationTime { get; set; } } // + + internal class ChangeFeedObserver : IChangeFeedObserver + { + public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason) + { + return Task.CompletedTask; + } + + public Task OpenAsync(IChangeFeedObserverContext context) + { + return Task.CompletedTask; + } + + public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList docs, CancellationToken cancellationToken) + { + foreach (var doc in docs) + { + Console.WriteLine($"\t[OLD Processor] Detected operation for item with id {doc.Id}, created at {doc.GetPropertyValue("creationTime")}."); + } + + return Task.CompletedTask; + } + } }