Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Subsegment streaming strategy. #83

Merged
merged 3 commits into from
Jun 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions sdk/src/Core/AWSXRayRecorder.netcore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ private void ProcessEndSubsegmentInLambdaContext()
// Emit
Emitter.Send(subsegment.RootSegment);
}
else if (ShouldStreamSubsegments(subsegment))
else if (StreamingStrategy.ShouldStream(subsegment))
{
StreamSubsegments(subsegment.RootSegment);
StreamingStrategy.Stream(subsegment.RootSegment, Emitter);
}

if (TraceContext.IsEntityPresent() && TraceContext.GetEntity().GetType() == typeof(FacadeSegment)) //implies FacadeSegment in the Trace Context
Expand Down Expand Up @@ -381,7 +381,7 @@ private void EndFacadeSegment()
PrepEndSegment(facadeSegment);
if (facadeSegment.Sampled == SampleDecision.Sampled && facadeSegment.RootSegment != null && facadeSegment.RootSegment.Size >= 0)
{
StreamSubsegments(facadeSegment); //Facade segment is not emitted, all its subsegments, if emmittable, are emitted
StreamingStrategy.Stream(facadeSegment, Emitter); //Facade segment is not emitted, all its subsegments, if emmittable, are emitted
}
}

Expand Down
53 changes: 9 additions & 44 deletions sdk/src/Core/AWSXRayRecorderImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ public ContextMissingStrategy ContextMissingStrategy
/// </summary>
public ExceptionSerializationStrategy ExceptionSerializationStrategy { get; set; } = new DefaultExceptionSerializationStrategy();

/// <summary>
/// Instance of <see cref="IStreamingStrategy"/>, used to define the streaming strategy for segment/subsegment.
/// </summary>
public IStreamingStrategy StreamingStrategy { get; set; } = new DefaultStreamingStrategy();

/// <summary>
/// Begin a tracing segment. A new tracing segment will be created and started.
/// </summary>
Expand Down Expand Up @@ -693,46 +698,6 @@ protected virtual void Dispose(bool disposing)
}
}

/// <summary>
/// Checks whether subsegments of the current instance of <see cref="Entity"/> should be streamed.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/></param>
/// <returns>True if the subsegments are streamable.</returns>
protected static bool ShouldStreamSubsegments(Entity entity)
{
return entity.Sampled == SampleDecision.Sampled && entity.RootSegment != null && entity.RootSegment.Size >= MaxSubsegmentSize;
}

/// <summary>
/// Streams subsegments of instance of <see cref="Entity"/>.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/>.</param>
protected void StreamSubsegments(Entity entity)
{
lock (entity.Subsegments)
{
foreach (var next in entity.Subsegments)
{
StreamSubsegments(next);
}

entity.Subsegments.RemoveAll(x => x.HasStreamed);
}

if (entity is Segment || entity.IsInProgress || entity.Reference > 0 || entity.IsSubsegmentsAdded)
{
return;
}

Subsegment subsegment = entity as Subsegment;
subsegment.TraceId = entity.RootSegment.TraceId;
subsegment.Type = "subsegment";
subsegment.ParentId = subsegment.Parent.Id;
Emitter.Send(subsegment);
subsegment.RootSegment.DecrementSize();
subsegment.HasStreamed = true;
}

/// <summary>
/// Returns subsegments.
/// </summary>
Expand Down Expand Up @@ -796,9 +761,9 @@ protected void ProcessEndSegment(Segment segment)
{
Emitter.Send(segment);
}
else if (ShouldStreamSubsegments(segment))
else if (StreamingStrategy.ShouldStream(segment))
{
StreamSubsegments(segment);
StreamingStrategy.Stream(segment, Emitter);
}
}

Expand Down Expand Up @@ -829,9 +794,9 @@ protected void ProcessEndSubsegment()
// Emit
Emitter.Send(subsegment.RootSegment);
}
else if (ShouldStreamSubsegments(subsegment))
else if (StreamingStrategy.ShouldStream(subsegment))
{
StreamSubsegments(subsegment.RootSegment);
StreamingStrategy.Stream(subsegment.RootSegment, Emitter);
}
}

Expand Down
18 changes: 18 additions & 0 deletions sdk/src/Core/AwsXrayRecorderBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class AWSXRayRecorderBuilder
private string _daemonAddress;
private ITraceContext _traceContext;
private ExceptionSerializationStrategy _exceptionSerializationStrategy;
private IStreamingStrategy _streamingStrategy;

/// <summary>
/// Gets a read-only copy of current plugins in the builder
Expand Down Expand Up @@ -208,6 +209,18 @@ public AWSXRayRecorderBuilder WithSamplingStrategy(ISamplingStrategy newStrategy
return this;
}

/// <summary>
/// Adds the given streaming strategy to builder. There can exist only one streaming strategy.
/// Any previous value of streaming strategy will be overwritten.
/// </summary>
/// <param name="newStreamingStrategy">A streaming strategy to add</param>
/// <returns>The builder with streaming strategy added</returns>
public AWSXRayRecorderBuilder WithStreamingStrategy(IStreamingStrategy newStreamingStrategy)
{
_streamingStrategy = newStreamingStrategy ?? throw new ArgumentNullException("StreamingStrategy");
return this;
}

/// <summary>
/// Adds the context missing strategy.
/// </summary>
Expand Down Expand Up @@ -321,6 +334,11 @@ private void PopulateRecorder(AWSXRayRecorder recorder)
recorder.SamplingStrategy = _samplingStrategy;
}

if (_streamingStrategy != null)
{
recorder.StreamingStrategy = _streamingStrategy;
}

if (_daemonAddress != null)
{
recorder.SetDaemonAddress(_daemonAddress);
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/Core/IAWSXRayRecorder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public interface IAWSXRayRecorder : IDisposable
/// </summary>
ISamplingStrategy SamplingStrategy { get; set; }

/// <summary>
/// Get or sets the streaming strategy
/// </summary>
IStreamingStrategy StreamingStrategy { get; set; }

/// <summary>
/// Gets or sets the context missing strategy.
/// </summary>
Expand Down
85 changes: 85 additions & 0 deletions sdk/src/Core/Strategies/DefaultStreamingStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;
using Amazon.XRay.Recorder.Core.Internal.Entities;
using Amazon.XRay.Recorder.Core.Internal.Emitters;
using Amazon.XRay.Recorder.Core.Sampling;

namespace Amazon.XRay.Recorder.Core.Strategies
{
/// <summary>
/// The default streaming strategy. It uses the total count of a segment's children subsegments as a threshold. If the threshold is breached, it uses subtree streaming to stream out.
/// </summary>
public class DefaultStreamingStrategy : IStreamingStrategy
{
/// <summary>
/// Default max subsegment size to stream for the strategy.
/// </summary>
private const long DefaultMaxSubsegmentSize = 100;

/// <summary>
/// Max subsegment size to stream fot the strategy.
/// </summary>
public long MaxSubsegmentSize { get; private set; } = DefaultMaxSubsegmentSize;

/// <summary>
/// Initializes a new instance of the <see cref="DefaultStreamingStrategy"/> class.
/// </summary>
public DefaultStreamingStrategy() : this(DefaultMaxSubsegmentSize)
{

}

/// <summary>
/// Initializes a new instance of the <see cref="DefaultStreamingStrategy"/> class.
/// </summary>
/// <param name="maxSubsegmentSize"></param>
public DefaultStreamingStrategy(long maxSubsegmentSize)
yogiraj07 marked this conversation as resolved.
Show resolved Hide resolved
{
if(maxSubsegmentSize < 0)
{
throw new ArgumentException("maxSubsegmentSize cannot be a negative number.");
}
MaxSubsegmentSize = maxSubsegmentSize;
}

/// <summary>
/// Checks whether subsegments of the current instance of <see cref="Entity"/> should be streamed.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/></param>
/// <returns>True if the subsegments are streamable.</returns>
public bool ShouldStream(Entity entity)
{
return entity.Sampled == SampleDecision.Sampled && entity.RootSegment != null && entity.RootSegment.Size >= MaxSubsegmentSize;
}

/// <summary>
/// Streams subsegments of instance of <see cref="Entity"/>.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/>.</param>
/// <param name="emitter">Instance of <see cref="ISegmentEmitter"/>.</param>
public void Stream(Entity entity, ISegmentEmitter emitter)
{
lock (entity.Subsegments)
{
foreach (var next in entity.Subsegments)
{
Stream(next, emitter);
}

entity.Subsegments.RemoveAll(x => x.HasStreamed);
}

if (entity is Segment || entity.IsInProgress || entity.Reference > 0 || entity.IsSubsegmentsAdded)
{
return;
}

Subsegment subsegment = entity as Subsegment;
subsegment.TraceId = entity.RootSegment.TraceId;
subsegment.Type = "subsegment";
subsegment.ParentId = subsegment.Parent.Id;
emitter.Send(subsegment);
subsegment.RootSegment.DecrementSize();
subsegment.HasStreamed = true;
}
}
}
42 changes: 42 additions & 0 deletions sdk/src/Core/Strategies/IStreamingStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//-----------------------------------------------------------------------------
// <copyright file="IStreamingStrategy.cs" company="Amazon.com">
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// A copy of the License is located at
//
// http://aws.amazon.com/apache2.0
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// </copyright>
//-----------------------------------------------------------------------------

using Amazon.XRay.Recorder.Core.Internal.Entities;
using Amazon.XRay.Recorder.Core.Internal.Emitters;

namespace Amazon.XRay.Recorder.Core.Strategies
{
/// <summary>
/// Interface of streaming strategy which is used to determine when and how the subsegments will be streamed.
/// </summary>
public interface IStreamingStrategy
{
/// <summary>
/// Determines whenther or not the provided segment/subsegment requires any subsegment streaming.
/// </summary>
/// <param name="input">An instance of <see cref="Entity"/>.</param>
/// <returns>true if the segment/subsegment should be streamed.</returns>
bool ShouldStream(Entity entity);

/// <summary>
/// Streams subsegments of instance of <see cref="Entity"/>.
/// </summary>
/// <param name="entity">Instance of <see cref="Entity"/>.</param>
/// <param name="emitter">Instance if <see cref="ISegmentEmitter"/>.</param>
void Stream(Entity entity, ISegmentEmitter emitter);
}
}
10 changes: 5 additions & 5 deletions sdk/test/UnitTests/AWSSDKHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void TestInitialize()
{
_recorder = new AWSXRayRecorder();
#if NET45
AWSXRayRecorder.InitializeInstance(_recorder);
AWSXRayRecorder.InitializeInstance(recorder:_recorder);
#else
AWSXRayRecorder.InitializeInstance(recorder: _recorder);
# endif
Expand Down Expand Up @@ -101,19 +101,19 @@ public void TestS3SubsegmentNameIsCorrectForAWSSDKHandler()
{
AWSSDKHandler.RegisterXRay<IAmazonS3>();
var s3 = new AmazonS3Client(new AnonymousAWSCredentials(), RegionEndpoint.USEast1);
CustomResponses.SetResponse(s3, null, null, true);

CustomResponses.SetResponse(s3, null, null, true);

_recorder.BeginSegment("test s3", TraceId);
#if NET45
s3.GetObject("testBucket", "testKey");
#else
s3.GetObjectAsync("testBucket", "testKey").Wait();
#endif
var segment = AWSXRayRecorder.Instance.TraceContext.GetEntity();
var segment = _recorder.TraceContext.GetEntity();
_recorder.EndSegment();
Assert.AreEqual("S3", segment.Subsegments[0].Name);
Assert.IsTrue(segment.Subsegments[0].Aws.ContainsKey("version_id"));
Assert.AreEqual(segment.Subsegments[0].Aws["bucket_name"],"testBucket");
Assert.AreEqual(segment.Subsegments[0].Aws["bucket_name"], "testBucket");
Assert.AreEqual(segment.Subsegments[0].Aws["operation"], "GetObject");
}

Expand Down
31 changes: 31 additions & 0 deletions sdk/test/UnitTests/AWSXRayRecorderBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,15 @@ public class AwsXrayRecorderBuilderTests : TestBase
{
private const string PluginKey = "AWSXRayPlugins";
private const string UseRuntimeErrors = "UseRuntimeErrors";
private AWSXRayRecorder _recorder;
#if !NET45
private XRayOptions _xRayOptions = new XRayOptions();
#endif
[TestInitialize]
public void Initialize()
{
_recorder = new AWSXRayRecorder();
}

[TestCleanup]
public new void TestCleanup()
Expand All @@ -58,7 +64,11 @@ public class AwsXrayRecorderBuilderTests : TestBase
#else
_xRayOptions = new XRayOptions();
#endif
_recorder.Dispose();
AWSXRayRecorder.Instance.Dispose();
_recorder = null;


}

[TestMethod]
Expand Down Expand Up @@ -128,6 +138,13 @@ public void TestSetSamplingStrategy()
Assert.AreEqual(typeof(DummySamplingStrategy).FullName, recorder.SamplingStrategy.GetType().FullName);
}

[TestMethod]
public void TestSetStreamingStrategy()
{
var recorder = new AWSXRayRecorderBuilder().WithStreamingStrategy(new DummyStreamingStrategy()).Build();
Assert.AreEqual(typeof(DummyStreamingStrategy).FullName, recorder.StreamingStrategy.GetType().FullName);
}

[TestMethod]
public void TestDefaultValueOfContextMissingStrategy()
{
Expand Down Expand Up @@ -254,6 +271,7 @@ public void TestExceptionStrategy2() // invalid input
Assert.AreEqual(DefaultExceptionSerializationStrategy.DefaultStackFrameSize, actual.MaxStackFrameSize);
}


[TestMethod]
public void TestExceptionStrategy4() // Test custom exception strategy
{
Expand Down Expand Up @@ -385,6 +403,19 @@ public SamplingResponse ShouldTrace(SamplingInput input)
}
}

private class DummyStreamingStrategy : IStreamingStrategy
{
public bool ShouldStream(Entity entity)
{
throw new NotImplementedException();
}

public void Stream(Entity entity, ISegmentEmitter emitter)
{
throw new NotImplementedException();
}
}

private class DummyPlugin : IPlugin
{
public string Origin
Expand Down
Loading