Skip to content

Commit

Permalink
Replace CacheSizeInMb setting with DataMaxAgeInCache and DataMinTimeI…
Browse files Browse the repository at this point in the history
…nCache in stream providers (#3126)
  • Loading branch information
xiazen authored and jason-bragg committed Jun 19, 2017
1 parent 5b89ba8 commit 3559958
Show file tree
Hide file tree
Showing 52 changed files with 1,017 additions and 1,132 deletions.
9 changes: 8 additions & 1 deletion src/OrleansProviders/OrleansProviders.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,23 @@
</ItemGroup>
<ItemGroup>
<Compile Include="BuiltInProvidersConfigurationExtensions.cs" />
<Compile Include="Streams\Common\RecoverableStreamProviderSettings.cs" />
<Compile Include="Streams\Common\EventSequenceTokenV2.cs" />
<Compile Include="Streams\Common\Monitors\DefaultBlockPoolMonitor.cs" />
<Compile Include="Streams\Common\Monitors\DefaultCacheMonitor.cs" />
<Compile Include="Streams\Common\Monitors\DefaultQueueAdapterReceiverMonitor.cs" />
<Compile Include="Streams\Common\Monitors\IBlockPoolMonitor.cs" />
<Compile Include="Streams\Common\Monitors\IObjectPoolMonitor.cs" />
<Compile Include="Streams\Common\Monitors\ICacheMonitor.cs" />
<Compile Include="Streams\Common\Monitors\IQueueAdapterReceiverMonitor.cs" />
<Compile Include="Streams\Common\Monitors\MonitorAggregationDimensions.cs" />
<Compile Include="Streams\Common\PooledCache\ChronologicalEvictionStrategy.cs" />
<Compile Include="Streams\Common\PooledCache\IEvictionStrategy.cs" />
<Compile Include="Streams\Common\PooledCache\TimePurgePredicate.cs" />
<Compile Include="Streams\Memory\IMemoryStreamQueueGrain.cs" />
<Compile Include="Streams\Memory\MemoryAdapterConfig.cs" />
<Compile Include="Streams\Memory\MemoryAdapterFactory.cs" />
<Compile Include="Streams\Common\PooledCache\CachedMessageBlock.cs" />
<Compile Include="Streams\Common\PooledCache\FixedSizeObjectPool.cs" />
<Compile Include="Streams\Common\PooledCache\FixedSizeBuffer.cs" />
<Compile Include="Streams\Common\PooledCache\ICacheDataAdapter.cs" />
<Compile Include="Streams\Common\PooledCache\IObjectPool.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using Orleans.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.Providers.Streams.Common
{
/// <summary>
/// block pool monitor used as a default option in GeneratorStreamprovider and MemoryStreamProvider
/// </summary>
public class DefaultBlockPoolMonitor : IBlockPoolMonitor
{
protected Logger Logger;
protected Dictionary<string, string> LogProperties;

public DefaultBlockPoolMonitor(Logger logger)
{
this.Logger = logger;
}

public DefaultBlockPoolMonitor(BlockPoolMonitorDimensions dimensions, Logger logger)
:this(logger)
{
this.LogProperties = new Dictionary<string, string>
{
{"BlockPoolId", dimensions.BlockPoolId},
{"HostName", dimensions.NodeConfig.HostNameOrIPAddress }
};
}
/// <inheritdoc cref="IBlockPoolMonitor"/>
public void Report(long totalMemoryInByte, long availableMemoryInByte, long claimedMemoryInByte)
{
this.Logger.TrackMetric("TotalMemoryInByte", totalMemoryInByte, this.LogProperties);
this.Logger.TrackMetric("AvailableMemoryInByte", availableMemoryInByte, this.LogProperties);
this.Logger.TrackMetric("ClaimedMemoryInByte", claimedMemoryInByte, this.LogProperties);
}

/// <inheritdoc cref="IBlockPoolMonitor"/>
public void TrackMemoryReleased(long releasedMemoryInByte)
{
this.Logger.TrackMetric("ReleasedMemoryInByte", releasedMemoryInByte, this.LogProperties);
}

/// <inheritdoc cref="IBlockPoolMonitor"/>
public void TrackMemoryAllocated(long allocatedMemoryInByte)
{
this.Logger.TrackMetric("AllocatedMemoryInByte", allocatedMemoryInByte, this.LogProperties);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using Orleans.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.Providers.Streams.Common
{
/// <summary>
/// cache monitor used as a default option in GeneratorStreamprovider and MemoryStreamProvider
/// </summary>
public class DefaultCacheMonitor : ICacheMonitor
{
protected Logger Logger;
protected Dictionary<string, string> LogProperties;

public DefaultCacheMonitor(Logger logger)
{
this.Logger = logger;
}

public DefaultCacheMonitor(CacheMonitorDimensions dimensions, Logger logger)
:this(logger)
{
this.LogProperties = new Dictionary<string, string>
{
{"QueueId", dimensions.QueueId},
{"HostName", dimensions.NodeConfig.HostNameOrIPAddress}
};
}
/// <inheritdoc cref="ICacheMonitor"/>
public void TrackCachePressureMonitorStatusChange(string pressureMonitorType, bool underPressure, double? cachePressureContributionCount, double? currentPressure,
double? flowControlThreshold)
{
this.Logger.TrackMetric($"{pressureMonitorType}-UnderPressure", underPressure ? 1 : 0, this.LogProperties);
if (cachePressureContributionCount.HasValue)
this.Logger.TrackMetric($"{pressureMonitorType}-PressureContributionCount", cachePressureContributionCount.Value, this.LogProperties);
if (currentPressure.HasValue)
this.Logger.TrackMetric($"{pressureMonitorType}-CurrentPressure", currentPressure.Value, this.LogProperties);
}

/// <inheritdoc cref="ICacheMonitor"/>
public void ReportCacheSize(long totalCacheSizeInByte)
{
this.Logger.TrackMetric("TotalCacheSizeInByte", totalCacheSizeInByte, this.LogProperties);
}

/// <inheritdoc cref="ICacheMonitor"/>
public void ReportMessageStatistics(DateTime? oldestMessageEnqueueTimeUtc, DateTime? oldestMessageDequeueTimeUtc, DateTime? newestMessageEnqueueTimeUtc, long totalMessageCount)
{
if (oldestMessageEnqueueTimeUtc.HasValue && newestMessageEnqueueTimeUtc.HasValue)
this.Logger.TrackMetric("OldestMessageRelativeAgeToNewestMessage", newestMessageEnqueueTimeUtc.Value - oldestMessageEnqueueTimeUtc.Value, this.LogProperties);

if (oldestMessageDequeueTimeUtc.HasValue)
this.Logger.TrackMetric("OldestMessageDequeueTimeToNow", DateTime.UtcNow - oldestMessageDequeueTimeUtc.Value, this.LogProperties);

this.Logger.TrackMetric("TotalMessageCount", totalMessageCount, this.LogProperties);
}

/// <inheritdoc cref="ICacheMonitor"/>
public void TrackMemoryAllocated(int memoryInByte)
{
this.Logger.TrackMetric("MemoryAllocatedInByte", memoryInByte, this.LogProperties);
}

/// <inheritdoc cref="ICacheMonitor"/>
public void TrackMemoryReleased(int memoryInByte)
{
this.Logger.TrackMetric("MemoryReleasedInByte", memoryInByte, this.LogProperties);
}

/// <inheritdoc cref="ICacheMonitor"/>
public void TrackMessagesAdded(long mesageAdded)
{
this.Logger.TrackMetric("MessageAdded", mesageAdded, this.LogProperties);
}

/// <inheritdoc cref="ICacheMonitor"/>
public void TrackMessagesPurged(long messagePurged)
{
this.Logger.TrackMetric("MessagePurged", messagePurged, this.LogProperties);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using Orleans.Runtime;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.Providers.Streams.Common
{
/// <summary>
/// Queue adapter receiver monitor used as a default option in GeneratorStreamprovider and MemoryStreamProvider
/// </summary>
public class DefaultQueueAdapterReceiverMonitor : IQueueAdapterReceiverMonitor
{
protected Logger Logger;
protected Dictionary<string, string> LogProperties;

public DefaultQueueAdapterReceiverMonitor(Logger logger)
{
this.Logger = logger;
}

public DefaultQueueAdapterReceiverMonitor(ReceiverMonitorDimensions dimensions, Logger logger)
:this(logger)
{
this.LogProperties = new Dictionary<string, string>
{
{"QueueId", dimensions.QueueId},
{"HostName", dimensions.NodeConfig.HostNameOrIPAddress }
};
}
/// <summary>
/// Track attempts to initialize the receiver.
/// </summary>
/// <param name="success">True if read succeeded, false if read failed.</param>
/// <param name="callTime"></param>
/// <param name="exception"></param>
public void TrackInitialization(bool success, TimeSpan callTime, Exception exception)
{
this.Logger.TrackMetric("InitializationFailure", success ? 0 : 1, this.LogProperties);
this.Logger.TrackMetric("InitializationCallTime", callTime, this.LogProperties);
this.Logger.TrackMetric("InitializationException", exception == null ? 0 : 1, this.LogProperties);
}

/// <summary>
/// Track attempts to read from the queue. Tracked per queue read operation.
/// </summary>
/// <param name="success">True if read succeeded, false if read failed.</param>
/// <param name="callTime"></param>
/// <param name="exception"></param>
public void TrackRead(bool success, TimeSpan callTime, Exception exception)
{
this.Logger.TrackMetric("ReadFailure", success ? 0 : 1, this.LogProperties);
this.Logger.TrackMetric("ReadCallTime", callTime, this.LogProperties);
this.Logger.TrackMetric("ReadException", exception == null ? 0 : 1, this.LogProperties);
}

/// <summary>
/// Tracks messages read and time taken per successful read. Tracked per successful queue read operation.
/// </summary>
/// <param name="count">Messages read.</param>
/// <param name="oldestMessageEnqueueTimeUtc"></param>
/// <param name="newestMessageEnqueueTimeUtc"></param>
public void TrackMessagesReceived(long count, DateTime? oldestMessageEnqueueTimeUtc, DateTime? newestMessageEnqueueTimeUtc)
{
var now = DateTime.UtcNow;
this.Logger.TrackMetric("MessagesRecieved", count, this.LogProperties);
if (oldestMessageEnqueueTimeUtc.HasValue)
this.Logger.TrackMetric("OldestMessageReadEnqueueTimeToNow", now - oldestMessageEnqueueTimeUtc.Value, this.LogProperties);
if (newestMessageEnqueueTimeUtc.HasValue)
this.Logger.TrackMetric("NewestMessageReadEnqueueTimeToNow", now - newestMessageEnqueueTimeUtc.Value, this.LogProperties);
}

/// <summary>
/// Track attempts to shutdown the receiver.
/// </summary>
/// <param name="success">True if read succeeded, false if read failed.</param>
/// <param name="callTime"></param>
/// <param name="exception"></param>
public void TrackShutdown(bool success, TimeSpan callTime, Exception exception)
{
this.Logger.TrackMetric("ShutdownFailure", success ? 0 : 1, this.LogProperties);
this.Logger.TrackMetric("ShutdownCallTime", callTime, this.LogProperties);
this.Logger.TrackMetric("ShutdownException", exception == null ? 0 : 1, this.LogProperties);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@

using System;

namespace Orleans.ServiceBus.Providers
namespace Orleans.Providers.Streams.Common
{
/// <summary>
/// Responsible for monitoring receiver performance metrics.
/// </summary>
public interface IEventHubReceiverMonitor
public interface IQueueAdapterReceiverMonitor
{
#region event driven metrics
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using Orleans.Runtime.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Orleans.Providers.Streams.Common
{
/// <summary>
/// Base class for holding monitor aggregation dimensions
/// </summary>
public class MonitorAggregationDimensions
{
/// <summary>
/// Data object holding Silo global configuration parameters.
/// </summary>
public GlobalConfiguration GlobalConfig { get; set; }

/// <summary>
/// Individual node-specific silo configuration parameters.
/// </summary>
public NodeConfiguration NodeConfig { get; set; }

/// <summary>
/// Constructor
/// </summary>
/// <param name="globalConfig"></param>
/// <param name="nodeConfig"></param>
public MonitorAggregationDimensions(GlobalConfiguration globalConfig, NodeConfiguration nodeConfig)
{
this.GlobalConfig = globalConfig;
this.NodeConfig = nodeConfig;
}

/// <summary>
/// Constructor
/// </summary>
public MonitorAggregationDimensions()
{ }
}

/// <summary>
/// Aggregation dimensions for receiver monitor
/// </summary>
public class ReceiverMonitorDimensions : MonitorAggregationDimensions
{
/// <summary>
/// Eventhub partition
/// </summary>
public string QueueId { get; set; }

/// <summary>
/// Constructor
/// </summary>
/// <param name="dimensions"></param>
/// <param name="queueId"></param>
public ReceiverMonitorDimensions(MonitorAggregationDimensions dimensions, string queueId)
: base(dimensions.GlobalConfig, dimensions.NodeConfig)
{
this.QueueId = queueId;
}

/// <summary>
/// Zero parameter constructor
/// </summary>
public ReceiverMonitorDimensions()
{
}
}

/// <summary>
/// Aggregation dimensions for cache monitor
/// </summary>
public class CacheMonitorDimensions : ReceiverMonitorDimensions
{
/// <summary>
/// Block pool Id
/// </summary>
public string BlockPoolId { get; set; }

public CacheMonitorDimensions(MonitorAggregationDimensions dimensions, string queueId, string blockPoolId)
:base(dimensions, queueId)
{
this.BlockPoolId = blockPoolId;
}
}

/// <summary>
/// Aggregation dimensions for block pool monitors
/// </summary>
public class BlockPoolMonitorDimensions : MonitorAggregationDimensions
{
/// <summary>
/// Block pool Id
/// </summary>
public string BlockPoolId { get; set; }

public BlockPoolMonitorDimensions(MonitorAggregationDimensions dimensions, string blockPoolId)
:base(dimensions.GlobalConfig, dimensions.NodeConfig)
{
this.BlockPoolId = blockPoolId;
}
}
}
Loading

0 comments on commit 3559958

Please sign in to comment.