Skip to content

Commit

Permalink
[azeventhubs] Fixing a bug where a cancelled recovery could result in…
Browse files Browse the repository at this point in the history
… a perpetually broken client (#23337)

Fixing a customer issue where they cancel in the middle of our retry loop and prevent our recovery from happening.
    
This simplifies the overall logic and should be just fine - some of the cases it was trying to handle aren't needed anymore because of changes in the recovery code, etc...
    
Also, updated stress tests to:
* Include pipefail, which makes sure that, if the underlying test fails, the script _exits_ instead of potentially reporting okay because `tee` happened to work :) 
* Remove use of the test credential (in stress tests only!) since we need to use the DefaultAzureCredential and don't need to take advantage of or care about the state of test recording.

Fixes #23282
  • Loading branch information
richardpark-msft authored Aug 15, 2024
1 parent a37cf7f commit bbad005
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 39 deletions.
8 changes: 2 additions & 6 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 1.2.2 (Unreleased)

### Features Added

### Breaking Changes
## 1.2.2 (2024-08-15)

### Bugs Fixed

### Other Changes
- Fixed a bug that where a short context deadline could prevent recovery from ever happening. The end result would be a broken PartitionClient/ConsumerClient that would never recover from the underlying failure. (PR#23337)

## 1.2.1 (2024-05-20)

Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azeventhubs/internal/eh/stress/Chart.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.3.2
digest: sha256:6eee71a7e8a4c0dc06d5fbbce39ef63237a0db0b7fc2da66e98e96b68985b764
generated: "2024-05-30T01:22:48.620817144Z"
version: 0.3.3
digest: sha256:1cffb5ed8ea74953ab7611f9e2de2163af2c3f0918afb9928f71210da9c19a4a
generated: "2024-08-14T17:44:22.828343827-07:00"
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ spec:
args:
- >
set -ex;
set -o pipefail;
mkdir -p "$DEBUG_SHARE";
{{if eq .Stress.testTarget "multibalance" }}
/app/stress "{{.Stress.testTarget}}" "-rounds" "{{.Stress.rounds}}" "{{.Stress.verbose}}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
Expand Down
6 changes: 4 additions & 2 deletions sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/internal/test/credential"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test"
Expand Down Expand Up @@ -109,7 +109,9 @@ func newStressTestData(name string, baggage map[string]string) (*stressTestData,

td.TC = telemetryClient{tc}

td.Cred, err = credential.New(nil)
// NOTE: this isn't run in the live testing pipelines, only within stress testing
// so you shouldn't use the test credential.
td.Cred, err = azidentity.NewDefaultAzureCredential(nil)

if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions sdk/messaging/azeventhubs/internal/links.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ type AMQPLink interface {
// LinksForPartitionClient are the functions that the PartitionClient uses within Links[T]
// (for unit testing only)
type LinksForPartitionClient[LinkT AMQPLink] interface {
// Retry is [Links.Retry]
Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error

// Close is [Links.Close]
Close(ctx context.Context) error
}

Expand Down
43 changes: 15 additions & 28 deletions sdk/messaging/azeventhubs/internal/links_recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ import (
)

type LinkRetrier[LinkT AMQPLink] struct {
GetLink func(ctx context.Context, partitionID string) (LinkWithID[LinkT], error)
// GetLink is set to [Links.GetLink]
GetLink func(ctx context.Context, partitionID string) (LinkWithID[LinkT], error)

// CloseLink is set to [Links.closePartitionLinkIfMatch]
CloseLink func(ctx context.Context, partitionID string, linkName string) error

// NSRecover is set to [Namespace.Recover]
NSRecover func(ctx context.Context, connID uint64) error
}

Expand All @@ -30,7 +35,6 @@ func (l LinkRetrier[LinkT]) Retry(ctx context.Context,
partitionID string,
retryOptions exported.RetryOptions,
fn RetryCallback[LinkT]) error {
didQuickRetry := false

isFatalErrorFunc := func(err error) bool {
return GetRecoveryKind(err) == RecoveryKindFatal
Expand All @@ -43,10 +47,6 @@ func (l LinkRetrier[LinkT]) Retry(ctx context.Context,
}

return utils.Retry(ctx, eventName, prefix, retryOptions, func(ctx context.Context, args *utils.RetryFnArgs) error {
if err := l.RecoverIfNeeded(ctx, args.LastErr); err != nil {
return err
}

linkWithID, err := l.GetLink(ctx, partitionID)

if err != nil {
Expand All @@ -56,37 +56,24 @@ func (l LinkRetrier[LinkT]) Retry(ctx context.Context,
currentPrefix = linkWithID.String()

if err := fn(ctx, linkWithID); err != nil {
if args.I == 0 && !didQuickRetry && IsQuickRecoveryError(err) {
// go-amqp will asynchronously handle detaches. This means errors that you get
// back from Send(), for instance, can actually be from much earlier in time
// depending on the last time you called into Send().
//
// This means we'll sometimes do an unneeded sleep after a failed retry when
// it would have just immediately worked. To counteract that we'll do a one-time
// quick attempt to recreate link immediately if we see a detach error. This might
// waste a bit of time attempting to do the creation, but since it's just link creation
// it should be fairly fast.
//
// So when we've received a detach is:
// 0th attempt
// extra immediate 0th attempt (if last error was detach)
// (actual retries)
//
// Whereas normally you'd do (for non-detach errors):
// 0th attempt
// (actual retries)
azlog.Writef(exported.EventConn, "(%s, %s) Link was previously detached. Attempting quick reconnect to recover from error: %s", linkWithID.String(), operation, err.Error())
didQuickRetry = true
args.ResetAttempts()
if recoveryErr := l.RecoverIfNeeded(ctx, err); recoveryErr != nil {
// it's okay to return this error, and we're still in an okay state. The next loop through will end
// up reopening all the closed links and will either get the same error again (ie, network is _still_
// down) or will work and then things proceed as normal.
return recoveryErr
}

// it's critical that we still return the original error here (that came from fn()) and NOT nil,
// otherwise we'll end up terminating the retry loop.
return err
}

return nil
}, isFatalErrorFunc)
}

// RecoverIfNeeded will check the error and pick the correct minimal recovery pattern (none, link only, connection and link, etc..)
// NOTE: if 'ctx' is cancelled this function will still close out all the connections/links involved.
func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) error {
rk := GetRecoveryKind(err)

Expand Down
126 changes: 126 additions & 0 deletions sdk/messaging/azeventhubs/internal/links_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/internal/test/credential"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported"
Expand Down Expand Up @@ -317,6 +318,131 @@ func TestLinksManagementRetry(t *testing.T) {
require.Nil(t, links.managementLink)
}

func TestRecoveryWithCancelledContext_Link(t *testing.T) {
// Customer calls into our functions, has an error and the context, bring expired, causes our retries
// to abort before we attempt to do even a single recovery.
//
// https://github.com/Azure/azure-sdk-for-go/issues/23282

const partitionID = "0"

setup := func(t *testing.T) (*Links[amqpwrap.AMQPSenderCloser], LinkWithID[amqpwrap.AMQPSenderCloser]) {
ns, links := newLinksForTest(t)

t.Cleanup(func() { test.RequireClose(t, links) })
t.Cleanup(func() { test.RequireNSClose(t, ns) })

origLWID, err := links.GetLink(context.Background(), partitionID)
require.NoError(t, err)
require.NotEmpty(t, origLWID)

// force a recovery but with a pre-cancelled context
cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()

first := true
err = links.Retry(cancelledCtx, log.Event("event"), "operation", partitionID, exported.RetryOptions{}, func(ctx context.Context, lwid LinkWithID[amqpwrap.AMQPSenderCloser]) error {
if first {
first = false
return amqpwrap.Error{
Err: &amqp.LinkError{},
ConnID: lwid.ConnID(),
LinkName: lwid.Link().LinkName(),
PartitionID: lwid.PartitionID(),
}
}

return nil
})
require.ErrorIs(t, err, context.Canceled)

return links, origLWID
}

t.Run("GetLinks", func(t *testing.T) {
links, origLWID := setup(t)

newLWID, err := links.GetLink(context.Background(), partitionID)
require.NoError(t, err)

require.NotEqual(t, origLWID.Link(), newLWID.Link())
require.Equal(t, origLWID.ConnID(), newLWID.ConnID())
})

t.Run("Retry", func(t *testing.T) {
links, origLWID := setup(t)

err := links.Retry(context.Background(), log.Event("event"), "operation", partitionID, exported.RetryOptions{}, func(ctx context.Context, lwid LinkWithID[amqpwrap.AMQPSenderCloser]) error {
require.NotEqual(t, origLWID.Link(), lwid.Link())
require.Equal(t, origLWID.ConnID(), lwid.ConnID())
return nil
})
require.NoError(t, err)
})
}

func TestRecoveryWithCancelledContext_Connection(t *testing.T) {
const partitionID = "0"

// Customer calls into our functions, has an error and the context, bring expired, causes our retries
// to abort before we attempt to do even a single recovery.
//
// https://github.com/Azure/azure-sdk-for-go/issues/23282
setup := func(t *testing.T) (*Links[amqpwrap.AMQPSenderCloser], LinkWithID[amqpwrap.AMQPSenderCloser]) {
ns, links := newLinksForTest(t)

t.Cleanup(func() { test.RequireClose(t, links) })
t.Cleanup(func() { test.RequireNSClose(t, ns) })

origLWID, err := links.GetLink(context.Background(), partitionID)
require.NoError(t, err)
require.NotEmpty(t, origLWID)

// force a recovery but with a pre-cancelled context
cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()

first := true
err = links.Retry(cancelledCtx, log.Event("event"), "operation", partitionID, exported.RetryOptions{}, func(ctx context.Context, lwid LinkWithID[amqpwrap.AMQPSenderCloser]) error {
if first {
first = false
return amqpwrap.Error{
Err: &amqp.ConnError{},
ConnID: lwid.ConnID(),
LinkName: lwid.Link().LinkName(),
PartitionID: lwid.PartitionID(),
}
}

return nil
})
require.ErrorIs(t, err, context.Canceled)

return links, origLWID
}

t.Run("GetLinks", func(t *testing.T) {
links, origLWID := setup(t)

newLWID, err := links.GetLink(context.Background(), partitionID)
require.NoError(t, err)

require.NotEqual(t, origLWID.Link(), newLWID.Link())
require.NotEqual(t, origLWID.ConnID(), newLWID.ConnID())
})

t.Run("Retry", func(t *testing.T) {
links, origLWID := setup(t)

err := links.Retry(context.Background(), log.Event("event"), "operation", partitionID, exported.RetryOptions{}, func(ctx context.Context, lwid LinkWithID[amqpwrap.AMQPSenderCloser]) error {
require.NotEqual(t, origLWID.Link(), lwid.Link())
require.NotEqual(t, origLWID.ConnID(), lwid.ConnID())
return nil
})
require.NoError(t, err)
})
}

func requireNewLinkSameConn(t *testing.T, oldLWID LinkWithID[AMQPSenderCloser], newLWID LinkWithID[AMQPSenderCloser]) {
t.Helper()
require.NotEqual(t, oldLWID.Link().LinkName(), newLWID.Link().LinkName(), "Link should have a new ID because it was recreated")
Expand Down

0 comments on commit bbad005

Please sign in to comment.