Skip to content

Commit

Permalink
Added Change Feed (#11692)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcc-msft authored Jun 3, 2020
1 parent 973ca8f commit 029bf31
Show file tree
Hide file tree
Showing 103 changed files with 6,840 additions and 4 deletions.
1 change: 1 addition & 0 deletions eng/Packages.Data.props
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<PackageReference Update="Azure.Storage.Files.Shares" Version="12.0.1" />
<PackageReference Update="Azure.Storage.Queues" Version="12.1.1" />
<PackageReference Update="Azure.Storage.Files.DataLake" Version="12.0.0" />
<PackageReference Update="Azure.Storage.Blobs.ChangeFeed" Version="12.0.0-preview.1" />
<PackageReference Update="BenchmarkDotNet" Version="0.11.5" />
<PackageReference Update="Castle.Core" Version="4.4.0" />
<PackageReference Update="FluentAssertions" Version="5.10.3" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Breaking Changes
================

12.0.0-preview.1
--------------------------
- New Azure.Storage.Blobs.ChangeFeed client library.
4 changes: 4 additions & 0 deletions sdk/storage/Azure.Storage.Blobs.ChangeFeed/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Release History

## 12.0.0-preview.1
This preview is the first release supporting Azure Storage Blobs Change Feed.
148 changes: 148 additions & 0 deletions sdk/storage/Azure.Storage.Blobs.ChangeFeed/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Azure Storage Blobs Change Feed client library for .NET

> Server Version: 2019-12-12
The purpose of the change feed is to provide transaction logs of all the changes that occur to
the blobs and the blob metadata in your storage account. The change feed provides ordered,
guaranteed, durable, immutable, read-only log of these changes. Client applications can read these
logs at any time. The change feed enables you to build efficient and scalable solutions that
process change events that occur in your Blob Storage account at a low cost.

[Source code][source] | [Product documentation][product_docs]

## Getting started

### Install the package
- TODO after we have released.

### Prerequisites

You need an [Azure subscription][azure_sub] and a
[Storage Account][storage_account_docs] to use this package.

To create a new Storage Account, you can use the [Azure Portal][storage_account_create_portal],
[Azure PowerShell][storage_account_create_ps], or the [Azure CLI][storage_account_create_cli].
Here's an example using the Azure CLI:

```Powershell
az storage account create --name MyStorageAccount --resource-group MyResourceGroup --location westus --sku Standard_LRS
```

### Authenticate the Client

Authentication works the same as in [Azure.Storage.Blobs][authenticating_with_blobs].

## Key concepts

The change feed is stored as blobs in a special container in your storage account at standard blob
pricing cost. You can control the retention period of these files based on your requirements
(See the conditions of the current release). Change events are appended to the change feed as records
in the Apache Avro format specification: a compact, fast, binary format that provides rich data structures
with inline schema. This format is widely used in the Hadoop ecosystem, Stream Analytics, and Azure Data
Factory.

You can process these logs incrementally or in-full. Any number of client applications can independently
read the change feed, in parallel, and at their own pace. Analytics applications such as Apache Drill or
Apache Spark can consume logs directly as Avro files, which let you process them at a low-cost, with
high-bandwidth, and without having to write a custom application.

## Examples

### Get all events in the Change Feed
```C# Snippet:SampleSnippetsChangeFeed_GetAllEvents
// Get all the events in the change feed.
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
```

### Get events between a start and end time
```C# Snippet:SampleSnippetsChangeFeed_GetEventsBetweenStartAndEndTime
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2017, 3, 2, 15, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2020, 10, 7, 2, 0, 0, TimeSpan.Zero);

// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
```

### Resume with cursor
```C# Snippet:SampleSnippetsChangeFeed_ResumeWithCursor
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync()
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();

await enumerator.MoveNextAsync();

foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}

// get the change feed cursor. The cursor is not required to get each page of events,
// it is intended to be saved and used to resume iterating at a later date.
string cursor = enumerator.Current.ContinuationToken;

// Resume iterating from the pervious position with the cursor.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
continuation: cursor))
{
changeFeedEvents.Add(changeFeedEvent);
}
```

## Troubleshooting
All Blob service operations will throw a
[RequestFailedException][RequestFailedException] on failure with
helpful [`ErrorCode`s][error_codes]. Many of these errors are recoverable.

## Next steps

Get started with our [Change Feed samples][samples]:

1. [Hello World](samples/Sample01a_HelloWorld.cs): Get changes that have occured in your storage account (or [asynchronously](samples/Sample01b_HelloWorldAsync.cs))
2. [Auth](samples/Sample02_Auth.cs): Authenticate with connection strings, public access, shared keys, shared access signatures, and Azure Active Directory.


## Contributing

See the [Storage CONTRIBUTING.md][storage_contrib] for details on building,
testing, and contributing to this library.

This project welcomes contributions and suggestions. Most contributions require
you to agree to a Contributor License Agreement (CLA) declaring that you have
the right to, and actually do, grant us the rights to use your contribution. For
details, visit [cla.microsoft.com][cla].

This project has adopted the [Microsoft Open Source Code of Conduct][coc].
For more information see the [Code of Conduct FAQ][coc_faq]
or contact [[email protected]][coc_contact] with any
additional questions or comments.

<!-- LINKS -->
[source]: https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/storage/Azure.Storage.Blobs/srcs
[product_docs]: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-change-feed
[azure_sub]: https://azure.microsoft.com/free/
[storage_account_docs]: https://docs.microsoft.com/azure/storage/common/storage-account-overview
[storage_account_create_ps]: https://docs.microsoft.com/azure/storage/common/storage-quickstart-create-account?tabs=azure-powershell
[storage_account_create_cli]: https://docs.microsoft.com/azure/storage/common/storage-quickstart-create-account?tabs=azure-cli
[storage_account_create_portal]: https://docs.microsoft.com/azure/storage/common/storage-quickstart-create-account?tabs=azure-portal
[authenticating_with_blobs]: https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Azure.Storage.Blobs/samples/Sample02_Auth.cs
[RequestFailedException]: https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/core/Azure.Core/src/RequestFailedException.cs
[error_codes]: https://docs.microsoft.com/rest/api/storageservices/blob-service-error-codes
[samples]: samples/
[storage_contrib]: ../CONTRIBUTING.md
[cla]: https://cla.microsoft.com
[coc]: https://opensource.microsoft.com/codeofconduct/
[coc_faq]: https://opensource.microsoft.com/codeofconduct/faq/
[coc_contact]: mailto:[email protected]
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
<AssemblyTitle>Microsoft Azure.Storage.Blobs.ChangeFeed client library samples</AssemblyTitle>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Identity" />
<ProjectReference Include="$(MSBuildThisFileDirectory)..\src\Azure.Storage.Blobs.ChangeFeed.csproj" />
<ProjectReference Include="$(MSBuildThisFileDirectory)..\..\Azure.Storage.Common\src\Azure.Storage.Common.csproj" />
</ItemGroup>
<ItemGroup>
<Compile Include="$(MSBuildThisFileDirectory)..\..\Azure.Storage.Common\tests\Shared\**\*.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
<None Include="$(MSBuildThisFileDirectory)..\..\Azure.Storage.Common\tests\Shared\*.xml">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<!-- Required Shared Source for testing -->
<Compile Include="$(AzureCoreSharedSources)ArrayBufferWriter.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureCoreSharedSources)NoBodyResponse{T}.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureCoreSharedSources)TaskExtensions.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureStorageSharedSources)Constants.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureStorageSharedSources)Errors.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureStorageSharedSources)StorageConnectionString.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureStorageSharedSources)SharedAccessSignatureCredentials.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureStorageSharedSources)UriExtensions.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureStorageSharedSources)UriQueryParamsCollection.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureStorageSharedSources)StorageExceptionExtensions.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
</ItemGroup>
</Project>
15 changes: 15 additions & 0 deletions sdk/storage/Azure.Storage.Blobs.ChangeFeed/samples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
page_type: sample
languages:
- csharp
products:
- azure
- azure-storage
name: Azure.Storage.ChangeFeed samples for .NET
description: Samples for the Azure.Storage.Blobs.ChangeFeed client library
---

# Azure.Storage.ChangeFeed Samples

- sample 0
- sample 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection.Metadata.Ecma335;
using Azure.Storage.Blobs.ChangeFeed.Models;
using NUnit.Framework;

namespace Azure.Storage.Blobs.ChangeFeed.Samples
{
/// <summary>
/// Basic Azure ChangeFeed Storage samples.
/// </summary>
public class Sample01a_HelloWorld : SampleTest
{
/// <summary>
/// Download every event in the change feed.
/// </summary>
[Test]
public void ChangeFeed()
{
// Get a connection string to our Azure Storage account.
string connectionString = ConnectionString;

// Get a new blob service client.
BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

// Get all the events in the change feed.
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChanges())
{
changeFeedEvents.Add(changeFeedEvent);
}
}

/// <summary>
/// Download change feed events between a start and end time.
/// </summary>
[Test]
public void ChangeFeedBetweenDates()
{
// Get a connection string to our Azure Storage account.
string connectionString = ConnectionString;

// Get a new blob service client.
BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2017, 3, 2, 15, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2020, 10, 7, 2, 0, 0, TimeSpan.Zero);

// You can also provide just a start or end time.
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChanges(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
}

/// <summary>
/// You can use the change feed cursor to resume iterating throw the change feed
/// at a later time.
/// </summary>
[Test]
public void ChangeFeedResumeWithCursor()
{
// Get a connection string to our Azure Storage account.
string connectionString = ConnectionString;

// Get a new blob service client.
BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

IEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChanges()
.AsPages(pageSizeHint: 10)
.GetEnumerator();
;

enumerator.MoveNext();

foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}

// get the change feed cursor. The cursor is not required to get each page of events,
// it is intended to be saved and used to resume iterating at a later date.
string cursor = enumerator.Current.ContinuationToken;

// Resume iterating from the pervious position with the cursor.
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChanges(
continuation: cursor))
{
changeFeedEvents.Add(changeFeedEvent);
}
}
}
}
Loading

0 comments on commit 029bf31

Please sign in to comment.