Skip to content

Commit

Permalink
MailBuffer to easily wait for specific mail
Browse files Browse the repository at this point in the history
  • Loading branch information
Timshel committed Sep 3, 2024
1 parent 9335034 commit cfe8a9a
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 0 deletions.
3 changes: 3 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ The `close` and `delete` event subjects are reserved and cannot be used to wait
**iterator(subject): AsyncIterator<Mail>** - Generator to iterate over received email with matching event subject.
Use an internal array to store received email even when not consumming. Don't forget to use `.return()` to close it.

**buffer(subject): MailBuffer** - Return a struct which store received emails.
Then **MailBuffer.next( (Mail) => boolean )** allows to wait for a specific `Mail` independant of the order of arrival.

### Callbacks

**on('new', callback)** - Event called when a new email is received. Callback
Expand Down
67 changes: 67 additions & 0 deletions src/lib/mailbuffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"use strict";

import type { Attachment, Envelope, Mail, ParsedMail } from "./type";
import type { MailServer } from "./mailserver";

interface Next {
filter: (Mail) => boolean;
resolve: (Mail) => any;
reject: (Error) => any;
consume: boolean;
}

export class MailBuffer {
mails: Mail[] = [];
nexts: Next[] = [];

close: () => any;
_receive: (Mail) => any;

constructor(mailServer: MailServer, subject: String) {
this._receive = (mail) => {
this.mails.push(mail);

for (const { filter, resolve, consume, ..._ } of this.nexts) {
const index = this.mails.findIndex(filter);
if (index > -1) {
resolve(this.mails[index]);
if (consume) {
this.mails.splice(index, 1);
}
}
}
};

this.close = () => {
mailServer.removeListener("close", this.close);
mailServer.removeListener(subject, this._receive);

const error = new Error("Closing buffer");
for (const { reject, ..._ } of this.nexts) {
reject(error);
}
};

mailServer.on(subject, this._receive);
mailServer.once("close", this.close);
}

next(filter: (Mail) => boolean, consume: boolean = true): Promise<Mail> {
return new Promise((resolve, reject) => {
const index = this.mails.findIndex(filter);
if (index > -1) {
resolve(this.mails[index]);
if (consume) {
this.mails.splice(index, 1);
}
} else {
this.nexts.push({
filter,
resolve,
reject,
consume,
});
}
});
}
}
10 changes: 10 additions & 0 deletions src/lib/mailserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { ReadStream } from "fs";

import { calculateBcc } from "./helpers/bcc";
import { createOnAuthCallback } from "./helpers/smtp";
import { MailBuffer } from "./mailbuffer";
import { parse as mailParser } from "./mailparser";
import { Outgoing } from "./outgoing";
import { SMTPServer } from "smtp-server";
Expand Down Expand Up @@ -131,6 +132,15 @@ export class MailServer {
return inner(subject);
}

/**
* Return a struct which store received emails.
* Then allow to obtain a `Promise<Mail>` dependant on a predicate `(Mail) => boolean`.
* Allow to wait for `Mail` independant of their order of arrival.
*/
buffer(subject: string): MailBuffer {
return new MailBuffer(this, subject);
}

constructor(
options?: MailServerOptions,
mailEventSubjectMapper: (Mail) => string | undefined = (m) => m.to[0]?.address,
Expand Down
149 changes: 149 additions & 0 deletions test/mailbuffer.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/* global describe, it, before, after */
"use strict";

/**
* MailDev - mailserver.js -- test the mailserver options
*/

const assert = require("assert");
const SMTPConnection = require("nodemailer/lib/smtp-connection");
const MailServer = require("../dist/lib/mailserver").MailServer;
const nodemailer = require("nodemailer");
const port = 9025;

async function createTransporter(port, auth) {
return nodemailer.createTransport({
port: port,
auth,
});
}

describe("MailBuffer", () => {
let mailServer;
let transporter;

before(async () => {
mailServer = new MailServer({
port: port,
auth: { user: "bodhi", pass: "surfing" },
});
await mailServer.listen();

transporter = await createTransporter(port, { type: "login", user: "bodhi", pass: "surfing" });
return transporter.verify();
});

after((done) => {
mailServer.close().finally(() => {
done();
});
});

const emailOpts = {
from: "[email protected]",
to: "[email protected]",
subject: "Test",
html: "Test",
};

function sendMail(subject = emailOpts.subject) {
const mail = {
...emailOpts,
subject,
};
transporter.sendMail(mail);
return subject;
}

it("should resolve when receiving email", async () => {
const buffer = mailServer.buffer(emailOpts.to);
const p = buffer.next((_) => true);

sendMail();

const received = await p;

assert.strictEqual(received.from[0]?.address, emailOpts.from);
assert.strictEqual(received.to[0]?.address, emailOpts.to);
assert.strictEqual(received.subject, emailOpts.subject);

buffer.close();
});

it("should resolve an already received email", async () => {
const buffer = mailServer.buffer(emailOpts.to);

sendMail();
await mailServer.next(emailOpts.to);

const received = await buffer.next((_) => true);

assert.strictEqual(received.from[0]?.address, emailOpts.from);
assert.strictEqual(received.to[0]?.address, emailOpts.to);
assert.strictEqual(received.subject, emailOpts.subject);

buffer.close();
});

it("should resolve out of order mails", async () => {
const buffer = mailServer.buffer(emailOpts.to);

const subject1 = sendMail("Not Dropped1");
const subject2 = sendMail("Not Dropped2");
const subject3 = sendMail("Not Dropped3");
const subject4 = sendMail("Not Dropped4");

const mail4 = await buffer.next((m) => m.subject === subject4);
assert.strictEqual(mail4.subject, subject4);

const mail3 = await buffer.next((m) => m.subject === subject3);
assert.strictEqual(mail3.subject, subject3);

const mail2 = await buffer.next((m) => m.subject === subject2);
assert.strictEqual(mail2.subject, subject2);

const mail1 = await buffer.next((m) => m.subject === subject1);
assert.strictEqual(mail1.subject, subject1);

buffer.close();
});

it("should consume email by default", async () => {
const buffer = mailServer.buffer(emailOpts.to);

sendMail();

const received = await buffer.next((_) => true);
assert.strictEqual(received.subject, emailOpts.subject);

const rejected = buffer.next((_) => true);
buffer.close();
await assert.rejects(rejected);
});

it("should not consume if specified", async () => {
const buffer = mailServer.buffer(emailOpts.to);

sendMail();

const received1 = await buffer.next((_) => true, false);
assert.strictEqual(received1.subject, emailOpts.subject);
const received2 = await buffer.next((_) => true, true);
assert.strictEqual(received2.subject, emailOpts.subject);
const rejected = buffer.next((_) => true);

buffer.close();
await assert.rejects(rejected);
});

it("should reject promises when closing", async () => {
const buffer = mailServer.buffer(emailOpts.to);
const p1 = buffer.next((_) => true);
const p2 = buffer.next((_) => true);

buffer.close();

await assert.rejects(p1);
await assert.rejects(p2);
});
});

0 comments on commit cfe8a9a

Please sign in to comment.