Skip to content

Commit

Permalink
Added Subsegment streaming strategy. (#83)
Browse files Browse the repository at this point in the history
* Added Subsegment streaming strategy.

* Refactored ShouldStreamSubsegments() to ShouldStream() and StreamSubsegments() to Stream(). Added check for negative number in DefaultStreamingStrategy(). Added unit tests for the same.

* added separate tests for custom and negative value of maxSubsegmentSize for DefaultStreamingStrategy.
  • Loading branch information
srprash authored and Yogiraj Awati committed Jun 14, 2019
1 parent 4ee0660 commit f41ad03
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 52 deletions.
6 changes: 3 additions & 3 deletions sdk/src/Core/AWSXRayRecorder.netcore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ private void ProcessEndSubsegmentInLambdaContext()
// Emit
Emitter.Send(subsegment.RootSegment);
}
else if (ShouldStreamSubsegments(subsegment))
else if (StreamingStrategy.ShouldStream(subsegment))
{
StreamSubsegments(subsegment.RootSegment);
StreamingStrategy.Stream(subsegment.RootSegment, Emitter);
}

if (TraceContext.IsEntityPresent() && TraceContext.GetEntity().GetType() == typeof(FacadeSegment)) //implies FacadeSegment in the Trace Context
Expand Down Expand Up @@ -381,7 +381,7 @@ private void EndFacadeSegment()
PrepEndSegment(facadeSegment);
if (facadeSegment.Sampled == SampleDecision.Sampled && facadeSegment.RootSegment != null && facadeSegment.RootSegment.Size >= 0)
{
StreamSubsegments(facadeSegment); //Facade segment is not emitted, all its subsegments, if emmittable, are emitted
StreamingStrategy.Stream(facadeSegment, Emitter); //Facade segment is not emitted, all its subsegments, if emmittable, are emitted
}
}

Expand Down
53 changes: 9 additions & 44 deletions sdk/src/Core/AWSXRayRecorderImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ public ContextMissingStrategy ContextMissingStrategy
/// </summary>
public ExceptionSerializationStrategy ExceptionSerializationStrategy { get; set; } = new DefaultExceptionSerializationStrategy();

/// <summary>
/// Instance of <see cref="IStreamingStrategy"/>, used to define the streaming strategy for segment/subsegment.
/// </summary>
public IStreamingStrategy StreamingStrategy { get; set; } = new DefaultStreamingStrategy();

/// <summary>
/// Begin a tracing segment. A new tracing segment will be created and started.
/// </summary>
Expand Down Expand Up @@ -693,46 +698,6 @@ protected virtual void Dispose(bool disposing)
}
}

/// <summary>
/// Checks whether subsegments of the current instance of <see cref="Entity"/> should be streamed.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/></param>
/// <returns>True if the subsegments are streamable.</returns>
protected static bool ShouldStreamSubsegments(Entity entity)
{
return entity.Sampled == SampleDecision.Sampled && entity.RootSegment != null && entity.RootSegment.Size >= MaxSubsegmentSize;
}

/// <summary>
/// Streams subsegments of instance of <see cref="Entity"/>.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/>.</param>
protected void StreamSubsegments(Entity entity)
{
lock (entity.Subsegments)
{
foreach (var next in entity.Subsegments)
{
StreamSubsegments(next);
}

entity.Subsegments.RemoveAll(x => x.HasStreamed);
}

if (entity is Segment || entity.IsInProgress || entity.Reference > 0 || entity.IsSubsegmentsAdded)
{
return;
}

Subsegment subsegment = entity as Subsegment;
subsegment.TraceId = entity.RootSegment.TraceId;
subsegment.Type = "subsegment";
subsegment.ParentId = subsegment.Parent.Id;
Emitter.Send(subsegment);
subsegment.RootSegment.DecrementSize();
subsegment.HasStreamed = true;
}

/// <summary>
/// Returns subsegments.
/// </summary>
Expand Down Expand Up @@ -796,9 +761,9 @@ protected void ProcessEndSegment(Segment segment)
{
Emitter.Send(segment);
}
else if (ShouldStreamSubsegments(segment))
else if (StreamingStrategy.ShouldStream(segment))
{
StreamSubsegments(segment);
StreamingStrategy.Stream(segment, Emitter);
}
}

Expand Down Expand Up @@ -829,9 +794,9 @@ protected void ProcessEndSubsegment()
// Emit
Emitter.Send(subsegment.RootSegment);
}
else if (ShouldStreamSubsegments(subsegment))
else if (StreamingStrategy.ShouldStream(subsegment))
{
StreamSubsegments(subsegment.RootSegment);
StreamingStrategy.Stream(subsegment.RootSegment, Emitter);
}
}

Expand Down
18 changes: 18 additions & 0 deletions sdk/src/Core/AwsXrayRecorderBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class AWSXRayRecorderBuilder
private string _daemonAddress;
private ITraceContext _traceContext;
private ExceptionSerializationStrategy _exceptionSerializationStrategy;
private IStreamingStrategy _streamingStrategy;

/// <summary>
/// Gets a read-only copy of current plugins in the builder
Expand Down Expand Up @@ -208,6 +209,18 @@ public AWSXRayRecorderBuilder WithSamplingStrategy(ISamplingStrategy newStrategy
return this;
}

/// <summary>
/// Adds the given streaming strategy to builder. There can exist only one streaming strategy.
/// Any previous value of streaming strategy will be overwritten.
/// </summary>
/// <param name="newStreamingStrategy">A streaming strategy to add</param>
/// <returns>The builder with streaming strategy added</returns>
public AWSXRayRecorderBuilder WithStreamingStrategy(IStreamingStrategy newStreamingStrategy)
{
_streamingStrategy = newStreamingStrategy ?? throw new ArgumentNullException("StreamingStrategy");
return this;
}

/// <summary>
/// Adds the context missing strategy.
/// </summary>
Expand Down Expand Up @@ -321,6 +334,11 @@ private void PopulateRecorder(AWSXRayRecorder recorder)
recorder.SamplingStrategy = _samplingStrategy;
}

if (_streamingStrategy != null)
{
recorder.StreamingStrategy = _streamingStrategy;
}

if (_daemonAddress != null)
{
recorder.SetDaemonAddress(_daemonAddress);
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/Core/IAWSXRayRecorder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public interface IAWSXRayRecorder : IDisposable
/// </summary>
ISamplingStrategy SamplingStrategy { get; set; }

/// <summary>
/// Get or sets the streaming strategy
/// </summary>
IStreamingStrategy StreamingStrategy { get; set; }

/// <summary>
/// Gets or sets the context missing strategy.
/// </summary>
Expand Down
85 changes: 85 additions & 0 deletions sdk/src/Core/Strategies/DefaultStreamingStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;
using Amazon.XRay.Recorder.Core.Internal.Entities;
using Amazon.XRay.Recorder.Core.Internal.Emitters;
using Amazon.XRay.Recorder.Core.Sampling;

namespace Amazon.XRay.Recorder.Core.Strategies
{
/// <summary>
/// The default streaming strategy. It uses the total count of a segment's children subsegments as a threshold. If the threshold is breached, it uses subtree streaming to stream out.
/// </summary>
public class DefaultStreamingStrategy : IStreamingStrategy
{
/// <summary>
/// Default max subsegment size to stream for the strategy.
/// </summary>
private const long DefaultMaxSubsegmentSize = 100;

/// <summary>
/// Max subsegment size to stream fot the strategy.
/// </summary>
public long MaxSubsegmentSize { get; private set; } = DefaultMaxSubsegmentSize;

/// <summary>
/// Initializes a new instance of the <see cref="DefaultStreamingStrategy"/> class.
/// </summary>
public DefaultStreamingStrategy() : this(DefaultMaxSubsegmentSize)
{

}

/// <summary>
/// Initializes a new instance of the <see cref="DefaultStreamingStrategy"/> class.
/// </summary>
/// <param name="maxSubsegmentSize"></param>
public DefaultStreamingStrategy(long maxSubsegmentSize)
{
if(maxSubsegmentSize < 0)
{
throw new ArgumentException("maxSubsegmentSize cannot be a negative number.");
}
MaxSubsegmentSize = maxSubsegmentSize;
}

/// <summary>
/// Checks whether subsegments of the current instance of <see cref="Entity"/> should be streamed.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/></param>
/// <returns>True if the subsegments are streamable.</returns>
public bool ShouldStream(Entity entity)
{
return entity.Sampled == SampleDecision.Sampled && entity.RootSegment != null && entity.RootSegment.Size >= MaxSubsegmentSize;
}

/// <summary>
/// Streams subsegments of instance of <see cref="Entity"/>.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/>.</param>
/// <param name="emitter">Instance of <see cref="ISegmentEmitter"/>.</param>
public void Stream(Entity entity, ISegmentEmitter emitter)
{
lock (entity.Subsegments)
{
foreach (var next in entity.Subsegments)
{
Stream(next, emitter);
}

entity.Subsegments.RemoveAll(x => x.HasStreamed);
}

if (entity is Segment || entity.IsInProgress || entity.Reference > 0 || entity.IsSubsegmentsAdded)
{
return;
}

Subsegment subsegment = entity as Subsegment;
subsegment.TraceId = entity.RootSegment.TraceId;
subsegment.Type = "subsegment";
subsegment.ParentId = subsegment.Parent.Id;
emitter.Send(subsegment);
subsegment.RootSegment.DecrementSize();
subsegment.HasStreamed = true;
}
}
}
42 changes: 42 additions & 0 deletions sdk/src/Core/Strategies/IStreamingStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//-----------------------------------------------------------------------------
// <copyright file="IStreamingStrategy.cs" company="Amazon.com">
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// A copy of the License is located at
//
// http://aws.amazon.com/apache2.0
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// </copyright>
//-----------------------------------------------------------------------------

using Amazon.XRay.Recorder.Core.Internal.Entities;
using Amazon.XRay.Recorder.Core.Internal.Emitters;

namespace Amazon.XRay.Recorder.Core.Strategies
{
/// <summary>
/// Interface of streaming strategy which is used to determine when and how the subsegments will be streamed.
/// </summary>
public interface IStreamingStrategy
{
/// <summary>
/// Determines whenther or not the provided segment/subsegment requires any subsegment streaming.
/// </summary>
/// <param name="input">An instance of <see cref="Entity"/>.</param>
/// <returns>true if the segment/subsegment should be streamed.</returns>
bool ShouldStream(Entity entity);

/// <summary>
/// Streams subsegments of instance of <see cref="Entity"/>.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/>.</param>
/// <param name="emitter">Instance if <see cref="ISegmentEmitter"/>.</param>
void Stream(Entity entity, ISegmentEmitter emitter);
}
}
10 changes: 5 additions & 5 deletions sdk/test/UnitTests/AWSSDKHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void TestInitialize()
{
_recorder = new AWSXRayRecorder();
#if NET45
AWSXRayRecorder.InitializeInstance(_recorder);
AWSXRayRecorder.InitializeInstance(recorder:_recorder);
#else
AWSXRayRecorder.InitializeInstance(recorder: _recorder);
# endif
Expand Down Expand Up @@ -101,19 +101,19 @@ public void TestS3SubsegmentNameIsCorrectForAWSSDKHandler()
{
AWSSDKHandler.RegisterXRay<IAmazonS3>();
var s3 = new AmazonS3Client(new AnonymousAWSCredentials(), RegionEndpoint.USEast1);
CustomResponses.SetResponse(s3, null, null, true);

CustomResponses.SetResponse(s3, null, null, true);

_recorder.BeginSegment("test s3", TraceId);
#if NET45
s3.GetObject("testBucket", "testKey");
#else
s3.GetObjectAsync("testBucket", "testKey").Wait();
#endif
var segment = AWSXRayRecorder.Instance.TraceContext.GetEntity();
var segment = _recorder.TraceContext.GetEntity();
_recorder.EndSegment();
Assert.AreEqual("S3", segment.Subsegments[0].Name);
Assert.IsTrue(segment.Subsegments[0].Aws.ContainsKey("version_id"));
Assert.AreEqual(segment.Subsegments[0].Aws["bucket_name"],"testBucket");
Assert.AreEqual(segment.Subsegments[0].Aws["bucket_name"], "testBucket");
Assert.AreEqual(segment.Subsegments[0].Aws["operation"], "GetObject");
}

Expand Down
31 changes: 31 additions & 0 deletions sdk/test/UnitTests/AWSXRayRecorderBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,15 @@ public class AwsXrayRecorderBuilderTests : TestBase
{
private const string PluginKey = "AWSXRayPlugins";
private const string UseRuntimeErrors = "UseRuntimeErrors";
private AWSXRayRecorder _recorder;
#if !NET45
private XRayOptions _xRayOptions = new XRayOptions();
#endif
[TestInitialize]
public void Initialize()
{
_recorder = new AWSXRayRecorder();
}

[TestCleanup]
public new void TestCleanup()
Expand All @@ -58,7 +64,11 @@ public class AwsXrayRecorderBuilderTests : TestBase
#else
_xRayOptions = new XRayOptions();
#endif
_recorder.Dispose();
AWSXRayRecorder.Instance.Dispose();
_recorder = null;


}

[TestMethod]
Expand Down Expand Up @@ -128,6 +138,13 @@ public void TestSetSamplingStrategy()
Assert.AreEqual(typeof(DummySamplingStrategy).FullName, recorder.SamplingStrategy.GetType().FullName);
}

[TestMethod]
public void TestSetStreamingStrategy()
{
var recorder = new AWSXRayRecorderBuilder().WithStreamingStrategy(new DummyStreamingStrategy()).Build();
Assert.AreEqual(typeof(DummyStreamingStrategy).FullName, recorder.StreamingStrategy.GetType().FullName);
}

[TestMethod]
public void TestDefaultValueOfContextMissingStrategy()
{
Expand Down Expand Up @@ -254,6 +271,7 @@ public void TestExceptionStrategy2() // invalid input
Assert.AreEqual(DefaultExceptionSerializationStrategy.DefaultStackFrameSize, actual.MaxStackFrameSize);
}


[TestMethod]
public void TestExceptionStrategy4() // Test custom exception strategy
{
Expand Down Expand Up @@ -385,6 +403,19 @@ public SamplingResponse ShouldTrace(SamplingInput input)
}
}

private class DummyStreamingStrategy : IStreamingStrategy
{
public bool ShouldStream(Entity entity)
{
throw new NotImplementedException();
}

public void Stream(Entity entity, ISegmentEmitter emitter)
{
throw new NotImplementedException();
}
}

private class DummyPlugin : IPlugin
{
public string Origin
Expand Down
Loading

0 comments on commit f41ad03

Please sign in to comment.