Skip to content

Commit

Permalink
Merge pull request Azure#3 from devigned/feature/dups
Browse files Browse the repository at this point in the history
add duplicate detection functionality
  • Loading branch information
devigned authored Jan 23, 2018
2 parents 9b4011f + 82602c8 commit 9d7e208
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 92 deletions.
102 changes: 102 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package servicebus

import (
"context"
mgmt "github.com/Azure/azure-sdk-for-go/services/servicebus/mgmt/2017-04-01/servicebus"
"github.com/Azure/go-autorest/autorest"
log "github.com/sirupsen/logrus"
)

// QueueOption represents named options for assisting queue creation
type QueueOption func(queue *mgmt.SBQueue) error

/*
QueueWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased
storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure
FIFO message retreival:
SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the
partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled
by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of
session states.
PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses
the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional
messages. The partition key ensures that all messages that are sent within a transaction are handled by the same
messaging broker.
MessageId. If the queue or topic has the RequiresDuplicationDetection property set to true, then the MessageId
property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that
all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and
eliminate duplicate messages
*/
func QueueWithPartitioning() QueueOption {
return func(queue *mgmt.SBQueue) error {
queue.SBQueueProperties.EnablePartitioning = ptrBool(true)
return nil
}
}

// QueueWithDuplicateDetection will ensure the queue has the ability to detected duplicate messages based on
// the message's MessageID
func QueueWithDuplicateDetection() QueueOption {
return func(queue *mgmt.SBQueue) error {
queue.SBQueueProperties.RequiresDuplicateDetection = ptrBool(true)
return nil
}
}

// QueueWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs
func QueueWithRequiredSessions() QueueOption {
return func(queue *mgmt.SBQueue) error {
queue.SBQueueProperties.RequiresSession = ptrBool(true)
return nil
}
}

// QueueWithMessageExpiration will ensure the queue sends expired messages to the dead letter queue
func QueueWithMessageExpiration() QueueOption {
return func(queue *mgmt.SBQueue) error {
queue.DeadLetteringOnMessageExpiration = ptrBool(true)
return nil
}
}

// EnsureQueue makes sure a queue exists in the given namespace. If the queue doesn't exist, it will create it with
// the specified name and properties. If properties are not specified, it will build a default partitioned queue.
func (sb *serviceBus) EnsureQueue(ctx context.Context, queueName string, opts ...QueueOption) (*mgmt.SBQueue, error) {
log.Debugf("ensuring exists queue %s", queueName)
queueClient := sb.getQueueMgmtClient()
queue, err := queueClient.Get(ctx, sb.resourceGroup, sb.namespace, queueName)

// TODO: check if the queue properties are the same as the requested. If not, throw error or build new queue??
if err != nil {
newQueue := &mgmt.SBQueue{
Name: &queueName,
SBQueueProperties: &mgmt.SBQueueProperties{},
}

for _, opt := range opts {
opt(newQueue)
}

queue, err = queueClient.CreateOrUpdate(ctx, sb.resourceGroup, sb.namespace, queueName, *newQueue)
if err != nil {
return nil, err
}
}
return &queue, nil
}

// DeleteQueue deletes an existing queue
func (sb *serviceBus) DeleteQueue(ctx context.Context, queueName string) error {
queueClient := sb.getQueueMgmtClient()
_, err := queueClient.Delete(ctx, sb.resourceGroup, sb.namespace, queueName)
return err
}

func (sb *serviceBus) getQueueMgmtClient() mgmt.QueuesClient {
client := mgmt.NewQueuesClientWithBaseURI(sb.environment.ResourceManagerEndpoint, sb.subscriptionID)
client.Authorizer = autorest.NewBearerAuthorizer(sb.token)
return client
}
7 changes: 5 additions & 2 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,18 @@ func (r *Receiver) listenForMessages(msgChan chan *amqp.Message) {
close(msgChan)
return
default:
log.Debug("attempting to receive messages")
waitCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
//log.Debug("attempting to receive messages")
waitCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
msg, err := r.receiver.Receive(waitCtx)
cancel()

// TODO: handle receive errors better. It's not sufficient to check only for timeout
if err, ok := err.(net.Error); ok && err.Timeout() {
log.Debug("attempting to receive messages timed out")
continue
} else if err != nil {
log.Fatalln(err)
time.Sleep(10 * time.Second)
}
if msg != nil {
id := interface{}("null")
Expand Down
29 changes: 27 additions & 2 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ func (s *Sender) Close() error {
return nil
}

// Send will send a message using the session and link
func (s *Sender) Send(ctx context.Context, msg *amqp.Message) error {
// Send will send a message to the entity path with options
func (s *Sender) Send(ctx context.Context, msg *amqp.Message, opts ...SendOption) error {
// TODO: Add in recovery logic in case the link / session has gone down
s.prepareMessage(msg)
for _, opt := range opts {
opt(msg)
}

err := s.sender.Send(ctx, msg)
if err != nil {
return err
Expand Down Expand Up @@ -95,3 +99,24 @@ func (s *Sender) newSessionAndLink() error {
s.sender = amqpSender
return nil
}

// SendOption provides a way to customize a message on sending
type SendOption func(message *amqp.Message) error

// SendWithMessageID provides an option of adding a message ID for the sent message
func SendWithMessageID(msgID interface{}) SendOption {
return func(msg *amqp.Message) error {
msg.Properties.MessageID = msgID
return nil
}
}

// SendWithoutSessionID will set the SessionID to nil. If sending to a partitioned Service Bus queue, this will cause
// the queue distributed the message in a round robin fashion to the next available partition with the effect of not
// enforcing FIFO ordering of messages, but enabling more efficient distribution of messages across partitions.
func SendWithoutSessionID() SendOption {
return func(msg *amqp.Message) error {
msg.Properties.GroupID = ""
return nil
}
}
53 changes: 5 additions & 48 deletions servicebus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ var (

// SenderReceiver provides the ability to send and receive messages
type SenderReceiver interface {
Send(ctx context.Context, entityPath string, msg *amqp.Message) error
Send(ctx context.Context, entityPath string, msg *amqp.Message, opts ...SendOption) error
Receive(entityPath string, handler Handler) error
Close() error
}

// EntityManager provides the ability to manage Service Bus entities (Queues, Topics, Subscriptions, etc.)
type EntityManager interface {
EnsureQueue(ctx context.Context, queueName string, properties *mgmt.SBQueueProperties) (*mgmt.SBQueue, error)
EnsureQueue(ctx context.Context, queueName string, opts ...QueueOption) (*mgmt.SBQueue, error)
DeleteQueue(ctx context.Context, queueName string) error
}

Expand Down Expand Up @@ -215,14 +215,14 @@ func (sb *serviceBus) Receive(entityPath string, handler Handler) error {
return nil
}

// Send sends a message to a provided entity path.
func (sb *serviceBus) Send(ctx context.Context, entityPath string, msg *amqp.Message) error {
// Send sends a message to a provided entity path with options
func (sb *serviceBus) Send(ctx context.Context, entityPath string, msg *amqp.Message, opts ...SendOption) error {
sender, err := sb.fetchSender(entityPath)
if err != nil {
return err
}

return sender.Send(ctx, msg)
return sender.Send(ctx, msg, opts...)
}

func (sb *serviceBus) fetchSender(entityPath string) (*Sender, error) {
Expand All @@ -231,7 +231,6 @@ func (sb *serviceBus) fetchSender(entityPath string) (*Sender, error) {

entry, ok := sb.senders[entityPath]
if ok {
log.Debugf("found sender for entity path %s", entityPath)
return entry, nil
}

Expand All @@ -243,45 +242,3 @@ func (sb *serviceBus) fetchSender(entityPath string) (*Sender, error) {
sb.senders[entityPath] = sender
return sender, nil
}

// EnsureQueue makes sure a queue exists in the given namespace. If the queue doesn't exist, it will create it with
// the specified name and properties. If properties are not specified, it will build a default partitioned queue.
func (sb *serviceBus) EnsureQueue(ctx context.Context, queueName string, properties *mgmt.SBQueueProperties) (*mgmt.SBQueue, error) {
log.Debugf("ensuring exists queue %s", queueName)
queueClient := sb.getQueueMgmtClient()
queue, err := queueClient.Get(ctx, sb.resourceGroup, sb.namespace, queueName)
// TODO: check if the queue properties are the same as the requested. If not, throw error or build new queue??

if properties == nil {
log.Debugf("no properties specified, so using default partitioned queue for %s", queueName)
properties = &mgmt.SBQueueProperties{
EnablePartitioning: ptrBool(true),
}
}

if err != nil {
log.Debugf("building a new queue %s", queueName)
newQueue := mgmt.SBQueue{
Name: &queueName,
SBQueueProperties: properties,
}
queue, err = queueClient.CreateOrUpdate(ctx, sb.resourceGroup, sb.namespace, queueName, newQueue)
if err != nil {
return nil, err
}
}
return &queue, nil
}

// DeleteQueue deletes an existing queue
func (sb *serviceBus) DeleteQueue(ctx context.Context, queueName string) error {
queueClient := sb.getQueueMgmtClient()
_, err := queueClient.Delete(ctx, sb.resourceGroup, sb.namespace, queueName)
return err
}

func (sb *serviceBus) getQueueMgmtClient() mgmt.QueuesClient {
client := mgmt.NewQueuesClientWithBaseURI(sb.environment.ResourceManagerEndpoint, sb.subscriptionID)
client.Authorizer = autorest.NewBearerAuthorizer(sb.token)
return client
}
Loading

0 comments on commit 9d7e208

Please sign in to comment.