Skip to content

Commit

Permalink
Add maxIdleSockets and idleSocketTimeout to Elasticsearch config (ela…
Browse files Browse the repository at this point in the history
…stic#142019)

* Add maxIdleSockets and idleSocketTimeout to Elasticsearch config

* Simplify agent manager

* Fix types

* Fix types

* Reduce idleSocketTimeout default to 60s

* Fix tests

* Update docs/setup/settings.asciidoc

* Address review comments

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
2 people authored and WafaaNasr committed Oct 11, 2022
1 parent fb37304 commit 086ddd2
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 56 deletions.
11 changes: 10 additions & 1 deletion docs/setup/settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,18 @@ configuration is effectively ignored when <<csp-strict, `csp.strict`>> is enable
*Default: `true`*

[[elasticsearch-maxSockets]] `elasticsearch.maxSockets`::
The maximum number of sockets that can be used for communications with elasticsearch.
The maximum number of sockets that can be used for communications with {es}.
*Default: `Infinity`*


[[elasticsearch-maxIdleSockets]] `elasticsearch.maxIdleSockets`::
The maximum number of idle sockets to keep open between {kib} and {es}. If more sockets become idle, they will be closed.
*Default: `256`*

[[elasticsearch-idleSocketTimeout]] `elasticsearch.idleSocketTimeout`::
The timeout for idle sockets kept open between {kib} and {es}. If the socket is idle for longer than this timeout, it will be closed. If you have a transparent proxy between {kib} and {es} be sure to set this value lower than or equal to the proxy's timeout.
*Default: `60s`*

`elasticsearch.customHeaders`::
| Header names and values to send to {es}. Any custom headers cannot be
overwritten by client-side headers, regardless of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,27 @@ describe('AgentManager', () => {
expect(httpsAgent).toEqual(mockedHttpsAgent);
});

it('provides Agents with a valid default configuration', () => {
const agentManager = new AgentManager();
const agentFactory = agentManager.getAgentFactory();
agentFactory({ url: new URL('http://elastic-node-1:9200') });
expect(HttpAgent).toBeCalledTimes(1);
expect(HttpAgent).toBeCalledWith({
keepAlive: true,
keepAliveMsecs: 1000,
maxFreeSockets: 256,
maxSockets: 256,
scheduling: 'lifo',
});
});

it('takes into account the provided configurations', () => {
const agentManager = new AgentManager({ maxFreeSockets: 32, maxSockets: 2048 });
const agentManager = new AgentManager();
const agentFactory = agentManager.getAgentFactory({
maxSockets: 1024,
maxTotalSockets: 1024,
scheduling: 'fifo',
});
agentFactory({ url: new URL('http://elastic-node-1:9200') });
expect(HttpAgent).toBeCalledTimes(1);
const agentFactory2 = agentManager.getAgentFactory({
maxFreeSockets: 10,
scheduling: 'lifo',
});
agentFactory2({ url: new URL('http://elastic-node-2:9200') });
expect(HttpAgent).toBeCalledTimes(2);
expect(HttpAgent).toBeCalledWith({
keepAlive: true,
keepAliveMsecs: 1000,
maxFreeSockets: 32,
maxSockets: 1024,
maxTotalSockets: 1024,
scheduling: 'fifo',
});
expect(HttpAgent).toBeCalledWith({
maxFreeSockets: 10,
scheduling: 'lifo',
});
});

it('provides Agents that match the URLs protocol', () => {
Expand All @@ -86,7 +78,7 @@ describe('AgentManager', () => {
expect(HttpsAgent).toHaveBeenCalledTimes(1);
});

it('provides the same Agent iif URLs use the same protocol', () => {
it('provides the same Agent if URLs use the same protocol', () => {
const agentManager = new AgentManager();
const agentFactory = agentManager.getAgentFactory();
const agent1 = agentFactory({ url: new URL('http://elastic-node-1:9200') });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,11 @@
* Side Public License, v 1.
*/

import { Agent as HttpAgent } from 'http';
import { Agent as HttpAgent, type AgentOptions } from 'http';
import { Agent as HttpsAgent } from 'https';
import type { ConnectionOptions, HttpAgentOptions } from '@elastic/elasticsearch';

const HTTPS = 'https:';
const DEFAULT_CONFIG: HttpAgentOptions = {
keepAlive: true,
keepAliveMsecs: 1000,
maxSockets: 256,
maxFreeSockets: 256,
scheduling: 'lifo',
};

export type NetworkAgent = HttpAgent | HttpsAgent;
export type AgentFactory = (connectionOpts: ConnectionOptions) => NetworkAgent;
Expand All @@ -44,11 +37,11 @@ export interface AgentStore {
export class AgentManager implements AgentFactoryProvider, AgentStore {
private agents: Set<HttpAgent>;

constructor(private agentOptions: HttpAgentOptions = DEFAULT_CONFIG) {
constructor() {
this.agents = new Set();
}

public getAgentFactory(agentOptions?: HttpAgentOptions): AgentFactory {
public getAgentFactory(agentOptions?: AgentOptions): AgentFactory {
// a given agent factory always provides the same Agent instances (for the same protocol)
// we keep references to the instances at factory level, to be able to reuse them
let httpAgent: HttpAgent;
Expand All @@ -57,13 +50,7 @@ export class AgentManager implements AgentFactoryProvider, AgentStore {
return (connectionOpts: ConnectionOptions): NetworkAgent => {
if (connectionOpts.url.protocol === HTTPS) {
if (!httpsAgent) {
const config = Object.assign(
{},
DEFAULT_CONFIG,
this.agentOptions,
agentOptions,
connectionOpts.tls
);
const config = Object.assign({}, agentOptions, connectionOpts.tls);
httpsAgent = new HttpsAgent(config);
this.agents.add(httpsAgent);
dereferenceOnDestroy(this.agents, httpsAgent);
Expand All @@ -73,8 +60,7 @@ export class AgentManager implements AgentFactoryProvider, AgentStore {
}

if (!httpAgent) {
const config = Object.assign({}, DEFAULT_CONFIG, this.agentOptions, agentOptions);
httpAgent = new HttpAgent(config);
httpAgent = new HttpAgent(agentOptions);
this.agents.add(httpAgent);
dereferenceOnDestroy(this.agents, httpAgent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const createConfig = (
customHeaders: {},
compression: false,
maxSockets: Infinity,
maxIdleSockets: 300,
idleSocketTimeout: duration(30, 'seconds'),
sniffOnStart: false,
sniffOnConnectionFault: false,
sniffInterval: false,
Expand All @@ -41,15 +43,13 @@ describe('parseClientOptions', () => {
);
});

it('specifies `headers.maxSockets` Infinity and `keepAlive` true by default', () => {
it('specifies `maxTotalSockets` Infinity and `keepAlive` true by default', () => {
const config = createConfig({});

expect(parseClientOptions(config, false, kibanaVersion)).toEqual(
expect(parseClientOptions(config, false, kibanaVersion).agent).toEqual(
expect.objectContaining({
agent: {
keepAlive: true,
maxSockets: Infinity,
},
keepAlive: true,
maxTotalSockets: Infinity,
})
);
});
Expand Down Expand Up @@ -119,18 +119,35 @@ describe('parseClientOptions', () => {
});

describe('`maxSockets` option', () => {
it('uses the specified config value', () => {
it('sets the agent.maxTotalSockets config value', () => {
const options = parseClientOptions(
createConfig({ maxSockets: 1024 }),
false,
kibanaVersion
);
expect(options.agent).toHaveProperty('maxSockets', 1024);
expect(options.agent).toHaveProperty('maxTotalSockets', 1024);
});
});

it('defaults to `Infinity` if not specified by the config', () => {
const options = parseClientOptions(createConfig({}), false, kibanaVersion);
expect(options.agent).toHaveProperty('maxSockets', Infinity);
describe('`maxIdleSockets` option', () => {
it('sets the agent.maxFreeSockets config value', () => {
const options = parseClientOptions(
createConfig({ maxIdleSockets: 1024 }),
false,
kibanaVersion
);
expect(options.agent).toHaveProperty('maxFreeSockets', 1024);
});
});

describe('`idleSocketTimeout` option', () => {
it('sets the agent.timeout config value', () => {
const options = parseClientOptions(
createConfig({ idleSocketTimeout: duration(1000, 's') }),
false,
kibanaVersion
);
expect(options.agent).toHaveProperty('timeout', 1_000_000);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import { ConnectionOptions as TlsConnectionOptions } from 'tls';
import { URL } from 'url';
import { Duration } from 'moment';
import type { ClientOptions, HttpAgentOptions } from '@elastic/elasticsearch';
import type { ClientOptions } from '@elastic/elasticsearch';
import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
import { AgentOptions } from 'https';
import { getDefaultHeaders } from './headers';

export type ParsedClientOptions = Omit<ClientOptions, 'agent'> & { agent: HttpAgentOptions };
export type ParsedClientOptions = Omit<ClientOptions, 'agent'> & { agent: AgentOptions };

/**
* Parse the client options from given client config and `scoped` flag.
Expand All @@ -38,8 +39,10 @@ export function parseClientOptions(
// fixes https://github.com/elastic/kibana/issues/101944
disablePrototypePoisoningProtection: true,
agent: {
maxSockets: config.maxSockets,
maxTotalSockets: config.maxSockets,
keepAlive: config.keepAlive ?? true,
timeout: getDurationAsMs(config.idleSocketTimeout),
maxFreeSockets: config.maxIdleSockets,
},
compression: config.compression,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type { ElasticsearchClientConfig } from '@kbn/core-elasticsearch-server';
import { ClusterClient } from './cluster_client';
import { DEFAULT_HEADERS, getDefaultHeaders } from './headers';
import { AgentManager } from './agent_manager';
import { duration } from 'moment';

const createConfig = (
parts: Partial<ElasticsearchClientConfig> = {}
Expand All @@ -27,6 +28,8 @@ const createConfig = (
sniffOnConnectionFault: false,
sniffInterval: false,
maxSockets: Infinity,
maxIdleSockets: 200,
idleSocketTimeout: duration('30s'),
compression: false,
requestHeadersWhitelist: ['authorization'],
customHeaders: {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ test('set correct defaults', () => {
"hosts": Array [
"http://localhost:9200",
],
"idleSocketTimeout": "PT1M",
"ignoreVersionMismatch": false,
"maxIdleSockets": 256,
"maxSockets": Infinity,
"password": undefined,
"pingTimeout": "PT30S",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export const configSchema = schema.object({
defaultValue: 'http://localhost:9200',
}),
maxSockets: schema.number({ defaultValue: Infinity, min: 1 }),
maxIdleSockets: schema.number({ defaultValue: 256, min: 1 }),
idleSocketTimeout: schema.duration({ defaultValue: '60s' }),
compression: schema.boolean({ defaultValue: false }),
username: schema.maybe(
schema.string({
Expand Down Expand Up @@ -304,6 +306,16 @@ export class ElasticsearchConfig implements IElasticsearchConfig {
*/
public readonly maxSockets: number;

/**
* The maximum number of idle sockets to keep open between Kibana and Elasticsearch. If more sockets become idle, they will be closed.
*/
public readonly maxIdleSockets: number;

/**
* The timeout for idle sockets kept open between Kibana and Elasticsearch. If the socket is idle for longer than this timeout, it will be closed.
*/
public readonly idleSocketTimeout: Duration;

/**
* Whether to use compression for communications with elasticsearch.
*/
Expand Down Expand Up @@ -409,6 +421,8 @@ export class ElasticsearchConfig implements IElasticsearchConfig {
this.serviceAccountToken = rawConfig.serviceAccountToken;
this.customHeaders = rawConfig.customHeaders;
this.maxSockets = rawConfig.maxSockets;
this.maxIdleSockets = rawConfig.maxIdleSockets;
this.idleSocketTimeout = rawConfig.idleSocketTimeout;
this.compression = rawConfig.compression;
this.skipStartupConnectionCheck = rawConfig.skipStartupConnectionCheck;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export interface ElasticsearchClientConfig {
customHeaders: Record<string, string>;
requestHeadersWhitelist: string[];
maxSockets: number;
maxIdleSockets: number;
idleSocketTimeout: Duration;
compression: boolean;
sniffOnStart: boolean;
sniffOnConnectionFault: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ export interface IElasticsearchConfig {
*/
readonly maxSockets: number;

/**
* The maximum number of idle sockets to keep open between Kibana and Elasticsearch. If more sockets become idle, they will be closed.
*/
readonly maxIdleSockets: number;

/**
* The timeout for idle sockets kept open between Kibana and Elasticsearch. If the socket is idle for longer than this timeout, it will be closed.
*/
readonly idleSocketTimeout: Duration;

/**
* Whether to use compression for communications with elasticsearch.
*/
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/monitoring/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ describe('config schema', () => {
"healthCheck": Object {
"delay": "PT2.5S",
},
"idleSocketTimeout": "PT1M",
"ignoreVersionMismatch": false,
"logFetchCount": 10,
"logQueries": false,
"maxIdleSockets": 256,
"maxSockets": Infinity,
"pingTimeout": "PT30S",
"requestHeadersWhitelist": Array [
Expand Down

0 comments on commit 086ddd2

Please sign in to comment.