Skip to content
This repository has been archived by the owner on Oct 11, 2023. It is now read-only.

Only set message_id when provided by caller #169

Merged
merged 1 commit into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions client/lib/eventData.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

import * as uuid from "uuid/v4";
import {
Message, MessageProperties, MessageHeader, Dictionary, messageHeader, messageProperties,
MessageAnnotations, DeliveryAnnotations
Expand Down Expand Up @@ -199,9 +198,6 @@ export namespace EventData {
(msg as any)[prop] = (data.properties as any)[prop];
}
}
if (!msg.message_id) {
msg.message_id = uuid();
}
if (data.applicationProperties) {
msg.application_properties = data.applicationProperties;
}
Expand Down
6 changes: 1 addition & 5 deletions client/lib/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,6 @@ export class EventHubSender extends LinkEntity {
}
}

if (!batchMessage.message_id) {
batchMessage.message_id = uuid();
}

// Finally encode the envelope (batch message).
const encodedBatchMessage = message.encode(batchMessage);
log.sender("[%s] Sender '%s', sending encoded batch message.",
Expand Down Expand Up @@ -365,7 +361,7 @@ export class EventHubSender extends LinkEntity {
this._sender!.credit, this._sender!.session.outgoing.available());
if (this._sender!.sendable()) {
log.sender("[%s] Sender '%s', sending message with id '%s'.", this._context.connectionId,
this.name, message.message_id || tag);
this.name, message.message_id || tag || '<not specified>');
let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>;
Expand Down
40 changes: 33 additions & 7 deletions client/tests/misc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

import "mocha";
import * as uuid from "uuid/v4";
import * as chai from "chai";
import * as assert from "assert";
const should = chai.should();
Expand Down Expand Up @@ -38,8 +39,10 @@ describe("Misc tests", function () {
const msgString = "A".repeat(220 * 1024);
const msgBody = Buffer.from(msgString);
const obj: EventData = { body: msgBody };
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
debug("Sending one message with %d bytes.", bodysize);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
let data = await breceiver.receive(5, 5);
data.length.should.equal(0);
await client.send(obj, partitionId);
Expand All @@ -51,6 +54,7 @@ describe("Misc tests", function () {
should.exist(data);
data.length.should.equal(1);
data[0].body.toString().should.equal(msgString);
should.not.exist((data[0].properties || {}).message_id);
});

it("should be able to send and receive a JSON object as a message correctly", async function () {
Expand All @@ -68,8 +72,10 @@ describe("Misc tests", function () {
]
};
const obj: EventData = { body: msgBody };
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
debug("Sending one message %O", obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
await client.send(obj, partitionId);
debug("Successfully sent the large message.");
const data = await breceiver.receive(5, 10);
Expand All @@ -79,6 +85,7 @@ describe("Misc tests", function () {
data.length.should.equal(1);
debug("Received message: %O", data);
assert.deepEqual(data[0].body, msgBody);
should.not.exist((data[0].properties || {}).message_id);
});

it("should be able to send and receive an array as a message correctly", async function () {
Expand All @@ -93,9 +100,11 @@ describe("Misc tests", function () {
20,
"some string"
];
const obj: EventData = { body: msgBody };
const obj: EventData = { body: msgBody, properties: { message_id: uuid() } };
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
debug("Sending one message %O", obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
await client.send(obj, partitionId);
debug("Successfully sent the large message.");
const data = await breceiver.receive(5, 5);
Expand All @@ -105,14 +114,17 @@ describe("Misc tests", function () {
data.length.should.equal(1);
debug("Received message: %O", data);
assert.deepEqual(data[0].body, msgBody);
assert.strictEqual(data[0].properties.message_id, obj.properties.message_id);
});

it("should be able to send a boolean as a message correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
const msgBody = true;
const obj: EventData = { body: msgBody };
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
debug("Sending one message %O", obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
await client.send(obj, partitionId);
debug("Successfully sent the large message.");
const data = await breceiver.receive(5, 5);
Expand All @@ -122,12 +134,15 @@ describe("Misc tests", function () {
data.length.should.equal(1);
debug("Received message: %O", data);
assert.deepEqual(data[0].body, msgBody);
should.not.exist((data[0].properties || {}).message_id);
});

it("should be able to send and receive batched messages correctly", async function () {
try {
const partitionId = hubInfo.partitionIds[0];
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
let data = await breceiver.receive(5, 10);
data.length.should.equal(0);
const messageCount = 5;
Expand All @@ -145,6 +160,9 @@ describe("Misc tests", function () {
debug("received message: ", data);
should.exist(data);
data.length.should.equal(5);
for (const message of data) {
should.not.exist((message.properties || {}).message_id);
}
} catch (err) {
debug("should not have happened, uber catch....", err);
throw err;
Expand All @@ -154,7 +172,9 @@ describe("Misc tests", function () {
it("should be able to send and receive batched messages as JSON objects correctly", async function () {
try {
const partitionId = hubInfo.partitionIds[0];
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
debug(`Partition ${partitionId} has last message with offset ${offset}.`);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });
let data = await breceiver.receive(5, 5);
data.length.should.equal(0);
const messageCount = 5;
Expand All @@ -173,6 +193,9 @@ describe("Misc tests", function () {
isBlue: false,
}
]
},
properties: {
message_id: uuid()
}
};
d.push(obj);
Expand All @@ -187,6 +210,9 @@ describe("Misc tests", function () {
should.exist(data);
data[0].body.count.should.equal(0);
data.length.should.equal(5);
for (const [index, message] of data.entries()) {
assert.strictEqual(message.properties.message_id, d[index].properties.message_id);
}
} catch (err) {
debug("should not have happened, uber catch....", err);
throw err;
Expand Down