Skip to content

Commit

Permalink
[EPH] Clean up logs that use >>>> (#19378)
Browse files Browse the repository at this point in the history
Resolves #19360

This PR cleans up the logs in the test files in the `@azure/event-processor-host` package to address the concerns brought up in #19360 around `>>>>>` being the same as git conflict markers.

I am using the prefix `Test logs` instead to differentiate these logs from the logs coming from the actual client code unless we are logging errors

Please note that the `@azure/event-processor-host` package is actually deprecated at this moment
  • Loading branch information
ramya-rao-a authored Dec 15, 2021
1 parent c3e805d commit ae4c1d8
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 39 deletions.
67 changes: 36 additions & 31 deletions sdk/eventhub/event-processor-host/test/eph.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ describe("EPH", function(): void {
doneCheckpointing = true;
}
} catch (err) {
debug(">>>>>>> An error occurred while checkpointing msg number %d: %O", num, err);
debug("An error occurred while checkpointing msg number %d: %O", num, err);
}
}
};
Expand Down Expand Up @@ -164,7 +164,7 @@ describe("EPH", function(): void {
ehc
.getPartitionIds()
.then((ids) => {
debug(">>> Received partition ids: ", ids);
debug("Test logs: Received partition ids: ", ids);
host = EventProcessorHost.createFromConnectionString(
EventProcessorHost.createHostName(),
storageConnString!,
Expand All @@ -175,24 +175,24 @@ describe("EPH", function(): void {
initialOffset: EventPosition.fromEnqueuedTime(Date.now())
}
);
debug(">>>>> Sending the test message...");
debug("Test logs: Sending the test message...");
ehc
.send({ body: "Test Message", properties: { message_id: msgId } })
.then(() => {
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>>>> Rx message from '%s': '%s'", context.partitionId, data);
debug("Test logs: Rx message from '%s': '%s'", context.partitionId, data);
if (data.properties!.message_id === msgId) {
debug(">>>> Checkpointing the received message...");
debug("Test logs: Checkpointing the received message...");
context
.checkpoint()
.then(() => {
debug(">>>> Checkpoint succesful...");
debug("Test logs: Checkpoint succesful...");
return context["_context"].blobReferenceByPartition[
context.partitionId
].getContent();
})
.then((content) => {
debug(">>>> Seen expected message. New lease contents: %s", content);
debug("Test logs: Seen expected message. New lease contents: %s", content);
const parsed = JSON.parse(content);
parsed.offset.should.eql(data.offset);
})
Expand All @@ -203,7 +203,7 @@ describe("EPH", function(): void {
return host.stop();
})
.then(() => {
debug(">>>> closed the sender and the eph...");
debug("Test logs: closed the sender and the eph...");
return done();
})
.catch((err) => {
Expand Down Expand Up @@ -231,7 +231,7 @@ describe("EPH", function(): void {
const msgId = uuid();
const ehc = EventHubClient.createFromConnectionString(ehConnString!, hubName!);
const leasecontainerName = EventProcessorHost.createHostName("tc");
debug(">>>>> Lease container name: %s", leasecontainerName);
debug("Test logs: Lease container name: %s", leasecontainerName);
async function sendAcrossAllPartitions(
ehc: EventHubClient,
ids: string[]
Expand All @@ -244,12 +244,12 @@ describe("EPH", function(): void {
result.push(ehc.send(data, id));
}
await Promise.all(result);
debug(">>>> Successfully finished sending messages.. %O", idMessage);
debug("Test logs: Successfully finished sending messages.. %O", idMessage);
return idMessage;
}

const ids = await ehc.getPartitionIds();
debug(">>> Received partition ids: ", ids);
debug("Test logs: Received partition ids: ", ids);
host = EventProcessorHost.createFromConnectionString(
"my-eph-1",
storageConnString!,
Expand All @@ -264,14 +264,14 @@ describe("EPH", function(): void {
}
);
await delay(1000);
debug(">>>>> Sending the first set of test messages...");
debug("Test logs: Sending the first set of test messages...");
const firstSend = await sendAcrossAllPartitions(ehc, ids);
let count = 0;
const onMessage: OnReceivedMessage = async (context: PartitionContext, data: EventData) => {
const partitionId = context.partitionId;
debug(">>>>> Rx message from '%s': '%o'", partitionId, data);
debug("Test logs: Rx message from '%s': '%o'", partitionId, data);
if (data.properties!.message_id === firstSend[partitionId].properties!.message_id) {
debug(">>>> Checkpointing the received message...");
debug("Test logs: Checkpointing the received message...");
await context.checkpoint();
count++;
} else {
Expand All @@ -286,29 +286,29 @@ describe("EPH", function(): void {
debug("An error occurred while receiving the message: %O", err);
throw err;
};
debug(">>>> Starting my-eph-1");
debug("Test logs: Starting my-eph-1");
await host.start(onMessage, onError);
while (count < ids.length) {
await delay(10000);
debug(">>>> number of partitionIds: %d, count: %d", ids.length, count);
debug("Test logs: number of partitionIds: %d, count: %d", ids.length, count);
}
await host.stop();

debug(
">>>> Restarting the same host. This time the initial offset should be ignored, and " +
"Test logs: Restarting the same host. This time the initial offset should be ignored, and " +
"the EventPosition should be from the checkpointed offset.."
);
debug(">>>>> Sending the second set of test messages...");
debug("Test logs: Sending the second set of test messages...");
const secondSend = await sendAcrossAllPartitions(ehc, ids);
let count2 = 0;
const onMessage2: OnReceivedMessage = async (
context: PartitionContext,
data: EventData
) => {
const partitionId = context.partitionId;
debug(">>>>> Rx message from '%s': '%s'", partitionId, data);
debug("Test logs: Rx message from '%s': '%s'", partitionId, data);
if (data.properties!.message_id === secondSend[partitionId].properties!.message_id) {
debug(">>>> Checkpointing the received message...");
debug("Test logs: Checkpointing the received message...");
await context.checkpoint();
count2++;
} else {
Expand All @@ -323,13 +323,13 @@ describe("EPH", function(): void {
debug("An error occurred while receiving the message: %O", err);
throw err;
};
debug(">>>> Starting my-eph-2");
debug("Test logs: Starting my-eph-2");
await host.start(onMessage2, onError2);
while (count2 < ids.length) {
await delay(10000);
debug(">>>> number of partitionIds: %d, count: %d", ids.length, count);
debug("Test logs: number of partitionIds: %d, count: %d", ids.length, count);
}
debug(">>>>>> sleeping for 10 more seconds....");
debug("Test logs: sleeping for 10 more seconds....");
await delay(10000);
await host.stop();
await ehc.close();
Expand Down Expand Up @@ -360,7 +360,7 @@ describe("EPH", function(): void {
for (const hostName in hostByName) {
receivingPartitionsByHost[hostName] = hostByName[hostName].receivingFromPartitions;
}
debug(">>> EPH -> Partitions: \n%O", receivingPartitionsByHost);
debug("Test logs: EPH -> Partitions: \n%O", receivingPartitionsByHost);
return receivingPartitionsByHost;
};

Expand All @@ -371,7 +371,7 @@ describe("EPH", function(): void {
};
sendDataByPartition[ids[i]] = data;
await ehc.send(data, ids[i]);
debug(">>> Sent data to partition: %s", ids[i]);
debug("Test logs: Sent data to partition: %s", ids[i]);
}
};

Expand All @@ -390,23 +390,28 @@ describe("EPH", function(): void {
);

const onError: OnReceivedError = (error: Error) => {
debug(`>>> [%s] Received error: %O`, hostName, error);
debug(`Test logs: [%s] Received error: %O`, hostName, error);
throw error;
};
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug(
"Test logs: [%s] Rx message from '%s': '%O'",
hostName,
context.partitionId,
data
);
should.equal(sendDataByPartition[context.partitionId].body, data.body);
};
await hostByName[hostName].start(onMessage, onError);
debug(">>> Sleeping for 8 seconds after starting %s.", hostName);
debug("Test logs: Sleeping for 8 seconds after starting %s.", hostName);
await delay(8000);
debug(
">>> [%s] currently receiving messages from partitions : %o",
"Test logs: [%s] currently receiving messages from partitions : %o",
hostName,
hostByName[hostName].receivingFromPartitions
);
}
debug(">>> Sleeping for another 15 seconds.");
debug("Test logs: Sleeping for another 15 seconds.");
await delay(15000);
const hostToPartition = getReceivingFromPartitionsForAllEph();
for (const host in hostToPartition) {
Expand Down Expand Up @@ -470,7 +475,7 @@ describe("EPH", function(): void {
}
);
const partitionInfo = await host.getPartitionInformation("0");
debug(">>> partitionInfo: %o", partitionInfo);
debug("Test logs: partitionInfo: %o", partitionInfo);
partitionInfo.partitionId.should.equal("0");
partitionInfo.type.should.equal("com.microsoft:partition");
partitionInfo.hubPath.should.equal(hubName);
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-processor-host/test/iothub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ describe("EPH with iothub connection string", function(): void {
}
);
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug("Test logs: [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
};
const onError: OnReceivedError = (err) => {
debug("An error occurred while receiving the message: %O", err);
throw err;
};
const runtimeInfo = await host.getHubRuntimeInformation();
debug(">>>> runtimeInfo: %O", runtimeInfo);
debug("Test logs: runtimeInfo: %O", runtimeInfo);
// tslint:disable-next-line: no-unused-expression
runtimeInfo.createdAt.should.exist;
(typeof runtimeInfo.partitionCount).should.equal("number");
Expand Down
12 changes: 6 additions & 6 deletions sdk/eventhub/event-processor-host/test/negative.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ describe("negative", function(): void {
}
);
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug("Test logs: [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
};
const onError: OnReceivedError = (err) => {
debug("An error occurred while receiving the message: %O", err);
throw err;
};
await host.start(onMessage, onError);
try {
debug(">>> [%s] Trying to start second time.", hostName);
debug("Test logs: [%s] Trying to start second time.", hostName);
await host.start(onMessage, onError);
throw new Error("The second call to start() should have failed.");
} catch (err) {
Expand Down Expand Up @@ -90,7 +90,7 @@ describe("negative", function(): void {
}
);
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug("Test logs: [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
};
const onError: OnReceivedError = (err) => {
debug("An error occurred while receiving the message: %O", err);
Expand All @@ -102,7 +102,7 @@ describe("negative", function(): void {
return Promise.reject(new Error("This statement should not have executed."));
})
.catch((err) => {
debug(">>>>>>> %s", err.action);
debug("Err action: %s", err.action);
err.action.should.equal("Getting PartitionIds");
done();
});
Expand All @@ -120,7 +120,7 @@ describe("negative", function(): void {
}
);
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug("Test logs: [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
};
const onError: OnReceivedError = (err) => {
debug("An error occurred while receiving the message: %O", err);
Expand All @@ -132,7 +132,7 @@ describe("negative", function(): void {
return Promise.reject(new Error("This statement should not have executed."));
})
.catch((err) => {
debug(">>>>>>> %s", err.action);
debug("Err action: %s", err.action);
err.action.should.equal("Getting PartitionIds");
done();
});
Expand Down

0 comments on commit ae4c1d8

Please sign in to comment.