From 8db8379fafb1a2d3941ec07c6c30070c2b9d5c89 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 09:38:09 -0800 Subject: [PATCH 01/20] [event-hubs] fix missing return type eslint errors/warnings --- .../event-hubs/src/connectionContext.ts | 6 +-- .../event-hubs/src/eventHubConsumerClient.ts | 6 +-- sdk/eventhub/event-hubs/src/eventHubSender.ts | 12 +++--- sdk/eventhub/event-hubs/src/eventPosition.ts | 4 +- sdk/eventhub/event-hubs/src/eventProcessor.ts | 10 ++--- .../event-hubs/src/impl/partitionGate.ts | 4 +- sdk/eventhub/event-hubs/src/log.ts | 2 +- .../event-hubs/src/managementClient.ts | 9 ++-- .../event-hubs/src/partitionProcessor.ts | 10 ++--- sdk/eventhub/event-hubs/test/client.spec.ts | 2 +- .../test/eventHubConsumerClient.spec.ts | 2 +- .../event-hubs/test/eventProcessor.spec.ts | 42 +++++++++++-------- .../utils/fakeSubscriptionEventHandlers.ts | 4 +- .../event-hubs/test/utils/logHelpers.ts | 8 ++-- .../test/utils/receivedMessagesTester.ts | 2 +- .../test/utils/subscriptionHandlerForTests.ts | 10 ++--- 16 files changed, 70 insertions(+), 63 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index efe67cf368c6..0d77bfffc617 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -387,7 +387,7 @@ export namespace ConnectionContext { } }; - function addConnectionListeners(connection: Connection) { + function addConnectionListeners(connection: Connection): void { // Add listeners on the connection object. connection.on(ConnectionEvents.connectionOpen, onConnectionOpen); connection.on(ConnectionEvents.disconnected, onDisconnected); @@ -395,7 +395,7 @@ export namespace ConnectionContext { connection.on(ConnectionEvents.error, error); } - function cleanConnectionContext(connectionContext: ConnectionContext) { + function cleanConnectionContext(connectionContext: ConnectionContext): Promise { // Remove listeners from the connection object. connectionContext.connection.removeListener( ConnectionEvents.connectionOpen, @@ -408,7 +408,7 @@ export namespace ConnectionContext { return connectionContext.connection.close(); } - async function refreshConnection(connectionContext: ConnectionContext) { + async function refreshConnection(connectionContext: ConnectionContext): Promise { const originalConnectionId = connectionContext.connectionId; try { await cleanConnectionContext(connectionContext); diff --git a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts index 93b1b90112f0..9a92f43bfb1e 100644 --- a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts @@ -533,7 +533,7 @@ export class EventHubConsumerClient { private createEventProcessorForAllPartitions( subscriptionEventHandlers: SubscriptionEventHandlers, options?: SubscribeOptions - ) { + ): { targetedPartitionId: string; eventProcessor: EventProcessor } { this._partitionGate.add("all"); if (this._userChoseCheckpointStore) { @@ -569,7 +569,7 @@ export class EventHubConsumerClient { partitionId: string, eventHandlers: SubscriptionEventHandlers, options?: SubscribeOptions - ) { + ): { targetedPartitionId: string; eventProcessor: EventProcessor } { this._partitionGate.add(partitionId); const subscribeOptions = options as SubscribeOptions | undefined; @@ -607,7 +607,7 @@ export class EventHubConsumerClient { subscriptionEventHandlers: SubscriptionEventHandlers, checkpointStore: CheckpointStore, options: FullEventProcessorOptions - ) { + ): EventProcessor { return new EventProcessor( this._consumerGroup, connectionContext, diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index cbbaaefadec9..8ceb4e53cff8 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -226,7 +226,7 @@ export class EventHubSender extends LinkEntity { return this._sender!.maxMessageSize; } return new Promise(async (resolve, reject) => { - const rejectOnAbort = () => { + const rejectOnAbort = (): void => { const desc: string = `[${this._context.connectionId}] The create batch operation has been cancelled by the user.`; // Cancellation is user-intented, so treat as info instead of warning. logger.info(desc); @@ -234,7 +234,7 @@ export class EventHubSender extends LinkEntity { reject(error); }; - const onAbort = () => { + const onAbort = (): void => { if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); } @@ -402,9 +402,9 @@ export class EventHubSender extends LinkEntity { const retryOptions = options.retryOptions || {}; const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions); retryOptions.timeoutInMs = timeoutInMs; - const sendEventPromise = () => + const sendEventPromise = (): Promise => new Promise(async (resolve, reject) => { - const rejectOnAbort = () => { + const rejectOnAbort = (): void => { const desc: string = `[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` + `address "${this.address}" has been cancelled by the user.`; @@ -425,7 +425,7 @@ export class EventHubSender extends LinkEntity { } }; - const onAborted = () => { + const onAborted = (): void => { removeListeners(); return rejectOnAbort(); }; @@ -434,7 +434,7 @@ export class EventHubSender extends LinkEntity { abortSignal.addEventListener("abort", onAborted); } - const actionAfterTimeout = () => { + const actionAfterTimeout = (): void => { removeListeners(); const desc: string = `[${this._context.connectionId}] Sender "${this.name}" with ` + diff --git a/sdk/eventhub/event-hubs/src/eventPosition.ts b/sdk/eventhub/event-hubs/src/eventPosition.ts index 201bc3f621e6..231e94102da6 100644 --- a/sdk/eventhub/event-hubs/src/eventPosition.ts +++ b/sdk/eventhub/event-hubs/src/eventPosition.ts @@ -113,7 +113,7 @@ export const latestEventPosition: EventPosition = { */ export function validateEventPositions( position: EventPosition | { [partitionId: string]: EventPosition } -) { +): void { if (position == undefined) { return; } @@ -166,7 +166,7 @@ export function isEventPosition(position: any): position is EventPosition { return false; } -function validateEventPosition(position: EventPosition) { +function validateEventPosition(position: EventPosition): void { if (position == undefined) { return; } diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index cb51278f0b9c..61c120dbcd0f 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -301,7 +301,7 @@ export class EventProcessor { } } - private async _startPump(partitionId: string, abortSignal: AbortSignalLike) { + private async _startPump(partitionId: string, abortSignal: AbortSignalLike): Promise { if (abortSignal.aborted) { logger.verbose( `[${this._id}] The subscription was closed before starting to read from ${partitionId}.` @@ -402,7 +402,7 @@ export class EventProcessor { private async _runLoopWithLoadBalancing( loadBalancingStrategy: LoadBalancingStrategy, abortSignal: AbortSignalLike - ) { + ): Promise { let cancelLoopResolver; // This provides a mechanism for exiting the loop early // if the subscription has had `close` called. @@ -453,7 +453,7 @@ export class EventProcessor { loadBalancingStrategy: LoadBalancingStrategy, partitionIds: string[], abortSignal: AbortSignalLike - ) { + ): Promise { if (abortSignal.aborted) throw new AbortError("The operation was aborted."); // Retrieve current partition ownership details from the datastore. @@ -573,7 +573,7 @@ export class EventProcessor { } } - isRunning() { + isRunning(): boolean { return this._isRunning; } @@ -613,7 +613,7 @@ export class EventProcessor { } } - private async abandonPartitionOwnerships() { + private async abandonPartitionOwnerships(): Promise { logger.verbose(`[${this._id}] Abandoning owned partitions`); const allOwnerships = await this._checkpointStore.listOwnership( this._fullyQualifiedNamespace, diff --git a/sdk/eventhub/event-hubs/src/impl/partitionGate.ts b/sdk/eventhub/event-hubs/src/impl/partitionGate.ts index e5f59173971b..bff956ac0bb2 100644 --- a/sdk/eventhub/event-hubs/src/impl/partitionGate.ts +++ b/sdk/eventhub/event-hubs/src/impl/partitionGate.ts @@ -21,7 +21,7 @@ export class PartitionGate { * * @param partitionId A partition ID or the constant "all" */ - add(partitionId: string | "all") { + add(partitionId: string | "all"): void { if ( (partitionId === "all" && this._partitions.size > 0) || this._partitions.has(partitionId) || @@ -38,7 +38,7 @@ export class PartitionGate { * * @param partitionId A partition ID or the constant "all" */ - remove(partitionId: string | "all") { + remove(partitionId: string | "all"): void { this._partitions.delete(partitionId); } } diff --git a/sdk/eventhub/event-hubs/src/log.ts b/sdk/eventhub/event-hubs/src/log.ts index 587f23af9ddb..21d5fd046a15 100644 --- a/sdk/eventhub/event-hubs/src/log.ts +++ b/sdk/eventhub/event-hubs/src/log.ts @@ -14,7 +14,7 @@ export const logger = createClientLogger("event-hubs"); * @param error Error containing a stack trace. * @hidden */ -export function logErrorStackTrace(error: any) { +export function logErrorStackTrace(error: any): void { if (error && error.stack) { logger.verbose(error.stack); } diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 824fa18cb4fa..af6b89c8b37f 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -13,6 +13,7 @@ import { retry, translate } from "@azure/core-amqp"; +import { AccessToken } from "@azure/core-auth"; import { EventContext, Message, @@ -141,7 +142,7 @@ export class ManagementClient extends LinkEntity { * @hidden * @internal */ - async getSecurityToken() { + async getSecurityToken(): Promise { if (this._context.tokenCredential instanceof SharedKeyCredential) { // the security_token has the $management address removed from the end of the audience // expected audience: sb://fully.qualified.namespace/event-hub-name/$management @@ -393,14 +394,14 @@ export class ManagementClient extends LinkEntity { try { const abortSignal: AbortSignalLike | undefined = options && options.abortSignal; - const sendOperationPromise = () => + const sendOperationPromise = (): Promise => new Promise(async (resolve, reject) => { let count = 0; const retryTimeoutInMs = getRetryAttemptTimeoutInMs(options.retryOptions); let timeTakenByInit = 0; - const rejectOnAbort = () => { + const rejectOnAbort = (): void => { const requestName = options.requestName; const desc: string = `[${this._context.connectionId}] The request "${requestName}" ` + @@ -428,7 +429,7 @@ export class ManagementClient extends LinkEntity { const initOperationStartTime = Date.now(); - const actionAfterTimeout = () => { + const actionAfterTimeout = (): void => { const desc: string = `The request with message_id "${request.message_id}" timed out. Please try again later.`; const e: Error = { name: "OperationTimeoutError", diff --git a/sdk/eventhub/event-hubs/src/partitionProcessor.ts b/sdk/eventhub/event-hubs/src/partitionProcessor.ts index 62b1614c7014..14cca63bb952 100644 --- a/sdk/eventhub/event-hubs/src/partitionProcessor.ts +++ b/sdk/eventhub/event-hubs/src/partitionProcessor.ts @@ -96,7 +96,7 @@ export class PartitionProcessor implements PartitionContext { * @property The fully qualified namespace from where the current partition is being processed. It is set by the `EventProcessor` * @readonly */ - public get fullyQualifiedNamespace() { + public get fullyQualifiedNamespace(): string { return this._context.fullyQualifiedNamespace; } @@ -104,7 +104,7 @@ export class PartitionProcessor implements PartitionContext { * @property The name of the consumer group from where the current partition is being processed. It is set by the `EventProcessor` * @readonly */ - public get consumerGroup() { + public get consumerGroup(): string { return this._context.consumerGroup!; } @@ -112,7 +112,7 @@ export class PartitionProcessor implements PartitionContext { * @property The name of the event hub to which the current partition belongs. It is set by the `EventProcessor` * @readonly */ - public get eventHubName() { + public get eventHubName(): string { return this._context.eventHubName; } @@ -120,14 +120,14 @@ export class PartitionProcessor implements PartitionContext { * @property The identifier of the Event Hub partition that is being processed. It is set by the `EventProcessor` * @readonly */ - public get partitionId() { + public get partitionId(): string { return this._context.partitionId; } /** * @property The unique identifier of the `EventProcessor` that has spawned the current instance of `PartitionProcessor`. This is set by the `EventProcessor` */ - public get eventProcessorId() { + public get eventProcessorId(): string { return this._context.eventProcessorId; } diff --git a/sdk/eventhub/event-hubs/test/client.spec.ts b/sdk/eventhub/event-hubs/test/client.spec.ts index 14ebaedb4399..4cd70ba37a0e 100644 --- a/sdk/eventhub/event-hubs/test/client.spec.ts +++ b/sdk/eventhub/event-hubs/test/client.spec.ts @@ -579,7 +579,7 @@ describe("EventHubProducerClient User Agent String", function(): void { }); }); -function testUserAgentString(context: ConnectionContext, customValue?: string) { +function testUserAgentString(context: ConnectionContext, customValue?: string): void { const packageVersion = packageJsonInfo.version; const properties = context.connection.options.properties; properties!["user-agent"].should.startWith( diff --git a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts index 74eee6afa19c..280128e69b8b 100644 --- a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts @@ -64,7 +64,7 @@ describe("EventHubConsumerClient", () => { subscriptionEventHandlers: SubscriptionEventHandlers, checkpointStore: CheckpointStore, options: FullEventProcessorOptions - ) => { + ): SinonStubbedInstance => { subscriptionEventHandlers.should.equal(subscriptionHandlers); should.exist(connectionContext.managementSession); isCheckpointStore(checkpointStore).should.be.ok; diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index b7ee6c80f8bb..62f6f880a6e1 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -164,7 +164,7 @@ describe("Event Processor", function(): void { function createEventProcessor( checkpointStore: CheckpointStore, startPosition?: FullEventProcessorOptions["startPosition"] - ) { + ): EventProcessor { return new EventProcessor( EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], @@ -871,7 +871,7 @@ describe("Event Processor", function(): void { let partionCount: { [x: string]: number } = {}; class FooPartitionProcessor { - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { processedAtLeastOneEvent.add(context.partitionId); !partionCount[context.partitionId] @@ -890,7 +890,7 @@ describe("Event Processor", function(): void { } } } - async processError() { + async processError(): Promise { didError = true; } } @@ -999,7 +999,7 @@ describe("Event Processor", function(): void { const checkpointStore = new InMemoryCheckpointStore(); const allObjects = new Set(); - const assertUnique = (...objects: any[]) => { + const assertUnique = (...objects: any[]): void => { const size = allObjects.size; for (const obj of objects) { @@ -1101,20 +1101,20 @@ describe("Event Processor", function(): void { // The partitionProcess will need to add events to the partitionResultsMap as they are received class FooPartitionProcessor implements Required { - async processInitialize(context: PartitionContext) { + async processInitialize(context: PartitionContext): Promise { loggerForTest(`processInitialize(${context.partitionId})`); partitionResultsMap.get(context.partitionId)!.initialized = true; } - async processClose(reason: CloseReason, context: PartitionContext) { + async processClose(reason: CloseReason, context: PartitionContext): Promise { loggerForTest(`processClose(${context.partitionId})`); partitionResultsMap.get(context.partitionId)!.closeReason = reason; } - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { partitionOwnershipArr.add(context.partitionId); const existingEvents = partitionResultsMap.get(context.partitionId)!.events; existingEvents.push(...events.map((event) => event.body)); } - async processError(err: Error, context: PartitionContext) { + async processError(err: Error, context: PartitionContext): Promise { loggerForTest(`processError(${context.partitionId})`); const errorName = (err as any).code; if (errorName === "ReceiverDisconnectedError") { @@ -1207,7 +1207,7 @@ describe("Event Processor", function(): void { // if stealing has occurred we just want to make sure that _all_ // the stealing has completed. - const isBalanced = (friendlyName: string) => { + const isBalanced = (friendlyName: string): boolean => { const n = Math.floor(partitionIds.length / 2); const numPartitions = partitionOwnershipMap.get(processorByName[friendlyName].id)! .length; @@ -1258,20 +1258,20 @@ describe("Event Processor", function(): void { // The partitionProcess will need to add events to the partitionResultsMap as they are received class FooPartitionProcessor implements Required { - async processInitialize(context: PartitionContext) { + async processInitialize(context: PartitionContext): Promise { loggerForTest(`processInitialize(${context.partitionId})`); partitionResultsMap.get(context.partitionId)!.initialized = true; } - async processClose(reason: CloseReason, context: PartitionContext) { + async processClose(reason: CloseReason, context: PartitionContext): Promise { loggerForTest(`processClose(${context.partitionId})`); partitionResultsMap.get(context.partitionId)!.closeReason = reason; } - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { partitionOwnershipArr.add(context.partitionId); const existingEvents = partitionResultsMap.get(context.partitionId)!.events; existingEvents.push(...events.map((event) => event.body)); } - async processError(err: Error, context: PartitionContext) { + async processError(err: Error, context: PartitionContext): Promise { loggerForTest(`processError(${context.partitionId})`); const errorName = (err as any).code; if (errorName === "ReceiverDisconnectedError") { @@ -1364,7 +1364,7 @@ describe("Event Processor", function(): void { // if stealing has occurred we just want to make sure that _all_ // the stealing has completed. - const isBalanced = (friendlyName: string) => { + const isBalanced = (friendlyName: string): boolean => { const n = Math.floor(partitionIds.length / 2); const numPartitions = partitionOwnershipMap.get(processorByName[friendlyName].id)! .length; @@ -1407,10 +1407,13 @@ describe("Event Processor", function(): void { // The partitionProcess will need to add events to the partitionResultsMap as they are received class FooPartitionProcessor { - async processEvents(_events: ReceivedEventData[], context: PartitionContext) { + async processEvents( + _events: ReceivedEventData[], + context: PartitionContext + ): Promise { partitionOwnershipArr.add(context.partitionId); } - async processError() { + async processError(): Promise { didError = true; } } @@ -1487,10 +1490,13 @@ describe("Event Processor", function(): void { // The partitionProcess will need to add events to the partitionResultsMap as they are received class FooPartitionProcessor { - async processEvents(_events: ReceivedEventData[], context: PartitionContext) { + async processEvents( + _events: ReceivedEventData[], + context: PartitionContext + ): Promise { partitionOwnershipArr.add(context.partitionId); } - async processError() {} + async processError(): Promise {} } // create messages diff --git a/sdk/eventhub/event-hubs/test/utils/fakeSubscriptionEventHandlers.ts b/sdk/eventhub/event-hubs/test/utils/fakeSubscriptionEventHandlers.ts index fe486579be14..6d49eb8f7bfb 100644 --- a/sdk/eventhub/event-hubs/test/utils/fakeSubscriptionEventHandlers.ts +++ b/sdk/eventhub/event-hubs/test/utils/fakeSubscriptionEventHandlers.ts @@ -7,7 +7,7 @@ export class FakeSubscriptionEventHandlers implements SubscriptionEventHandlers public events: Map = new Map(); public errors: Error[] = []; - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { for (const event of events) { let receivedEvents = this.events.get(context.partitionId); @@ -20,7 +20,7 @@ export class FakeSubscriptionEventHandlers implements SubscriptionEventHandlers } } - async processError(err: Error) { + async processError(err: Error): Promise { this.errors.push(err); } } diff --git a/sdk/eventhub/event-hubs/test/utils/logHelpers.ts b/sdk/eventhub/event-hubs/test/utils/logHelpers.ts index b45e3925f41a..b83c64767e93 100644 --- a/sdk/eventhub/event-hubs/test/utils/logHelpers.ts +++ b/sdk/eventhub/event-hubs/test/utils/logHelpers.ts @@ -24,7 +24,7 @@ export class LogTester { debugModule.enable(loggers.map((logger) => logger.namespace).join(",")); } - assert() { + assert(): void { this.close(); if (this._expectedMessages.length > 0) { @@ -32,7 +32,7 @@ export class LogTester { } } - private check(message: string) { + private check(message: string): void { for (let i = 0; i < this._expectedMessages.length; ++i) { const expectedMessage = this._expectedMessages[i]; if (typeof expectedMessage === "string") { @@ -49,7 +49,7 @@ export class LogTester { } } - private close() { + private close(): void { for (const logger of this._attachedLoggers) { logger.logger.enabled = logger.wasEnabled; logger.logger.log = logger.previousLogFunction; @@ -59,7 +59,7 @@ export class LogTester { debugModule.disable(); } - private attach(logger: debugModule.Debugger) { + private attach(logger: debugModule.Debugger): void { this._attachedLoggers.push({ logger, wasEnabled: logger.enabled, diff --git a/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts b/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts index 0604dbcc4a5c..2a9d490e4d31 100644 --- a/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts +++ b/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts @@ -138,7 +138,7 @@ export class ReceivedMessagesTester implements Required { const expectedMessagePrefix = `EventHubConsumerClient test - ${Date.now().toString()}`; const messagesToSend = []; diff --git a/sdk/eventhub/event-hubs/test/utils/subscriptionHandlerForTests.ts b/sdk/eventhub/event-hubs/test/utils/subscriptionHandlerForTests.ts index 61d1a93401b8..410ca140c9bc 100644 --- a/sdk/eventhub/event-hubs/test/utils/subscriptionHandlerForTests.ts +++ b/sdk/eventhub/event-hubs/test/utils/subscriptionHandlerForTests.ts @@ -56,15 +56,15 @@ export class SubscriptionHandlerForTests implements Required { this.data.set(context.partitionId, {}); } - async processClose(reason: CloseReason, context: PartitionContext) { + async processClose(reason: CloseReason, context: PartitionContext): Promise { this.data.get(context.partitionId)!.closeReason = reason; } - async processEvents(events: ReceivedEventData[], context: PartitionContext) { + async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { // by default we don't fill out the lastEnqueuedEventInfo field (they have to enable it // explicitly in the options for the processor). should.not.exist(context.lastEnqueuedEventProperties); @@ -79,7 +79,7 @@ export class SubscriptionHandlerForTests implements Required { loggerForTest(`Error in partition ${context.partitionId}: ${err}`); should.exist( context.partitionId, @@ -147,7 +147,7 @@ export class SubscriptionHandlerForTests implements Required Date: Tue, 19 Jan 2021 09:59:19 -0800 Subject: [PATCH 02/20] [event-hubs] fix eqeqeq eslint errors/warnings --- sdk/eventhub/event-hubs/src/eventData.ts | 3 ++- sdk/eventhub/event-hubs/src/eventDataBatch.ts | 5 ++-- .../event-hubs/src/eventHubProducerClient.ts | 9 ++++---- sdk/eventhub/event-hubs/src/eventPosition.ts | 23 ++++++++++--------- sdk/eventhub/event-hubs/src/util/isDefined.ts | 12 ++++++++++ sdk/eventhub/event-hubs/src/util/retries.ts | 3 ++- .../event-hubs/test/eventProcessor.spec.ts | 4 ++-- 7 files changed, 38 insertions(+), 21 deletions(-) create mode 100644 sdk/eventhub/event-hubs/src/util/isDefined.ts diff --git a/sdk/eventhub/event-hubs/src/eventData.ts b/sdk/eventhub/event-hubs/src/eventData.ts index 4cc9f70f1aab..9f15d0e1f043 100644 --- a/sdk/eventhub/event-hubs/src/eventData.ts +++ b/sdk/eventhub/event-hubs/src/eventData.ts @@ -3,6 +3,7 @@ import { DeliveryAnnotations, Message as RheaMessage, MessageAnnotations } from "rhea-promise"; import { Constants } from "@azure/core-amqp"; +import { isDefined } from "./util/isDefined"; /** * Describes the delivery annotations. @@ -203,7 +204,7 @@ export function toRheaMessage(data: EventData, partitionKey?: string): RheaMessa if (data.properties) { msg.application_properties = data.properties; } - if (partitionKey != undefined) { + if (isDefined(partitionKey)) { msg.message_annotations[Constants.partitionKey] = partitionKey; // Event Hub service cannot route messages to a specific partition based on the partition key // if AMQP message header is an empty object. Hence we make sure that header is always present diff --git a/sdk/eventhub/event-hubs/src/eventDataBatch.ts b/sdk/eventhub/event-hubs/src/eventDataBatch.ts index 8734378ab30f..277fa3ba03e9 100644 --- a/sdk/eventhub/event-hubs/src/eventDataBatch.ts +++ b/sdk/eventhub/event-hubs/src/eventDataBatch.ts @@ -9,6 +9,7 @@ import { Span, SpanContext } from "@opentelemetry/api"; import { TRACEPARENT_PROPERTY, instrumentEventData } from "./diagnostics/instrumentEventData"; import { createMessageSpan } from "./diagnostics/messageSpan"; import { defaultDataTransformer } from "./dataTransformer"; +import { isDefined } from "./util/isDefined"; /** * The amount of bytes to reserve as overhead for a small message. @@ -192,8 +193,8 @@ export class EventDataBatchImpl implements EventDataBatch { ) { this._context = context; this._maxSizeInBytes = maxSizeInBytes; - this._partitionKey = partitionKey != undefined ? String(partitionKey) : partitionKey; - this._partitionId = partitionId != undefined ? String(partitionId) : partitionId; + this._partitionKey = isDefined(partitionKey) ? String(partitionKey) : partitionKey; + this._partitionId = isDefined(partitionId) ? String(partitionId) : partitionId; this._sizeInBytes = 0; this._count = 0; } diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index caffb7603869..13f25ec63b93 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -21,6 +21,7 @@ import { SendBatchOptions } from "./models/public"; import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error"; +import { isDefined } from "./util/isDefined"; import { getParentSpan, OperationOptions } from "./util/operationOptions"; /** @@ -174,7 +175,7 @@ export class EventHubProducerClient { async createBatch(options: CreateBatchOptions = {}): Promise { throwErrorIfConnectionClosed(this._context); - if (options.partitionId != undefined && options.partitionKey != undefined) { + if (isDefined(options.partitionId) && isDefined(options.partitionKey)) { throw new Error("partitionId and partitionKey cannot both be set when creating a batch"); } @@ -316,16 +317,16 @@ export class EventHubProducerClient { } } } - if (partitionId != undefined && partitionKey != undefined) { + if (isDefined(partitionId) && isDefined(partitionKey)) { throw new Error( `The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.` ); } - if (partitionId != undefined) { + if (isDefined(partitionId)) { partitionId = String(partitionId); } - if (partitionKey != undefined) { + if (isDefined(partitionKey)) { partitionKey = String(partitionKey); } diff --git a/sdk/eventhub/event-hubs/src/eventPosition.ts b/sdk/eventhub/event-hubs/src/eventPosition.ts index 231e94102da6..3d73f164b92f 100644 --- a/sdk/eventhub/event-hubs/src/eventPosition.ts +++ b/sdk/eventhub/event-hubs/src/eventPosition.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. import { Constants, ErrorNameConditionMapper, translate } from "@azure/core-amqp"; +import { isDefined } from "./util/isDefined"; /** * Represents the position of an event in an Event Hub partition, typically used when calling the `subscribe()` @@ -51,15 +52,15 @@ export interface EventPosition { export function getEventPositionFilter(eventPosition: EventPosition): string { let result; // order of preference - if (eventPosition.offset != undefined) { + if (isDefined(eventPosition.offset)) { result = eventPosition.isInclusive ? `${Constants.offsetAnnotation} >= '${eventPosition.offset}'` : `${Constants.offsetAnnotation} > '${eventPosition.offset}'`; - } else if (eventPosition.sequenceNumber != undefined) { + } else if (isDefined(eventPosition.sequenceNumber)) { result = eventPosition.isInclusive ? `${Constants.sequenceNumberAnnotation} >= '${eventPosition.sequenceNumber}'` : `${Constants.sequenceNumberAnnotation} > '${eventPosition.sequenceNumber}'`; - } else if (eventPosition.enqueuedOn != undefined) { + } else if (isDefined(eventPosition.enqueuedOn)) { const time = eventPosition.enqueuedOn instanceof Date ? eventPosition.enqueuedOn.getTime() @@ -114,7 +115,7 @@ export const latestEventPosition: EventPosition = { export function validateEventPositions( position: EventPosition | { [partitionId: string]: EventPosition } ): void { - if (position == undefined) { + if (!isDefined(position)) { return; } @@ -151,15 +152,15 @@ export function isEventPosition(position: any): position is EventPosition { return false; } - if (position.offset != undefined) { + if (isDefined(position.offset)) { return true; } - if (position.sequenceNumber != undefined) { + if (isDefined(position.sequenceNumber)) { return true; } - if (position.enqueuedOn != undefined) { + if (isDefined(position.enqueuedOn)) { return true; } @@ -167,12 +168,12 @@ export function isEventPosition(position: any): position is EventPosition { } function validateEventPosition(position: EventPosition): void { - if (position == undefined) { + if (!isDefined(position)) { return; } - const offsetPresent = position.offset != undefined; - const sequenceNumberPresent = position.sequenceNumber != undefined; - const enqueuedOnPresent = position.enqueuedOn != undefined; + const offsetPresent = isDefined(position.offset); + const sequenceNumberPresent = isDefined(position.sequenceNumber); + const enqueuedOnPresent = isDefined(position.enqueuedOn); if ( (offsetPresent && sequenceNumberPresent) || diff --git a/sdk/eventhub/event-hubs/src/util/isDefined.ts b/sdk/eventhub/event-hubs/src/util/isDefined.ts new file mode 100644 index 000000000000..83b932a9d5b7 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/util/isDefined.ts @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/** + * Helper TypeGuard that checks if something is defined or not. + * @param thing Anything + * @internal + * @hidden + */ +export function isDefined(thing: T | undefined | null): thing is T { + return typeof thing !== "undefined" && thing !== null; +} diff --git a/sdk/eventhub/event-hubs/src/util/retries.ts b/sdk/eventhub/event-hubs/src/util/retries.ts index e4d69b513f5e..7f0a69f88269 100644 --- a/sdk/eventhub/event-hubs/src/util/retries.ts +++ b/sdk/eventhub/event-hubs/src/util/retries.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. import { Constants, RetryOptions } from "@azure/core-amqp"; +import { isDefined } from "./isDefined"; /** * @internal @@ -9,7 +10,7 @@ import { Constants, RetryOptions } from "@azure/core-amqp"; */ export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number { const timeoutInMs = - retryOptions == undefined || + !isDefined(retryOptions) || typeof retryOptions.timeoutInMs !== "number" || !isFinite(retryOptions.timeoutInMs) || retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInMs diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 62f6f880a6e1..fd9646ba9600 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -1211,7 +1211,7 @@ describe("Event Processor", function(): void { const n = Math.floor(partitionIds.length / 2); const numPartitions = partitionOwnershipMap.get(processorByName[friendlyName].id)! .length; - return numPartitions == n || numPartitions == n + 1; + return numPartitions === n || numPartitions === n + 1; }; if (!isBalanced(`processor-1`) || !isBalanced(`processor-2`)) { @@ -1368,7 +1368,7 @@ describe("Event Processor", function(): void { const n = Math.floor(partitionIds.length / 2); const numPartitions = partitionOwnershipMap.get(processorByName[friendlyName].id)! .length; - return numPartitions == n || numPartitions == n + 1; + return numPartitions === n || numPartitions === n + 1; }; if (!isBalanced(`processor-1`) || !isBalanced(`processor-2`)) { From 03e516b9e3036035ad1b0c54ce5c08d5f34f236c Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 10:11:19 -0800 Subject: [PATCH 03/20] [event-hubs] fix @typescript-eslint/ban-ts-comment eslint errors/wanrings --- sdk/eventhub/event-hubs/test/receiver.spec.ts | 2 +- sdk/eventhub/event-hubs/test/sender.spec.ts | 16 ++++++++-------- sdk/eventhub/event-hubs/test/utils/testUtils.ts | 3 ++- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sdk/eventhub/event-hubs/test/receiver.spec.ts b/sdk/eventhub/event-hubs/test/receiver.spec.ts index f20d4f9b9d16..ed774e554561 100644 --- a/sdk/eventhub/event-hubs/test/receiver.spec.ts +++ b/sdk/eventhub/event-hubs/test/receiver.spec.ts @@ -63,7 +63,7 @@ describe("EventHubConsumerClient", function(): void { let subscription: Subscription | undefined; await new Promise((resolve, reject) => { subscription = consumerClient.subscribe( - // @ts-expect-error + // @ts-expect-error Testing the value 0 can be provided as a number for JS users. 0, { processEvents: async () => { diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 04260cc55c45..8a33d229c7e5 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -92,7 +92,7 @@ describe("EventHub Sender", function(): void { it("partitionId is set as expected when it is 0 i.e. falsy", async () => { const batch = await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionId: 0 }); should.equal(batch.partitionId, "0"); @@ -107,7 +107,7 @@ describe("EventHub Sender", function(): void { it("partitionKey is set as expected when it is 0 i.e. falsy", async () => { const batch = await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionKey: 0 }); should.equal(batch.partitionKey, "0"); @@ -165,7 +165,7 @@ describe("EventHub Sender", function(): void { const list = ["Albert", "Marie"]; const batch = await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionId: 0 }); @@ -206,7 +206,7 @@ describe("EventHub Sender", function(): void { const list = ["Albert", "Marie"]; const batch = await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionKey: 0 }); @@ -1033,7 +1033,7 @@ describe("EventHub Sender", function(): void { it("throws an error if partitionId and partitionKey are set and partitionId is 0 i.e. falsy", async () => { try { await producerClient.createBatch({ - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionId: 0, partitionKey: "boo" }); @@ -1049,7 +1049,7 @@ describe("EventHub Sender", function(): void { try { await producerClient.createBatch({ partitionId: "1", - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionKey: 0 }); throw new Error("Test failure"); @@ -1202,7 +1202,7 @@ describe("EventHub Sender", function(): void { it("throws an error if partitionId and partitionKey are set with partitionId set to 0 i.e. falsy", async () => { const badOptions: SendBatchOptions = { partitionKey: "foo", - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionId: 0 }; const batch = [{ body: "Hello 1" }, { body: "Hello 2" }]; @@ -1217,7 +1217,7 @@ describe("EventHub Sender", function(): void { }); it("throws an error if partitionId and partitionKey are set with partitionKey set to 0 i.e. falsy", async () => { const badOptions: SendBatchOptions = { - //@ts-expect-error + // @ts-expect-error Testing the value 0 is not ignored. partitionKey: 0, partitionId: "0" }; diff --git a/sdk/eventhub/event-hubs/test/utils/testUtils.ts b/sdk/eventhub/event-hubs/test/utils/testUtils.ts index cf30d8c8aa55..611dcca49ca8 100644 --- a/sdk/eventhub/event-hubs/test/utils/testUtils.ts +++ b/sdk/eventhub/event-hubs/test/utils/testUtils.ts @@ -11,6 +11,8 @@ import { NoOpTracer, TestTracer, setTracer } from "@azure/core-tracing"; dotenv.config(); +declare const window: any; + export const isNode = !!process && !!process.version && !!process.versions && !!process.versions.node; @@ -26,7 +28,6 @@ function getEnvVarValue(name: string): string | undefined { if (isNode) { return process.env[name]; } else { - // @ts-ignore return window.__env__[name]; } } From 968de4f8d7d2de7187c7a168f9cfd8786cd8db18 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 12:07:32 -0800 Subject: [PATCH 04/20] [event-hubs] fixes @typescript-eslint/explicit-module-boundary-types eslint errors/warnings --- .../event-hubs/src/dataTransformer.ts | 9 ++-- sdk/eventhub/event-hubs/src/eventData.ts | 2 +- sdk/eventhub/event-hubs/src/eventDataBatch.ts | 6 +-- .../event-hubs/src/eventHubProducerClient.ts | 2 +- sdk/eventhub/event-hubs/src/eventPosition.ts | 10 ++-- sdk/eventhub/event-hubs/src/log.ts | 5 +- sdk/eventhub/event-hubs/src/util/error.ts | 5 +- sdk/eventhub/event-hubs/src/util/isDefined.ts | 12 ----- sdk/eventhub/event-hubs/src/util/retries.ts | 2 +- .../event-hubs/src/util/typeGuards.ts | 54 +++++++++++++++++++ 10 files changed, 76 insertions(+), 31 deletions(-) delete mode 100644 sdk/eventhub/event-hubs/src/util/isDefined.ts create mode 100644 sdk/eventhub/event-hubs/src/util/typeGuards.ts diff --git a/sdk/eventhub/event-hubs/src/dataTransformer.ts b/sdk/eventhub/event-hubs/src/dataTransformer.ts index f09470d8983a..fc96c95bc08b 100644 --- a/sdk/eventhub/event-hubs/src/dataTransformer.ts +++ b/sdk/eventhub/event-hubs/src/dataTransformer.ts @@ -5,6 +5,7 @@ import { message } from "rhea-promise"; import isBuffer from "is-buffer"; import { Buffer } from "buffer"; import { logErrorStackTrace, logger } from "./log"; +import { isObjectWithProperties } from "./util/typeGuards"; /** * The default data transformer that will be used by the Azure SDK. @@ -23,7 +24,7 @@ export const defaultDataTransformer = { * - content: The given AMQP message body as a Buffer. * - multiple: true | undefined. */ - encode(body: any): any { + encode(body: unknown): any { let result: any; if (isBuffer(body)) { result = message.data_section(body); @@ -31,7 +32,7 @@ export const defaultDataTransformer = { // string, undefined, null, boolean, array, object, number should end up here // coercing undefined to null as that will ensure that null value will be given to the // customer on receive. - if (body === undefined) body = null; // tslint:disable-line + if (body === undefined) body = null; try { const bodyStr = JSON.stringify(body); result = message.data_section(Buffer.from(bodyStr, "utf8")); @@ -56,10 +57,10 @@ export const defaultDataTransformer = { * @param {DataSection} body The AMQP message body * @return {*} decoded body or the given body as-is. */ - decode(body: any): any { + decode(body: unknown): any { let processedBody: any = body; try { - if (body.content && isBuffer(body.content)) { + if (isObjectWithProperties(body, ["content"]) && isBuffer(body.content)) { // This indicates that we are getting the AMQP described type. Let us try decoding it. processedBody = body.content; } diff --git a/sdk/eventhub/event-hubs/src/eventData.ts b/sdk/eventhub/event-hubs/src/eventData.ts index 9f15d0e1f043..a2a1cd7b6de4 100644 --- a/sdk/eventhub/event-hubs/src/eventData.ts +++ b/sdk/eventhub/event-hubs/src/eventData.ts @@ -3,7 +3,7 @@ import { DeliveryAnnotations, Message as RheaMessage, MessageAnnotations } from "rhea-promise"; import { Constants } from "@azure/core-amqp"; -import { isDefined } from "./util/isDefined"; +import { isDefined } from "./util/typeGuards"; /** * Describes the delivery annotations. diff --git a/sdk/eventhub/event-hubs/src/eventDataBatch.ts b/sdk/eventhub/event-hubs/src/eventDataBatch.ts index 277fa3ba03e9..3f85de9a8a89 100644 --- a/sdk/eventhub/event-hubs/src/eventDataBatch.ts +++ b/sdk/eventhub/event-hubs/src/eventDataBatch.ts @@ -9,7 +9,7 @@ import { Span, SpanContext } from "@opentelemetry/api"; import { TRACEPARENT_PROPERTY, instrumentEventData } from "./diagnostics/instrumentEventData"; import { createMessageSpan } from "./diagnostics/messageSpan"; import { defaultDataTransformer } from "./dataTransformer"; -import { isDefined } from "./util/isDefined"; +import { isDefined, isObjectWithProperties } from "./util/typeGuards"; /** * The amount of bytes to reserve as overhead for a small message. @@ -30,9 +30,9 @@ const smallMessageMaxBytes = 255; * @internal * @hidden */ -export function isEventDataBatch(eventDataBatch: any): eventDataBatch is EventDataBatch { +export function isEventDataBatch(eventDataBatch: unknown): eventDataBatch is EventDataBatch { return ( - eventDataBatch && + isObjectWithProperties(eventDataBatch, ["count", "sizeInBytes", "tryAdd"]) && typeof eventDataBatch.tryAdd === "function" && typeof eventDataBatch.count === "number" && typeof eventDataBatch.sizeInBytes === "number" diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index 13f25ec63b93..55d6236d3a19 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -21,7 +21,7 @@ import { SendBatchOptions } from "./models/public"; import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error"; -import { isDefined } from "./util/isDefined"; +import { isDefined } from "./util/typeGuards"; import { getParentSpan, OperationOptions } from "./util/operationOptions"; /** diff --git a/sdk/eventhub/event-hubs/src/eventPosition.ts b/sdk/eventhub/event-hubs/src/eventPosition.ts index 3d73f164b92f..95e9a1b0e78b 100644 --- a/sdk/eventhub/event-hubs/src/eventPosition.ts +++ b/sdk/eventhub/event-hubs/src/eventPosition.ts @@ -2,7 +2,7 @@ // Licensed under the MIT license. import { Constants, ErrorNameConditionMapper, translate } from "@azure/core-amqp"; -import { isDefined } from "./util/isDefined"; +import { isDefined, objectHasProperty } from "./util/typeGuards"; /** * Represents the position of an event in an Event Hub partition, typically used when calling the `subscribe()` @@ -147,20 +147,20 @@ export function validateEventPositions( * @hidden * @internal */ -export function isEventPosition(position: any): position is EventPosition { +export function isEventPosition(position: unknown): position is EventPosition { if (!position) { return false; } - if (isDefined(position.offset)) { + if (objectHasProperty(position, "offset") && isDefined(position.offset)) { return true; } - if (isDefined(position.sequenceNumber)) { + if (objectHasProperty(position, "sequenceNumber") && isDefined(position.sequenceNumber)) { return true; } - if (isDefined(position.enqueuedOn)) { + if (objectHasProperty(position, "enqueuedOn") && isDefined(position.enqueuedOn)) { return true; } diff --git a/sdk/eventhub/event-hubs/src/log.ts b/sdk/eventhub/event-hubs/src/log.ts index 21d5fd046a15..8fb9a5680966 100644 --- a/sdk/eventhub/event-hubs/src/log.ts +++ b/sdk/eventhub/event-hubs/src/log.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. import { createClientLogger } from "@azure/logger"; +import { isObjectWithProperties } from "./util/typeGuards"; /** * The @azure/logger configuration for this package. @@ -14,8 +15,8 @@ export const logger = createClientLogger("event-hubs"); * @param error Error containing a stack trace. * @hidden */ -export function logErrorStackTrace(error: any): void { - if (error && error.stack) { +export function logErrorStackTrace(error: unknown): void { + if (isObjectWithProperties(error, ["stack"]) && error.stack) { logger.verbose(error.stack); } } diff --git a/sdk/eventhub/event-hubs/src/util/error.ts b/sdk/eventhub/event-hubs/src/util/error.ts index 83ef87678e8a..9bc513d90997 100644 --- a/sdk/eventhub/event-hubs/src/util/error.ts +++ b/sdk/eventhub/event-hubs/src/util/error.ts @@ -3,6 +3,7 @@ import { logErrorStackTrace, logger } from "../log"; import { ConnectionContext } from "../connectionContext"; +import { isDefined } from "./typeGuards"; /** * @internal @@ -33,9 +34,9 @@ export function throwTypeErrorIfParameterMissing( connectionId: string, methodName: string, parameterName: string, - parameterValue: any + parameterValue: unknown ): void { - if (parameterValue === undefined || parameterValue === null) { + if (!isDefined(parameterValue)) { const error = new TypeError( `${methodName} called without required argument "${parameterName}"` ); diff --git a/sdk/eventhub/event-hubs/src/util/isDefined.ts b/sdk/eventhub/event-hubs/src/util/isDefined.ts deleted file mode 100644 index 83b932a9d5b7..000000000000 --- a/sdk/eventhub/event-hubs/src/util/isDefined.ts +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -/** - * Helper TypeGuard that checks if something is defined or not. - * @param thing Anything - * @internal - * @hidden - */ -export function isDefined(thing: T | undefined | null): thing is T { - return typeof thing !== "undefined" && thing !== null; -} diff --git a/sdk/eventhub/event-hubs/src/util/retries.ts b/sdk/eventhub/event-hubs/src/util/retries.ts index 7f0a69f88269..281723fbaaea 100644 --- a/sdk/eventhub/event-hubs/src/util/retries.ts +++ b/sdk/eventhub/event-hubs/src/util/retries.ts @@ -2,7 +2,7 @@ // Licensed under the MIT license. import { Constants, RetryOptions } from "@azure/core-amqp"; -import { isDefined } from "./isDefined"; +import { isDefined } from "./typeGuards"; /** * @internal diff --git a/sdk/eventhub/event-hubs/src/util/typeGuards.ts b/sdk/eventhub/event-hubs/src/util/typeGuards.ts new file mode 100644 index 000000000000..9394c1d4c583 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/util/typeGuards.ts @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/** + * Helper TypeGuard that checks if something is defined or not. + * @param thing Anything + * @internal + * @hidden + */ +export function isDefined(thing: T | undefined | null): thing is T { + return typeof thing !== "undefined" && thing !== null; +} + +/** + * Helper TypeGuard that checks if the input is an object with the specified properties. + * @param thing - Anything. + * @param properties - The name of the properties that should appear in the object. + * @internal + * @hidden + */ +export function isObjectWithProperties( + thing: Thing, + properties: PropertyName[] +): thing is Thing & Record { + if (!isDefined(thing) || typeof thing !== "object") { + return false; + } + + for (const property of properties) { + if (!objectHasProperty(thing, property)) { + return false; + } + } + + return true; +} + +/** + * Helper TypeGuard that checks if the input is an object with the specified property. + * @param thing - Any object. + * @param property - The name of the property that should appear in the object. + * @internal + * @hidden + */ +export function objectHasProperty( + thing: Thing, + property: PropertyName +): thing is Thing & Record { + if (!(property in thing)) { + return false; + } + + return true; +} From 032f9c02d5ac50baf9d2ed035c8f1ff5efd4d44c Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 12:21:38 -0800 Subject: [PATCH 05/20] [event-hubs] fixes @typescript-eslint/no-empty-function eslint errors/warnings --- sdk/eventhub/event-hubs/src/eventProcessor.ts | 4 +- sdk/eventhub/event-hubs/test/client.spec.ts | 12 +- .../test/eventHubConsumerClient.spec.ts | 60 +++++++--- .../event-hubs/test/eventProcessor.spec.ts | 104 +++++++++++++----- .../event-hubs/test/partitionPump.spec.ts | 4 +- sdk/eventhub/event-hubs/test/receiver.spec.ts | 8 +- sdk/eventhub/event-hubs/test/sender.spec.ts | 16 ++- 7 files changed, 156 insertions(+), 52 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 61c120dbcd0f..175b6aafa2e3 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -527,7 +527,9 @@ export class EventProcessor { eventHubName: this._eventHubName, consumerGroup: this._consumerGroup, partitionId: "", - updateCheckpoint: async () => {} + updateCheckpoint: async () => { + /* no-op */ + } }); } catch (err) { logger.verbose( diff --git a/sdk/eventhub/event-hubs/test/client.spec.ts b/sdk/eventhub/event-hubs/test/client.spec.ts index 4cd70ba37a0e..8b0a508609f8 100644 --- a/sdk/eventhub/event-hubs/test/client.spec.ts +++ b/sdk/eventhub/event-hubs/test/client.spec.ts @@ -275,7 +275,9 @@ describe("EventHubConsumerClient with non existent namespace", function(): void let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = client.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } @@ -419,7 +421,9 @@ describe("EventHubConsumerClient with non existent event hub", function(): void let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = client.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } @@ -666,7 +670,9 @@ describe("EventHubConsumerClient after close()", function(): void { let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = client.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } diff --git a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts index 280128e69b8b..4da61b312e87 100644 --- a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts @@ -45,8 +45,12 @@ describe("EventHubConsumerClient", () => { describe("unit tests", () => { it("isCheckpointStore", () => { isCheckpointStore({ - processEvents: async () => {}, - processClose: async () => {} + processEvents: async () => { + /* no-op */ + }, + processClose: async () => { + /* no-op */ + } }).should.not.ok; isCheckpointStore("hello").should.not.ok; @@ -94,8 +98,12 @@ describe("EventHubConsumerClient", () => { ); subscriptionHandlers = { - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }; (client as any)["_createEventProcessor"] = fakeEventProcessorConstructor; @@ -103,7 +111,9 @@ describe("EventHubConsumerClient", () => { }); it("conflicting subscribes", () => { - validateOptions = () => {}; + validateOptions = () => { + /* no-op */ + }; client.subscribe(subscriptionHandlers); // invalid - we're already subscribed to a conflicting partition @@ -639,7 +649,9 @@ describe("EventHubConsumerClient", () => { close: 0 }; const subscriptionHandlers1: SubscriptionEventHandlers = { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents() { if (!handlerCalls.close) { // start the 2nd subscription that will kick the 1st subscription off @@ -661,7 +673,9 @@ describe("EventHubConsumerClient", () => { } }; const subscriptionHandlers2: SubscriptionEventHandlers = { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents() { // stop this subscription since it already should have forced the 1st subscription to have an error. await subscription2!.close(); @@ -719,7 +733,9 @@ describe("EventHubConsumerClient", () => { } const subscriptionHandlers1: SubscriptionEventHandlers = { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(_, context) { partitionHandlerCalls[context.partitionId].processEvents = true; }, @@ -754,7 +770,9 @@ describe("EventHubConsumerClient", () => { const partitionsReadFromSub2 = new Set(); const subscriptionHandlers2: SubscriptionEventHandlers = { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(_, context) { partitionsReadFromSub2.add(context.partitionId); } @@ -1071,8 +1089,12 @@ describe("EventHubConsumerClient", () => { let closeCalled = 0; const subscription = client.subscribe(partitionId, { - async processError() {}, - async processEvents() {}, + async processError() { + /* no-op */ + }, + async processEvents() { + /* no-op */ + }, async processClose() { closeCalled++; }, @@ -1115,8 +1137,12 @@ describe("EventHubConsumerClient", () => { let closeCalled = 0; const subscription = client.subscribe({ - async processError() {}, - async processEvents() {}, + async processError() { + /* no-op */ + }, + async processEvents() { + /* no-op */ + }, async processClose() { closeCalled++; }, @@ -1158,7 +1184,9 @@ describe("EventHubConsumerClient", () => { let subscription: Subscription; const caughtErr: Error = await new Promise((resolve) => { subscription = client.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err, context) => { if (!context.partitionId) { await subscription.close(); @@ -1187,7 +1215,9 @@ describe("EventHubConsumerClient", () => { const caughtErr: Error = await new Promise((resolve) => { // Subscribe to an invalid partition id to trigger a partition-specific error. subscription = client.subscribe("-1", { - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err, context) => { if (context.partitionId) { await subscription.close(); diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index fd9646ba9600..fb942b3b3cab 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -169,8 +169,12 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }, checkpointStore, { @@ -207,7 +211,9 @@ describe("Event Processor", function(): void { listOwnership: async () => { return []; }, - updateCheckpoint: async () => {} + updateCheckpoint: async () => { + /* no-op */ + } }; } }); @@ -229,7 +235,9 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err, context) => { // simulate the user messing up and accidentally throwing an error // we should just log it and not kill anything. @@ -284,7 +292,9 @@ describe("Event Processor", function(): void { async listOwnership(): Promise { return []; }, - async updateCheckpoint(): Promise {}, + async updateCheckpoint(): Promise { + /* no-op */ + }, async listCheckpoints(): Promise { return []; } @@ -297,7 +307,9 @@ describe("Event Processor", function(): void { pumpManager.createPumpCalled = true; }, - async removeAllPumps() {}, + async removeAllPumps() { + /* no-op */ + }, isReceivingFromPartition() { return false; @@ -312,8 +324,12 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }, checkpointStore, { @@ -387,8 +403,12 @@ describe("Event Processor", function(): void { loopIntervalInMs: 1, maxWaitTimeInSeconds: 1, pumpManager: { - async createPump() {}, - async removeAllPumps(): Promise {}, + async createPump() { + /* no-op */ + }, + async removeAllPumps(): Promise { + /* no-op */ + }, isReceivingFromPartition() { return false; } @@ -505,7 +525,9 @@ describe("Event Processor", function(): void { claimOwnership: async () => { throw new Error("Some random failure!"); }, - updateCheckpoint: async () => {}, + updateCheckpoint: async () => { + /* no-op */ + }, listCheckpoints: async () => [] }; @@ -513,7 +535,9 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err, _) => { errors.push(err); } @@ -617,8 +641,12 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }, new InMemoryCheckpointStore(), { @@ -636,8 +664,12 @@ describe("Event Processor", function(): void { EventHubConsumerClient.defaultConsumerGroupName, consumerClient["_context"], { - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }, new InMemoryCheckpointStore(), { ...defaultOptions, ownerId: "hello", startPosition: latestEventPosition } @@ -694,8 +726,12 @@ describe("Event Processor", function(): void { processInitialize: async () => { didPartitionProcessorStart = true; }, - processEvents: async () => {}, - processError: async () => {} + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } }, new InMemoryCheckpointStore(), { @@ -1496,7 +1532,9 @@ describe("Event Processor", function(): void { ): Promise { partitionOwnershipArr.add(context.partitionId); } - async processError(): Promise {} + async processError(): Promise { + /* no-op */ + } } // create messages @@ -1588,8 +1626,12 @@ describe("Event Processor", function(): void { claimedPartitions.add(partitionId); claimedPartitionsMap[eventProcessorId] = claimedPartitions; }, - async processEvents() {}, - async processError() {}, + async processEvents() { + /* no-op */ + }, + async processError() { + /* no-op */ + }, async processClose(reason, context) { const eventProcessorId: string = (context as any).eventProcessorId; const partitionId = context.partitionId; @@ -1751,8 +1793,12 @@ describe("Event Processor", function(): void { claimedPartitions.add(partitionId); claimedPartitionsMap[eventProcessorId] = claimedPartitions; }, - async processEvents() {}, - async processError() {}, + async processEvents() { + /* no-op */ + }, + async processError() { + /* no-op */ + }, async processClose(reason, context) { const eventProcessorId: string = (context as any).eventProcessorId; const partitionId = context.partitionId; @@ -1917,9 +1963,15 @@ function triggerAbortedSignalAfterNumCalls(maxCalls: number): AbortSignal { return false; }, - addEventListener: () => {}, - removeEventListener: () => {}, - onabort: () => {}, + addEventListener: () => { + /* no-op */ + }, + removeEventListener: () => { + /* no-op */ + }, + onabort: () => { + /* no-op */ + }, dispatchEvent: () => true }; diff --git a/sdk/eventhub/event-hubs/test/partitionPump.spec.ts b/sdk/eventhub/event-hubs/test/partitionPump.spec.ts index 1a2ec1c262b8..90f747360c3d 100644 --- a/sdk/eventhub/event-hubs/test/partitionPump.spec.ts +++ b/sdk/eventhub/event-hubs/test/partitionPump.spec.ts @@ -105,7 +105,9 @@ describe("PartitionPump", () => { const tracer = new TestTracer(); const span = tracer.startSpan("whatever"); - await trace(async () => {}, span); + await trace(async () => { + /* no-op */ + }, span); span.status!.code.should.equal(CanonicalCode.OK); span.endCalled.should.be.ok; diff --git a/sdk/eventhub/event-hubs/test/receiver.spec.ts b/sdk/eventhub/event-hubs/test/receiver.spec.ts index ed774e554561..971a4be50e9b 100644 --- a/sdk/eventhub/event-hubs/test/receiver.spec.ts +++ b/sdk/eventhub/event-hubs/test/receiver.spec.ts @@ -704,7 +704,9 @@ describe("EventHubConsumerClient", function(): void { let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = badConsumerClient.subscribe({ - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } @@ -723,7 +725,9 @@ describe("EventHubConsumerClient", function(): void { let subscription: Subscription | undefined; const caughtErr = await new Promise((resolve) => { subscription = consumerClient.subscribe("boo", { - processEvents: async () => {}, + processEvents: async () => { + /* no-op */ + }, processError: async (err) => { resolve(err); } diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 8a33d229c7e5..1c7f8d90befb 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -269,7 +269,9 @@ describe("EventHub Sender", function(): void { const subscriber = consumerClient.subscribe( "0", { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(events) { receivedEvents.push(...events); if (receivedEvents.length >= 3) { @@ -745,7 +747,9 @@ describe("EventHub Sender", function(): void { const receivingPromise = new Promise((r) => (receivingResolver = r)); const subscription = consumerClient.subscribe( { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(events) { receivedEvents.push(...events); receivingResolver(); @@ -773,7 +777,9 @@ describe("EventHub Sender", function(): void { const receivingPromise = new Promise((r) => (receivingResolver = r)); const subscription = consumerClient.subscribe( { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(events) { receivedEvents.push(...events); receivingResolver(); @@ -806,7 +812,9 @@ describe("EventHub Sender", function(): void { const subscription = consumerClient.subscribe( partitionId, { - async processError() {}, + async processError() { + /* no-op */ + }, async processEvents(events) { receivedEvents.push(...events); receivingResolver(); From 9166606ebe0cf486038929c9ad718b39028bfb51 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 12:47:43 -0800 Subject: [PATCH 06/20] [event-hubs] fixes no-unused-expressions eslint errors/warnings --- .../test/eventHubConsumerClient.spec.ts | 8 ++-- .../event-hubs/test/eventProcessor.spec.ts | 47 ++++++++++--------- .../test/loadBalancingStrategy.spec.ts | 4 +- .../event-hubs/test/partitionPump.spec.ts | 4 +- sdk/eventhub/event-hubs/test/sender.spec.ts | 20 ++++---- .../test/utils/receivedMessagesTester.ts | 10 ++-- 6 files changed, 48 insertions(+), 45 deletions(-) diff --git a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts index 4da61b312e87..4a8e269a02b9 100644 --- a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts @@ -51,11 +51,11 @@ describe("EventHubConsumerClient", () => { processClose: async () => { /* no-op */ } - }).should.not.ok; + }).should.not.equal(true); - isCheckpointStore("hello").should.not.ok; + isCheckpointStore("hello").should.not.equal(true); - isCheckpointStore(new InMemoryCheckpointStore()).should.ok; + isCheckpointStore(new InMemoryCheckpointStore()).should.equal(true); }); describe("subscribe() overloads route properly", () => { @@ -71,7 +71,7 @@ describe("EventHubConsumerClient", () => { ): SinonStubbedInstance => { subscriptionEventHandlers.should.equal(subscriptionHandlers); should.exist(connectionContext.managementSession); - isCheckpointStore(checkpointStore).should.be.ok; + isCheckpointStore(checkpointStore).should.equal(true); validateOptions(options); diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index fb942b3b3cab..8567c365f680 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -97,7 +97,7 @@ describe("Event Processor", function(): void { const processor = createEventProcessor(emptyCheckpointStore); const eventPosition = await processor["_getStartingPosition"]("0"); - isLatestPosition(eventPosition).should.be.ok; + should.equal(isLatestPosition(eventPosition), true); }); it("has a checkpoint", async () => { @@ -158,7 +158,7 @@ describe("Event Processor", function(): void { should.not.exist(eventPositionForPartitionZero!.sequenceNumber); const eventPositionForPartitionOne = await processor["_getStartingPosition"]("1"); - isLatestPosition(eventPositionForPartitionOne).should.be.ok; + should.equal(isLatestPosition(eventPositionForPartitionOne), true); }); function createEventProcessor( @@ -351,10 +351,10 @@ describe("Event Processor", function(): void { // when we fail to claim a partition we should _definitely_ // not attempt to start a pump. - pumpManager.createPumpCalled.should.be.false; + should.equal(pumpManager.createPumpCalled, false); // we'll attempt to claim a partition (but won't succeed) - checkpointStore.claimOwnershipCalled.should.be.true; + should.equal(checkpointStore.claimOwnershipCalled, true); }); it("abandoned claims are treated as unowned claims", async () => { @@ -436,7 +436,7 @@ describe("Event Processor", function(): void { triggerAbortedSignalAfterNumCalls(partitionIds.length * numTimesAbortedIsCheckedInLoop) ); - handlers.errors.should.be.empty; + handlers.errors.should.deep.equal([]); const currentOwnerships = await checkpointStore.listOwnership( commonFields.fullyQualifiedNamespace, @@ -712,8 +712,8 @@ describe("Event Processor", function(): void { receivedEvents.should.deep.equal(expectedMessages); - subscriptionEventHandler.hasErrors(partitionIds).should.be.false; - subscriptionEventHandler.allShutdown(partitionIds).should.be.true; + subscriptionEventHandler.hasErrors(partitionIds).should.equal(false); + subscriptionEventHandler.allShutdown(partitionIds).should.equal(true); }); it("should not throw if stop is called without start", async function(): Promise { @@ -743,7 +743,7 @@ describe("Event Processor", function(): void { // shutdown the processor await processor.stop(); - didPartitionProcessorStart.should.be.false; + didPartitionProcessorStart.should.equal(false); }); it("should support start after stopping", async function(): Promise { @@ -779,8 +779,8 @@ describe("Event Processor", function(): void { receivedEvents.should.deep.equal(expectedMessages); - subscriptionEventHandler.hasErrors(partitionIds).should.be.false; - subscriptionEventHandler.allShutdown(partitionIds).should.be.true; + subscriptionEventHandler.hasErrors(partitionIds).should.equal(false); + subscriptionEventHandler.allShutdown(partitionIds).should.equal(true); // validate correct events captured for each partition @@ -795,8 +795,8 @@ describe("Event Processor", function(): void { loggerForTest(`Stopping processor again`); await processor.stop(); - subscriptionEventHandler.hasErrors(partitionIds).should.be.false; - subscriptionEventHandler.allShutdown(partitionIds).should.be.true; + subscriptionEventHandler.hasErrors(partitionIds).should.equal(false); + subscriptionEventHandler.allShutdown(partitionIds).should.equal(true); }); describe("Partition processor", function(): void { @@ -828,8 +828,8 @@ describe("Event Processor", function(): void { // shutdown the processor await processor.stop(); - subscriptionEventHandler.hasErrors(partitionIds).should.be.false; - subscriptionEventHandler.allShutdown(partitionIds).should.be.true; + subscriptionEventHandler.hasErrors(partitionIds).should.equal(false); + subscriptionEventHandler.allShutdown(partitionIds).should.equal(true); receivedEvents.should.deep.equal(expectedMessages); }); @@ -910,9 +910,10 @@ describe("Event Processor", function(): void { async processEvents(events: ReceivedEventData[], context: PartitionContext): Promise { processedAtLeastOneEvent.add(context.partitionId); - !partionCount[context.partitionId] - ? (partionCount[context.partitionId] = 1) - : partionCount[context.partitionId]++; + if (!partionCount[context.partitionId]) { + partionCount[context.partitionId] = 0; + } + partionCount[context.partitionId]++; const existingEvents = checkpointMap.get(context.partitionId)!; @@ -1019,7 +1020,7 @@ describe("Event Processor", function(): void { firstEventsReceivedFromProcessor2[index++] = receivedEvents[0]; } - didError.should.be.false; + didError.should.equal(false); index = 0; // validate correct events captured for each partition using checkpoint for (const partitionId of partitionIds) { @@ -1270,8 +1271,8 @@ describe("Event Processor", function(): void { for (const partitionId of partitionIds) { const results = partitionResultsMap.get(partitionId)!; results.events.length.should.be.gte(1); - results.initialized.should.be.true; - (results.closeReason === CloseReason.Shutdown).should.be.true; + results.initialized.should.equal(true); + (results.closeReason === CloseReason.Shutdown).should.equal(true); } }); @@ -1427,8 +1428,8 @@ describe("Event Processor", function(): void { for (const partitionId of partitionIds) { const results = partitionResultsMap.get(partitionId)!; results.events.length.should.be.gte(1); - results.initialized.should.be.true; - (results.closeReason === CloseReason.Shutdown).should.be.true; + results.initialized.should.equal(true); + (results.closeReason === CloseReason.Shutdown).should.equal(true); } }); @@ -1510,7 +1511,7 @@ describe("Event Processor", function(): void { } } - didError.should.be.false; + didError.should.equal(false); const n = Math.floor(partitionIds.length / 2); partitionOwnershipMap.get(processorByName[`processor-0`].id)!.length.should.oneOf([n, n + 1]); partitionOwnershipMap.get(processorByName[`processor-1`].id)!.length.should.oneOf([n, n + 1]); diff --git a/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts index 927376da4d2b..a9b83acd8049 100644 --- a/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts +++ b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts @@ -1,10 +1,12 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +import chai from "chai"; import { PartitionOwnership } from "../src/eventProcessor"; import { BalancedLoadBalancingStrategy } from "../src/loadBalancerStrategies/balancedStrategy"; import { GreedyLoadBalancingStrategy } from "../src/loadBalancerStrategies/greedyStrategy"; import { UnbalancedLoadBalancingStrategy } from "../src/loadBalancerStrategies/unbalancedStrategy"; +const should = chai.should(); describe("LoadBalancingStrategy", () => { function createOwnershipMap( @@ -33,7 +35,7 @@ describe("LoadBalancingStrategy", () => { const lb = new UnbalancedLoadBalancingStrategy(); lb.getPartitionsToCliam("ownerId", m, ["1", "2", "3"]).should.deep.eq(["1", "2", "3"]); - m.should.be.empty; + should.equal(m.size, 0); }); it("claim partitions we already own", () => { diff --git a/sdk/eventhub/event-hubs/test/partitionPump.spec.ts b/sdk/eventhub/event-hubs/test/partitionPump.spec.ts index 90f747360c3d..6bc02f895911 100644 --- a/sdk/eventhub/event-hubs/test/partitionPump.spec.ts +++ b/sdk/eventhub/event-hubs/test/partitionPump.spec.ts @@ -110,7 +110,7 @@ describe("PartitionPump", () => { }, span); span.status!.code.should.equal(CanonicalCode.OK); - span.endCalled.should.be.ok; + should.equal(span.endCalled, true); }); it("trace - throws", async () => { @@ -123,7 +123,7 @@ describe("PartitionPump", () => { span.status!.code.should.equal(CanonicalCode.UNKNOWN); span.status!.message!.should.equal("error thrown from fn"); - span.endCalled.should.be.ok; + should.equal(span.endCalled, true); }); }); }); diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 1c7f8d90befb..202dca6cc4ae 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -129,9 +129,9 @@ describe("EventHub Sender", function(): void { should.not.exist(batch.partitionKey); batch.maxSizeInBytes.should.be.gt(0); - batch.tryAdd({ body: list[0] }).should.be.ok; - batch.tryAdd({ body: list[1] }).should.not.be.ok; // The Mike message will be rejected - it's over the limit. - batch.tryAdd({ body: list[2] }).should.be.ok; // Marie should get added"; + should.equal(batch.tryAdd({ body: list[0] }), true); + should.equal(batch.tryAdd({ body: list[1] }), false); // The Mike message will be rejected - it's over the limit. + should.equal(batch.tryAdd({ body: list[2] }), true); // Marie should get added"; const { subscriptionEventHandler, @@ -173,8 +173,8 @@ describe("EventHub Sender", function(): void { should.not.exist(batch.partitionKey); batch.maxSizeInBytes.should.be.gt(0); - batch.tryAdd({ body: list[0] }).should.be.ok; - batch.tryAdd({ body: list[1] }).should.be.ok; + should.equal(batch.tryAdd({ body: list[0] }), true); + should.equal(batch.tryAdd({ body: list[1] }), true); const { subscriptionEventHandler, @@ -214,8 +214,8 @@ describe("EventHub Sender", function(): void { should.not.exist(batch.partitionId); batch.maxSizeInBytes.should.be.gt(0); - batch.tryAdd({ body: list[0] }).should.be.ok; - batch.tryAdd({ body: list[1] }).should.be.ok; + should.equal(batch.tryAdd({ body: list[0] }), true); + should.equal(batch.tryAdd({ body: list[1] }), true); const { subscriptionEventHandler, @@ -255,9 +255,9 @@ describe("EventHub Sender", function(): void { batch.maxSizeInBytes.should.be.gt(0); - batch.tryAdd(list[0]).should.be.ok; - batch.tryAdd(list[1]).should.be.ok; - batch.tryAdd(list[2]).should.be.ok; + should.equal(batch.tryAdd(list[0]), true); + should.equal(batch.tryAdd(list[1]), true); + should.equal(batch.tryAdd(list[2]), true); const receivedEvents: ReceivedEventData[] = []; let waitUntilEventsReceivedResolver: Function; diff --git a/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts b/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts index 2a9d490e4d31..1b6239b0167c 100644 --- a/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts +++ b/sdk/eventhub/event-hubs/test/utils/receivedMessagesTester.ts @@ -85,7 +85,7 @@ export class ReceivedMessagesTester implements Required Date: Tue, 19 Jan 2021 13:13:20 -0800 Subject: [PATCH 07/20] [event-hubs] fixes no-constant-condition eslint errors/warnings --- .../test/utils/subscriptionHandlerForTests.ts | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/sdk/eventhub/event-hubs/test/utils/subscriptionHandlerForTests.ts b/sdk/eventhub/event-hubs/test/utils/subscriptionHandlerForTests.ts index 410ca140c9bc..175941097579 100644 --- a/sdk/eventhub/event-hubs/test/utils/subscriptionHandlerForTests.ts +++ b/sdk/eventhub/event-hubs/test/utils/subscriptionHandlerForTests.ts @@ -110,7 +110,8 @@ export class SubscriptionHandlerForTests implements Required { - const akey = `${a.partitionId}:${a.event.body}`; - const bkey = `${b.partitionId}:${b.event.body}`; - return akey.localeCompare(bkey); - }); - - return this.events; + isWaiting = false; } } + + this.events.sort((a, b) => { + const akey = `${a.partitionId}:${a.event.body}`; + const bkey = `${b.partitionId}:${b.event.body}`; + return akey.localeCompare(bkey); + }); + + return this.events; } async waitForEvents( From 9f355b821f7cb2fd0f11a52fea42f9e5313870f4 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 13:17:36 -0800 Subject: [PATCH 08/20] [event-hubs] fixes promise/param-names eslint errors/warnings --- sdk/eventhub/event-hubs/test/sender.spec.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 202dca6cc4ae..719b4935c0b4 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -261,7 +261,9 @@ describe("EventHub Sender", function(): void { const receivedEvents: ReceivedEventData[] = []; let waitUntilEventsReceivedResolver: Function; - const waitUntilEventsReceived = new Promise((r) => (waitUntilEventsReceivedResolver = r)); + const waitUntilEventsReceived = new Promise( + (resolve) => (waitUntilEventsReceivedResolver = resolve) + ); const sequenceNumber = (await consumerClient.getPartitionProperties("0")) .lastEnqueuedSequenceNumber; @@ -744,7 +746,7 @@ describe("EventHub Sender", function(): void { const receivedEvents: ReceivedEventData[] = []; let receivingResolver: Function; - const receivingPromise = new Promise((r) => (receivingResolver = r)); + const receivingPromise = new Promise((resolve) => (receivingResolver = resolve)); const subscription = consumerClient.subscribe( { async processError() { @@ -774,7 +776,7 @@ describe("EventHub Sender", function(): void { const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; const receivedEvents: ReceivedEventData[] = []; let receivingResolver: Function; - const receivingPromise = new Promise((r) => (receivingResolver = r)); + const receivingPromise = new Promise((resolve) => (receivingResolver = resolve)); const subscription = consumerClient.subscribe( { async processError() { @@ -808,7 +810,7 @@ describe("EventHub Sender", function(): void { const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; const receivedEvents: ReceivedEventData[] = []; let receivingResolver: Function; - const receivingPromise = new Promise((r) => (receivingResolver = r)); + const receivingPromise = new Promise((resolve) => (receivingResolver = resolve)); const subscription = consumerClient.subscribe( partitionId, { From 42f8d23eca40fe3b1e5e0b0b94a90e9bb179ad88 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 13:39:44 -0800 Subject: [PATCH 09/20] [event-hubs] fixes no-ex-assign eslint errors/warnings --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 24 ++++++++--------- .../event-hubs/src/managementClient.ts | 27 +++++++++---------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 8ceb4e53cff8..115d93c7f12f 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -464,15 +464,15 @@ export class EventHubSender extends LinkEntity { }); } catch (err) { removeListeners(); - err = translate(err); + const translatedError = translate(err); logger.warning( "[%s] An error occurred while creating the sender %s: %s", this._context.connectionId, this.name, - `${err?.name}: ${err?.message}` + `${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - return reject(err); + logErrorStackTrace(translatedError); + return reject(translatedError); } } const timeTakenByInit = Date.now() - initStartTime; @@ -505,14 +505,14 @@ export class EventHubSender extends LinkEntity { ); return resolve(); } catch (err) { - err = translate(err.innerError || err); + const translatedError = translate(err.innerError || err); logger.warning( "[%s] An error occurred while sending the message %s", this._context.connectionId, - `${err?.name}: ${err?.message}` + `${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - return reject(err); + logErrorStackTrace(translatedError); + return reject(translatedError); } finally { removeListeners(); } @@ -586,15 +586,15 @@ export class EventHubSender extends LinkEntity { } } catch (err) { this.isConnecting = false; - err = translate(err); + const translatedError = translate(err); logger.warning( "[%s] An error occurred while creating the sender %s: %s", this._context.connectionId, this.name, - `${err?.name}: ${err?.message}` + `${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - throw err; + logErrorStackTrace(translatedError); + throw translatedError; } } diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index af6b89c8b37f..66895209cc87 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -368,12 +368,12 @@ export class ManagementClient extends LinkEntity { await this._ensureTokenRenewal(); } } catch (err) { - err = translate(err); + const translatedError = translate(err); logger.warning( - `[${this._context.connectionId}] An error occured while establishing the $management links: ${err?.name}: ${err?.message}` + `[${this._context.connectionId}] An error occured while establishing the $management links: ${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - throw err; + logErrorStackTrace(translatedError); + throw translatedError; } } @@ -474,16 +474,15 @@ export class ManagementClient extends LinkEntity { const result = await this._mgmtReqResLink!.sendRequest(request, sendRequestOptions); resolve(result); } catch (err) { - err = translate(err); + const translatedError = translate(err); logger.warning( - "[%s] An error occurred during send on management request-response link with address " + - "'%s': %s", + "[%s] An error occurred during send on management request-response link with address '%s': %s", this._context.connectionId, this.address, - `${err?.name}: ${err?.message}` + `${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - reject(err); + logErrorStackTrace(translatedError); + reject(translatedError); } }); @@ -496,12 +495,12 @@ export class ManagementClient extends LinkEntity { }; return (await retry(config)).body; } catch (err) { - err = translate(err); + const translatedError = translate(err); logger.warning( - `An error occurred while making the request to $management endpoint: ${err?.name}: ${err?.message}` + `An error occurred while making the request to $management endpoint: ${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - throw err; + logErrorStackTrace(translatedError); + throw translatedError; } } From 917ad31ccfb55e6ac508a15f7cdb9f014fae8c87 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 14:27:16 -0800 Subject: [PATCH 10/20] [event-hubs] fixes @typescript-eslint/no-shadow eslint errors/warnings --- .../event-hubs/src/connectionContext.ts | 29 +++++++-------- sdk/eventhub/event-hubs/src/eventHubSender.ts | 28 +++++++-------- sdk/eventhub/event-hubs/src/eventProcessor.ts | 4 +-- sdk/eventhub/event-hubs/src/partitionPump.ts | 8 ++--- .../test/eventHubConsumerClient.spec.ts | 6 ++-- .../event-hubs/test/eventProcessor.spec.ts | 12 +++---- .../event-hubs/test/eventdata.spec.ts | 2 +- .../test/loadBalancingStrategy.spec.ts | 20 +++++------ sdk/eventhub/event-hubs/test/misc.spec.ts | 6 ---- sdk/eventhub/event-hubs/test/sender.spec.ts | 36 ++++++++----------- 10 files changed, 66 insertions(+), 85 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index 0d77bfffc617..eb38a2d9e5ea 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -395,35 +395,32 @@ export namespace ConnectionContext { connection.on(ConnectionEvents.error, error); } - function cleanConnectionContext(connectionContext: ConnectionContext): Promise { + function cleanConnectionContext(context: ConnectionContext): Promise { // Remove listeners from the connection object. - connectionContext.connection.removeListener( - ConnectionEvents.connectionOpen, - onConnectionOpen - ); - connectionContext.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); - connectionContext.connection.removeListener(ConnectionEvents.protocolError, protocolError); - connectionContext.connection.removeListener(ConnectionEvents.error, error); + context.connection.removeListener(ConnectionEvents.connectionOpen, onConnectionOpen); + context.connection.removeListener(ConnectionEvents.disconnected, onDisconnected); + context.connection.removeListener(ConnectionEvents.protocolError, protocolError); + context.connection.removeListener(ConnectionEvents.error, error); // Close the connection - return connectionContext.connection.close(); + return context.connection.close(); } - async function refreshConnection(connectionContext: ConnectionContext): Promise { - const originalConnectionId = connectionContext.connectionId; + async function refreshConnection(context: ConnectionContext): Promise { + const originalConnectionId = context.connectionId; try { - await cleanConnectionContext(connectionContext); + await cleanConnectionContext(context); } catch (err) { logger.verbose( - `[${connectionContext.connectionId}] There was an error closing the connection before reconnecting: %O`, + `[${context.connectionId}] There was an error closing the connection before reconnecting: %O`, err ); } // Create a new connection, id, locks, and cbs client. - connectionContext.refreshConnection(); - addConnectionListeners(connectionContext.connection); + context.refreshConnection(); + addConnectionListeners(context.connection); logger.verbose( - `The connection "${originalConnectionId}" has been updated to "${connectionContext.connectionId}".` + `The connection "${originalConnectionId}" has been updated to "${context.connectionId}".` ); } diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 115d93c7f12f..7cab6a5fae26 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -89,8 +89,8 @@ export class EventHubSender extends LinkEntity { this.address = context.config.getSenderAddress(partitionId); this.audience = context.config.getSenderAudience(partitionId); - this._onAmqpError = (context: EventContext) => { - const senderError = context.sender && context.sender.error; + this._onAmqpError = (eventContext: EventContext) => { + const senderError = eventContext.sender && eventContext.sender.error; logger.verbose( "[%s] 'sender_error' event occurred on the sender '%s' with address '%s'. " + "The associated error is: %O", @@ -102,8 +102,8 @@ export class EventHubSender extends LinkEntity { // TODO: Consider rejecting promise in trySendBatch() or createBatch() }; - this._onSessionError = (context: EventContext) => { - const sessionError = context.session && context.session.error; + this._onSessionError = (eventContext: EventContext) => { + const sessionError = eventContext.session && eventContext.session.error; logger.verbose( "[%s] 'session_error' event occurred on the session of sender '%s' with address '%s'. " + "The associated error is: %O", @@ -115,8 +115,8 @@ export class EventHubSender extends LinkEntity { // TODO: Consider rejecting promise in trySendBatch() or createBatch() }; - this._onAmqpClose = async (context: EventContext) => { - const sender = this._sender || context.sender!; + this._onAmqpClose = async (eventContext: EventContext) => { + const sender = this._sender || eventContext.sender!; logger.verbose( "[%s] 'sender_close' event occurred on the sender '%s' with address '%s'. " + "Value for isItselfClosed on the receiver is: '%s' " + @@ -140,8 +140,8 @@ export class EventHubSender extends LinkEntity { } }; - this._onSessionClose = async (context: EventContext) => { - const sender = this._sender || context.sender!; + this._onSessionClose = async (eventContext: EventContext) => { + const sender = this._sender || eventContext.sender!; logger.verbose( "[%s] 'session_close' event occurred on the session of sender '%s' with address '%s'. " + "Value for isSessionItselfClosed on the session is: '%s' " + @@ -322,9 +322,9 @@ export class EventHubSender extends LinkEntity { const messages: RheaMessage[] = []; // Convert EventData to RheaMessage. for (let i = 0; i < events.length; i++) { - const message = toRheaMessage(events[i], partitionKey); - message.body = defaultDataTransformer.encode(events[i].body); - messages[i] = message; + const rheaMessage = toRheaMessage(events[i], partitionKey); + rheaMessage.body = defaultDataTransformer.encode(events[i].body); + messages[i] = rheaMessage; } // Encode every amqp message and then convert every encoded message to amqp data section const batchMessage: RheaMessage = { @@ -391,11 +391,11 @@ export class EventHubSender extends LinkEntity { * We have implemented a synchronous send over here in the sense that we shall be waiting * for the message to be accepted or rejected and accordingly resolve or reject the promise. * @hidden - * @param message The message to be sent to EventHub. + * @param rheaMessage The message to be sent to EventHub. * @returns Promise */ private _trySendBatch( - message: RheaMessage | Buffer, + rheaMessage: RheaMessage | Buffer, options: SendOptions & EventHubProducerOptions = {} ): Promise { const abortSignal: AbortSignalLike | undefined = options.abortSignal; @@ -496,7 +496,7 @@ export class EventHubSender extends LinkEntity { } try { this._sender!.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000; - const delivery = await this._sender!.send(message, undefined, 0x80013700); + const delivery = await this._sender!.send(rheaMessage, undefined, 0x80013700); logger.info( "[%s] Sender '%s', sent message with delivery id: %d", this._context.connectionId, diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 175b6aafa2e3..45dedea78c09 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -531,9 +531,9 @@ export class EventProcessor { /* no-op */ } }); - } catch (err) { + } catch (errorFromUser) { logger.verbose( - `[${this._id}] An error was thrown from the user's processError handler: ${err}` + `[${this._id}] An error was thrown from the user's processError handler: ${errorFromUser}` ); } } diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index 95b1e2872cde..a1ba0a852157 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -155,9 +155,9 @@ export class PartitionPump { // forward error to user's processError and swallow errors they may throw try { await this._partitionProcessor.processError(err); - } catch (err) { + } catch (errorFromUser) { // Using verbose over warning because this error is swallowed. - logger.verbose("An error was thrown by user's processError method: ", err); + logger.verbose("An error was thrown by user's processError method: ", errorFromUser); } // close the partition processor if a non-retryable error was encountered @@ -170,11 +170,11 @@ export class PartitionPump { } // this will close the pump and will break us out of the while loop return await this.stop(CloseReason.Shutdown); - } catch (err) { + } catch (errorFromStop) { // Using verbose over warning because this error is swallowed. logger.verbose( `An error occurred while closing the receiver with reason ${CloseReason.Shutdown}: `, - err + errorFromStop ); } } diff --git a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts index 4a8e269a02b9..38a0b7ba2bee 100644 --- a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts @@ -485,7 +485,7 @@ describe("EventHubConsumerClient", () => { let clients: EventHubConsumerClient[]; let producerClient: EventHubProducerClient; let partitionIds: string[]; - const subscriptions: Subscription[] = []; + let subscriptions: Subscription[]; beforeEach(async () => { producerClient = new EventHubProducerClient(service.connectionString!, service.path!, {}); @@ -496,6 +496,7 @@ describe("EventHubConsumerClient", () => { partitionIds.length.should.gte(2); clients = []; + subscriptions = []; }); afterEach(async () => { @@ -513,7 +514,6 @@ describe("EventHubConsumerClient", () => { describe("#close()", function(): void { it("stops any actively running subscriptions", async function(): Promise { - const subscriptions: Subscription[] = []; const client = new EventHubConsumerClient( EventHubConsumerClient.defaultConsumerGroupName, service.connectionString, @@ -717,8 +717,6 @@ describe("EventHubConsumerClient", () => { clients.push(consumerClient1, consumerClient2); - const partitionIds = await consumerClient1.getPartitionIds(); - const partitionHandlerCalls: { [partitionId: string]: { initialize: number; diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 8567c365f680..8555cbb30e05 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -1721,10 +1721,10 @@ describe("Event Processor", function(): void { } // All partitions must be claimed. - const allPartitionsClaimed = + const innerAllPartitionsClaimed = aProcessorPartitions.size + bProcessorPartitions.size === partitionIds.length; - if (!allPartitionsClaimed) { + if (!innerAllPartitionsClaimed) { lastLoopError = { reason: "All partitions not claimed", partitionIds, @@ -1734,7 +1734,7 @@ describe("Event Processor", function(): void { }; } - return allPartitionsClaimed; + return innerAllPartitionsClaimed; } }); } catch (err) { @@ -1888,10 +1888,10 @@ describe("Event Processor", function(): void { } // All partitions must be claimed. - const allPartitionsClaimed = + const innerAllPartitionsClaimed = aProcessorPartitions.size + bProcessorPartitions.size === partitionIds.length; - if (!allPartitionsClaimed) { + if (!innerAllPartitionsClaimed) { lastLoopError = { reason: "All partitions not claimed", partitionIds, @@ -1901,7 +1901,7 @@ describe("Event Processor", function(): void { }; } - return allPartitionsClaimed; + return innerAllPartitionsClaimed; } }); } catch (err) { diff --git a/sdk/eventhub/event-hubs/test/eventdata.spec.ts b/sdk/eventhub/event-hubs/test/eventdata.spec.ts index 66ce16c9e79b..d9aaf851d3d6 100644 --- a/sdk/eventhub/event-hubs/test/eventdata.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventdata.spec.ts @@ -35,12 +35,12 @@ const testSourceEventData: EventData = { properties: properties }; -const testEventData = fromRheaMessage(testMessage); const messageFromED = toRheaMessage(testSourceEventData); describe("EventData", function(): void { describe("fromRheaMessage", function(): void { it("populates body with the message body", function(): void { + const testEventData = fromRheaMessage(testMessage); testEventData.body.should.equal(testBody); }); diff --git a/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts index a9b83acd8049..936c0821fe0e 100644 --- a/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts +++ b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts @@ -203,7 +203,7 @@ describe("LoadBalancingStrategy", () => { // meet the minimum. const partitions = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]; - const lb = new BalancedLoadBalancingStrategy(1000 * 60); + const lbs = new BalancedLoadBalancingStrategy(1000 * 60); // we'll do 4 consumers const initialOwnershipMap = createOwnershipMap({ @@ -222,7 +222,7 @@ describe("LoadBalancingStrategy", () => { "9": "d" }); - const requestedPartitions = lb.getPartitionsToCliam("c", initialOwnershipMap, partitions); + const requestedPartitions = lbs.getPartitionsToCliam("c", initialOwnershipMap, partitions); requestedPartitions.sort(); requestedPartitions.should.deep.equal( @@ -299,7 +299,7 @@ describe("LoadBalancingStrategy", () => { it("honors the partitionOwnershipExpirationIntervalInMs", () => { const intervalInMs = 1000; - const lb = new BalancedLoadBalancingStrategy(intervalInMs); + const lbs = new BalancedLoadBalancingStrategy(intervalInMs); const allPartitions = ["0", "1"]; const ownershipMap = createOwnershipMap({ "0": "b", @@ -307,14 +307,14 @@ describe("LoadBalancingStrategy", () => { }); // At this point, 'a' has its fair share of partitions, and none should be returned. - let partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + let partitionsToOwn = lbs.getPartitionsToCliam("a", ownershipMap, allPartitions); partitionsToOwn.length.should.equal(0, "Expected to not claim any new partitions."); // Change the ownership of partition "0" so it is older than the interval. const ownership = ownershipMap.get("0")!; ownership.lastModifiedTimeInMs = Date.now() - (intervalInMs + 1); // Add 1 to the interval to ensure it has just expired. - partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + partitionsToOwn = lbs.getPartitionsToCliam("a", ownershipMap, allPartitions); partitionsToOwn.should.deep.equal(["0"]); }); }); @@ -483,7 +483,7 @@ describe("LoadBalancingStrategy", () => { // meet the minimum. const partitions = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]; - const lb = new BalancedLoadBalancingStrategy(1000 * 60); + const lbs = new BalancedLoadBalancingStrategy(1000 * 60); // we'll do 4 consumers const initialOwnershipMap = createOwnershipMap({ @@ -502,7 +502,7 @@ describe("LoadBalancingStrategy", () => { "9": "d" }); - const requestedPartitions = lb.getPartitionsToCliam("c", initialOwnershipMap, partitions); + const requestedPartitions = lbs.getPartitionsToCliam("c", initialOwnershipMap, partitions); requestedPartitions.sort(); requestedPartitions.should.deep.equal( @@ -579,7 +579,7 @@ describe("LoadBalancingStrategy", () => { it("honors the partitionOwnershipExpirationIntervalInMs", () => { const intervalInMs = 1000; - const lb = new GreedyLoadBalancingStrategy(intervalInMs); + const lbs = new GreedyLoadBalancingStrategy(intervalInMs); const allPartitions = ["0", "1", "2", "3"]; const ownershipMap = createOwnershipMap({ "0": "b", @@ -587,7 +587,7 @@ describe("LoadBalancingStrategy", () => { }); // At this point, "a" should only grab 1 partition since both "a" and "b" should end up with 2 partitions each. - let partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + let partitionsToOwn = lbs.getPartitionsToCliam("a", ownershipMap, allPartitions); partitionsToOwn.length.should.equal(1, "Expected to claim 1 new partitions."); // Change the ownership of partition "0" so it is older than the interval. @@ -597,7 +597,7 @@ describe("LoadBalancingStrategy", () => { // At this point, "a" should grab partitions 0, 2, and 3. // This is because "b" only owned 1 partition and that claim is expired, // so "a" as treated as if it is the only owner. - partitionsToOwn = lb.getPartitionsToCliam("a", ownershipMap, allPartitions); + partitionsToOwn = lbs.getPartitionsToCliam("a", ownershipMap, allPartitions); partitionsToOwn.sort(); partitionsToOwn.should.deep.equal(["0", "2", "3"]); }); diff --git a/sdk/eventhub/event-hubs/test/misc.spec.ts b/sdk/eventhub/event-hubs/test/misc.spec.ts index ba0fb25f7125..f693c0a92059 100644 --- a/sdk/eventhub/event-hubs/test/misc.spec.ts +++ b/sdk/eventhub/event-hubs/test/misc.spec.ts @@ -335,12 +335,6 @@ describe("Misc tests", function(): void { it("should consistently send messages with partitionkey to a partitionId", async function(): Promise< void > { - const consumerClient = new EventHubConsumerClient( - EventHubConsumerClient.defaultConsumerGroupName, - service.connectionString!, - service.path - ); - const { subscriptionEventHandler, startPosition diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index 719b4935c0b4..d11fd9564368 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -133,14 +133,11 @@ describe("EventHub Sender", function(): void { should.equal(batch.tryAdd({ body: list[1] }), false); // The Mike message will be rejected - it's over the limit. should.equal(batch.tryAdd({ body: list[2] }), true); // Marie should get added"; - const { - subscriptionEventHandler, - startPosition - } = await SubscriptionHandlerForTests.startingFromHere(producerClient); + const { subscriptionEventHandler } = await SubscriptionHandlerForTests.startingFromHere( + producerClient + ); - const subscriber = consumerClient.subscribe("0", subscriptionEventHandler, { - startPosition - }); + const subscriber = consumerClient.subscribe("0", subscriptionEventHandler, { startPosition }); await producerClient.sendBatch(batch); let receivedEvents; @@ -176,14 +173,11 @@ describe("EventHub Sender", function(): void { should.equal(batch.tryAdd({ body: list[0] }), true); should.equal(batch.tryAdd({ body: list[1] }), true); - const { - subscriptionEventHandler, - startPosition - } = await SubscriptionHandlerForTests.startingFromHere(producerClient); + const { subscriptionEventHandler } = await SubscriptionHandlerForTests.startingFromHere( + producerClient + ); - const subscriber = consumerClient.subscribe("0", subscriptionEventHandler, { - startPosition - }); + const subscriber = consumerClient.subscribe("0", subscriptionEventHandler, { startPosition }); await producerClient.sendBatch(batch); let receivedEvents; @@ -217,10 +211,9 @@ describe("EventHub Sender", function(): void { should.equal(batch.tryAdd({ body: list[0] }), true); should.equal(batch.tryAdd({ body: list[1] }), true); - const { - subscriptionEventHandler, - startPosition - } = await SubscriptionHandlerForTests.startingFromHere(producerClient); + const { subscriptionEventHandler } = await SubscriptionHandlerForTests.startingFromHere( + producerClient + ); const subscriber = consumerClient.subscribe(subscriptionEventHandler, { startPosition @@ -511,10 +504,9 @@ describe("EventHub Sender", function(): void { describe("Multiple sendBatch calls", function(): void { it("should be sent successfully in parallel", async function(): Promise { - const { - subscriptionEventHandler, - startPosition - } = await SubscriptionHandlerForTests.startingFromHere(consumerClient); + const { subscriptionEventHandler } = await SubscriptionHandlerForTests.startingFromHere( + consumerClient + ); const promises = []; for (let i = 0; i < 5; i++) { From b0a1d04e255e70765902a8bb38d0cc0ba044ef02 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 14:33:08 -0800 Subject: [PATCH 11/20] [event-hubs] fixes @typescript-eslint/ban-types eslint errors/warnings --- sdk/eventhub/event-hubs/src/connectionContext.ts | 2 +- sdk/eventhub/event-hubs/test/sender.spec.ts | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index eb38a2d9e5ea..5d20810c2aff 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -111,7 +111,7 @@ export interface ConnectionContextOptions extends EventHubClientOptions { /** * Helper type to get the names of all the functions on an object. */ -type FunctionPropertyNames = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T]; +type FunctionPropertyNames = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T]; // eslint-disable-line @typescript-eslint/ban-types /** * Helper type to get the types of all the functions on an object. */ diff --git a/sdk/eventhub/event-hubs/test/sender.spec.ts b/sdk/eventhub/event-hubs/test/sender.spec.ts index d11fd9564368..b4de88c43614 100644 --- a/sdk/eventhub/event-hubs/test/sender.spec.ts +++ b/sdk/eventhub/event-hubs/test/sender.spec.ts @@ -253,7 +253,7 @@ describe("EventHub Sender", function(): void { should.equal(batch.tryAdd(list[2]), true); const receivedEvents: ReceivedEventData[] = []; - let waitUntilEventsReceivedResolver: Function; + let waitUntilEventsReceivedResolver: (value?: any) => void; const waitUntilEventsReceived = new Promise( (resolve) => (waitUntilEventsReceivedResolver = resolve) ); @@ -736,7 +736,7 @@ describe("EventHub Sender", function(): void { it("should be sent successfully", async () => { const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; const receivedEvents: ReceivedEventData[] = []; - let receivingResolver: Function; + let receivingResolver: (value?: unknown) => void; const receivingPromise = new Promise((resolve) => (receivingResolver = resolve)); const subscription = consumerClient.subscribe( @@ -767,7 +767,7 @@ describe("EventHub Sender", function(): void { it("should be sent successfully with partitionKey", async () => { const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; const receivedEvents: ReceivedEventData[] = []; - let receivingResolver: Function; + let receivingResolver: (value?: unknown) => void; const receivingPromise = new Promise((resolve) => (receivingResolver = resolve)); const subscription = consumerClient.subscribe( { @@ -801,7 +801,7 @@ describe("EventHub Sender", function(): void { const partitionId = "0"; const data: EventData[] = [{ body: "Hello World 1" }, { body: "Hello World 2" }]; const receivedEvents: ReceivedEventData[] = []; - let receivingResolver: Function; + let receivingResolver: (value?: unknown) => void; const receivingPromise = new Promise((resolve) => (receivingResolver = resolve)); const subscription = consumerClient.subscribe( partitionId, From 3b1c7226677aa5a354ca09abaae397f62f9c9c8d Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 14:35:10 -0800 Subject: [PATCH 12/20] [event-hubs] fixes @typescript-eslint/no-useless-constructor eslint errors/warnings --- sdk/eventhub/event-hubs/test/partitionPump.spec.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/eventhub/event-hubs/test/partitionPump.spec.ts b/sdk/eventhub/event-hubs/test/partitionPump.spec.ts index 6bc02f895911..d88a4cd402be 100644 --- a/sdk/eventhub/event-hubs/test/partitionPump.spec.ts +++ b/sdk/eventhub/event-hubs/test/partitionPump.spec.ts @@ -22,10 +22,6 @@ describe("PartitionPump", () => { public spanOptions: SpanOptions | undefined; public spanName: string | undefined; - constructor() { - super(); - } - startSpan(nameArg: string, optionsArg?: SpanOptions): TestSpan { this.spanName = nameArg; this.spanOptions = optionsArg; From 81ec0e7853797ab4d39fa24b07adb330324a2945 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 14:37:04 -0800 Subject: [PATCH 13/20] [event-hubs] fixes prefer-const eslint errors/warnings --- sdk/eventhub/event-hubs/src/eventProcessor.ts | 4 +--- sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 45dedea78c09..80c431bf51f0 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -500,9 +500,7 @@ export class EventProcessor { const uniquePartitionsToClaim = new Set(partitionsToClaim); for (const partitionToClaim of uniquePartitionsToClaim) { - let partitionOwnershipRequest: PartitionOwnership; - - partitionOwnershipRequest = this._createPartitionOwnershipRequest( + const partitionOwnershipRequest = this._createPartitionOwnershipRequest( partitionOwnershipMap, partitionToClaim ); diff --git a/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts index 936c0821fe0e..2e862d818566 100644 --- a/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts +++ b/sdk/eventhub/event-hubs/test/loadBalancingStrategy.spec.ts @@ -443,7 +443,7 @@ describe("LoadBalancingStrategy", () => { allPartitions.push(`${i}`); } - let partitionsToOwn = lb.getPartitionsToCliam( + const partitionsToOwn = lb.getPartitionsToCliam( "a", createOwnershipMap({ "0": "", From 19c531f1eca02e60f7183f85a2d35274097cebc6 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 14:59:00 -0800 Subject: [PATCH 14/20] [event-hubs] fixes @typescript-eslint/no-use-before-define eslint errors/warnings --- sdk/eventhub/event-hubs/src/eventHubSender.ts | 4 +- .../test/eventHubConsumerClient.spec.ts | 77 ++++++------ .../event-hubs/test/eventProcessor.spec.ts | 112 +++++++++--------- 3 files changed, 98 insertions(+), 95 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 7cab6a5fae26..a993b4e3ed40 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -419,9 +419,9 @@ export class EventHubSender extends LinkEntity { } const removeListeners = (): void => { - clearTimeout(waitTimer); + clearTimeout(waitTimer); // eslint-disable-line @typescript-eslint/no-use-before-define if (abortSignal) { - abortSignal.removeEventListener("abort", onAborted); + abortSignal.removeEventListener("abort", onAborted); // eslint-disable-line @typescript-eslint/no-use-before-define } }; diff --git a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts index 38a0b7ba2bee..e1ff120d4570 100644 --- a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts @@ -63,6 +63,7 @@ describe("EventHubConsumerClient", () => { let clientWithCheckpointStore: EventHubConsumerClient; let subscriptionHandlers: SubscriptionEventHandlers; let fakeEventProcessor: SinonStubbedInstance; + let validateOptions: (options: FullEventProcessorOptions) => void; const fakeEventProcessorConstructor = ( connectionContext: ConnectionContext, subscriptionEventHandlers: SubscriptionEventHandlers, @@ -78,8 +79,6 @@ describe("EventHubConsumerClient", () => { return fakeEventProcessor; }; - let validateOptions: (options: FullEventProcessorOptions) => void; - beforeEach(() => { fakeEventProcessor = createStubInstance(EventProcessor); @@ -642,36 +641,7 @@ describe("EventHubConsumerClient", () => { ); clients.push(consumerClient1, consumerClient2); - - // keep track of the handlers called on subscription 1 - const handlerCalls = { - initialize: 0, - close: 0 - }; - const subscriptionHandlers1: SubscriptionEventHandlers = { - async processError() { - /* no-op */ - }, - async processEvents() { - if (!handlerCalls.close) { - // start the 2nd subscription that will kick the 1st subscription off - subscription2 = consumerClient2.subscribe(partitionId, subscriptionHandlers2, { - ownerLevel: 1, - maxBatchSize: 1, - maxWaitTimeInSeconds: 1 - }); - } else { - // stop this subscription, we know close was called so we've restarted - await subscription1.close(); - } - }, - async processClose() { - handlerCalls.close++; - }, - async processInitialize() { - handlerCalls.initialize++; - } - }; + let subscription2: Subscription | undefined; const subscriptionHandlers2: SubscriptionEventHandlers = { async processError() { /* no-op */ @@ -681,11 +651,44 @@ describe("EventHubConsumerClient", () => { await subscription2!.close(); } }; - let subscription2: Subscription | undefined; - const subscription1 = consumerClient1.subscribe(partitionId, subscriptionHandlers1, { - maxBatchSize: 1, - maxWaitTimeInSeconds: 1 - }); + + // keep track of the handlers called on subscription 1 + const handlerCalls = { + initialize: 0, + close: 0 + }; + + const subscription1 = consumerClient1.subscribe( + partitionId, + { + async processError() { + /* no-op */ + }, + async processEvents() { + if (!handlerCalls.close) { + // start the 2nd subscription that will kick the 1st subscription off + subscription2 = consumerClient2.subscribe(partitionId, subscriptionHandlers2, { + ownerLevel: 1, + maxBatchSize: 1, + maxWaitTimeInSeconds: 1 + }); + } else { + // stop this subscription, we know close was called so we've restarted + await subscription1.close(); + } + }, + async processClose() { + handlerCalls.close++; + }, + async processInitialize() { + handlerCalls.initialize++; + } + }, + { + maxBatchSize: 1, + maxWaitTimeInSeconds: 1 + } + ); await loopUntil({ maxTimes: 10, diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 8555cbb30e05..cee10f1b397e 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -83,6 +83,62 @@ describe("Event Processor", function(): void { describe("unit tests", () => { describe("_getStartingPosition", () => { + function createEventProcessor( + checkpointStore: CheckpointStore, + startPosition?: FullEventProcessorOptions["startPosition"] + ): EventProcessor { + return new EventProcessor( + EventHubConsumerClient.defaultConsumerGroupName, + consumerClient["_context"], + { + processEvents: async () => { + /* no-op */ + }, + processError: async () => { + /* no-op */ + } + }, + checkpointStore, + { + startPosition, + maxBatchSize: 1, + maxWaitTimeInSeconds: 1, + loadBalancingStrategy: defaultOptions.loadBalancingStrategy, + loopIntervalInMs: defaultOptions.loopIntervalInMs + } + ); + } + + const emptyCheckpointStore = createCheckpointStore([]); + + function createCheckpointStore( + checkpointsForTest: Pick[] + ): CheckpointStore { + return { + claimOwnership: async () => { + return []; + }, + listCheckpoints: async () => { + return checkpointsForTest.map((cp) => { + return { + fullyQualifiedNamespace: "not-used-for-this-test", + consumerGroup: "not-used-for-this-test", + eventHubName: "not-used-for-this-test", + offset: cp.offset, + sequenceNumber: cp.sequenceNumber, + partitionId: cp.partitionId + }; + }); + }, + listOwnership: async () => { + return []; + }, + updateCheckpoint: async () => { + /* no-op */ + } + }; + } + before(() => { consumerClient["_context"].managementSession!.getEventHubProperties = async () => { return Promise.resolve({ @@ -160,62 +216,6 @@ describe("Event Processor", function(): void { const eventPositionForPartitionOne = await processor["_getStartingPosition"]("1"); should.equal(isLatestPosition(eventPositionForPartitionOne), true); }); - - function createEventProcessor( - checkpointStore: CheckpointStore, - startPosition?: FullEventProcessorOptions["startPosition"] - ): EventProcessor { - return new EventProcessor( - EventHubConsumerClient.defaultConsumerGroupName, - consumerClient["_context"], - { - processEvents: async () => { - /* no-op */ - }, - processError: async () => { - /* no-op */ - } - }, - checkpointStore, - { - startPosition, - maxBatchSize: 1, - maxWaitTimeInSeconds: 1, - loadBalancingStrategy: defaultOptions.loadBalancingStrategy, - loopIntervalInMs: defaultOptions.loopIntervalInMs - } - ); - } - - const emptyCheckpointStore = createCheckpointStore([]); - - function createCheckpointStore( - checkpointsForTest: Pick[] - ): CheckpointStore { - return { - claimOwnership: async () => { - return []; - }, - listCheckpoints: async () => { - return checkpointsForTest.map((cp) => { - return { - fullyQualifiedNamespace: "not-used-for-this-test", - consumerGroup: "not-used-for-this-test", - eventHubName: "not-used-for-this-test", - offset: cp.offset, - sequenceNumber: cp.sequenceNumber, - partitionId: cp.partitionId - }; - }); - }, - listOwnership: async () => { - return []; - }, - updateCheckpoint: async () => { - /* no-op */ - } - }; - } }); describe("_handleSubscriptionError", () => { From b7574b20773f4166847370c3ef1e1296123a5747 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 15:02:04 -0800 Subject: [PATCH 15/20] [event-hubs] fixes no-irregular-whitespace eslint errors/warnings --- sdk/eventhub/event-hubs/src/linkEntity.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/src/linkEntity.ts b/sdk/eventhub/event-hubs/src/linkEntity.ts index f41395637c3c..8f95b32aabf5 100644 --- a/sdk/eventhub/event-hubs/src/linkEntity.ts +++ b/sdk/eventhub/event-hubs/src/linkEntity.ts @@ -235,7 +235,7 @@ export class LinkEntity { clearTimeout(this._tokenRenewalTimer as NodeJS.Timer); if (link) { try { - // Closing the link and its underlying sessionĀ if the link is open. This should also + // Closing the link and its underlying session if the link is open. This should also // remove them from the internal map. await link.close(); logger.verbose( From e9180e502540b1993dc02d607ec64e7412f91bbd Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 15:03:56 -0800 Subject: [PATCH 16/20] [event-hubs] fixes no-empty eslint errors/warnings --- sdk/eventhub/event-hubs/src/util/delayWithoutThrow.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/src/util/delayWithoutThrow.ts b/sdk/eventhub/event-hubs/src/util/delayWithoutThrow.ts index 0b4dfcc4e6e9..a37d0c73b8e8 100644 --- a/sdk/eventhub/event-hubs/src/util/delayWithoutThrow.ts +++ b/sdk/eventhub/event-hubs/src/util/delayWithoutThrow.ts @@ -16,5 +16,7 @@ export async function delayWithoutThrow( ): Promise { try { await delay(delayInMs, abortSignal); - } catch {} // swallow AbortError + } catch { + /* no-op to swallow AbortError */ + } } From 340154059892d50d3440a4329c861f95156b47b7 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 15:07:27 -0800 Subject: [PATCH 17/20] [event-hubs] fixes no-return-await eslint errors/warnings --- sdk/eventhub/event-hubs/src/eventHubReceiver.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index 51c5b4b9d9fb..e7f7f5555bd6 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -706,7 +706,7 @@ export class EventHubReceiver extends LinkEntity { // operation has been cancelled, so exit immediately if (abortSignal && abortSignal.aborted) { - return await rejectOnAbort(); + return rejectOnAbort(); } // updates the prefetch count so that the baseConsumer adds From 9aeb8acbd13869f6828a5e2b1f0d5faeaf2ec88f Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 15:08:34 -0800 Subject: [PATCH 18/20] [event-hubs] fixes no-unsafe-finally eslint errors/warnings --- sdk/eventhub/event-hubs/src/eventHubReceiver.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index e7f7f5555bd6..de500c8967b2 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -700,7 +700,7 @@ export class EventHubReceiver extends LinkEntity { try { await this.close(); } finally { - return reject(new AbortError("The receive operation has been cancelled by the user.")); + reject(new AbortError("The receive operation has been cancelled by the user.")); } }; From 3e375a9947d836b2748b7b174ee45f756e9d1b88 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 15:16:55 -0800 Subject: [PATCH 19/20] [event-hubs] fixes promise/always-return eslint errors/warnings --- sdk/eventhub/event-hubs/src/eventHubReceiver.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index de500c8967b2..2b9f11b2256a 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -462,7 +462,10 @@ export class EventHubReceiver extends LinkEntity { await this.abort(); } } catch (err) { - return this._onError === onError && onError(err); + if (this._onError === onError) { + onError(err); + } + return; } } else { logger.verbose( @@ -478,6 +481,7 @@ export class EventHubReceiver extends LinkEntity { 0 ); this._addCredit(creditsToAdd); + return; }) .catch((err) => { // something really unexpected happened, so attempt to call user's error handler From f49241c06c0581092f36f910c99d04a41c937d96 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Tue, 19 Jan 2021 15:19:27 -0800 Subject: [PATCH 20/20] [event-hubs] fixes @azure/azure-sdk/ts-naming-options eslint errors/warnings --- sdk/eventhub/event-hubs/src/eventHubProducerClient.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index 55d6236d3a19..b7157c99d4bf 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -81,7 +81,7 @@ export class EventHubProducerClient { * - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets. * - `userAgent` : A string to append to the built in user agent string that is passed to the service. */ - constructor(connectionString: string, options?: EventHubClientOptions); + constructor(connectionString: string, options?: EventHubClientOptions); // eslint-disable-line @azure/azure-sdk/ts-naming-options /** * The `EventHubProducerClient` class is used to send events to an Event Hub. * Use the `options` parmeter to configure retry policy or proxy settings. @@ -95,7 +95,7 @@ export class EventHubProducerClient { * - `webSocketOptions`: Configures the channelling of the AMQP connection over Web Sockets. * - `userAgent` : A string to append to the built in user agent string that is passed to the service. */ - constructor(connectionString: string, eventHubName: string, options?: EventHubClientOptions); + constructor(connectionString: string, eventHubName: string, options?: EventHubClientOptions); // eslint-disable-line @azure/azure-sdk/ts-naming-options /** * The `EventHubProducerClient` class is used to send events to an Event Hub. * Use the `options` parmeter to configure retry policy or proxy settings. @@ -114,13 +114,13 @@ export class EventHubProducerClient { fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, - options?: EventHubClientOptions + options?: EventHubClientOptions // eslint-disable-line @azure/azure-sdk/ts-naming-options ); constructor( fullyQualifiedNamespaceOrConnectionString1: string, eventHubNameOrOptions2?: string | EventHubClientOptions, credentialOrOptions3?: TokenCredential | EventHubClientOptions, - options4?: EventHubClientOptions + options4?: EventHubClientOptions // eslint-disable-line @azure/azure-sdk/ts-naming-options ) { this._context = createConnectionContext( fullyQualifiedNamespaceOrConnectionString1, @@ -261,7 +261,7 @@ export class EventHubProducerClient { * @throws MessagingError if an error is encountered while sending a message. * @throws Error if the underlying connection or sender has been closed. */ - async sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise; + async sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise; // eslint-disable-line @azure/azure-sdk/ts-naming-options async sendBatch( batch: EventDataBatch | EventData[], options: SendBatchOptions | OperationOptions = {}