Skip to content

Commit

Permalink
Expose available versions information in placement context (#3136)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminpetit authored and sergeybykov committed Jun 19, 2017
1 parent da78acb commit 5604368
Show file tree
Hide file tree
Showing 23 changed files with 315 additions and 63 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The idea is to track end-user facing changes as they occur.*
- Allow `IGrainWithGuidCompoundKey` as implicit subscription grain, and sets the stream namespace as the grain key extension (subtle breaking change: previous to 1.5 `IGrainWithGuidCompoundKey` wasn't technically supported, but if you did use it, the grain key extension would have had a `null` string) #3011
- Non-breaking improvements
- Enable runtime policy change for Silo versioning #3055
- Expose available versions information in placement context #3136
- Add support for hash-based grain placement #2944
- Allow complex streaming filters in `ImplicitStreamSubscriptionAttribute` #2988
- Support fire and forget one-way grain calls using `[OneWay]` method attribute #2993
Expand Down
5 changes: 4 additions & 1 deletion src/Orleans/Placement/IPlacementContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans.GrainDirectory;

Expand All @@ -8,6 +9,8 @@ public interface IPlacementContext
{
IList<SiloAddress> GetCompatibleSilos(PlacementTarget target);

IReadOnlyDictionary<ushort, IReadOnlyList<SiloAddress>> GetCompatibleSilosWithVersions(PlacementTarget target);

SiloAddress LocalSilo { get; }

SiloStatus LocalSiloStatus { get; }
Expand Down
16 changes: 15 additions & 1 deletion src/OrleansRuntime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -237,7 +238,7 @@ public IList<SiloAddress> GetCompatibleSilos(PlacementTarget target)

var typeCode = target.GrainIdentity.TypeCode;
var silos = target.InterfaceVersion > 0
? versionSelectorManager.GetSuitableSilos(typeCode, target.InterfaceId, target.InterfaceVersion)
? versionSelectorManager.GetSuitableSilos(typeCode, target.InterfaceId, target.InterfaceVersion).SuitableSilos
: GrainTypeManager.GetSupportedSilos(typeCode);

var compatibleSilos = silos.Intersect(AllActiveSilos).ToList();
Expand All @@ -247,6 +248,19 @@ public IList<SiloAddress> GetCompatibleSilos(PlacementTarget target)
return compatibleSilos;
}

public IReadOnlyDictionary<ushort, IReadOnlyList<SiloAddress>> GetCompatibleSilosWithVersions(PlacementTarget target)
{
if (target.InterfaceVersion == 0)
throw new ArgumentException("Interface version not provided", nameof(target));

var typeCode = target.GrainIdentity.TypeCode;
var silos = versionSelectorManager
.GetSuitableSilos(typeCode, target.InterfaceId, target.InterfaceVersion)
.SuitableSilosByVersion;

return silos;
}

internal void SetStorageManager(IStorageProviderManager storageManager)
{
storageProviderManager = storageManager;
Expand Down
17 changes: 10 additions & 7 deletions src/OrleansRuntime/GrainTypeManager/GrainTypeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,21 @@ internal IReadOnlyList<SiloAddress> GetSupportedSilos(int typeCode)
return supportedSilosByTypeCode[typeCode];
}

internal IReadOnlyList<SiloAddress> GetSupportedSilos(int typeCode, int ifaceId, IReadOnlyList<ushort> versions)
internal IReadOnlyDictionary<ushort, IReadOnlyList<SiloAddress>> GetSupportedSilos(int typeCode, int ifaceId, IReadOnlyList<ushort> versions)
{
var result = new List<SiloAddress>();
var result = new Dictionary<ushort, IReadOnlyList<SiloAddress>>();
foreach (var version in versions)
{
var silosWithTypeCode = supportedSilosByTypeCode[typeCode];
var silosWithCorrectVersion = supportedSilosByInterface[ifaceId][version].Intersect(silosWithTypeCode);
result.AddRange(silosWithCorrectVersion);
// We need to sort this so the list of silos returned will
// be the same accross all silos in the cluster
var silosWithCorrectVersion = supportedSilosByInterface[ifaceId][version]
.Intersect(silosWithTypeCode)
.OrderBy(addr => addr)
.ToList();
result[version] = silosWithCorrectVersion;
}
// We need to sort this so the list of silos returned will
// be the same accross all silos in the cluster
result.Sort();

return result;
}

Expand Down
26 changes: 20 additions & 6 deletions src/OrleansRuntime/Versions/CachedVersionSelectorManager.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using Orleans.Runtime.Versions.Compatibility;
using Orleans.Runtime.Versions.Selector;
using Orleans.Utilities;
Expand All @@ -9,9 +10,16 @@ namespace Orleans.Runtime.Versions
{
internal class CachedVersionSelectorManager
{
internal struct CachedEntry
{
public List<SiloAddress> SuitableSilos { get; set; }

public IReadOnlyDictionary<ushort, IReadOnlyList<SiloAddress>> SuitableSilosByVersion { get; set; }
}

private readonly GrainTypeManager grainTypeManager;
private readonly Func<Tuple<int, int, ushort>, IReadOnlyList<SiloAddress>> getSilosFunc;
private readonly CachedReadConcurrentDictionary<Tuple<int,int,ushort>, IReadOnlyList<SiloAddress>> suitableSilosCache;
private readonly Func<Tuple<int, int, ushort>, CachedEntry> getSilosFunc;
private readonly CachedReadConcurrentDictionary<Tuple<int,int,ushort>, CachedEntry> suitableSilosCache;

public VersionSelectorManager VersionSelectorManager { get; }

Expand All @@ -23,10 +31,10 @@ public CachedVersionSelectorManager(GrainTypeManager grainTypeManager, VersionSe
this.VersionSelectorManager = versionSelectorManager;
this.CompatibilityDirectorManager = compatibilityDirectorManager;
this.getSilosFunc = GetSuitableSilosImpl;
this.suitableSilosCache = new CachedReadConcurrentDictionary<Tuple<int, int, ushort>, IReadOnlyList<SiloAddress>>();
this.suitableSilosCache = new CachedReadConcurrentDictionary<Tuple<int, int, ushort>, CachedEntry>();
}

public IReadOnlyList<SiloAddress> GetSuitableSilos(int typeCode, int ifaceId, ushort requestedVersion)
public CachedEntry GetSuitableSilos(int typeCode, int ifaceId, ushort requestedVersion)
{
var key = Tuple.Create(typeCode, ifaceId, requestedVersion);
return suitableSilosCache.GetOrAdd(key, getSilosFunc);
Expand All @@ -37,7 +45,7 @@ public void ResetCache()
this.suitableSilosCache.Clear();
}

private IReadOnlyList<SiloAddress> GetSuitableSilosImpl(Tuple<int, int, ushort> key)
private CachedEntry GetSuitableSilosImpl(Tuple<int, int, ushort> key)
{
var typeCode = key.Item1;
var ifaceId = key.Item2;
Expand All @@ -50,7 +58,13 @@ private IReadOnlyList<SiloAddress> GetSuitableSilosImpl(Tuple<int, int, ushort>
this.grainTypeManager.GetAvailableVersions(ifaceId),
compatibilityDirector);

return this.grainTypeManager.GetSupportedSilos(typeCode, ifaceId, versions);
var result = this.grainTypeManager.GetSupportedSilos(typeCode, ifaceId, versions);

return new CachedEntry
{
SuitableSilos = result.SelectMany(sv => sv.Value).OrderBy(addr => addr).ToList(),
SuitableSilosByVersion = result,
};
}
}
}
1 change: 1 addition & 0 deletions test/TestGrainInterfaces/TestGrainInterfaces.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
<Compile Include="SQLAdapter\ICustomerGrain.cs" />
<Compile Include="SQLAdapter\IDeviceGrain.cs" />
<Compile Include="UnitTestGrainInterfaces.cs" />
<Compile Include="VersionAwarePlacementDirector.cs" />
</ItemGroup>
<ItemGroup>
<None Include="project.json" />
Expand Down
70 changes: 70 additions & 0 deletions test/TestGrainInterfaces/VersionAwarePlacementDirector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Orleans.Placement;
using Orleans.Runtime;
using Orleans.Runtime.Placement;

namespace UnitTests.GrainInterfaces
{
[AttributeUsage(AttributeTargets.Class)]
public sealed class VersionAwareStrategyAttribute : PlacementAttribute
{
public VersionAwareStrategyAttribute()
: base(VersionAwarePlacementStrategy.Singleton)
{
}
}

[Serializable]
public class VersionAwarePlacementStrategy : PlacementStrategy
{
internal static VersionAwarePlacementStrategy Singleton { get; } = new VersionAwarePlacementStrategy();

private VersionAwarePlacementStrategy()
{ }

public override bool Equals(object obj)
{
return obj is VersionAwarePlacementStrategy;
}

public override int GetHashCode()
{
return GetType().GetHashCode();
}
}

public class VersionAwarePlacementDirector : IPlacementDirector<VersionAwarePlacementStrategy>
{
private readonly Random random = new Random();

public Task<SiloAddress> OnAddActivation(PlacementStrategy strategy, PlacementTarget target, IPlacementContext context)
{
IReadOnlyList<SiloAddress> silos;
if (target.InterfaceVersion == 0)
{
silos = (IReadOnlyList<SiloAddress>)context.GetCompatibleSilos(target);
}
else
{
var silosByVersion = context.GetCompatibleSilosWithVersions(target);
var maxSiloCount = 0;
ushort version = 0;
foreach (var kvp in silosByVersion)
{
if (kvp.Value.Count > maxSiloCount)
{
version = kvp.Key;
maxSiloCount = kvp.Value.Count;
}
}
silos = silosByVersion[version];
}

return Task.FromResult(silos[random.Next(silos.Count)]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public async Task ChangeCompatibilityStrategy()
{
var ifaceId = GrainInterfaceUtils.GetGrainInterfaceId(typeof(IVersionUpgradeTestGrain));

await DeployCluster();
await StartSiloV1();

var grainV1 = Client.GetGrain<IVersionUpgradeTestGrain>(0);
Assert.Equal(1, await grainV1.GetVersion());
Expand Down Expand Up @@ -63,7 +63,7 @@ public async Task ChangeVersionSelectorStrategy()
{
var ifaceId = GrainInterfaceUtils.GetGrainInterfaceId(typeof(IVersionUpgradeTestGrain));

await DeployCluster();
await StartSiloV1();

// Only V1 exists
var grainV1 = Client.GetGrain<IVersionUpgradeTestGrain>(0);
Expand Down Expand Up @@ -102,7 +102,7 @@ public async Task ChangeDefaultVersionCompatibilityStrategy()
{
Assert.Equal(AllVersionsCompatible.Singleton, CompatibilityStrategy);

await DeployCluster();
await StartSiloV1();

// Only V1 exists
var grainV1 = new IVersionUpgradeTestGrain[2];
Expand Down Expand Up @@ -136,7 +136,7 @@ public async Task ChangeDefaultVersionSelectorStrategy()
{
Assert.Equal(LatestVersion.Singleton, VersionSelectorStrategy);

await DeployCluster();
await StartSiloV1();

// Only V1 exists
var grainV1 = Client.GetGrain<IVersionUpgradeTestGrain>(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public async Task CreateActivationWithBothVersion()
{
const float numberOfGrains = 300;

await DeployCluster();
await StartSiloV1();
await StartSiloV2();

var versionCounter = new int[2];
Expand Down
Loading

0 comments on commit 5604368

Please sign in to comment.