Skip to content

Commit

Permalink
[Event Hubs] Refactor sender tests to use EventHubProducerClient (#9191)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya-rao-a authored Jun 3, 2020
1 parent e5c2a2b commit dc30784
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 492 deletions.
6 changes: 3 additions & 3 deletions sdk/eventhub/event-hubs/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ export class EventHubProducer {
}

/**
* Send one or more of events to the associated Event Hub.
* Send events to the associated Event Hub.
*
* @param eventData An individual `EventData` object, or an array of `EventData` objects or an
* @param eventData An array of `EventData` objects or an
* instance of `EventDataBatch`.
* @param options The set of options that can be specified to influence the way in which
* events are sent to the associated Event Hub.
Expand All @@ -162,7 +162,7 @@ export class EventHubProducer {
* Create a new producer using the EventHubClient createProducer method.
*/
async send(
eventData: EventData | EventData[] | EventDataBatch,
eventData: EventData[] | EventDataBatch,
options: SendOptions = {}
): Promise<void> {
this._throwIfSenderOrConnectionClosed();
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/test/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ describe("Errors after close()", function(): void {

// Ensure sender link is opened
sender = client.createProducer({ partitionId: "0" });
await sender.send({ body: "dummy send to ensure AMQP connection is opened" });
await sender.send([{ body: "dummy send to ensure AMQP connection is opened" }]);

// Ensure receiver link is opened
receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, "0", {
Expand Down Expand Up @@ -345,7 +345,7 @@ describe("Errors after close()", function(): void {

const testMessage = { body: "test" };
let errorSend: string = "";
await sender.send(testMessage).catch((err) => {
await sender.send([testMessage]).catch((err) => {
errorSend = err.message;
});
should.equal(errorSend, expectedErrorMsg, "Expected error not thrown for send()");
Expand Down
29 changes: 13 additions & 16 deletions sdk/eventhub/event-hubs/test/eventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import {
SubscriptionEventHandlers,
earliestEventPosition,
latestEventPosition,
EventHubConsumerClient
EventHubConsumerClient,
EventHubProducerClient
} from "../src";
import { EventHubClient } from "../src/impl/eventHubClient";
import { EnvVarKeys, getEnvVars, loopUntil } from "./utils/testUtils";
Expand Down Expand Up @@ -52,6 +53,8 @@ describe("Event Processor", function(): void {
path: env[EnvVarKeys.EVENTHUB_NAME]
};
let client: EventHubClient;
let producerClient: EventHubProducerClient;

before("validate environment", async function(): Promise<void> {
should.exist(
env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
Expand All @@ -65,10 +68,12 @@ describe("Event Processor", function(): void {

beforeEach("create the client", function() {
client = new EventHubClient(service.connectionString, service.path, {});
producerClient = new EventHubProducerClient(service.connectionString, service.path);
});

afterEach("close the connection", async function(): Promise<void> {
await client.close();
await producerClient.close();
});

describe("unit tests", () => {
Expand Down Expand Up @@ -630,7 +635,7 @@ describe("Event Processor", function(): void {
processor.start();
processor.start();

const expectedMessages = await sendOneMessagePerPartition(partitionIds, client);
const expectedMessages = await sendOneMessagePerPartition(partitionIds, producerClient);
const receivedEvents = await subscriptionEventHandler.waitForEvents(partitionIds);

// shutdown the processor
Expand Down Expand Up @@ -695,7 +700,7 @@ describe("Event Processor", function(): void {
loggerForTest(`Starting processor for the first time`);
processor.start();

const expectedMessages = await sendOneMessagePerPartition(partitionIds, client);
const expectedMessages = await sendOneMessagePerPartition(partitionIds, producerClient);
const receivedEvents = await subscriptionEventHandler.waitForEvents(partitionIds);

loggerForTest(`Stopping processor for the first time`);
Expand Down Expand Up @@ -747,7 +752,7 @@ describe("Event Processor", function(): void {

processor.start();

const expectedMessages = await sendOneMessagePerPartition(partitionIds, client);
const expectedMessages = await sendOneMessagePerPartition(partitionIds, producerClient);
const receivedEvents = await subscriptionEventHandler.waitForEvents(partitionIds);

// shutdown the processor
Expand Down Expand Up @@ -876,12 +881,10 @@ describe("Event Processor", function(): void {
const events: EventData[] = [];

for (const partitionId of partitionIds) {
const producer = client.createProducer({ partitionId });
for (let index = 1; index <= 100; index++) {
events.push({ body: `${expectedMessagePrefix} ${index} ${partitionId}` });
}
await producer.send(events);
await producer.close();
await producerClient.sendBatch(events, { partitionId });
}

// set a delay to give a consumers a chance to receive a message
Expand Down Expand Up @@ -1089,9 +1092,7 @@ describe("Event Processor", function(): void {
// create messages
const expectedMessagePrefix = "EventProcessor test - multiple partitions - ";
for (const partitionId of partitionIds) {
const producer = client.createProducer({ partitionId });
await producer.send({ body: expectedMessagePrefix + partitionId });
await producer.close();
await producerClient.sendBatch([{ body: expectedMessagePrefix + partitionId }], { partitionId });
}

const processor1LoadBalancingInterval = {
Expand Down Expand Up @@ -1222,9 +1223,7 @@ describe("Event Processor", function(): void {
// create messages
const expectedMessagePrefix = "EventProcessor test - multiple partitions - ";
for (const partitionId of partitionIds) {
const producer = client.createProducer({ partitionId });
await producer.send({ body: expectedMessagePrefix + partitionId });
await producer.close();
await producerClient.sendBatch([{ body: expectedMessagePrefix + partitionId }], { partitionId });
}

for (let i = 0; i < 2; i++) {
Expand Down Expand Up @@ -1447,9 +1446,7 @@ describe("Event Processor", function(): void {
const { startPosition } = await SubscriptionHandlerForTests.startingFromHere(client);
const partitionIds = await client.getPartitionIds({});
for (const partitionId of partitionIds) {
const producer = client.createProducer({ partitionId: `${partitionId}` });
await producer.send({ body: `Hello world - ${partitionId}` });
await producer.close();
await producerClient.sendBatch([{ body: `Hello world - ${partitionId}` }], { partitionId });
}

const partitionIdsSet = new Set();
Expand Down
24 changes: 10 additions & 14 deletions sdk/eventhub/event-hubs/test/misc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const debug = debugModule("azure:event-hubs:misc-spec");
import {
EventData,
EventHubConsumerClient,
EventHubProducerClient,
EventHubProperties,
ReceivedEventData,
Subscription
Expand All @@ -33,6 +34,7 @@ describe("Misc tests", function(): void {
path: env[EnvVarKeys.EVENTHUB_NAME]
};
const client: EventHubClient = new EventHubClient(service.connectionString, service.path);
const producerClient = new EventHubProducerClient(service.connectionString, service.path);
let receiver: EventHubConsumer;
let hubInfo: EventHubProperties;
before("validate environment", async function(): Promise<void> {
Expand All @@ -49,6 +51,7 @@ describe("Misc tests", function(): void {

after("close the connection", async function(): Promise<void> {
await client.close();
await producerClient.close();
});

it("should be able to send and receive a large message correctly", async function(): Promise<
Expand All @@ -67,8 +70,7 @@ describe("Misc tests", function(): void {
});
let data = await receiver.receiveBatch(1, 1);
should.equal(data.length, 0, "Unexpected to receive message before client sends it");
const sender = client.createProducer({ partitionId });
await sender.send([obj]);
await producerClient.sendBatch([obj], { partitionId });
debug("Successfully sent the large message.");
data = await receiver.receiveBatch(1, 30);
debug("Closing the receiver..");
Expand Down Expand Up @@ -103,8 +105,7 @@ describe("Misc tests", function(): void {
receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, {
offset
});
const sender = client.createProducer({ partitionId });
await sender.send([obj]);
await producerClient.sendBatch([obj], { partitionId });
debug("Successfully sent the large message.");
const data = await receiver.receiveBatch(1, 30);
await receiver.close();
Expand Down Expand Up @@ -137,8 +138,7 @@ describe("Misc tests", function(): void {
receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, {
offset
});
const sender = client.createProducer({ partitionId });
await sender.send([obj]);
await producerClient.sendBatch([obj], { partitionId });
debug("Successfully sent the large message.");
const data = await receiver.receiveBatch(1, 30);
await receiver.close();
Expand All @@ -160,8 +160,7 @@ describe("Misc tests", function(): void {
receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, {
offset
});
const sender = client.createProducer({ partitionId });
await sender.send([obj]);
await producerClient.sendBatch([obj], { partitionId });
debug("Successfully sent the large message.");
const data = await receiver.receiveBatch(1, 30);
await receiver.close();
Expand All @@ -187,8 +186,7 @@ describe("Misc tests", function(): void {
d.push(obj);
}

const sender = client.createProducer({ partitionId });
await sender.send(d);
await producerClient.sendBatch(d, { partitionId });
debug("Successfully sent 5 messages batched together.");

const receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, {
Expand Down Expand Up @@ -239,8 +237,7 @@ describe("Misc tests", function(): void {
d.push(obj);
}

const sender = client.createProducer({ partitionId });
await sender.send(d);
await producerClient.sendBatch(d, { partitionId });
debug("Successfully sent 5 messages batched together.");

const receiver = client.createConsumer(EventHubConsumerClient.defaultConsumerGroupName, partitionId, {
Expand Down Expand Up @@ -286,9 +283,8 @@ describe("Misc tests", function(): void {

for (let i = 0; i < msgToSendCount; i++) {
const partitionKey = getRandomInt(10);
const sender = client.createProducer();
senderPromises.push(
sender.send([{ body: "Hello EventHub " + i }], {
producerClient.sendBatch([{ body: "Hello EventHub " + i }], {
partitionKey: partitionKey.toString()
})
);
Expand Down
Loading

0 comments on commit dc30784

Please sign in to comment.