Skip to content

Commit

Permalink
refactor: Object storage IDs are not limited to GUID
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem committed Dec 5, 2024
1 parent 732bb95 commit 7c628f1
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 28 deletions.
12 changes: 7 additions & 5 deletions Adaptors/LocalStorage/src/ObjectStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -97,10 +98,11 @@ public Task<HealthCheckResult> Check(HealthCheckTag tag)
IAsyncEnumerable<ReadOnlyMemory<byte>> valueChunks,
CancellationToken cancellationToken = default)
{
var id = Guid.NewGuid();
long size = 0;
var key = Guid.NewGuid()
.ToString();
var filename = Path.Combine(path_,
id.ToString());
key);


// Write to temporary file
Expand Down Expand Up @@ -134,14 +136,14 @@ public Task<HealthCheckResult> Check(HealthCheckTag tag)
await file.FlushAsync(cancellationToken)
.ConfigureAwait(false);

return (id.ToByteArray(), size);
return (Encoding.UTF8.GetBytes(key), size);
}

/// <inheritdoc />
public async IAsyncEnumerable<byte[]> GetValuesAsync(byte[] id,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var key = new Guid(id).ToString();
var key = Encoding.UTF8.GetString(id);

var filename = Path.Combine(path_,
key);
Expand Down Expand Up @@ -191,7 +193,7 @@ public async Task TryDeleteAsync(IEnumerable<byte[]> ids,
{
foreach (var id in ids)
{
var key = new Guid(id).ToString();
var key = Encoding.UTF8.GetString(id);
await TryDeleteAsync(key,
cancellationToken)
.ConfigureAwait(false);
Expand Down
12 changes: 7 additions & 5 deletions Adaptors/Memory/src/ObjectStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -55,25 +56,26 @@ public Task<HealthCheckResult> Check(HealthCheckTag tag)
{
var array = new List<byte>();

var id = Guid.NewGuid();
var key = Guid.NewGuid()
.ToString();

await foreach (var val in valueChunks.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
array.AddRange(val.ToArray());
}

store_[id.ToString()] = array.ToArray();
store_[key] = array.ToArray();

return (id.ToByteArray(), array.Count);
return (Encoding.UTF8.GetBytes(key), array.Count);
}

#pragma warning disable CS1998
public async IAsyncEnumerable<byte[]> GetValuesAsync(byte[] id,
#pragma warning restore CS1998
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var key = new Guid(id).ToString();
var key = Encoding.UTF8.GetString(id);
if (!store_.TryGetValue(key,
out var value))
{
Expand All @@ -91,7 +93,7 @@ public Task TryDeleteAsync(IEnumerable<byte[]> ids,
{
foreach (var id in ids)
{
var key = new Guid(id).ToString();
var key = Encoding.UTF8.GetString(id);

store_.TryRemove(key,
out _);
Expand Down
18 changes: 11 additions & 7 deletions Adaptors/Redis/src/ObjectStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -90,9 +91,10 @@ public Task<HealthCheckResult> Check(HealthCheckTag tag)
IAsyncEnumerable<ReadOnlyMemory<byte>> valueChunks,
CancellationToken cancellationToken = default)
{
var id = Guid.NewGuid();
var key = Guid.NewGuid()
.ToString();
var storageNameKey = objectStorageName_ + key;
long size = 0;
var storageNameKey = objectStorageName_ + id;

var idx = 0;
var taskList = new List<Task>();
Expand All @@ -111,14 +113,14 @@ public Task<HealthCheckResult> Check(HealthCheckTag tag)
await taskList.WhenAll()
.ConfigureAwait(false);

return (id.ToByteArray(), size);
return (Encoding.UTF8.GetBytes(key), size);
}

/// <inheritdoc />
public async IAsyncEnumerable<byte[]> GetValuesAsync(byte[] id,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var key = new Guid(id);
var key = Encoding.UTF8.GetString(id);
var value = await PerformActionWithRetry(() => redis_.StringGetAsync(objectStorageName_ + key + "_count"))
.ConfigureAwait(false);

Expand Down Expand Up @@ -146,13 +148,15 @@ public async IAsyncEnumerable<byte[]> GetValuesAsync(byte[]
/// <inheritdoc />
public async Task TryDeleteAsync(IEnumerable<byte[]> ids,
CancellationToken cancellationToken = default)
=> await ids.ParallelForEach(key => TryDeleteAsync(new Guid(key).ToString(),
cancellationToken))
=> await ids.ParallelForEach(id => TryDeleteAsync(id,
cancellationToken))
.ConfigureAwait(false);

private async Task TryDeleteAsync(string key,
private async Task TryDeleteAsync(byte[] id,
CancellationToken cancellationToken = default)
{
var key = Encoding.UTF8.GetString(id);

var value = await PerformActionWithRetry(() => redis_.StringGetAsync(objectStorageName_ + key + "_count"))
.ConfigureAwait(false);

Expand Down
17 changes: 10 additions & 7 deletions Adaptors/S3/src/ObjectStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -98,7 +99,7 @@ public Task<HealthCheckResult> Check(HealthCheckTag tag)
public async IAsyncEnumerable<byte[]> GetValuesAsync(byte[] id,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var key = new Guid(id);
var key = Encoding.UTF8.GetString(id);
var objectStorageFullName = $"{objectStorageName_}{key}";

try
Expand Down Expand Up @@ -173,8 +174,9 @@ await s3Client_.GetObjectAsync(options_.BucketName,
{
0,
};
var id = Guid.NewGuid();
var objectStorageFullName = $"{objectStorageName_}{id.ToString()}";
var key = Guid.NewGuid()
.ToString();
var objectStorageFullName = $"{objectStorageName_}{key}";

logger_.LogDebug("Upload object");
var initRequest = new InitiateMultipartUploadRequest
Expand Down Expand Up @@ -226,19 +228,20 @@ await s3Client_.AbortMultipartUploadAsync(abortMpuRequest,
throw;
}

return (id.ToByteArray(), sizeBox[0]);
return (Encoding.UTF8.GetBytes(key), sizeBox[0]);
}

/// <inheritdoc />
public async Task TryDeleteAsync(IEnumerable<byte[]> ids,
CancellationToken cancellationToken = default)
=> await ids.ParallelForEach(key => TryDeleteAsync(new Guid(key).ToString(),
cancellationToken))
=> await ids.ParallelForEach(id => TryDeleteAsync(id,
cancellationToken))
.ConfigureAwait(false);

private async Task TryDeleteAsync(string key,
private async Task TryDeleteAsync(byte[] id,
CancellationToken cancellationToken = default)
{
var key = Encoding.UTF8.GetString(id);
var objectStorageFullName = $"{objectStorageName_}{key}";

var objectDeleteRequest = new DeleteObjectRequest
Expand Down
9 changes: 5 additions & 4 deletions Common/tests/TestBase/ObjectStorageTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ public async Task GetValuesAsyncShouldFail()
{
if (RunTests)
{
var id = Guid.NewGuid()
.ToByteArray();
var id = Encoding.UTF8.GetBytes("IdThatShouldFail");
var data = new List<byte>();

try
Expand Down Expand Up @@ -266,7 +265,8 @@ public async Task PayloadShouldBeEqual()

var str = Encoding.ASCII.GetString(data.ToArray());
Console.WriteLine(str);
Assert.IsTrue(str.SequenceEqual("AAAABBBB"));
Assert.AreEqual("AAAABBBB",
str);
}
}

Expand All @@ -286,7 +286,8 @@ public async Task Payload2ShouldBeEqual()

var str = Encoding.ASCII.GetString(data.ToArray());
Console.WriteLine(str);
Assert.IsTrue(str.SequenceEqual("AAAABBBBCCCCDDDD"));
Assert.AreEqual("AAAABBBBCCCCDDDD",
str);
}
}

Expand Down

0 comments on commit 7c628f1

Please sign in to comment.