Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPH] Clean up logs that use >>>> #19378

Merged
1 commit merged into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions sdk/eventhub/event-processor-host/test/eph.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ describe("EPH", function(): void {
doneCheckpointing = true;
}
} catch (err) {
debug(">>>>>>> An error occurred while checkpointing msg number %d: %O", num, err);
debug("An error occurred while checkpointing msg number %d: %O", num, err);
}
}
};
Expand Down Expand Up @@ -164,7 +164,7 @@ describe("EPH", function(): void {
ehc
.getPartitionIds()
.then((ids) => {
debug(">>> Received partition ids: ", ids);
debug("Test logs: Received partition ids: ", ids);
host = EventProcessorHost.createFromConnectionString(
EventProcessorHost.createHostName(),
storageConnString!,
Expand All @@ -175,24 +175,24 @@ describe("EPH", function(): void {
initialOffset: EventPosition.fromEnqueuedTime(Date.now())
}
);
debug(">>>>> Sending the test message...");
debug("Test logs: Sending the test message...");
ehc
.send({ body: "Test Message", properties: { message_id: msgId } })
.then(() => {
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>>>> Rx message from '%s': '%s'", context.partitionId, data);
debug("Test logs: Rx message from '%s': '%s'", context.partitionId, data);
if (data.properties!.message_id === msgId) {
debug(">>>> Checkpointing the received message...");
debug("Test logs: Checkpointing the received message...");
context
.checkpoint()
.then(() => {
debug(">>>> Checkpoint succesful...");
debug("Test logs: Checkpoint succesful...");
return context["_context"].blobReferenceByPartition[
context.partitionId
].getContent();
})
.then((content) => {
debug(">>>> Seen expected message. New lease contents: %s", content);
debug("Test logs: Seen expected message. New lease contents: %s", content);
const parsed = JSON.parse(content);
parsed.offset.should.eql(data.offset);
})
Expand All @@ -203,7 +203,7 @@ describe("EPH", function(): void {
return host.stop();
})
.then(() => {
debug(">>>> closed the sender and the eph...");
debug("Test logs: closed the sender and the eph...");
return done();
})
.catch((err) => {
Expand Down Expand Up @@ -231,7 +231,7 @@ describe("EPH", function(): void {
const msgId = uuid();
const ehc = EventHubClient.createFromConnectionString(ehConnString!, hubName!);
const leasecontainerName = EventProcessorHost.createHostName("tc");
debug(">>>>> Lease container name: %s", leasecontainerName);
debug("Test logs: Lease container name: %s", leasecontainerName);
async function sendAcrossAllPartitions(
ehc: EventHubClient,
ids: string[]
Expand All @@ -244,12 +244,12 @@ describe("EPH", function(): void {
result.push(ehc.send(data, id));
}
await Promise.all(result);
debug(">>>> Successfully finished sending messages.. %O", idMessage);
debug("Test logs: Successfully finished sending messages.. %O", idMessage);
return idMessage;
}

const ids = await ehc.getPartitionIds();
debug(">>> Received partition ids: ", ids);
debug("Test logs: Received partition ids: ", ids);
host = EventProcessorHost.createFromConnectionString(
"my-eph-1",
storageConnString!,
Expand All @@ -264,14 +264,14 @@ describe("EPH", function(): void {
}
);
await delay(1000);
debug(">>>>> Sending the first set of test messages...");
debug("Test logs: Sending the first set of test messages...");
const firstSend = await sendAcrossAllPartitions(ehc, ids);
let count = 0;
const onMessage: OnReceivedMessage = async (context: PartitionContext, data: EventData) => {
const partitionId = context.partitionId;
debug(">>>>> Rx message from '%s': '%o'", partitionId, data);
debug("Test logs: Rx message from '%s': '%o'", partitionId, data);
if (data.properties!.message_id === firstSend[partitionId].properties!.message_id) {
debug(">>>> Checkpointing the received message...");
debug("Test logs: Checkpointing the received message...");
await context.checkpoint();
count++;
} else {
Expand All @@ -286,29 +286,29 @@ describe("EPH", function(): void {
debug("An error occurred while receiving the message: %O", err);
throw err;
};
debug(">>>> Starting my-eph-1");
debug("Test logs: Starting my-eph-1");
await host.start(onMessage, onError);
while (count < ids.length) {
await delay(10000);
debug(">>>> number of partitionIds: %d, count: %d", ids.length, count);
debug("Test logs: number of partitionIds: %d, count: %d", ids.length, count);
}
await host.stop();

debug(
">>>> Restarting the same host. This time the initial offset should be ignored, and " +
"Test logs: Restarting the same host. This time the initial offset should be ignored, and " +
"the EventPosition should be from the checkpointed offset.."
);
debug(">>>>> Sending the second set of test messages...");
debug("Test logs: Sending the second set of test messages...");
const secondSend = await sendAcrossAllPartitions(ehc, ids);
let count2 = 0;
const onMessage2: OnReceivedMessage = async (
context: PartitionContext,
data: EventData
) => {
const partitionId = context.partitionId;
debug(">>>>> Rx message from '%s': '%s'", partitionId, data);
debug("Test logs: Rx message from '%s': '%s'", partitionId, data);
if (data.properties!.message_id === secondSend[partitionId].properties!.message_id) {
debug(">>>> Checkpointing the received message...");
debug("Test logs: Checkpointing the received message...");
await context.checkpoint();
count2++;
} else {
Expand All @@ -323,13 +323,13 @@ describe("EPH", function(): void {
debug("An error occurred while receiving the message: %O", err);
throw err;
};
debug(">>>> Starting my-eph-2");
debug("Test logs: Starting my-eph-2");
await host.start(onMessage2, onError2);
while (count2 < ids.length) {
await delay(10000);
debug(">>>> number of partitionIds: %d, count: %d", ids.length, count);
debug("Test logs: number of partitionIds: %d, count: %d", ids.length, count);
}
debug(">>>>>> sleeping for 10 more seconds....");
debug("Test logs: sleeping for 10 more seconds....");
await delay(10000);
await host.stop();
await ehc.close();
Expand Down Expand Up @@ -360,7 +360,7 @@ describe("EPH", function(): void {
for (const hostName in hostByName) {
receivingPartitionsByHost[hostName] = hostByName[hostName].receivingFromPartitions;
}
debug(">>> EPH -> Partitions: \n%O", receivingPartitionsByHost);
debug("Test logs: EPH -> Partitions: \n%O", receivingPartitionsByHost);
return receivingPartitionsByHost;
};

Expand All @@ -371,7 +371,7 @@ describe("EPH", function(): void {
};
sendDataByPartition[ids[i]] = data;
await ehc.send(data, ids[i]);
debug(">>> Sent data to partition: %s", ids[i]);
debug("Test logs: Sent data to partition: %s", ids[i]);
}
};

Expand All @@ -390,23 +390,28 @@ describe("EPH", function(): void {
);

const onError: OnReceivedError = (error: Error) => {
debug(`>>> [%s] Received error: %O`, hostName, error);
debug(`Test logs: [%s] Received error: %O`, hostName, error);
throw error;
};
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug(
"Test logs: [%s] Rx message from '%s': '%O'",
hostName,
context.partitionId,
data
);
should.equal(sendDataByPartition[context.partitionId].body, data.body);
};
await hostByName[hostName].start(onMessage, onError);
debug(">>> Sleeping for 8 seconds after starting %s.", hostName);
debug("Test logs: Sleeping for 8 seconds after starting %s.", hostName);
await delay(8000);
debug(
">>> [%s] currently receiving messages from partitions : %o",
"Test logs: [%s] currently receiving messages from partitions : %o",
hostName,
hostByName[hostName].receivingFromPartitions
);
}
debug(">>> Sleeping for another 15 seconds.");
debug("Test logs: Sleeping for another 15 seconds.");
await delay(15000);
const hostToPartition = getReceivingFromPartitionsForAllEph();
for (const host in hostToPartition) {
Expand Down Expand Up @@ -470,7 +475,7 @@ describe("EPH", function(): void {
}
);
const partitionInfo = await host.getPartitionInformation("0");
debug(">>> partitionInfo: %o", partitionInfo);
debug("Test logs: partitionInfo: %o", partitionInfo);
partitionInfo.partitionId.should.equal("0");
partitionInfo.type.should.equal("com.microsoft:partition");
partitionInfo.hubPath.should.equal(hubName);
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-processor-host/test/iothub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ describe("EPH with iothub connection string", function(): void {
}
);
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug("Test logs: [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
};
const onError: OnReceivedError = (err) => {
debug("An error occurred while receiving the message: %O", err);
throw err;
};
const runtimeInfo = await host.getHubRuntimeInformation();
debug(">>>> runtimeInfo: %O", runtimeInfo);
debug("Test logs: runtimeInfo: %O", runtimeInfo);
// tslint:disable-next-line: no-unused-expression
runtimeInfo.createdAt.should.exist;
(typeof runtimeInfo.partitionCount).should.equal("number");
Expand Down
12 changes: 6 additions & 6 deletions sdk/eventhub/event-processor-host/test/negative.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ describe("negative", function(): void {
}
);
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug("Test logs: [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
};
const onError: OnReceivedError = (err) => {
debug("An error occurred while receiving the message: %O", err);
throw err;
};
await host.start(onMessage, onError);
try {
debug(">>> [%s] Trying to start second time.", hostName);
debug("Test logs: [%s] Trying to start second time.", hostName);
await host.start(onMessage, onError);
throw new Error("The second call to start() should have failed.");
} catch (err) {
Expand Down Expand Up @@ -90,7 +90,7 @@ describe("negative", function(): void {
}
);
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug("Test logs: [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
};
const onError: OnReceivedError = (err) => {
debug("An error occurred while receiving the message: %O", err);
Expand All @@ -102,7 +102,7 @@ describe("negative", function(): void {
return Promise.reject(new Error("This statement should not have executed."));
})
.catch((err) => {
debug(">>>>>>> %s", err.action);
debug("Err action: %s", err.action);
err.action.should.equal("Getting PartitionIds");
done();
});
Expand All @@ -120,7 +120,7 @@ describe("negative", function(): void {
}
);
const onMessage: OnReceivedMessage = (context: PartitionContext, data: EventData) => {
debug(">>> [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
debug("Test logs: [%s] Rx message from '%s': '%O'", hostName, context.partitionId, data);
};
const onError: OnReceivedError = (err) => {
debug("An error occurred while receiving the message: %O", err);
Expand All @@ -132,7 +132,7 @@ describe("negative", function(): void {
return Promise.reject(new Error("This statement should not have executed."));
})
.catch((err) => {
debug(">>>>>>> %s", err.action);
debug("Err action: %s", err.action);
err.action.should.equal("Getting PartitionIds");
done();
});
Expand Down