diff --git a/client/lib/eventData.ts b/client/lib/eventData.ts index a46555ad..449e6239 100644 --- a/client/lib/eventData.ts +++ b/client/lib/eventData.ts @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -import * as uuid from "uuid/v4"; import { Message, MessageProperties, MessageHeader, Dictionary, messageHeader, messageProperties, MessageAnnotations, DeliveryAnnotations @@ -199,9 +198,6 @@ export namespace EventData { (msg as any)[prop] = (data.properties as any)[prop]; } } - if (!msg.message_id) { - msg.message_id = uuid(); - } if (data.applicationProperties) { msg.application_properties = data.applicationProperties; } diff --git a/client/lib/eventHubSender.ts b/client/lib/eventHubSender.ts index 18f79d72..e0de1c19 100644 --- a/client/lib/eventHubSender.ts +++ b/client/lib/eventHubSender.ts @@ -310,10 +310,6 @@ export class EventHubSender extends LinkEntity { } } - if (!batchMessage.message_id) { - batchMessage.message_id = uuid(); - } - // Finally encode the envelope (batch message). const encodedBatchMessage = message.encode(batchMessage); log.sender("[%s] Sender '%s', sending encoded batch message.", @@ -365,7 +361,7 @@ export class EventHubSender extends LinkEntity { this._sender!.credit, this._sender!.session.outgoing.available()); if (this._sender!.sendable()) { log.sender("[%s] Sender '%s', sending message with id '%s'.", this._context.connectionId, - this.name, message.message_id || tag); + this.name, message.message_id || tag || ''); let onRejected: Func; let onReleased: Func; let onModified: Func; diff --git a/client/tests/misc.spec.ts b/client/tests/misc.spec.ts index 616bb85f..c4b7d914 100644 --- a/client/tests/misc.spec.ts +++ b/client/tests/misc.spec.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. import "mocha"; +import * as uuid from "uuid/v4"; import * as chai from "chai"; import * as assert from "assert"; const should = chai.should(); @@ -38,8 +39,10 @@ describe("Misc tests", function () { const msgString = "A".repeat(220 * 1024); const msgBody = Buffer.from(msgString); const obj: EventData = { body: msgBody }; + const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset; + debug(`Partition ${partitionId} has last message with offset ${offset}.`); debug("Sending one message with %d bytes.", bodysize); - breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); + breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) }); let data = await breceiver.receive(5, 5); data.length.should.equal(0); await client.send(obj, partitionId); @@ -51,6 +54,7 @@ describe("Misc tests", function () { should.exist(data); data.length.should.equal(1); data[0].body.toString().should.equal(msgString); + should.not.exist((data[0].properties || {}).message_id); }); it("should be able to send and receive a JSON object as a message correctly", async function () { @@ -68,8 +72,10 @@ describe("Misc tests", function () { ] }; const obj: EventData = { body: msgBody }; + const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset; + debug(`Partition ${partitionId} has last message with offset ${offset}.`); debug("Sending one message %O", obj); - breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); + breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) }); await client.send(obj, partitionId); debug("Successfully sent the large message."); const data = await breceiver.receive(5, 10); @@ -79,6 +85,7 @@ describe("Misc tests", function () { data.length.should.equal(1); debug("Received message: %O", data); assert.deepEqual(data[0].body, msgBody); + should.not.exist((data[0].properties || {}).message_id); }); it("should be able to send and receive an array as a message correctly", async function () { @@ -93,9 +100,11 @@ describe("Misc tests", function () { 20, "some string" ]; - const obj: EventData = { body: msgBody }; + const obj: EventData = { body: msgBody, properties: { message_id: uuid() } }; + const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset; + debug(`Partition ${partitionId} has last message with offset ${offset}.`); debug("Sending one message %O", obj); - breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); + breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) }); await client.send(obj, partitionId); debug("Successfully sent the large message."); const data = await breceiver.receive(5, 5); @@ -105,14 +114,17 @@ describe("Misc tests", function () { data.length.should.equal(1); debug("Received message: %O", data); assert.deepEqual(data[0].body, msgBody); + assert.strictEqual(data[0].properties.message_id, obj.properties.message_id); }); it("should be able to send a boolean as a message correctly", async function () { const partitionId = hubInfo.partitionIds[0]; const msgBody = true; const obj: EventData = { body: msgBody }; + const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset; + debug(`Partition ${partitionId} has last message with offset ${offset}.`); debug("Sending one message %O", obj); - breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); + breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) }); await client.send(obj, partitionId); debug("Successfully sent the large message."); const data = await breceiver.receive(5, 5); @@ -122,12 +134,15 @@ describe("Misc tests", function () { data.length.should.equal(1); debug("Received message: %O", data); assert.deepEqual(data[0].body, msgBody); + should.not.exist((data[0].properties || {}).message_id); }); it("should be able to send and receive batched messages correctly", async function () { try { const partitionId = hubInfo.partitionIds[0]; - breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); + const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset; + debug(`Partition ${partitionId} has last message with offset ${offset}.`); + breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) }); let data = await breceiver.receive(5, 10); data.length.should.equal(0); const messageCount = 5; @@ -145,6 +160,9 @@ describe("Misc tests", function () { debug("received message: ", data); should.exist(data); data.length.should.equal(5); + for (const message of data) { + should.not.exist((message.properties || {}).message_id); + } } catch (err) { debug("should not have happened, uber catch....", err); throw err; @@ -154,7 +172,9 @@ describe("Misc tests", function () { it("should be able to send and receive batched messages as JSON objects correctly", async function () { try { const partitionId = hubInfo.partitionIds[0]; - breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) }); + const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset; + debug(`Partition ${partitionId} has last message with offset ${offset}.`); + breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) }); let data = await breceiver.receive(5, 5); data.length.should.equal(0); const messageCount = 5; @@ -173,6 +193,9 @@ describe("Misc tests", function () { isBlue: false, } ] + }, + properties: { + message_id: uuid() } }; d.push(obj); @@ -187,6 +210,9 @@ describe("Misc tests", function () { should.exist(data); data[0].body.count.should.equal(0); data.length.should.equal(5); + for (const [index, message] of data.entries()) { + assert.strictEqual(message.properties.message_id, d[index].properties.message_id); + } } catch (err) { debug("should not have happened, uber catch....", err); throw err;