Skip to content

Commit

Permalink
feat(bounces): Send a delayed notification email if a delivery has no…
Browse files Browse the repository at this point in the history
…t been completed after 3 hours ZMS-56 (zone-eu#363)

* Detect if should send out a delayed delivery notification email (no sending yet)

* Do not defer if MX server was not found

* Send a delayed notification email

* Fixed breaking tests
  • Loading branch information
andris9 authored Jan 19, 2024
1 parent de238ed commit a535e3a
Show file tree
Hide file tree
Showing 10 changed files with 923 additions and 723 deletions.
4 changes: 1 addition & 3 deletions bin/check-bounce.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ process.stdin.on('data', chunk => {
});

process.stdin.on('end', () => {
let str = Buffer.concat(chunks)
.toString()
.trim();
let str = Buffer.concat(chunks).toString().trim();
let bounceInfo = bounces.check(str);
console.log('data : %s', str.replace(/\n/g, '\n' + ' '.repeat(11)));
Object.keys(bounceInfo || {}).forEach(key => {
Expand Down
7 changes: 7 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ module.exports = {
},
disableInterfaces: ['forwarder'], // do not bounce messages from this interface
sendingZone: 'bounces',

// Send a warning email about delayed delivery
delayEmail: {
enabled: true,
after: 3 * 3600 * 1000 // 3h
},

zoneConfig: {
// specify zone specific bounce options
myzonename: {
Expand Down
95 changes: 95 additions & 0 deletions lib/bounces.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,101 @@ module.exports.check = (input, category) => {
};
};

module.exports.canSendBounce = (delivery, options) => {
options = options || {};
let logName = options.logName || 'Bounce';

if (delivery.skipBounce) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s as defined by routing',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>'
);
return false;
}

if (/^mailer-daemon@/i.test(delivery.from) || !delivery.from) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to envelope (MAIL FROM=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
JSON.stringify(delivery.from || '')
.replace(/"/g, '')
.trim() || '<>'
);
return false;
}

let xAutoResponseSuppress = delivery.headers.getFirst('X-Auto-Response-Suppress');
if (/\ball\b/i.test(xAutoResponseSuppress)) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to header (%s=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
'X-Auto-Response-Suppress',
JSON.stringify(xAutoResponseSuppress).replace(/"/g, '').trim()
);
return false;
}

let autoSubmitted = delivery.headers.getFirst('Auto-Submitted');
if (/\bauto-(generated|replied)\b/i.test(autoSubmitted)) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to header (%s=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
'Auto-Submitted',
JSON.stringify(autoSubmitted).replace(/"/g, '').trim()
);
return false;
}

let contentType = delivery.headers.getFirst('Content-Type');
if (/^multipart\/report\b/i.test(contentType)) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to header (%s=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
'Content-Type',
'multipart/report'
);
return false;
}

if (delivery.parsedEnvelope && /^mailer-daemon@/i.test(delivery.parsedEnvelope.from)) {
log.info(
logName,
'id=%s %s.%s SKIPBOUNCE Skip bounce to %s due to header (%s=%s)',
delivery.sessionId,
delivery.id,
delivery.seq,
delivery.from || '<>',
'From',
JSON.stringify(delivery.parsedEnvelope.from || '<>')
.replace(/"/g, '')
.trim() || '<>'
);
return false;
}

return true;
};

function formatSMTPResponse(str) {
str = (str || '').toString().trim();
let code = str.match(/^\d{3}[\s-]+([\d.]+\s*)?/);
Expand Down
2 changes: 2 additions & 0 deletions lib/ip-tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class RedisCache {
return callback();
}

log.silly(logKey, 'DNSCACHE SET key=%s value=%s', key, JSON.stringify(value));

db.redis
.multi()
.set('dns:' + key, JSON.stringify(value))
Expand Down
96 changes: 79 additions & 17 deletions lib/mail-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ const QueueLocker = require('./queue-locker');
const TtlCache = require('./ttl-cache');
const crypto = require('crypto');
const plugins = require('./plugins');
const Headers = require('mailsplit').Headers;
const db = require('./db');
const GridFSBucket = require('mongodb').GridFSBucket;
const ObjectId = require('mongodb').ObjectId;
const internalCounters = require('./counters');
const bounces = require('./bounces');
const MailDrop = require('./mail-drop');
const yaml = require('js-yaml');
const fs = require('fs');
const pathlib = require('path');
Expand Down Expand Up @@ -47,6 +50,7 @@ class MailQueue {
this.closing = false;
this.garbageTimer = null;
this.seqIndex = new SeqIndex();
this.maildrop = new MailDrop(this);

this.cache = new TtlCache(); // shared cache for workers
this.locks = new QueueLocker();
Expand Down Expand Up @@ -745,39 +749,46 @@ class MailQueue {
*
* @param {Object} delivery Message object
* @param {Number} ttl TTL in ms. Once this time is over the message is reinserted to queue
* @param {Number} responseData SMTP response or description
* @param {Object} responseData SMTP response or description
* @param {Function} callback Run once the message is removed from active queue
*/
deferDelivery(delivery, ttl, responseData, callback) {
// add metainfo about postponing the delivery
delivery._deferred = delivery._deferred || {
first: Date.now(),
count: 0
};
delivery._deferred.count++;
delivery._deferred.last = Date.now();
delivery._deferred.next = Date.now() + ttl;
delivery._deferred.response = responseData.response;
delivery._deferred.log = responseData.log || delivery._deferred.log;
delivery.queued = new Date(Math.max(delivery._deferred.next, Date.now()));
delivery.locked = false;

const now = Date.now();

let updates = {
$set: {
_deferred: delivery._deferred,
queued: delivery.queued,
'_deferred.last': now,
'_deferred.next': now + ttl,
queued: new Date(now + ttl),
locked: false
},
$inc: {
'_deferred.count': 1
}
};

if (!delivery._deferred) {
updates.$set['_deferred.first'] = now;
}

if (responseData.response) {
updates.$set['_deferred.response'] = responseData.response;
}

if (responseData.log) {
updates.$set['_deferred.log'] = responseData.log;
}

if (responseData.updates && typeof responseData.updates === 'object') {
Object.keys(responseData.updates).forEach(key => {
if (key.charAt(0) === '$') {
if (!['$inc', '$mul'].includes(key)) {
return; // not allowed
}
// $inc etc.
updates[key] = responseData.updates[key];
updates[key] = Object.assign(updates[key] || {}, responseData.updates[key]);
return;
}
if (!updates.$set.hasOwnProperty(key)) {
Expand All @@ -787,13 +798,16 @@ class MailQueue {
}

let collection = this.mongodb.collection(this.options.collection);
collection.updateOne(
collection.findOneAndUpdate(
{
id: delivery.id,
seq: delivery.seq
},
updates,
err => {
{
returnOriginal: true
},
(err, item) => {
if (err) {
return callback(err);
}
Expand All @@ -802,6 +816,54 @@ class MailQueue {
log.verbose('Queue', '%s.%s UNLOCK (key="%s")', delivery.id, delivery.seq, delivery._lock);
this.locks.release(delivery._lock);

if (item && item.value) {
let firstCheck = item.value._deferred && item.value._deferred.first;
let prevLastCheck = item.value._deferred && item.value._deferred.last;
let lastCheck = now;

if (firstCheck && prevLastCheck) {
return this.getMeta(delivery.id, (err, meta) => {
if (err) {
// ignore
log.error('Queue', '%s.%s GET META %s', delivery.id, delivery.seq, err.message);
return callback(null, true);
}

let deliveryEntry = Object.assign(item.value, meta || {});
deliveryEntry.headers = new Headers(deliveryEntry.headers);

deliveryEntry.envelope = {
from: deliveryEntry.from,
to: deliveryEntry.recipient
};

if (!bounces.canSendBounce(deliveryEntry, { logName: 'Queue' })) {
return false;
}

return plugins.handler.runHooks(
'queue:delayed',
[
Object.assign({}, deliveryEntry, responseData),
this.maildrop,
{
first: firstCheck,
prev: prevLastCheck,
last: lastCheck
}
],
err => {
if (err) {
log.error('Queue', '%s.%s queue:delayed %s', deliveryEntry.id, deliveryEntry.seq, err.message);
}

return callback(null, true);
}
);
});
}
}

return callback(null, true);
}
);
Expand Down
6 changes: 4 additions & 2 deletions lib/queue-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class QueueServer {
);
});
}
case 'DEFER':
case 'DEFER': {
if (!client.zone) {
return client.send({
req: data.req,
Expand All @@ -190,6 +190,7 @@ class QueueServer {
deliveryStatusCounter.inc({
status: 'deferred'
});

return this.deferDelivery(client.zone, client.id, data, (err, response) => {
if (!client) {
// client already errored or closed
Expand All @@ -206,11 +207,12 @@ class QueueServer {
response
});
});
}

case 'BOUNCE':
{
bounceCounter.inc();
let bounce = data;
const bounce = data;
bounce.headers = new Headers(bounce.headers || []);
plugins.handler.runHooks(
'queue:bounce',
Expand Down
Loading

0 comments on commit a535e3a

Please sign in to comment.