Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jettison generic pool from outbound #3115

Merged
merged 7 commits into from
Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
# node 8, maint. ended 2019-12
# node 10, maint. ended 2021-04
# node 12, maint. ended 2022-04
node-version: [ 14, 16, 18 ]
node-version: [ 16, 18 ]
fail-fast: false
steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 1 addition & 3 deletions Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

### Changes

- dep(generic-pool): remove pooling from outbound #3115
- smtp_client: disable pooling in get_client_plugin, #3113
- fix(smtp_client): update generic-pool getPoolSize() -> pool.size
- fix(smtp_client): handle a crashing new generic-pool timeout error
- fix(smtp_client): add missing `$` char in front of interpolated string
- config/plugins: update name of uribl plugin
- uribl: timeout DNS 1 second before plugin, #3077
Expand All @@ -20,7 +19,6 @@
- style(smtp_client): pass args as objects (was positional)
- style(plugin/\*): transaction guarding #3032
- dep(iconv): removed, declared in haraka-email-message)
- dep(generic-pool): 2.5 -> 3.8 (promises) #3033, #3060
- dep(redis): 3.1 -> 4.1 #3058
- dep(nopt): 5 -> 6.0.0 #3076
- dep(haraka-plugin-fcrdns): 1.0.3 -> 1.1.0 #3076
Expand Down
11 changes: 0 additions & 11 deletions config/outbound.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,3 @@

; received_header (default: "Haraka outbound")
; received_header=Haraka outbound

; pool_timeout: default : 300
; pool_timeout=0

; pool_concurrency_max: default: 10
; set to zero to disable pools
; pool_concurrency_max=0

; pool_waiting_queue_max: default: 20
; set to zero to disable queue length restrictions
; pool_waiting_queue_max=0
6 changes: 0 additions & 6 deletions config/smtp_forward.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ host=localhost
; port to connect to
port=2555
;
; timeout backend connection from pool
;timeout=300
;
; max connections in pool
;max_connections=1000
;
; uncomment to enable TLS to the backend SMTP server
;enable_tls=true
;
Expand Down
6 changes: 0 additions & 6 deletions config/smtp_proxy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ host=localhost
; port to connect to
port=2555
;
; timeout backend connection from pool
;timeout=300
;
; max connections in pool
;max_connections=1000
;
; uncomment to enable TLS to the backend SMTP server
; enable_tls=1
;
Expand Down
28 changes: 0 additions & 28 deletions docs/Outbound.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,34 +74,6 @@ string is attached as a `Received` header to all outbound mail just before it is

Timeout for connecting to remote servers. Default: 30s

* `pool_timeout`

Outbound mail uses "pooled" connections. An unused pool connection will send
a QUIT after this time. Default: 50s

Pooled connections means that a mail to a particular IP address will hold that
connection open and use it the next time it is requested. This helps with
large scale outbound mail. If you don't send lots of mail it is advised to
lower the `pool_timeout` value since it may upset receiving mail servers.

Setting this value to `0` will effectively disable the use of pools. You may
wish to set this if you have a `get_mx` hook that picks outbound servers on
a per-email basis (rather than per-domain).

* `pool_concurrency_max`

Set this to `0` to completely disable the pooling code.

This value determines how many concurrent connections can be made to a single
IP address (destination) in the pool. Default: 10 connections.

* `pool_waiting_queue_max`

Set this to `0` to disable queue length restrictions.

This value determines max amount of connections waited to be processed in pool
of connections to single IP address (destination). Default: 20 connections.

* `local_mx_ok`

Default: false. By default, outbound to a local IP is disabled, to avoid creating
Expand Down
165 changes: 10 additions & 155 deletions outbound/client_pool.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
"use strict";
'use strict';

const generic_pool = require('generic-pool');
const utils = require('haraka-utils');

const sock = require('../line_socket');
const server = require('../server');
const logger = require('../logger');

const obc = require('./config');

function _create_socket (pool_name, port, host, local_addr, is_unix_socket, callback) {
function _create_socket (name, port, host, local_addr, is_unix_socket, callback) {

const socket = is_unix_socket ? sock.connect({path: host}) : sock.connect({port, host, localAddress: local_addr});
socket.__pool_name = pool_name;
socket.name = name;
socket.__uuid = utils.uuid();
socket.setTimeout(obc.cfg.connect_timeout * 1000);
logger.logdebug(`[outbound] created. host: ${host} port: ${port} pool_timeout: ${obc.cfg.pool_timeout}`, { uuid: socket.__uuid });
logger.logdebug(`[outbound] created. host: ${host} port: ${port}`, { uuid: socket.__uuid });
socket.once('connect', () => {
socket.removeAllListeners('error'); // these get added after callback
socket.removeAllListeners('timeout');
Expand All @@ -35,162 +33,19 @@ function _create_socket (pool_name, port, host, local_addr, is_unix_socket, call
});
}

// Separate pools are kept for each set of server attributes.
function get_pool (port, host, local_addr, is_unix_socket, max) {
port = port || 25;
host = host || 'localhost';
const name = `outbound::${port}:${host}:${local_addr}:${obc.cfg.pool_timeout}`;
if (!server.notes.pool) server.notes.pool = {};
if (server.notes.pool[name]) return server.notes.pool[name];

const factory = {

create () {
return new Promise(function (resolve, reject) {
_create_socket(name, port, host, local_addr, is_unix_socket, (err, socket) => {
if (err) return reject(err)
resolve(socket)
})
})
},

validate () {
return new Promise(function (resolve) {
resolve(socket => socket.__fromPool && socket.writable)
})
},

destroy (socket) {
return new Promise(function (resolve) {
logger.logdebug(`[outbound] destroying pool entry ${socket.__uuid} for ${host}:${port}`);
socket.removeAllListeners();
socket.__fromPool = false;
socket.on('line', line => {
// Just assume this is a valid response
logger.logprotocol(`[outbound] S: ${line}`);
});
socket.once('error', err => {
logger.logwarn(`[outbound] Socket got an error while shutting down: ${err}`);
});
socket.once('end', () => {
logger.loginfo("[outbound] Remote end half closed during destroy()");
socket.destroy();
})
if (socket.writable) {
logger.logprotocol(`[outbound] [${socket.__uuid}] C: QUIT`);
socket.write("QUIT\r\n");
}
socket.end(); // half close
resolve()
})
},
}

const opts = {
max: max || 10,
idleTimeoutMillis: obc.cfg.pool_timeout * 1000,
acquireTimeoutMillis: 10000 // temporary fix for #3100
}
const pool = generic_pool.createPool(factory, opts);
server.notes.pool[name] = pool;

return pool;
}

// Get a socket for the given attributes.
exports.get_client = (port, host, local_addr, is_unix_socket, callback) => {
if (obc.cfg.pool_concurrency_max == 0) {
return _create_socket(null, port, host, local_addr, is_unix_socket, callback);
}

const pool = get_pool(port, host, local_addr, is_unix_socket, obc.cfg.pool_concurrency_max);
if (obc.cfg.pool_waiting_queue_max != 0 && pool.pending >= obc.cfg.pool_waiting_queue_max) {
return callback("Too many waiting clients for pool", null);
}
port = port || 25;
host = host || 'localhost';
const name = `outbound::${port}:${host}:${local_addr}`;

pool.acquire().then(socket => {
socket.__acquired = true;
logger.loginfo(`[outbound] acquired socket ${socket.__uuid} for ${socket.__pool_name}`);
callback(null, socket);
}).catch(callback);
return _create_socket(name, port, host, local_addr, is_unix_socket, callback)
}

exports.release_client = (socket, port, host, local_addr, error) => {
logger.logdebug(`[outbound] release_client: ${socket.__uuid} ${host}:${port} to ${local_addr}`);

const name = socket.__pool_name;

if (!name && obc.cfg.pool_concurrency_max == 0) {
return sockend();
}

if (!socket.__acquired) {
logger.logwarn(`Release an un-acquired socket. Stack: ${(new Error()).stack}`);
return;
}
socket.__acquired = false;

if (!(server.notes && server.notes.pool)) {
logger.logcrit(`[outbound] Releasing a pool (${name}) that doesn't exist!`);
return;
}
const pool = server.notes.pool[name];
if (!pool) {
logger.logcrit(`[outbound] Releasing a pool (${name}) that doesn't exist!`);
return;
}

if (error) return sockend();

if (obc.cfg.pool_timeout == 0) {
logger.loginfo("[outbound] Pool_timeout is zero - shutting it down");
return sockend();
}

for (const event of ['close','error','end','timeout','line']) {
socket.removeAllListeners(event);
}

socket.__fromPool = true;

socket.once('error', err => {
logger.logwarn(`[outbound] Socket [${name}] in pool got an error: ${err}`);
sockend();
});

socket.once('end', () => {
logger.loginfo(`[outbound] Socket [${name}] in pool got FIN`);
socket.writable = false;
sockend();
});

pool.release(socket);

function sockend () {
socket.__fromPool = false;
if (server.notes.pool && server.notes.pool[name]) {
server.notes.pool[name].destroy(socket);
}
else {
socket.removeAllListeners();
socket.destroy();
}
}
}

exports.drain_pools = () => {
if (!server.notes.pool || Object.keys(server.notes.pool).length == 0) {
return logger.logdebug("[outbound] Drain pools: No pools available");
}
for (const p in server.notes.pool) {
logger.logdebug(`[outbound] Drain pools: Draining SMTP connection pool ${p}`);
server.notes.pool[p].drain(() => {
if (!server.notes.pool[p]) return;
server.notes.pool[p].drain().then(function () {
server.notes.pool[p].clear()
delete server.notes.pool[p];
})
});
}
logger.logdebug("[outbound] Drain pools: Pools shut down");
socket.removeAllListeners();
socket.destroy();
}
9 changes: 0 additions & 9 deletions outbound/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ function load_config () {
if (!cfg.connect_timeout) {
cfg.connect_timeout = 30;
}
if (cfg.pool_timeout === undefined) {
cfg.pool_timeout = 50;
}
if (cfg.pool_concurrency_max === undefined) {
cfg.pool_concurrency_max = 10;
}
if (cfg.pool_waiting_queue_max === undefined) {
cfg.pool_waiting_queue_max = 20;
}
if (!cfg.ipv6_enabled && config.get('outbound.ipv6_enabled')) {
cfg.ipv6_enabled = true;
}
Expand Down
Loading