Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message properties that are optional need to be nil-able #93

Merged
merged 11 commits into from
Dec 7, 2021
3 changes: 2 additions & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ jobs:
$(Pipeline.Workspace)/azure-amqp/bin/Debug/TestAmqpBroker/net461/TestAmqpBroker.exe $AMQP_BROKER_ADDR /headless &
brokerPID=$!
echo '##[section]Starting tests'
go test -tags -race -v -coverprofile=coverage.txt -covermode atomic ./... 2>&1 | go-junit-report > report.xml
go test -tags -race -v -coverprofile=coverage.txt -covermode atomic ./... 2>&1 | tee gotestoutput.log
go-junit-report < gotestoutput.log > report.xml
kill $brokerPID
gocov convert coverage.txt > coverage.json
gocov-xml < coverage.json > coverage.xml
Expand Down
47 changes: 27 additions & 20 deletions marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,17 +481,17 @@ var (
Properties: &MessageProperties{
MessageID: "yo",
UserID: []byte("baz"),
To: "me",
Subject: "sup?",
ReplyTo: "you",
To: stringPtr("me"),
Subject: stringPtr("sup?"),
ReplyTo: stringPtr("you"),
CorrelationID: uint64(34513),
ContentType: "text/plain",
ContentEncoding: "UTF-8",
AbsoluteExpiryTime: time.Date(2018, 01, 13, 14, 24, 07, 0, time.UTC),
CreationTime: time.Date(2018, 01, 13, 14, 14, 07, 0, time.UTC),
GroupID: "fooGroup",
GroupSequence: 89324,
ReplyToGroupID: "barGroup",
ContentType: stringPtr("text/plain"),
ContentEncoding: stringPtr("UTF-8"),
AbsoluteExpiryTime: timePtr(time.Date(2018, 01, 13, 14, 24, 07, 0, time.UTC)),
CreationTime: timePtr(time.Date(2018, 01, 13, 14, 14, 07, 0, time.UTC)),
GroupID: stringPtr("fooGroup"),
GroupSequence: uint32Ptr(89324),
ReplyToGroupID: stringPtr("barGroup"),
},
ApplicationProperties: map[string]interface{}{
"baz": "foo",
Expand All @@ -515,17 +515,17 @@ var (
&MessageProperties{
MessageID: "yo",
UserID: []byte("baz"),
To: "me",
Subject: "sup?",
ReplyTo: "you",
To: stringPtr("me"),
Subject: stringPtr("sup?"),
ReplyTo: stringPtr("you"),
CorrelationID: uint64(34513),
ContentType: "text/plain",
ContentEncoding: "UTF-8",
AbsoluteExpiryTime: time.Date(2018, 01, 13, 14, 24, 07, 0, time.UTC),
CreationTime: time.Date(2018, 01, 13, 14, 14, 07, 0, time.UTC),
GroupID: "fooGroup",
GroupSequence: 89324,
ReplyToGroupID: "barGroup",
ContentType: stringPtr("text/plain"),
ContentEncoding: stringPtr("UTF-8"),
AbsoluteExpiryTime: timePtr(time.Date(2018, 01, 13, 14, 24, 07, 0, time.UTC)),
CreationTime: timePtr(time.Date(2018, 01, 13, 14, 14, 07, 0, time.UTC)),
GroupID: stringPtr("fooGroup"),
GroupSequence: uint32Ptr(89324),
ReplyToGroupID: stringPtr("barGroup"),
},
&encoding.StateReceived{
SectionNumber: 234,
Expand Down Expand Up @@ -651,3 +651,10 @@ func rcvSettle(m ReceiverSettleMode) *ReceiverSettleMode {
func uint32Ptr(u uint32) *uint32 {
return &u
}
func stringPtr(s string) *string {
return &s
}

func timePtr(t time.Time) *time.Time {
return &t
}
75 changes: 52 additions & 23 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,35 @@ func (h *MessageHeader) Unmarshal(r *buffer.Buffer) error {
}...)
}

// AMQP types
type (
// AMQPAddress corresponds to the 'address' type in the AMQP spec.
// <type name="address-string" class="restricted" source="string" provides="address"/>
AMQPAddress = string

// AMQPMessageID corresponds to the 'message-id' type in the AMQP spec. Internally it can
// be one of the following:
// - uint64: <type name="message-id-ulong" class="restricted" source="ulong" provides="message-id"/>
// - amqp.UUID: <type name="message-id-uuid" class="restricted" source="uuid" provides="message-id"/>
// - []byte: <type name="message-id-binary" class="restricted" source="binary" provides="message-id"/>
// - string: <type name="message-id-string" class="restricted" source="string" provides="message-id"/>
AMQPMessageID = interface{}

// AMQPSymbol corresponds to the 'symbol' type in the AMQP spec.
// <type name="symbol" class="primitive"/>
// And either:
// - variable-width, 1 byte size up to 2^8 - 1 seven bit ASCII characters representing a symbolic value
// - variable-width, 4 byte size up to 2^32 - 1 seven bit ASCII characters representing a symbolic value
AMQPSymbol = string

// AMQPSequenceNumber corresponds to the `sequence-no` type in the AMQP spec.
// <type name="sequence-no" class="restricted" source="uint"/>
AMQPSequenceNumber = uint32

// AMQPBinary corresponds to the `binary` type in the AMQP spec.
AMQPBinary = []byte
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
)

/*
<type name="properties" class="composite" source="list" provides="section">
<descriptor name="amqp:properties:list" code="0x00000000:0x00000073"/>
Expand All @@ -354,25 +383,25 @@ type MessageProperties struct {
// such a way that it is assured to be globally unique. A broker MAY discard a
// message as a duplicate if the value of the message-id matches that of a
// previously received message sent to the same node.
MessageID interface{} // uint64, UUID, []byte, or string
MessageID AMQPMessageID // uint64, UUID, []byte, or string

// The identity of the user responsible for producing the message.
// The client sets this value, and it MAY be authenticated by intermediaries.
UserID []byte
UserID AMQPBinary

// The to field identifies the node that is the intended destination of the message.
// On any given transfer this might not be the node at the receiving end of the link.
To string
To *AMQPAddress

// A common field for summary information about the message content and purpose.
Subject string
Subject *string

// The address of the node to send replies to.
ReplyTo string
ReplyTo *AMQPAddress

// This is a client-specific id that can be used to mark or identify messages
// between clients.
CorrelationID interface{} // uint64, UUID, []byte, or string
CorrelationID AMQPMessageID // uint64, UUID, []byte, or string

// The RFC-2046 [RFC2046] MIME type for the message's application-data section
// (body). As per RFC-2046 [RFC2046] this can contain a charset parameter defining
Expand All @@ -385,7 +414,7 @@ type MessageProperties struct {
//
// When using an application-data section with a section code other than data,
// content-type SHOULD NOT be set.
ContentType string
ContentType *AMQPSymbol

// The content-encoding property is used as a modifier to the content-type.
// When present, its value indicates what additional content encodings have been
Expand All @@ -410,40 +439,40 @@ type MessageProperties struct {
//
// Implementations SHOULD NOT specify multiple content-encoding values except as to
// be compatible with messages originally sent with other protocols, e.g. HTTP or SMTP.
ContentEncoding string
ContentEncoding *AMQPSymbol

// An absolute time when this message is considered to be expired.
AbsoluteExpiryTime time.Time
AbsoluteExpiryTime *time.Time

// An absolute time when this message was created.
CreationTime time.Time
CreationTime *time.Time

// Identifies the group the message belongs to.
GroupID string
GroupID *string

// The relative position of this message within its group.
GroupSequence uint32 // RFC-1982 sequence number
GroupSequence *AMQPSequenceNumber // RFC-1982 sequence number

// This is a client-specific id that is used so that client can send replies to this
// message to a specific group.
ReplyToGroupID string
ReplyToGroupID *string
}

func (p *MessageProperties) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeMessageProperties, []encoding.MarshalField{
{Value: p.MessageID, Omit: p.MessageID == nil},
{Value: &p.UserID, Omit: len(p.UserID) == 0},
{Value: &p.To, Omit: p.To == ""},
{Value: &p.Subject, Omit: p.Subject == ""},
{Value: &p.ReplyTo, Omit: p.ReplyTo == ""},
{Value: p.To, Omit: p.To == nil},
{Value: p.Subject, Omit: p.Subject == nil},
{Value: p.ReplyTo, Omit: p.ReplyTo == nil},
{Value: p.CorrelationID, Omit: p.CorrelationID == nil},
{Value: (*encoding.Symbol)(&p.ContentType), Omit: p.ContentType == ""},
{Value: (*encoding.Symbol)(&p.ContentEncoding), Omit: p.ContentEncoding == ""},
{Value: &p.AbsoluteExpiryTime, Omit: p.AbsoluteExpiryTime.IsZero()},
{Value: &p.CreationTime, Omit: p.CreationTime.IsZero()},
{Value: &p.GroupID, Omit: p.GroupID == ""},
{Value: &p.GroupSequence},
{Value: &p.ReplyToGroupID, Omit: p.ReplyToGroupID == ""},
{Value: (*encoding.Symbol)(p.ContentType), Omit: p.ContentType == nil},
{Value: (*encoding.Symbol)(p.ContentEncoding), Omit: p.ContentEncoding == nil},
{Value: p.AbsoluteExpiryTime, Omit: p.AbsoluteExpiryTime.IsZero()},
{Value: p.CreationTime, Omit: p.CreationTime.IsZero()},
{Value: p.GroupID, Omit: p.GroupID == nil},
{Value: p.GroupSequence},
{Value: p.ReplyToGroupID, Omit: p.ReplyToGroupID == nil},
})
}

Expand Down