Skip to content

Commit

Permalink
RedisGrainDirectory: use a hash instead of serialized blob, update style
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Jun 5, 2023
1 parent 2f56e43 commit 6eaec69
Showing 1 changed file with 59 additions and 58 deletions.
117 changes: 59 additions & 58 deletions src/Redis/Orleans.GrainDirectory.Redis/RedisGrainDirectory.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Net.NetworkInformation;
using System.Text;
using System.Text.Json;
using System.Threading;
Expand All @@ -15,34 +16,34 @@ namespace Orleans.GrainDirectory.Redis
{
public class RedisGrainDirectory : IGrainDirectory, ILifecycleParticipant<ISiloLifecycle>
{
private readonly RedisGrainDirectoryOptions directoryOptions;
private readonly ClusterOptions clusterOptions;
private readonly ILogger<RedisGrainDirectory> logger;
private readonly RedisGrainDirectoryOptions _directoryOptions;
private readonly ClusterOptions _clusterOptions;
private readonly ILogger<RedisGrainDirectory> _logger;
private readonly RedisKey _keyPrefix;
private readonly string _ttl;
private IConnectionMultiplexer redis;
private IDatabase database;
private IConnectionMultiplexer _redis;
private IDatabase _database;

public RedisGrainDirectory(
RedisGrainDirectoryOptions directoryOptions,
IOptions<ClusterOptions> clusterOptions,
ILogger<RedisGrainDirectory> logger)
{
this.directoryOptions = directoryOptions;
this.logger = logger;
this.clusterOptions = clusterOptions.Value;
_keyPrefix = Encoding.UTF8.GetBytes($"{this.clusterOptions.ClusterId}/directory/");
_directoryOptions = directoryOptions;
_logger = logger;
_clusterOptions = clusterOptions.Value;
_keyPrefix = Encoding.UTF8.GetBytes($"{_clusterOptions.ClusterId}/directory/");
_ttl = directoryOptions.EntryExpiry is { } ts ? ts.TotalSeconds.ToString(CultureInfo.InvariantCulture) : "-1";
}

public async Task<GrainAddress> Lookup(GrainId grainId)
{
try
{
var result = (string)await this.database.StringGetAsync(GetKey(grainId));
var result = (string)await _database.HashGetAsync(GetKey(grainId), "data");

if (this.logger.IsEnabled(LogLevel.Debug))
this.logger.LogDebug("Lookup {GrainId}: {Result}", grainId, string.IsNullOrWhiteSpace(result) ? "null" : result);
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Lookup {GrainId}: {Result}", grainId, string.IsNullOrWhiteSpace(result) ? "null" : result);

if (string.IsNullOrWhiteSpace(result))
return default;
Expand All @@ -51,7 +52,7 @@ public async Task<GrainAddress> Lookup(GrainId grainId)
}
catch (Exception ex)
{
this.logger.LogError(ex, "Lookup failed for {GrainId}", grainId);
_logger.LogError(ex, "Lookup failed for {GrainId}", grainId);

if (IsRedisException(ex))
throw new OrleansException($"Lookup failed for {grainId} : {ex}");
Expand All @@ -64,56 +65,54 @@ public async Task<GrainAddress> Lookup(GrainId grainId)

public async Task<GrainAddress> Register(GrainAddress address, GrainAddress previousAddress)
{
const string UpdateScript =
const string RegisterScript =
"""
local cur = redis.call('GET', KEYS[1])
if (not cur or cur == ARGV[2]) then
redis.call('SET', KEYS[1], ARGV[1])
local etag = redis.call('HGET', KEYS[1], 'etag')
local data = redis.call('HGET', KEYS[1], 'data')
if (not etag or etag == ARGV[2]) then
redis.call('HSET', KEYS[1], 'data', ARGV[1])
redis.call('HSET', KEYS[1], 'etag', ARGV[4])
if (not ARGV[3] and ARGV[3] ~= '-1') then
redis.call('EXPIRE', KEYS[1], ARGV[3])
end
return nil
end
return cur
return data
""";

var value = JsonSerializer.Serialize(address);
try
{
var previousValue = previousAddress is { } ? JsonSerializer.Serialize(previousAddress) : "";
var etag = previousAddress is { } ? previousAddress.ActivationId.ToString() : "";
var newEtag = address.ActivationId.ToString();
var key = GetKey(address.GrainId);
var args = new RedisValue[] { value, previousValue, _ttl };
var resultRaw = await this.database.ScriptEvaluateAsync(
UpdateScript,
var args = new RedisValue[] { value, etag, _ttl, newEtag };
var entryString = (string)await _database.ScriptEvaluateAsync(
RegisterScript,
keys: new RedisKey[] { key },
values: args);
var result = (string)resultRaw;

if (this.logger.IsEnabled(LogLevel.Debug))
if (entryString is null)
{
if (result is null)
if (_logger.IsEnabled(LogLevel.Debug))
{
this.logger.LogDebug("Registered {GrainId} ({Address})", address.GrainId, value);
_logger.LogDebug("Registered {GrainId} ({Address})", address.GrainId, value);
}
else
{

this.logger.LogDebug("Failed to register {GrainId} ({Address}) in directory: Conflicted with existing value, {Result}", address.GrainId, value, result);
}
return address;
}

// This indicates success
if (result is null)
if (_logger.IsEnabled(LogLevel.Debug))
{
return address;
_logger.LogDebug("Failed to register {GrainId} ({Address}) in directory: Conflicted with existing value, {Result}", address.GrainId, value, entryString);
}

// This indicates failure
return JsonSerializer.Deserialize<GrainAddress>(result);
return JsonSerializer.Deserialize<GrainAddress>(entryString);
}
catch (Exception ex)
{
this.logger.LogError(ex, "Failed to register {GrainId} ({Address}) in directory", address.GrainId, value);
_logger.LogError(ex, "Failed to register {GrainId} ({Address}) in directory", address.GrainId, value);

if (IsRedisException(ex))
{
Expand All @@ -130,8 +129,8 @@ public async Task Unregister(GrainAddress address)
{
const string DeleteScript =
"""
local cur = redis.call('GET', KEYS[1])
if (cur == ARGV[1]) then
local etag = redis.call('HGET', KEYS[1], 'etag')
if (etag == ARGV[1]) then
return redis.call('DEL', KEYS[1])
end
return 0
Expand All @@ -140,19 +139,21 @@ public async Task Unregister(GrainAddress address)
try
{
var value = JsonSerializer.Serialize(address);
var result = (int) await this.database.ScriptEvaluateAsync(
var result = (int) await _database.ScriptEvaluateAsync(
DeleteScript,
keys: new RedisKey[] { GetKey(address.GrainId) },
values: new RedisValue[] { value });
values: new RedisValue[] { address.ActivationId.ToString() });

if (this.logger.IsEnabled(LogLevel.Debug))
this.logger.LogDebug("Unregister {GrainId} ({Address}): {Result}", address.GrainId, JsonSerializer.Serialize(address), (result != 0) ? "OK" : "Conflict");
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Unregister {GrainId} ({Address}): {Result}", address.GrainId, JsonSerializer.Serialize(address), (result != 0) ? "OK" : "Conflict");
}
}
catch (Exception ex)
{
var value = JsonSerializer.Serialize(address);

this.logger.LogError(ex, "Unregister failed for {GrainId} ({Address})", address.GrainId, value);
_logger.LogError(ex, "Unregister failed for {GrainId} ({Address})", address.GrainId, value);

if (IsRedisException(ex))
throw new OrleansException($"Unregister failed for {address.GrainId} ({value}) : {ex}");
Expand All @@ -173,42 +174,42 @@ public void Participate(ISiloLifecycle lifecycle)

public async Task Initialize(CancellationToken ct = default)
{
this.redis = await directoryOptions.CreateMultiplexer(directoryOptions);
_redis = await _directoryOptions.CreateMultiplexer(_directoryOptions);

// Configure logging
this.redis.ConnectionRestored += this.LogConnectionRestored;
this.redis.ConnectionFailed += this.LogConnectionFailed;
this.redis.ErrorMessage += this.LogErrorMessage;
this.redis.InternalError += this.LogInternalError;
_redis.ConnectionRestored += LogConnectionRestored;
_redis.ConnectionFailed += LogConnectionFailed;
_redis.ErrorMessage += LogErrorMessage;
_redis.InternalError += LogInternalError;

this.database = this.redis.GetDatabase();
_database = _redis.GetDatabase();
}

private async Task Uninitialize(CancellationToken arg)
{
if (this.redis != null && this.redis.IsConnected)
if (_redis != null && _redis.IsConnected)
{
await this.redis.CloseAsync();
this.redis.Dispose();
this.redis = null;
this.database = null;
await _redis.CloseAsync();
_redis.Dispose();
_redis = null;
_database = null;
}
}

private RedisKey GetKey(GrainId grainId) => _keyPrefix.Append(grainId.ToString());

#region Logging
private void LogConnectionRestored(object sender, ConnectionFailedEventArgs e)
=> this.logger.LogInformation(e.Exception, "Connection to {EndPoint} failed: {FailureType}", e.EndPoint, e.FailureType);
=> _logger.LogInformation(e.Exception, "Connection to {EndPoint} failed: {FailureType}", e.EndPoint, e.FailureType);

private void LogConnectionFailed(object sender, ConnectionFailedEventArgs e)
=> this.logger.LogError(e.Exception, "Connection to {EndPoint} failed: {FailureType}", e.EndPoint, e.FailureType);
=> _logger.LogError(e.Exception, "Connection to {EndPoint} failed: {FailureType}", e.EndPoint, e.FailureType);

private void LogErrorMessage(object sender, RedisErrorEventArgs e)
=> this.logger.LogError(e.Message);
=> _logger.LogError(e.Message);

private void LogInternalError(object sender, InternalErrorEventArgs e)
=> this.logger.LogError(e.Exception, "Internal error");
=> _logger.LogError(e.Exception, "Internal error");
#endregion

// These exceptions are not serializable by the client
Expand Down

0 comments on commit 6eaec69

Please sign in to comment.