Skip to content

Commit

Permalink
add return to pool; remove autoconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
mathe42 committed Apr 23, 2022
1 parent 027e4a3 commit c95525e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
48 changes: 40 additions & 8 deletions pool.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import { ConnectConfigWithAuthentication, SendConfig } from "./config.ts";

export class SMTPWorker {
id = 1
#timeout: number;

constructor(
config: ConnectConfigWithAuthentication,
{ timeout = 60000, autoconnect = true } = {},
{ timeout = 60000 } = {},
) {
this.#config = config;
this.#timeout = timeout;
if(autoconnect) {
this.#startup();
}
}
#w!: Worker;
#idleTO: number | null = null;
#idleMode2 = false;
#noCon = true;
#config: ConnectConfigWithAuthentication;

#resolver = new Map<number, {res: (res: any) => void, rej: (err: Error) => void}>()

#startup() {
this.#w = new Worker(new URL("./worker.ts", import.meta.url), {
type: "module",
Expand All @@ -33,7 +33,20 @@ export class SMTPWorker {
// This allowes the deno option so only for pool and worker we need --unstable
} as any);

this.#w.addEventListener("message", (ev: MessageEvent<boolean>) => {
this.#w.addEventListener("message", (ev: MessageEvent<boolean|{__ret: number, res: any, err: any}>) => {
if(typeof ev.data === 'object') {
if('err' in ev.data) {
this.#resolver.get(ev.data.__ret)?.rej(ev.data.err)
}

if('res' in ev.data) {
this.#resolver.get(ev.data.__ret)?.res(ev.data.res)
}

this.#resolver.delete(ev.data.__ret)
return
}

if (ev.data) {
this.#stopIdle();
} else {
Expand Down Expand Up @@ -77,11 +90,26 @@ export class SMTPWorker {
}

public send(mail: SendConfig) {
const myID = this.id
this.id++
this.#stopIdle();
if (this.#noCon) {
this.#startup();
}
this.#w.postMessage(mail);
this.#w.postMessage({
__mail: myID,
mail
});

return new Promise((res, rej) => {
this.#resolver.set(myID, {res, rej})
})
}

close() {
this.#w.terminate()
if(this.#idleTO) clearTimeout(this.#idleTO)

}
}

Expand All @@ -93,7 +121,7 @@ export class SMTPWorkerPool {
{ timeout = 60000, size = 2 } = {}
) {
for (let i = 0; i < size; i++) {
this.pool.push(new SMTPWorker(config, {timeout, autoconnect: i === 0}))
this.pool.push(new SMTPWorker(config, {timeout}))
}
}

Expand All @@ -102,7 +130,11 @@ export class SMTPWorkerPool {
send(mail: SendConfig) {
this.#lastUsed = (this.#lastUsed + 1) % this.pool.length

this.pool[this.#lastUsed].send(mail)
return this.pool[this.#lastUsed].send(mail)
}

close() {
this.pool.forEach(v=>v.close())
}
}

20 changes: 17 additions & 3 deletions worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,28 @@ async function send(config: SendConfig) {

addEventListener("message", async (ev: MessageEvent) => {
if (ev.data.__setup) {
await client.connectTLS(ev.data.__setup);
await client.connect(ev.data.__setup);
cb();
return;
}
if (ev.data.__check_idle) {
postMessage(client.isSending);
return;
}
await readyPromise;
send(ev.data);

if(ev.data.__mail) {
await readyPromise;
try {
const data = await send(ev.data.mail)
postMessage({
__ret: ev.data.__mail,
res: data
})
} catch (ex) {
postMessage({
__ret: ev.data.__mail,
err: ex
})
}
}
});

0 comments on commit c95525e

Please sign in to comment.