Skip to content

Commit

Permalink
[azservicebus,azeventhubs] Fixing issue where $cbs link being incompl…
Browse files Browse the repository at this point in the history
…etely closed prevented the connection from recovering (#20334)

You're only allowed to have one Sender+Receiver for the $cbs endpoint. In some cases, where there are connection failures or link cleanup is cancelled it's possible for the link to remain open. The only way to fix this is to restart the connection.

Fixes #19504
Likely related to dapr/components-contrib/issues/2617
  • Loading branch information
richardpark-msft authored Mar 7, 2023
1 parent 9286c60 commit ba84713
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 12 deletions.
5 changes: 3 additions & 2 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 0.6.0 (Unreleased)
## 0.6.0 (2023-03-07)

### Features Added

Expand All @@ -14,7 +14,8 @@

### Bugs Fixed

### Other Changes
- Recover the connection when the $cbs Receiver/Sender is not closed properly. This would cause
clients to return an error saying "$cbs node has already been opened." (PR#20334)

## 0.5.0 (2023-02-07)

Expand Down
9 changes: 9 additions & 0 deletions sdk/messaging/azeventhubs/internal/cbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"

"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/auth"
Expand Down Expand Up @@ -35,6 +36,14 @@ func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClie
})

if err != nil {
// In some circumstances we can end up in a situation where the link closing was cancelled
// or interrupted, leaving $cbs still open by some dangling receiver or sender. The only way
// to fix this is to restart the connection.
if IsNotAllowedError(err) {
log.Writef(exported.EventAuth, "Not allowed to open, connection will be reset: %s", err)
return errConnResetNeeded
}

return err
}

Expand Down
7 changes: 7 additions & 0 deletions sdk/messaging/azeventhubs/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,13 @@ func IsErrNotFound(err error) bool {
return ok
}

func IsNotAllowedError(err error) bool {
var e *amqp.Error

return errors.As(err, &e) &&
e.Condition == amqp.ErrorNotAllowed
}

func (e ErrConnectionClosed) Error() string {
return fmt.Sprintf("the connection has been closed: %s", string(e))
}
Expand Down
68 changes: 68 additions & 0 deletions sdk/messaging/azeventhubs/internal/links_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package internal

import (
"context"
"fmt"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test"
"github.com/stretchr/testify/require"
)

func TestLinksCBSLinkStillOpen(t *testing.T) {
// we're not going to use this client for these tests.
testParams := test.GetConnectionParamsForTest(t)
ns, err := NewNamespace(NamespaceWithConnectionString(testParams.ConnectionString))
require.NoError(t, err)

defer func() { _ = ns.Close(context.Background(), true) }()

session, oldConnID, err := ns.NewAMQPSession(context.Background())
require.NoError(t, err)

// opening a Sender to the $cbs endpoint. This endpoint can only be opened by a single
// sender/receiver pair in a connection.
_, err = session.NewSender(context.Background(), "$cbs", nil)
require.NoError(t, err)

newLinkFn := func(ctx context.Context, session amqpwrap.AMQPSession, entityPath string) (AMQPSenderCloser, error) {
return session.NewSender(ctx, entityPath, &amqp.SenderOptions{
SettlementMode: to.Ptr(amqp.ModeMixed),
RequestedReceiverSettleMode: to.Ptr(amqp.ModeFirst),
IgnoreDispositionErrors: true,
})
}

formatEntityPath := func(partitionID string) string {
return fmt.Sprintf("%s/Partitions/%s", testParams.EventHubName, partitionID)
}

links := NewLinks(ns, fmt.Sprintf("%s/$management", testParams.EventHubName), formatEntityPath, newLinkFn)

var lwid LinkWithID[AMQPSenderCloser]

err = links.Retry(context.Background(), exported.EventConn, "test", "0", exported.RetryOptions{
RetryDelay: -1,
MaxRetryDelay: time.Millisecond,
}, func(ctx context.Context, innerLWID LinkWithID[AMQPSenderCloser]) error {
lwid = innerLWID
return nil
})
require.NoError(t, err)

defer func() {
err := links.Close(context.Background())
require.NoError(t, err)
}()

require.NoError(t, err)
require.Equal(t, oldConnID+1, lwid.ConnID, "Connection gets incremented since it had to be reset")
}
6 changes: 5 additions & 1 deletion sdk/messaging/azeventhubs/internal/test/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ type ConnectionParamsForTest struct {
}

func GetConnectionParamsForTest(t *testing.T) ConnectionParamsForTest {
_ = godotenv.Load()
if _, err := os.Stat("../.env"); err == nil {
_ = godotenv.Load("../.env")
} else {
_ = godotenv.Load()
}

envVars := mustGetEnvironmentVars(t, []string{
"AZURE_SUBSCRIPTION_ID",
Expand Down
12 changes: 4 additions & 8 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
# Release History

## 1.2.1 (Unreleased)

### Features Added

### Breaking Changes
## 1.2.1 (2023-03-07)

### Bugs Fixed

- Fixing issues where we could over-request credit (#19965) or allow for negative/zero credits (#19743), both of
- Prevent over-requesting credit (#19965) or requesting negative/zero credits (#19743), both of
which could cause issues with go-amqp. (PR#19992)

### Other Changes
- Recover the connection when the $cbs Receiver/Sender is not closed properly. This would cause
clients to return an error saying "$cbs node has already been opened." (PR#20334)

## 1.2.0 (2023-02-07)

Expand Down
52 changes: 51 additions & 1 deletion sdk/messaging/azservicebus/internal/amqpLinks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestAMQPLinksBasic(t *testing.T) {
}

func TestAMQPLinksLive(t *testing.T) {
// we're not going to use this client for tehse tests.
// we're not going to use this client for these tests.
entityPath, cleanup := test.CreateExpiringQueue(t, nil)
defer cleanup()

Expand Down Expand Up @@ -174,6 +174,56 @@ func TestAMQPLinksLive(t *testing.T) {
assertLinks(t, lwr)
}

// TestAMQPLinksCBSLinkStillOpen makes sure we can recover from an incompletely
// closed $cbs link, which can happen if a user cancels and we can't properly close
// the link as a result.
func TestAMQPLinksCBSLinkStillOpen(t *testing.T) {
// we're not going to use this client for these tests.
entityPath, cleanup := test.CreateExpiringQueue(t, nil)
defer cleanup()

cs := test.GetConnectionString(t)
ns, err := NewNamespace(NamespaceWithConnectionString(cs))
require.NoError(t, err)

defer func() { _ = ns.Close(false) }()

session, oldConnID, err := ns.NewAMQPSession(context.Background())
require.NoError(t, err)

// opening a Sender to the $cbs endpoint. This endpoint can only be opened by a single
// sender/receiver pair in a connection.
_, err = session.NewSender(context.Background(), "$cbs", nil)
require.NoError(t, err)

links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error) {
return newLinksForAMQPLinksTest(entityPath, session)
},
GetRecoveryKindFunc: GetRecoveryKind,
})

var lwid *LinksWithID

err = links.Retry(context.Background(), exported.EventConn, "test", func(ctx context.Context, innerLwid *LinksWithID, args *utils.RetryFnArgs) error {
lwid = innerLwid
return nil
}, exported.RetryOptions{
RetryDelay: -1,
MaxRetryDelay: time.Millisecond,
})

defer func() {
err := links.Close(context.Background(), true)
require.NoError(t, err)
}()

require.NoError(t, err)
require.Equal(t, oldConnID+1, lwid.ID.Conn, "Connection gets incremented since it had to be reset")
}

func TestAMQPLinksLiveRecoverLink(t *testing.T) {
// we're not going to use this client for these tests.
entityPath, cleanup := test.CreateExpiringQueue(t, nil)
Expand Down
9 changes: 9 additions & 0 deletions sdk/messaging/azservicebus/internal/cbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package internal
import (
"context"

"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/auth"
Expand All @@ -31,6 +32,14 @@ func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClie
})

if err != nil {
// In some circumstances we can end up in a situation where the link closing was cancelled
// or interrupted, leaving $cbs still open by some dangling receiver or sender. The only way
// to fix this is to restart the connection.
if IsNotAllowedError(err) {
log.Writef(exported.EventAuth, "Not allowed to open, connection will be reset: %s", err)
return errConnResetNeeded
}

return err
}

Expand Down
7 changes: 7 additions & 0 deletions sdk/messaging/azservicebus/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ func IsDetachError(err error) bool {
return errors.As(err, &de)
}

func IsNotAllowedError(err error) bool {
var e *amqp.Error

return errors.As(err, &e) &&
e.Condition == amqp.ErrorNotAllowed
}

func IsCancelError(err error) bool {
if err == nil {
return false
Expand Down

0 comments on commit ba84713

Please sign in to comment.