-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathAzureBlobStorageService.cs
301 lines (273 loc) · 12 KB
/
AzureBlobStorageService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
using Akka;
using Akka.IO;
using Akka.Streams.Dsl;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Sas;
using Microsoft.Extensions.Logging;
using Snd.Sdk.Tasks;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Snd.Sdk.Helpers;
using Snd.Sdk.Storage.Base;
using Snd.Sdk.Storage.Models;
using Snd.Sdk.Storage.Models.BlobPath;
namespace Snd.Sdk.Storage.Azure
{
/// <summary>
/// Blob Service implementation for Azure.
/// Blob path for this service should be in format container@my/blob/path
/// </summary>
public class AzureBlobStorageService : IBlobStorageService<AdlsGen2Path>
{
private readonly BlobServiceClient blobServiceClient;
private readonly ILogger<AzureBlobStorageService> logger;
private BlobClient GetBlobClient(AdlsGen2Path path)
{
var bcc = this.blobServiceClient.GetBlobContainerClient(path.Container);
return bcc.GetBlobClient($"{path.BlobPath}/{path.BlobName}");
}
/// <summary>
/// Creates an instance of <see cref="AzureBlobStorageService"/>.
/// </summary>
/// <param name="blobServiceClient"></param>
/// <param name="logger"></param>
public AzureBlobStorageService(BlobServiceClient blobServiceClient, ILogger<AzureBlobStorageService> logger)
{
this.blobServiceClient = blobServiceClient;
this.logger = logger;
}
/// <inheritdoc />
public T GetBlobContent<T>(AdlsGen2Path path, Func<BinaryData, T> deserializer)
{
var bc = GetBlobClient(path);
try
{
var content = bc.DownloadContent().Value.Content;
return deserializer(content);
}
catch (RequestFailedException rfex)
{
this.logger.LogError(rfex, "File {blobName} does not exist under {blobPath}.", path.BlobName, path.BlobPath);
return default;
}
catch (JsonException jex)
{
this.logger.LogError(jex,
"Content of {blobName} under {blobPath} is not a valid json. Specify a different serializer or check blob contents.",
path.BlobName, path.BlobPath);
return default;
}
catch (Exception other)
{
this.logger.LogError(other, "Failed to process content of {blobName} under {blobPath}.", path.BlobName,
path.BlobPath);
return default;
}
}
/// <inheritdoc />
public Task<T> GetBlobContentAsync<T>(AdlsGen2Path path, Func<BinaryData, T> deserializer)
{
var bc = GetBlobClient(path);
try
{
return bc.DownloadContentAsync().Map(result => deserializer(result.Value.Content));
}
catch (RequestFailedException rfex)
{
this.logger.LogError(rfex, "File {blobName} does not exist under {blobPath}.", path.BlobName, path.BlobPath);
return Task.FromResult(default(T));
}
catch (JsonException jex)
{
this.logger.LogError(jex,
"Content of {blobName} under {blobPath} is not a valid json. Specify a different serializer or check blob contents.",
path.BlobName, path.BlobPath);
return Task.FromResult(default(T));
}
catch (Exception other)
{
this.logger.LogError(other, "Failed to process content of {blobName} under {blobPath}.", path.BlobName,
path.BlobPath);
return Task.FromResult(default(T));
}
}
/// <inheritdoc />
public Stream StreamBlobContent(string blobPath, string blobName)
{
var bc = GetBlobClient($"{blobPath}/{blobName}".AsAdlsGen2Path());
return bc.OpenRead(new BlobOpenReadOptions(true));
}
/// <inheritdoc />
public IDictionary<string, string> GetBlobMetadata(string blobPath, string blobName)
{
try
{
return GetBlobClient($"{blobPath}/{blobName}".AsAdlsGen2Path()).GetProperties().Value.Metadata;
}
catch (RequestFailedException ex)
{
this.logger.LogWarning(exception: ex,
message: "Unable to get metadata for a blob {blobName} on {blobPath}", blobName, blobPath);
return default;
}
catch (Exception other)
{
this.logger.LogError(exception: other,
message: "Fatal error when reading metadata for a blob {blobName} on {blobPath}", blobName, blobPath);
return default;
}
}
/// <inheritdoc />
public Task<IDictionary<string, string>> GetBlobMetadataAsync(string blobPath, string blobName)
{
try
{
return GetBlobClient($"{blobPath}/{blobName}".AsAdlsGen2Path()).GetPropertiesAsync().Map(props => props.Value.Metadata);
}
catch (RequestFailedException ex)
{
this.logger.LogWarning(exception: ex,
message: "Unable to get metadata for a blob {blobName} on {blobPath}", blobName, blobPath);
return Task.FromResult(default(IDictionary<string, string>));
}
catch (Exception other)
{
this.logger.LogError(exception: other,
message: "Fatal error when reading metadata for a blob {blobName} on {blobPath}", blobName, blobPath);
return Task.FromResult(default(IDictionary<string, string>));
}
}
/// <inheritdoc />
[ExcludeFromCodeCoverage]
// Requires an additional parameter ("validForSeconds", 123) to set SAS expiry date. Defaults to 1 minute if not provided.
public Uri GetBlobUri(AdlsGen2Path path, params ValueTuple<string, object>[] kwOptions)
{
var blobClient = GetBlobClient(path);
var sasDuration = kwOptions.Where(opt => opt.Item1 == "validForSeconds").ToList();
return blobClient.GenerateSasUri(BlobSasPermissions.Read,
DateTimeOffset.UtcNow.AddSeconds(sasDuration.Count == 0 ? 60 : (double)sasDuration.First().Item2));
}
private Pageable<BlobItem> ListBlobItems(string blobPath)
{
var containerClient = this.blobServiceClient.GetBlobContainerClient(blobPath.AsAdlsGen2Path().Container);
return containerClient.GetBlobs(prefix: blobPath.AsAdlsGen2Path().FullPath);
}
private StoredBlob MapBlobItem(BlobItem blobItem)
{
return new StoredBlob
{
Metadata = blobItem.Metadata,
Name = blobItem.Name,
ContentEncoding = blobItem.Properties?.ContentEncoding,
ContentHash = blobItem.Properties?.ContentHash != null
? Encoding.UTF8.GetString(blobItem.Properties.ContentHash)
: null,
LastModified = blobItem.Properties.LastModified,
CreatedOn = blobItem.Properties.CreatedOn,
ContentType = blobItem.Properties.ContentType,
ContentLength = blobItem.Properties.ContentLength
};
}
/// <inheritdoc />
public Source<StoredBlob, NotUsed> ListBlobs(string blobPath)
{
return Source.From(ListBlobItems(blobPath)).Select(MapBlobItem);
}
/// <inheritdoc />
public IEnumerable<StoredBlob> ListBlobsAsEnumerable(string blobPath)
{
return ListBlobItems(blobPath).Select(MapBlobItem);
}
/// <inheritdoc />
public Task<bool> MoveBlob(string sourcePath, string sourceName, string targetPath, string targetBlobName)
{
var sourceBlobClient = GetBlobClient($"{sourcePath}/{sourceName}".AsAdlsGen2Path());
var targetBlobClient = GetBlobClient($"{targetPath}/{targetBlobName}".AsAdlsGen2Path());
return targetBlobClient
.SyncCopyFromUriAsync(sourceBlobClient.GenerateSasUri(BlobSasPermissions.Read,
DateTimeOffset.UtcNow.AddMinutes(5)))
.Map(result =>
{
if (result.Value.CopyStatus == CopyStatus.Success)
{
return sourceBlobClient.DeleteIfExistsAsync().Map(dr => dr.Value);
}
return Task.FromResult(false);
})
.Flatten();
}
/// <inheritdoc />
public Task<bool> RemoveBlob(AdlsGen2Path path)
{
var blobClient = GetBlobClient(path);
return blobClient.DeleteIfExistsAsync().Map(v => v.Value);
}
/// <inheritdoc />
public Task<UploadedBlob> SaveTextAsBlob(string text, AdlsGen2Path path)
{
var containerClient = this.blobServiceClient.GetBlobContainerClient(path.Container);
return containerClient
.UploadBlobAsync(blobName: path.FullPath,
content: new BinaryData(Encoding.UTF8.GetBytes(text))).Map(result => new UploadedBlob
{
Name = path.FullPath,
ContentHash = Encoding.UTF8.GetString(result.Value.ContentHash),
LastModified = result.Value.LastModified
});
}
/// <summary>
/// Gets an instance of <see cref="AppendBlobClient"/> for a specified blob path and blob name.
/// If the blob doesn't exist, it will be created.
/// </summary>
/// <param name="blobPath">The path of the blob in Azure Data Lake Storage Gen2.</param>
/// <param name="blobName">The name of the blob to retrieve.</param>
/// <returns>Returns an instance of <see cref="AppendBlobClient"/> for the specified blob.</returns>
[ExcludeFromCodeCoverage]
protected virtual AppendBlobClient GetAppendBlobClient(string blobPath, string blobName)
{
var adlsPath = $"{blobPath}/{blobName}".AsAdlsGen2Path();
var blobClient = this.blobServiceClient.GetBlobContainerClient(adlsPath.Container)
.GetAppendBlobClient(adlsPath.FullPath);
blobClient.CreateIfNotExists();
return blobClient;
}
/// <inheritdoc />
public Flow<ByteString, bool, NotUsed> StreamToBlob(string blobPath, string blobName)
{
var blobClient = GetAppendBlobClient(blobPath, blobName);
return Flow.Create<ByteString, NotUsed>()
.SelectAsync(1, block =>
{
var blockStream = new MemoryStream((byte[])block);
return blobClient.AppendBlockAsync(blockStream).Map(_ => true);
})
.RecoverWithRetries(ex =>
{
this.logger.LogError(ex, "Failed to append a block to blob {blobName} under path {blobPath}.",
blobName, blobPath);
return Source.Single(false);
}, 1)
.Aggregate(true, (agg, element) => agg && element);
}
/// <inheritdoc />
public Task<UploadedBlob> SaveBytesAsBlob(BinaryData bytes, AdlsGen2Path path, bool overwrite = false)
{
var blobClient = GetBlobClient(path);
return blobClient.UploadAsync(bytes, overwrite: overwrite).Map(result => new UploadedBlob
{
Name = path.BlobName,
ContentHash = Convert.ToBase64String(result.Value.ContentHash),
LastModified = result.Value.LastModified
});
}
}
}