Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Hub Producer Client should expire transport producers after a period of inactivity #8592

Closed
jsquire opened this issue Nov 1, 2019 · 25 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. Event Hubs
Milestone

Comments

@jsquire
Copy link
Member

jsquire commented Nov 1, 2019

Summary

In order to publish events to a specific partition, the EventHubProducerClient will need to manage a pool of transport producers which are bound to a specific partition. These transport producers are created on-demand when a request is made for a specific partition. Once created, their lifespan is tied to that of the EventHubProducerClient that created them.

In general, this is not a concern, as the scenario in which a producer targets a specific partition is not a mainline scenario. As an efficiency, however, it would be desirable to expire those transport producers that have not been accessed in a given period of time (configurable) such that unused transport producers are disposed when possible, independent of the EventHubProducerClient.

Scope of Work

  • Implement a means of tracking the last time that a transport producer for a given partition was last used by the EventHubProducerClient that owns it.

  • Implement a periodic check for the transport producers owned by an EventHubClient instance and dispose of those which have not been accessed for a given (configurable) period of time.

  • Ensure that removing the existing transport producer is an atomic operation and does not cause corruption or unexpected results should there be a request to publish to that partition during the check/removal.

  • Ensure that removing the existing transport producer does not block creation of a new instance in the event of concurrent calls, nor cause the publishing of events for other partitions to be blocked. The producer client should be able to handle concurrent calls.

  • Tests have been added or adjusted to cover the new functionality.

Success Criteria

  • The refactoring and enhancements detailed by the scope have been completed.

  • The tests necessary for its validation have been created or adjusted and pass reliably.

  • The existing test suite continues to produce deterministic results and pass reliably.

References

@jsquire jsquire added Event Hubs Client This issue points to a problem in the data-plane of the library. Up for grabs labels Nov 1, 2019
@albertodenatale
Copy link
Contributor

albertodenatale commented Dec 1, 2019

Hello @jsquire
Hope you enjoyed a nice break and I wish you a happy start of the week.

I tried to give a look at this task and I wonder what would be your thoughts about this solution?

    internal class TransportProducerPool
    {
            // This class would be instantiated from EventHubProducerClient

             // This collection has been taken from EventHubProducerClient
             private ConcurrentDictionary<string, TransportProducer> PartitionProducers { get; } = new ConcurrentDictionary<string, TransportProducer>();

             internal async Task SendAndTrackEvent(IEnumerable<EventData> events,
                                                   TransportProducer activeProducer,
                                                   SendEventOptions options,
                                                   CancellationToken cancellationToken)
            {
                   // This method would be called on send and it would track
                   // in a concurrent dictionary the tasks returned from:
                   //     activeProducer.SendAsync(events, options, cancellationToken)
                   // This so that the task that will periodically check the producers will
                   // wait for the send requests to terminate before closing the producer
            }

            internal TransportProducer GetPartitionProducer(string partitionId,
                                                            EventHubConnection connection,
                                                            EventHubsRetryPolicy retryPolicy)
            {
                   // This method would atomically:
                   // A. Invoke GetOrAdd on PartitionProducers (as it is done today)
                   // B. Refresh the timestamp associated with a producer

                  // This method has a race condition with ProducersCleanerHandler as a 
                  // TransportProducer should not be closed after it is taken but before it is used
                  // This race condition would be resolved using a lock

                  // An assumption (to be enforced) is that after a producer has been refreshed 
                  // it will have enough time to start sending the events and call SendAndTrackEvent
            }

            internal TimerCallback ProducersCleanerHandler()
            {
                  // This method would be run periodically and its purpose
                  // would be to remove expired partition producers.

                 // It would atomically:
                 // A. Search for expired producers
                 // B. Remove them from PartitionProducers 

                  // This method has a race condition with ProducersCleanerHandler as a 
                  // TransportProducer should not be closed after it is taken but before it is used
                  // This race condition would be resolved using a lock
            }
    }


    public class EventHubProducerClient : IAsyncDisposable
    {
        internal virtual async Task SendAsync(IEnumerable<EventData> events,
                                              SendEventOptions options,
                                              CancellationToken cancellationToken = default)
        {
            // ( .. )

            TransportProducer activeProducer;

            if (string.IsNullOrEmpty(options.PartitionId))
            {
                activeProducer = EventHubProducer;
            }
            else
            {
                // This assertion is intended as an additional check, not as a guarantee.  There still exists a benign
                // race condition where a transport producer may be created after the client has been closed; in this case
                // the transport producer will be force-closed with the associated connection or, worst case, will close once
                // its idle timeout period elapses.

                Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient));
                activeProducer = PartitionProducerPool.GetPartitionProducer(options.PartitionId, Connection, RetryPolicy);
            }

            using DiagnosticScope scope = CreateDiagnosticScope();

            events = events.ToList();
            InstrumentMessages(events);

            try
            {
                await PartitionProducerPool.SendAndTrackEvent(events, activeProducer, options, cancellationToken).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                scope.Failed(ex);
                throw;
            }
        }
    }

Thank you, look forward to reading your thoughts on this

@jsquire
Copy link
Member Author

jsquire commented Dec 4, 2019

I like the general approach; it's pretty close to the initial design that was in my head when I wrote the issue up. Not sure that I understand what benefit having a SendAndTrackEvent method would give us over having it just act as a memory cache with sliding expiration. I definitely would prefer to not route sending through a resource pool.

I was thinking that System.Runtime.Caching.MemoryCache would be worth exploring to see if it may be an option with a bit of concurrency synchronization around it. Generally, cache expiration is a cornucopia of corner-cases and gotchas, so if we can get that for "free", I'm all for it.

The dictionary works nicely for the concurrency guards, but as you mentioned has some caveats for expiration. The race condition is definitely harmful here and something we'd need to work around - either by some form of two-phase deletion, an additional synchronization around the update/delete time, or some other means. I do think that storing just the TransportProducer in the dictionary wouldn't be enough. We'd need some kind of data structure for tracking the producer, expiration time, and possibly some kind of "pending status". I have a few ideas, but they're not yet fully formed.

@albertodenatale
Copy link
Contributor

albertodenatale commented Dec 6, 2019

Thank you a lot for your answer.

The race condition is definitely harmful here and something we'd need to work around - either by some form of two-phase deletion, an additional synchronization around the update/delete time, or some other means.

Hope you won't mind me fiddling you a little bit on this point by including some more details to our picture:

// This class would be instantiated from EventHubProducerClient
//
internal class TransportProducerPool
{
        private ConcurrentDictionary<string, TransportProducer> PartitionProducers { get; } = new ConcurrentDictionary<string, TransportProducer>();

        internal TransportProducer GetPartitionProducer(string partitionId,
                                                        EventHubConnection connection,
                                                        EventHubsRetryPolicy retryPolicy)
        {
            lock (ProducersLock)
            {
                var activeProducer = PartitionProducers.GetOrAdd(partitionId, id => connection.CreateTransportProducer(id, retryPolicy));
                 
                // refreshes the timestamp connected with the PartitionProducers
                 
                return activeProducer;
            }

            // From here happy days as a producer will have its timestamp refreshed
            // and will have long enough to start a send
        }

        // Called from Timer
        //
        internal TimerCallback ProducersCleanerHandler()
        {
             IEnumerable<TransportProducer> removedProducers = RemoveExpiredProducers();
                
             // From here it is safe to close a producer
             // Calls and awaits CloseAsync on each producer contained in removedProducers 
        }

        private IEnumerable<TransportProducer> RemoveExpiredProducers()
        {
            List<TransportProducer> result = new List<TransportProducer>();

            lock (ProducersLock)
            {
               // Cycles through PartitionProducers
               // removes each producer and adds it to result
            }

            return result;
        }
}

Not sure that I understand what benefit having a SendAndTrackEvent method would give us over having it just act as a memory cache with sliding expiration.

Please, allow me to add more details to ProducersCleanerHandler

// Called from Timer
//
internal TimerCallback ProducersCleanerHandler()
{
      IEnumerable<TransportProducer> removedProducers = RemoveExpiredProducers();
                
      // Foreach producer removed, it picks the active sending tasks and puts them in a collection 
      // named sendingTasks
      //
      // Foreach producer removed, it calls CloseAsync and puts the task in a collection 
      // named removedProducersTasks

      // Waits for open sends to finish before closing each producer
      //
      if (sendingTasks.Any())
      {
          await Task.WhenAll(sendingTasks).ConfigureAwait(false);
      }

      if (removedProducersTasks.Any())
      {
          await Task.WhenAll(removedProducersTasks).ConfigureAwait(false);
      }
}

In a nutshell, I put the SendAndTrackEvent in order to allow tracking the sending tasks and waiting for their end before closing their respective producer (I am not 100% sure if that is something useful, to be honest).

I was thinking that System.Runtime.Caching.MemoryCache would be worth exploring to see if it may be an option with a bit of concurrency synchronization around it.

That was a great catch thank you for that. I looked at it and yes I believe it would fit nicely. Just waiting for your thoughts on SendAndTrackEvent to be sure I understood correctly your thinking.

We'd need some kind of data structure for tracking the producer, expiration time, and possibly some kind of "pending status". I have a few ideas, but they're not yet fully formed.

Good point.

Look forward to reading your thoughts on this.

@jsquire
Copy link
Member Author

jsquire commented Dec 6, 2019

Hope you won't mind me fiddling you a little bit on this point by including some more details to our picture:

Of course not. We can't refine things if we don't bounce them around. 😄

The lock on every call concerns me. Though I don't expect a partition-bound producer to be the norm, having it that way removes any benefit from the concurrent dictionary. When I had mentioned something two-phase, I was thinking along the lines of something like:

internal class TransportProducerPool
{
    private readonly object RemoveSync = new object();
    private ConcurrentDictionary<string, TrackingStuffs> PartitionProducers { get; } = new ConcurrentDictionary<string, TrackingStuffs>();

    public TransportProducer GetPartitionProducer(string partitionId,  EventHubConnection connection,  EventHubsRetryPolicy retryPolicy)
    {
        var trackedProducer = PartitionProducers.GetOrAdd(partitionId, id => connection.CreateTransportProducer(id, retryPolicy));
        
        // If expiration is not within a buffer, say the next 5 minutes and ToBeRemoved is not set.
        trackedProducer.RemoveAfter = DateTimeOffset.Now.Add(TimeSpan.FromMinutes(10));
        return trackedProducer;
       
        // If within the buffer or it is flagged to be removed

        lock (RemoveSync)
        {
              trackedProducer = PartitionProducers.GetOrAdd(partitionId, id => connection.CreateTransportProducer(id, retryPolicy));
              trackedProducer.RemoveAfter = DateTimeOffset.Now.Add(TimeSpan.FromMinutes(15));
              trackedProducer.ToBeRemoved = false;
              PartitionProducer[partitionId] = trackedProducer;
        }
        
          return activeProducer;        
    }
    
    public TimerCallback ProducersCleanerHandler()
    {
         // Get Producers where expire time is now or earlier and are marked for removal. 

         lock (RemoveSync)
         {
              // Remove from the set, then track.
         }

         // Close those that we removed from the set.

         // Get Producers where expire time is now or earlier and not marked for removal.    Loop and mark them for removal.
     }

    private class TrackingStuffs
    {
        public readonly TransportProducer Producer;
        public bool ToBeRemoved;
        public DateTimeOffSet RemoveAfter;
    }
}

Obviously, that is just a rough sketch off the top of my head. It certainly needs more refinement, but the idea is that we avoid taking the lock path unless we think there's a pending removal. Since this isn't critical resource management, we don't have to be super-eager about cleaning, we can wait an extra tick or two.

I'd still rank this as a fallback to the cache if that doesn't work out.

Your turn for riffing on thoughts. 😄

@albertodenatale
Copy link
Contributor

albertodenatale commented Dec 10, 2019

"Tracking and refreshing producers that are about to expire" (rephrasing to ensure I got it right) would be a great improvement.

One point could be how to pick a buffer lifespan long enough to avoid the race condition and short enough (compared to the configurable producer lifespan) to provide the intended benefit.

internal class TransportProducerPool
{
    private ConcurrentDictionary<string, PooledTransportProducer> PartitionProducers { get; } = new ConcurrentDictionary<string, PooledTransportProducer>();

    public TransportProducer GetPartitionProducer(string partitionId,
                                                  EventHubConnection connection,
                                                  EventHubsRetryPolicy retryPolicy)
    {
        var pooledProducer = PartitionProducers.GetOrAdd(partitionId, id => new PooledTransportProducer(connection.CreateTransportProducer(id, retryPolicy), ProducerLifeSpan));

        if (pooledProducer.IsAlive())
        {
            pooledProducer.Refresh();
        }
        else
        {
            lock (RemoveSync)
            {
                pooledProducer = PartitionProducers.GetOrAdd(partitionId, id => new PooledTransportProducer(connection.CreateTransportProducer(id, retryPolicy), ProducerLifeSpan));
                pooledProducer.Refresh();
                PartitionProducers[partitionId] = pooledProducer;
            }
        }

        return pooledProducer.TransportProducer;
    }

    private TimerCallback ProducersCleanerHandler()
    {
        return async _ =>
        {
            lock (CleanSync)
            {
                if (IsCleanerRunning)
                {
                    return;
                }

                IsCleanerRunning = true;
            }

            try
            {
                var removedProducersTasks = new List<Task>();

                var removedProducers = RemoveExpiredProducers();

                foreach (var removedProducer in removedProducers)
                {
                    removedProducersTasks.Add(removedProducer.TransportProducer.CloseAsync(CancellationToken.None));
                }

                if (removedProducersTasks.Any())
                {
                    await Task.WhenAll(removedProducersTasks).ConfigureAwait(false);
                }

                SetExpiringProducers();
            }
            finally
            {
                IsCleanerRunning = false;
            }
        };
    }

    private Timer StartRefreshTimer(double secondsInterval)
    {
        return new Timer(ProducersCleanerHandler(), null, TimeSpan.FromSeconds(secondsInterval), TimeSpan.FromSeconds(secondsInterval));
    }
}

@jsquire
Copy link
Member Author

jsquire commented Dec 10, 2019

One point could be how to pick a buffer lifespan long enough to avoid the race condition and short enough (compared to the configurable producer lifespan) to provide the intended benefit.

Absolutely; that would be the trick. What just occurred to me is that we could potentially flip the paradigm and save ourselves a headache. What if we did something like:

internal class TransportProducerPool
{
    public PooledProducer GetPartitionProducer(string partitionId,  EventHubConnection connection,  EventHubsRetryPolicy retryPolicy)
    {
         // Get or add the producer.
         // Set the "is active marker"
         // Reset the "remove after" time
         // Set last updated time
         // Calculate maximum retry window using the retry policy + small buffer (5 mins?)
    }
    
    private TimerCallback CleanUp()
    {
          // Get all items where "remove after" is less than now.
          // if "is active" is set and "last updated + maximum retry interval" > now, ignore it
          //               If it is active but not eligible for retries, then something bad happened and we should clean up.
           
          // Remove it.
          // Close it.
    }

    internal class PooledProducer : IAsyncDisposable
    {
        private string PartitionId { get; }
        public TransportProducer Item { get; }

        public ValueTask DisposeAsync()
        {
                // Reset the "remove after" time.
               // Remove the "is active" marker.
               // Set the last updated time
         }
    }

    private class TrackingStuffs
    {
        public readonly TransportProducer Producer;
        public bool IsActive;
        public DateTimeOffSet RemoveAfter;
        public DateTimeOffset LastUpdated;
        public TimeSpan MaximumRetryWindow;
    }
}

We'd still have to do a bit of guarding with TryGet and TryUpdate but that may have some merit. We should probably not put too much thought into things unless we prove the memory cache is a bad option.

@albertodenatale
Copy link
Contributor

albertodenatale commented Dec 11, 2019

👏 👏 👏

That was a truth masterpiece really loved it. Making the PooledProducer as IAsyncDisposable was a real touch.

I'd still rank this as a fallback to the cache if that doesn't work out.

Not sure I understand completely plan A though (sorry I am a bit out of my comfort zone).

I think to understand we want to use System.Runtime.Caching.MemoryCache in place of TransportProducerPool or the ConcurrentDictionary?

Something that would end up with this call?

myCache.Add(partitionId, transportProducer, new CacheItemPolicy
{
    SlidingExpiration = new TimeSpan(0, 10, 0),
    RemovedCallback = async (CacheEntryRemovedArguments arguments) =>
    {
         TransportProducer producer = arguments.CacheItem.Value;
         await producer.CloseAsync();
    }
});

My understanding is that the remove call happens outside the lock.

However, looking at the code that is handling the removal, looks to me like adding and removing items always happen using locks (similarly to what we were doing in our first version): The eviction from the cache calls the CacheStore.Remove that in turns blocks additions by using the _entriesLock

@jsquire
Copy link
Member Author

jsquire commented Dec 12, 2019

Not sure I understand completely plan A though (sorry I am a bit out of my comfort zone).

I think to understand we want to use System.Runtime.Caching.MemoryCache in place of TransportProducerPool or the ConcurrentDictionary?

No worries; I don't have a clear picture in my head, so its quite likely that I'm not expressing myself so well. My thought would be to replace the ConcurrentDictionary as the storage and delegate that to MemoryCache if we can. That would make TransportProducerPool just a lightweight manager for putting items into the cache, fetching them from the cache, and doing any synchronization needed. The big win, as I see it, would be delegating responsibility for tracking the expiration of producers and keeping track of the last time it was touched.

@jsquire
Copy link
Member Author

jsquire commented Dec 12, 2019

I'd like to spin some further thoughts that I think could simplify things further and help to unify the "what happens with a MemoryCache" and "What happens with a ConcurrentDictionary" scenarios.

Here's the gist:

  • When the TransportProducerPool returns an producer, the caller provides a key that is logged as an active user of the item, and the "last used" time would be updated.
  • When the pool item expires, the handler (be it cache remove callback or timer callback) takes that producer out of the cache. If it is has any active users, it does not close it, just removes it.
  • The pool item is still disposable. When disposed, it will check to see if the item exists in the pool and, if so, that its key is still registered as an active user.
    • If the producer is still in the pool, it will update the "last used" time and remove the associated identifier from the active users.
    • If the producer is not in the pool, it will remove the associated identifier from the active users. If there are no active users, it will close the transport producer.

That does come with a potential trade-off. In an exception situation, it is possible that a producer would be removed from the pool, but the last active user would not properly close it. This would leave the producer in limbo. However, the resources used are bound to an idle timeout. So, in the worst case, the producer would live until the idle timeout closed the underlying AMQP link. Since we own the code that would be responsible for managing the pooled producer and closing things, I think that's a small exception case that I'm comfortable with.

I would absolutely not trust this code. It's just to illustrate, and definitely requires thinking about more deeply than I've done yet.

internal class TransportProducerPool : IDisposable
{
	private ConcurrentDictionary<string, PoolItem> Pool { get; } = new ConcurrentDictionary<string, PoolItem>();
	private Timer ExpirationTimer { get; }
	
	public TransportProducerPool()
	{
		ExpirationTimer = new Timer(PerformExpiration, null, TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(10));
	}
	
	public PooledProducer GetPartitionProducer(string partitionId, EventHubConnection connection, EventHubsRetryPolicy retryPolicy)
	{
		// Get or add the producer.
		
		var identifier = Guid.NewGuid().ToString();
		var item = Pool.GetOrAdd(partitionId,id => new PoolItem(new TransportProducer())); 
		
		// These introduce a slight race, but it's benign.  The timing for removal has a large enough
		// window that it doesn't need to be exact.
		
		item.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
		item.ActiveInstances.TryAdd(identifier, 0);

		return new PooledProducer(item.Producer, producer =>
		{
			// If the item is in the pool, extend it and remove the active instance.
			
			if (Pool.TryGetValue(partitionId, out var cachedItem))
			{
				cachedItem.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
				cachedItem.ActiveInstances.Remove(identifier, out _);
			}
			
			// If the item is not in the pool and there are no active instances, then return
			// close the producer.  
			//
			// I'm probably being overly defensive here, but the second
			// TryGet runs after the extension would have been seen, so it 
			// is intended to be sure that the item wasn't removed in the meantime.
			// 
			// We're not in a performance-critical area, so the aditional cost for 
			// synchronization isn't a concern.
			
			if ((!Pool.TryGetValue(partitionId, out _)) && (!item.ActiveInstances.Any()))
			{
				return producer.CloseAsync();
			}
			
			return Task.CompletedTask;
		});
	}

	private void PerformExpiration(object State)
	{
		// Capture the timestamp to use a consistent value.
		
		var now = DateTimeOffset.UtcNow;
		
		foreach (var key in Pool.Keys)
		{
		   if (Pool.TryGetValue(key, out var poolItem))
		   {
		   	  // Check to see if this should be removed.
			  
		   	  if (poolItem.RemoveAfter <= now)
			  {
			  	  // Remove it, if it should be removed and see if there are any instances 
				  // using it.  If not, close the producer.
				  
			  	  if ((Pool.TryRemove(key, out _)) && (!poolItem.ActiveInstances.Any()))
				  {
				  	  poolItem.Producer.CloseAsync().GetAwaiter().GetResult();
				  }
			  }
		   }
		}
	}

	public void Dispose()
	{
		ExpirationTimer.Dispose();
	}

	internal class PooledProducer : IAsyncDisposable
	{
		private Func<TransportProducer, Task> CleanUp { get; }
		public TransportProducer Item { get; }		
		
		public PooledProducer(TransportProducer producer, Func<TransportProducer, Task> cleanUp)
		{
			Item = producer;
			CleanUp = cleanUp;
		}
		
		public ValueTask DisposeAsync() => new ValueTask(CleanUp(Item));
	}

	private class PoolItem
	{
		public static readonly TimeSpan DefaultRemoveAfterDuration = TimeSpan.FromMinutes(10);
		
		public TransportProducer Producer { get; }
		public ConcurrentDictionary<string, byte> ActiveInstances { get; } = new ConcurrentDictionary<string, byte>();
		public DateTimeOffset RemoveAfter { get; set; }
		
		public PoolItem(TransportProducer producer, DateTimeOffset? removeAfter = default)
		{
			Producer = producer;
			RemoveAfter = removeAfter ?? DateTimeOffset.UtcNow.Add(TimeSpan.FromMinutes(1));
		}
	}
}

@albertodenatale
Copy link
Contributor

albertodenatale commented Dec 13, 2019

I think I got that, thank you for taking the time to explain in such a great detail your thinking.

// These introduce a slight race, but it's benign.  The timing for removal has a large enough
// window that it doesn't need to be exact.

You said that this race is benign. I can't understand though.

Let's say there was a context switch exactly where you put your comment.

Let's say a CloseAsync() was called on the same producer returned by Pool.GetOrAdd(partitionId,id => new PoolItem(new TransportProducer()));

Wouldn't that be bad?

Testing

How do you use to test race conditions?
Wonder if you may have part of the codebase where you addressed race conditions issues I could take inspiration from?

@jsquire
Copy link
Member Author

jsquire commented Dec 17, 2019

I think I got that, thank you for taking the time to explain in such a great detail your thinking.

Oh, I'm just thinking aloud. Call it more "asking for a sanity check" than "presenting a fully formed idea". We're definitely still working through the details together here.

// These introduce a slight race, but it's benign.  The timing for removal has a large enough
// window that it doesn't need to be exact.

You said that this race is benign. I can't understand though.

Let's say there was a context switch exactly where you put your comment.

That comment was about the sliding extension, which could potentially be in contention by two holders of the instance. I was indicating that I don't think we're concerned if we get the expiration that is a few seconds earlier.
Though, in hindsight, we should put that inside of the TryAdd only on success.

if (item.ActiveInstances.TryAdd(identifier, 0))
{
    item.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
}
else
{
    // We're got a bad instance, we should get another and do this again... or just outright fail and let the caller
    // hold responsibilities for retrying.  Not sure which....
}

Let's say a CloseAsync() was called on the same producer returned by Pool.GetOrAdd(partitionId,id => new PoolItem(new TransportProducer()));

Wouldn't that be bad?

It wouldn't be good. That said, I think we covered that fairly well. We remove it from the pool before checking to see if there are any active instances, which is a synchronized check. We'd have to have a scenario where the pool was returned just before the item was removed and the check was performed just before the other holder marked themselves as active. It is possible, but I would call it an acceptably low probability. We would want to add a bit of resilience to the retry loop to account for "did I get a closed client from the pool", certainly. I think we'd also, maybe, consider checking to see if the producer was closed before marking active. Neither would eliminate the race, but would provide reasonable defense in depth.... I think.

Testing

How do you use to test race conditions?
Wonder if you may have part of the codebase where you addressed race conditions issues I could take inspiration from?

The only way to do it reliably is to have a means to deterministically cause the condition. We'd have to have some kind of thoughts around how to override, mock, or otherwise force the pool to return an item in conflict.... or just accept it as a tiny possibility and prove that retries work to cover us there.

@albertodenatale
Copy link
Contributor

albertodenatale commented Dec 18, 2019

Thank you a lot for your answers,

Though, in hindsight, we should put that inside of the TryAdd only on success.

if (item.ActiveInstances.TryAdd(identifier, 0))
{
    item.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
}
else
{
    // We're got a bad instance, we should get another and do this again... or just outright fail and let the caller
    // hold responsibilities for retrying.  Not sure which....
}

I may have lost you here, why do you think there would be a good chance that we may try to add the same GUID twice?

Odd scenario

I was thinking about the following scenario.

A. The producer expires. However it still has active instances. Consequently, it is removed and delegates the invokation of the CloseAsync() to the cleanup function.

if (poolItem.RemoveAfter <= now)
{
     // Remove it, if it should be removed and see if there are any instances 
     // using it.  If not, close the producer.
				  
     if ((Pool.TryRemove(key, out _)) && (!poolItem.ActiveInstances.Any()))
     {
           poolItem.Producer.CloseAsync().GetAwaiter().GetResult();
     }
}

B. The cleanup function starts. It does not find the producer in the Pool, hence it cannot remove the active instance.

return new PooledProducer(item.Producer, producer =>
{
    if (Pool.TryGetValue(partitionId, out var cachedItem))
    {
        cachedItem.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
        cachedItem.ActiveInstances.Remove(identifier, out _);
    }

C. Reaching this point, !Pool.TryGetValue(partitionId, out _) would return true however the active instances would not be empty

if ((!Pool.TryGetValue(partitionId, out _)) && (!item.ActiveInstances.Any()))
{
	return producer.CloseAsync();
}

D. The producer would be hanging around and this would defeat the real purpose of this task

Instead, why don't we do this? 😉

Swap the checks in the Timed callback

if (poolItem.RemoveAfter <= now)
{
     // Remove it, if it should be removed and see if there are any instances 
     // using it.  If not, close the producer.
				  
     if (!poolItem.ActiveInstances.Any() && Pool.TryRemove(key, out _) )
     {
          poolItem.Producer.CloseAsync().GetAwaiter().GetResult();
     }
}

⌛️
⌛️
⌛️

return new PooledProducer(item.Producer, producer =>
{
    if (Pool.TryGetValue(partitionId, out var cachedItem))
    {
        cachedItem.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
        cachedItem.ActiveInstances.Remove(identifier, out _);
    }

    if (Pool.TryGetValue(partitionId, out _) && !item.ActiveInstances.Any())
    {
	 return producer.CloseAsync();
    }
}

Look forward to knowing your thinking on this.

@jsquire
Copy link
Member Author

jsquire commented Dec 18, 2019

Thank you a lot for your answers,

Though, in hindsight, we should put that inside of the TryAdd only on success.

if (item.ActiveInstances.TryAdd(identifier, 0))
{

I may have lost you here, why do you think there would be a good chance that we may try to add the same GUID twice?

Oh, I don't. Definitely a fringe case, but we should respect it. It may be worth extending with && (!item.Producer.IsClosed) - just to further add defense in depth.

@jsquire
Copy link
Member Author

jsquire commented Dec 18, 2019

D. The producer would be hanging around and this would defeat the real purpose of this task

Truth. In this case, the producer would hang around and potentially have an active link for 60 seconds until the AMQP session expires. The instance itself would live until garbage collected, since it is presumably an orphan now. If GC happens to kick in before the timeout, it'll force-close the resources. So, worst case, we see an orphan hold AMQP resources for 60 seconds and consume local memory until there's enough pressure for GC.

That's pretty much what I'm pitching. Given how infrequent this corner case would present, having an orphan instance in the pool wouldn't be a major concern. Here's the catch, though.... we would need to confirm that having an open link to that partition wouldn't block a new producer from being created. I'm about 70% sure that is the case, but haven't done my diligence.

Instead, why don't we do this? 😉

Swap the checks in the Timed callback

if (poolItem.RemoveAfter <= now)
{
     // Remove it, if it should be removed and see if there are any instances 
     // using it.  If not, close the producer.
				  
     if (!poolItem.ActiveInstances.Any() && Pool.TryRemove(key, out _) )
     {
          poolItem.Producer.CloseAsync().GetAwaiter().GetResult();
     }
}

It's a good idea and worthy of consideration. Here's the counter-scenario that I'm thinking of:

  • There are no active instances in the pool
  • Context switch occurs right after that check and before the TryRemove, there's now an active instance and someone holding the reference.
  • The instance is removed, then closed.
  • The user of that instance has the link force-closed. A failure occurs and retry logic kicks in to recover.

In either case, we can't fully eliminate the race condition, it's a matter of trade-off. As I see it, we're looking at:

Unlikely race causes an orphaned producer;  things clean up automatically after 60 seconds.

  - OR -

Unlikely race causes a producer in use to be closed; retry logic allows recovery by requesting a new producer.

Both cases would seem to be acceptable, given the low probability of occurrence, both being minimally impactful (one causes a slight performance hit, the other causes slight a slight increase in resource use), and that they're not on a hot path. That said, both also hinge on proving that opening a new AMQP link to the same resource doesn't cause failures. (A producer instance needs to open a link, and the partition is part of the resource path) If it causes the "old" copy to fail and lets the "new" copy take precedence, we're good. If its vice-versa, that throws wrench at things.

... ok, that's a lot going on. Revisiting after thinking about it as I type, I think you've convinced me that your approach is better. If we have the retry there, the publishing code is more resilient and can better recover from unrelated transient issues. Where in my version, we get no additional value out of things.

What are your thoughts?

@albertodenatale
Copy link
Contributor

albertodenatale commented Dec 19, 2019

Thank you a lot for sharing your thoughts,

we would need to confirm that having an open link to that partition wouldn't block a new producer from being created

I tried to give a look at this point and I created the following test:

await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
    var connectionString = TestEnvironment.BuildConnectionStringForEventHub(scope.EventHubName);

    await using (var connection = new TestConnectionWithTransport(connectionString))
    {
        var retryPolicy = new EventHubProducerClientOptions().RetryOptions.ToRetryPolicy();

        var firstProducer = connection.CreateTransportProducer("0", retryPolicy);

        var firstEvent = new List<EventData>
        {
            new EventData(System.Text.Encoding.UTF8.GetBytes("First Event"))
        };

        await firstProducer.SendAsync(firstEvent, default, CancellationToken.None);

        var secondProducer = connection.CreateTransportProducer("0", retryPolicy);

        var secondEvent = new List<EventData>
        {
            new EventData(System.Text.Encoding.UTF8.GetBytes("Second Event"))
        };

        await secondProducer.SendAsync(secondEvent, default, CancellationToken.None);
    }
}

It looks to me like the events go through fine.

image

Would you say this test could be conclusive?

@jsquire
Copy link
Member Author

jsquire commented Dec 19, 2019

Would you say this test could be conclusive?

Oh yeah, I'd definitely call that conclusive; you went right down to the metal and grabbed the transport producer directly. That's about as low-level as we functionally can get. If we had a problem with opening multiple links to that same resource, there would be a hard error rejecting or force-closing one.

@albertodenatale
Copy link
Contributor

albertodenatale commented Dec 23, 2019

you went right down to the metal and grabbed the transport producer directly

😉

(could not find any dabbing man nor unicorn in the GitHub's emoji list)

Retry logic

  • The user of that instance has the link force-closed. A failure occurs and the retry logic kicks in to recover.

I tried to trigger an AmqpProducer::CloseAsync(CancellationToken.None) while breaking on the following line of AmqpProducer::SendAsync:

link = await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout)).ConfigureAwait(false);

I then let the code run and I got the following exception:

image

   at Microsoft.Azure.Amqp.AmqpObject.ThrowIfClosed()
   at Microsoft.Azure.Amqp.SendingAmqpLink.BeginSendMessage(AmqpMessage message, ArraySegment`1 deliveryTag, ArraySegment`1 txnId, TimeSpan timeout, AsyncCallback callback, Object state)
   at Microsoft.Azure.Amqp.SendingAmqpLink.<>c__DisplayClass11_0.<SendMessageAsync>b__0(AsyncCallback c, Object s)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncImpl(Func`3 beginMethod, Func`2 endFunction, Action`1 endAction, Object state, TaskCreationOptions creationOptions)
   at Microsoft.Azure.Amqp.SendingAmqpLink.SendMessageAsync(AmqpMessage message, ArraySegment`1 deliveryTag, ArraySegment`1 txnId, TimeSpan timeout)
   at Azure.Messaging.EventHubs.Amqp.AmqpProducer.<SendAsync>d__31.MoveNext() in C:\src\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpProducer.cs:line 306

The retry logic did not seem to kick in as the exception was handled inside the following catch:

catch (AmqpException amqpException)
{
    throw AmqpError.CreateExceptionForError(amqpException.Error, EventHubName);
}

Not sure if this may be prompt you to completely abandon the idea and stick with leaving the producer in the limbo?

Alternative Scenario

I also tried an alternative scenario:

Tried to reproduce at least partially the scenario and I wonder if you think this might be possible:

  • There are no active instances in the pool
  • A context switch occurs right after that check and before the TryRemove, there's now an active instance and someone holding the reference.
  • The instance is removed, then closed.
  • The user of that instance calls AmqpProducer::SendAsync and the assertion Argument.AssertNotClosed(_closed, nameof(AmqpProducer)); triggers an error
public override async Task SendAsync(IEnumerable<EventData> events,
                                        SendEventOptions sendOptions,
                                        CancellationToken cancellationToken)
{
    Argument.AssertNotNull(events, nameof(events));
    Argument.AssertNotClosed(_closed, nameof(AmqpProducer));

    AmqpMessage messageFactory() => MessageConverter.CreateBatchFromEvents(events, sendOptions?.PartitionKey);
    await SendAsync(messageFactory, sendOptions?.PartitionKey, cancellationToken).ConfigureAwait(false);
}

Resilience

Reconciling with what we told:

We would want to add a bit of resilience to the retry loop to account for "did I get a closed client from the pool", certainly.

I wonder if you think we might have to remove that assertion?

Collisions in ConcurrentDictionary

// We're got a bad instance, we should get another and do this again... or just outright fail and let the caller
// hold responsibilities for retrying. Not sure which....

I wonder if you would see this problem in this perspective:

Would the caller of the GetPartitionProducer be happy of checking that a PooledProducer is returned in exchange for getting the performances of lock-free reads provided by the ConcurrentDictionary?

Look forward to knowing your thinking on this.

@jsquire
Copy link
Member Author

jsquire commented Dec 30, 2019

The retry logic did not seem to kick in as the exception was handled inside the following catch:

catch (AmqpException amqpException)
{
    throw AmqpError.CreateExceptionForError(amqpException.Error, EventHubName);
}

Not sure if this may be prompt you to completely abandon the idea and stick with leaving the producer in the limbo?

That's what I'd expect. That will surface as an EventHubsException with the Reason set for the client being closed. We would want to specifically handle that exception (which is normally fatal) in this one very specific scenario. Definitely not a general case thing nor something that is implemented today.

Alternative Scenario

I also tried an alternative scenario:

Tried to reproduce at least partially the scenario and I wonder if you think this might be possible:

  • There are no active instances in the pool
  • A context switch occurs right after that check and before the TryRemove, there's now an active instance and someone holding the reference.
  • The instance is removed, then closed.
  • The user of that instance calls AmqpProducer::SendAsync and the assertion Argument.AssertNotClosed(_closed, nameof(AmqpProducer)); triggers an error

Yup. Completely plausible; I'd say that strengthens the case for your approach, as we'd need the proposed retry logic enhancement to properly handle this case.

Resilience

Reconciling with what we told:

We would want to add a bit of resilience to the retry loop to account for "did I get a closed client from the pool", certainly.

I wonder if you think we might have to remove that assertion?

We'd certainly need to tweak the logic. Given that we could have a race condition between that assertion and the send, I'm speculating we'll probably want to keep the assertion and wrap the entire body in that try/catch with logic to request another from the pool if that condition hits in the assert or in the SendAsync call.

Collisions in ConcurrentDictionary

I wonder if you would see this problem in this perspective:

Would the caller of the GetPartitionProducer be happy of checking that a PooledProducer is returned in exchange for getting the performances of lock-free reads provided by the ConcurrentDictionary?

Look forward to knowing your thinking on this.

Not sure that I understand the proposal; we've got very tight and exclusive use-case here, so we don't have to worry about being overly generic and friendly. Any chance you can help me understand the approach?

@albertodenatale
Copy link
Contributor

Thank you very much for your answer.

Before we move on the next pieces, I wonder if you would allow me to take a step backward to inspect further our solution?

The scenario we are considering is:

Odd scenario

I was thinking about the following scenario.

A. The producer expires. However it still has active instances. Consequently, it is removed and delegates the invokation of the CloseAsync() to the cleanup function.

if (poolItem.RemoveAfter <= now)
{
     // Remove it, if it should be removed and see if there are any instances 
     // using it.  If not, close the producer.
				  
     if ((Pool.TryRemove(key, out _)) && (!poolItem.ActiveInstances.Any()))
     {
           poolItem.Producer.CloseAsync().GetAwaiter().GetResult();
     }
}

B. The cleanup function starts. It does not find the producer in the Pool, hence it cannot remove the active instance.

return new PooledProducer(item.Producer, producer =>
{
    if (Pool.TryGetValue(partitionId, out var cachedItem))
    {
        cachedItem.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
        cachedItem.ActiveInstances.Remove(identifier, out _);
    }

C. Reaching this point, !Pool.TryGetValue(partitionId, out _) would return true however the active instances would not be empty

if ((!Pool.TryGetValue(partitionId, out _)) && (!item.ActiveInstances.Any()))
{
	return producer.CloseAsync();
}

D. The producer would be hanging around and this would defeat the real purpose of this task

We then said, we can change the code at point A. as follows:

if (!poolItem.ActiveInstances.Any() && Pool.TryRemove(key, out var _))
{
    poolItem.Producer.CloseAsync(CancellationToken.None).GetAwaiter().GetResult();
}

A third solution might be modifying point B. as follows:

return new PooledProducer(item.Producer, producer =>
{
     item.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
     item.ActiveInstances.Remove(identifier, out _);
     
     (...)

We would always refresh a PooledItem.RemoveAfter and always remove from item.ActiveInstances. In this way we would ensure that at point C. item.ActiveInstances would always be empty.

I wonder if referencing the PooledItem from the variable cachedItem was to avoid some exceptions though?

Look forward to hearing back from you on this.

@jsquire
Copy link
Member Author

jsquire commented Jan 9, 2020

A third solution might be modifying point B. as follows:

return new PooledProducer(item.Producer, producer =>
{
     item.RemoveAfter = DateTime.UtcNow.Add(PoolItem.DefaultRemoveAfterDuration);
     item.ActiveInstances.Remove(identifier, out _);
     
     (...)

We would always refresh a PooledItem.RemoveAfter and always remove from item.ActiveInstances. In this way we would ensure that at point C. item.ActiveInstances would always be empty.

I wonder if referencing the PooledItem from the variable cachedItem was to avoid some exceptions though?

Look forward to hearing back from you on this.

I won't lie to you.... its been long enough that this made my brain hurt. I had to read things over a few times, so forgive me if I'm still a bit slow on the uptake. I think the idea behind using cachedItem was to ensure that we didn't extend the RemoveAfter for items that were no longer in the pool. Removing the active instance seems safe... and current me can't figure out whether past me had a good reason to be afraid of setting that RemoveAfter. Can you think of any?

If we can collectively determine that it is safe to always update RemoveAfter, then I think your new proposal is our best option. I have this nagging feeling that current me is missing something, though.

@albertodenatale
Copy link
Contributor

Can you think of any?

Not on the top of my mind, but I personally would not see a compelling case for refreshing removed producers.

I won't lie to you.... its been long enough that this made my brain hurt.

I agree with you I created a draft PR that I hope should make things easier.

I am definitely missing some tests though.

@albertodenatale
Copy link
Contributor

albertodenatale commented Jan 13, 2020

Hi @jsquire
I should be on annual leave tomorrow and the day afterwards. Apologies for slow responses and ackward silences.

@jsquire
Copy link
Member Author

jsquire commented Jan 14, 2020

Hi @jsquire
I should be on annual leave tomorrow and the day afterwards. Apologies for slow responses and ackward silences.

Thanks for the heads-up; no worries. It's been a week of unexpected fire drills, so I also apologize for being so slow to respond.

@albertodenatale
Copy link
Contributor

albertodenatale commented Jan 17, 2020

Thanks for the heads-up; no worries. It's been a week of unexpected fire drills, so I also apologize for being so slow to respond.

No worries 😄 happy to have you back

albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Feb 16, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Feb 29, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Feb 29, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Mar 1, 2020
@AlexGhiondea AlexGhiondea added this to the Backlog milestone Mar 3, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Mar 4, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Mar 5, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Mar 5, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Mar 6, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Mar 8, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Mar 8, 2020
albertodenatale added a commit to albertodenatale/azure-sdk-for-net that referenced this issue Mar 8, 2020
jsquire pushed a commit that referenced this issue Apr 15, 2020
The focus of these changes is to add a pool for transport producers used by the EventHubProducerClient which manages their lifespan using a sliding time window approach.


Authored-by: Alberto De Natale <[email protected]>
@jsquire jsquire modified the milestones: Backlog, [2020] May Apr 15, 2020
@jsquire
Copy link
Member Author

jsquire commented Apr 15, 2020

Fixed by #9431

@jsquire jsquire closed this as completed Apr 15, 2020
@github-actions github-actions bot locked and limited conversation to collaborators Mar 29, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. Event Hubs
Projects
None yet
Development

No branches or pull requests

3 participants