Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge main into datalake feature branch #20833

Merged
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e8167a2
Enable gocritic during linting (#20715)
jhendrixMSFT Apr 28, 2023
86627ae
Cosmos DB: Enable merge support (#20716)
ealsur Apr 28, 2023
8ac8c6d
[azservicebus, azeventhubs] Stress test and logging improvement (#20710)
richardpark-msft May 1, 2023
9111616
update proxy version (#20712)
azure-sdk May 1, 2023
d6bf190
Return an error when you try to send a message that's too large. (#20…
richardpark-msft May 1, 2023
e2693bd
Changes in test that is failing in pipeline (#20693)
siminsavani-msft May 2, 2023
03f0ac3
[azservicebus, azeventhubs] Treat 'entity full' as a fatal error (#20…
richardpark-msft May 2, 2023
838842d
[azservicebus/azeventhubs] Redirect stderr and stdout to tee (#20726)
richardpark-msft May 3, 2023
20b4dd8
Update changelog with latest features (#20730)
jhendrixMSFT May 3, 2023
745d967
pass along the artifact name so we can override it later (#20732)
azure-sdk May 3, 2023
6dfd0cb
[azeventhubs] Fixing checkpoint store race condition (#20727)
richardpark-msft May 3, 2023
ed7f3c7
Fix azidentity troubleshooting guide link (#20736)
chlowell May 3, 2023
b2cddab
[Release] sdk/resourcemanager/paloaltonetworksngfw/armpanngfw/0.1.0 (…
Alancere May 4, 2023
2a8d96d
add sdk/resourcemanager/postgresql/armpostgresql live test (#20685)
Alancere May 4, 2023
0d22aed
add sdk/resourcemanager/eventhub/armeventhub live test (#20686)
Alancere May 4, 2023
5fa7df4
add sdk/resourcemanager/compute/armcompute live test (#20048)
Alancere May 4, 2023
c005ed6
sdk/resourcemanager/network/armnetwork live test (#20331)
Alancere May 4, 2023
36f766d
add sdk/resourcemanager/cosmos/armcosmos live test (#20705)
Alancere May 4, 2023
9c9d62a
Increment package version after release of azcore (#20740)
azure-sdk May 4, 2023
8bc3450
[azeventhubs] Improperly resetting etag in the checkpoint store (#20737)
richardpark-msft May 4, 2023
e1a6152
Eng workflows sync and branch cleanup additions (#20743)
azure-sdk May 4, 2023
04b463d
[azeventhubs] Latest start position can also be inclusive (ie, get th…
richardpark-msft May 4, 2023
8849196
Update GitHubEventProcessor version and remove pull_request_review pr…
azure-sdk May 5, 2023
27f5ee0
Rename DisableAuthorityValidationAndInstanceDiscovery (#20746)
chlowell May 5, 2023
2eec707
fix (#20707)
Alancere May 6, 2023
22db2d4
AzFile (#20739)
souravgupta-msft May 8, 2023
0cbfd88
azfile: Fixing connection string parsing logic (#20798)
souravgupta-msft May 8, 2023
d54fb08
[azadmin] fix flaky test (#20758)
gracewilcox May 8, 2023
ad8ebd9
Prepare azidentity v1.3.0 for release (#20756)
chlowell May 8, 2023
e2a6f70
Fix broken podman link (#20801)
azure-sdk May 8, 2023
a59d912
[azquery] update doc comments (#20755)
gracewilcox May 8, 2023
bd3b467
Fixed contribution section (#20752)
bobtabor-msft May 8, 2023
132a01a
[azeventhubs,azservicebus] Some API cleanup, renames (#20754)
richardpark-msft May 8, 2023
8db51ca
Add supporting features to enable distributed tracing (#20301) (#20708)
jhendrixMSFT May 9, 2023
4a66b4f
Restore ARM CAE support for azcore beta (#20657)
chlowell May 9, 2023
7d4a3cb
Upgrade to stable azcore (#20808)
chlowell May 9, 2023
068c3be
Increment package version after release of data/azcosmos (#20807)
azure-sdk May 9, 2023
8e0f66e
Updating changelog (#20810)
souravgupta-msft May 9, 2023
ce926c4
Add fake package to azcore (#20711)
jhendrixMSFT May 9, 2023
1a145c5
Updating CHANGELOG.md (#20809)
siminsavani-msft May 9, 2023
90dfc5c
changelog (#20811)
tasherif-msft May 9, 2023
c7eda59
Increment package version after release of storage/azfile (#20813)
azure-sdk May 9, 2023
7fac0b5
Update changelog (azblob) (#20815)
siminsavani-msft May 9, 2023
498a2ef
[azquery] migration guide (#20742)
gracewilcox May 9, 2023
ccb967e
Increment package version after release of monitor/azquery (#20820)
azure-sdk May 9, 2023
f4e6a22
[keyvault] prep for release (#20819)
gracewilcox May 10, 2023
8fd8eda
Merge branch 'main' into feature/azdatalake
tasherif-msft May 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Return an error when you try to send a message that's too large. (#20721
)

This now works just like the message batch - you'll get an ErrMessageTooLarge
if you attempt to send a message that's too large for the link's configured
size.

NOTE: there's a patch to `internal/go-amqp/Sender.go` to match what's in go-amqp's
main so it returns a programmatically useful error when the message is too large.

Fixes #20647
richardpark-msft authored May 1, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit d6bf19024564205922e0225ba4d70449d7ab0e02
10 changes: 8 additions & 2 deletions sdk/messaging/azservicebus/internal/go-amqp/sender.go
Original file line number Diff line number Diff line change
@@ -101,7 +101,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
)
if len(msg.DeliveryTag) > maxDeliveryTagLength {
return nil, fmt.Errorf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag))
return nil, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag)),
}
}

s.mu.Lock()
@@ -114,7 +117,10 @@ func (s *Sender) send(ctx context.Context, msg *Message, opts *SendOptions) (cha
}

if s.l.maxMessageSize != 0 && uint64(s.buf.Len()) > s.l.maxMessageSize {
return nil, fmt.Errorf("encoded message size exceeds max of %d", s.l.maxMessageSize)
return nil, &Error{
Condition: ErrCondMessageSizeExceeded,
Description: fmt.Sprintf("encoded message size exceeds max of %d", s.l.maxMessageSize),
}
}

senderSettled := senderSettleModeValue(s.l.senderSettleMode) == SenderSettleModeSettled
3 changes: 2 additions & 1 deletion sdk/messaging/azservicebus/message_batch.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,8 @@ import (
)

// ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add()
var ErrMessageTooLarge = errors.New("the message could not be added because it is too large for the batch")
// or if the message is being sent on its own and is too large for the link.
var ErrMessageTooLarge = errors.New("the message is too large")

type (
// MessageBatch represents a batch of messages to send to Service Bus in a single message
15 changes: 12 additions & 3 deletions sdk/messaging/azservicebus/sender.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ package azservicebus

import (
"context"
"errors"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
@@ -33,7 +34,7 @@ type MessageBatchOptions struct {
// NewMessageBatch can be used to create a batch that contain multiple
// messages. Sending a batch of messages is more efficient than sending the
// messages one at a time.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error) {
var batch *MessageBatch

@@ -61,7 +62,9 @@ type SendMessageOptions struct {
}

// SendMessage sends a Message to a queue or topic.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
// If the operation fails it can return:
// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size.
// - An [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) SendMessage(ctx context.Context, message *Message, options *SendMessageOptions) error {
return s.sendMessage(ctx, message)
}
@@ -74,7 +77,9 @@ type SendAMQPAnnotatedMessageOptions struct {
// SendAMQPAnnotatedMessage sends an AMQPMessage to a queue or topic.
// Using an AMQPMessage allows for advanced use cases, like payload encoding, as well as better
// interoperability with pure AMQP clients.
// If the operation fails it can return an *azservicebus.Error type if the failure is actionable.
// If the operation fails it can return:
// - [ErrMessageTooLarge] if the message is larger than the maximum allowed link size.
// - An [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) SendAMQPAnnotatedMessage(ctx context.Context, message *AMQPAnnotatedMessage, options *SendAMQPAnnotatedMessageOptions) error {
return s.sendMessage(ctx, message)
}
@@ -171,6 +176,10 @@ func (s *Sender) sendMessage(ctx context.Context, message amqpCompatibleMessage)
return lwid.Sender.Send(ctx, message.toAMQPMessage(), nil)
}, RetryOptions(s.retryOptions))

if amqpErr := (*amqp.Error)(nil); errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrCondMessageSizeExceeded {
return ErrMessageTooLarge
}

return internal.TransformError(err)
}

44 changes: 44 additions & 0 deletions sdk/messaging/azservicebus/sender_test.go
Original file line number Diff line number Diff line change
@@ -734,3 +734,47 @@ func (rm receivedMessages) Less(i, j int) bool {
func (rm receivedMessages) Swap(i, j int) {
rm[i], rm[j] = rm[j], rm[i]
}

func Test_Sender_Send_MessageTooBig(t *testing.T) {
client, cleanup, queueName := setupLiveTest(t, &liveTestOptions{
ClientOptions: &ClientOptions{
RetryOptions: RetryOptions{
// This is a purposefully ridiculous wait time but we'll never hit it
// because exceeding the max message size is NOT a retryable error.
RetryDelay: time.Hour,
},
},
QueueProperties: &admin.QueueProperties{
EnablePartitioning: to.Ptr(true),
}})
defer cleanup()

sender, err := client.NewSender(queueName, nil)
require.NoError(t, err)

hugePayload := []byte{}

for i := 0; i < 1000*1000; i++ {
hugePayload = append(hugePayload, 100)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err = sender.SendMessage(ctx, &Message{
MessageID: to.Ptr("message with a message ID"),
Body: hugePayload,
}, nil)

require.ErrorIs(t, err, ErrMessageTooLarge)

ctx, cancel = context.WithTimeout(context.Background(), time.Minute)
defer cancel()

err = sender.SendAMQPAnnotatedMessage(ctx, &AMQPAnnotatedMessage{
Body: AMQPAnnotatedMessageBody{
Data: [][]byte{hugePayload},
},
}, nil)

require.ErrorIs(t, err, ErrMessageTooLarge)
}