From 537a6c49e7d1ef1746c2caa25a5f6b0633b95272 Mon Sep 17 00:00:00 2001 From: Kawika Avilla Date: Wed, 31 Aug 2022 16:39:11 -0700 Subject: [PATCH] Custom healthcheck with filters (#2232) Enable filtering with custom health checks based on node attributes: ``` opensearch.optimizedHealthcheck.filters: { attribute_key: "attribute_value", } ``` Also, fixes issue that expects the response to array when it was a dictionary. Issue: https://github.com/opensearch-project/OpenSearch-Dashboards/issues/2214 https://github.com/opensearch-project/OpenSearch-Dashboards/issues/2203 Signed-off-by: Kawika Avilla --- config/opensearch_dashboards.yml | 5 +- .../opensearch/opensearch_config.test.ts | 15 +- .../server/opensearch/opensearch_config.ts | 16 +- .../server/opensearch/opensearch_service.ts | 2 +- .../ensure_opensearch_version.test.ts | 137 +++++++++++++++++- .../ensure_opensearch_version.ts | 76 +++++++--- 6 files changed, 218 insertions(+), 33 deletions(-) diff --git a/config/opensearch_dashboards.yml b/config/opensearch_dashboards.yml index 26c11904fa9a..69f1f5d3afcc 100644 --- a/config/opensearch_dashboards.yml +++ b/config/opensearch_dashboards.yml @@ -40,7 +40,10 @@ # This node attribute should assign all nodes of the same cluster an integer value that increments with each new cluster that is spun up # e.g. in opensearch.yml file you would set the value to a setting using node.attr.cluster_id: # Should only be enabled if there is a corresponding node attribute created in your OpenSearch config that matches the value here -#opensearch.optimizedHealthcheckId: "cluster_id" +#opensearch.optimizedHealthcheck.id: "cluster_id" +#opensearch.optimizedHealthcheck.filters: { +# attribute_key: "attribute_value", +#} # If your OpenSearch is protected with basic authentication, these settings provide # the username and password that the OpenSearch Dashboards server uses to perform maintenance on the OpenSearch Dashboards diff --git a/src/core/server/opensearch/opensearch_config.test.ts b/src/core/server/opensearch/opensearch_config.test.ts index afe1c7d28754..2dfaecd76dd7 100644 --- a/src/core/server/opensearch/opensearch_config.test.ts +++ b/src/core/server/opensearch/opensearch_config.test.ts @@ -80,7 +80,7 @@ test('set correct defaults', () => { ], "ignoreVersionMismatch": false, "logQueries": false, - "optimizedHealthcheckId": undefined, + "optimizedHealthcheck": undefined, "password": undefined, "pingTimeout": "PT30S", "requestHeadersWhitelist": Array [ @@ -488,11 +488,20 @@ describe('deprecations', () => { `); }); - it('logs a warning if elasticsearch.optimizedHealthcheckId is set and opensearch.optimizedHealthcheckId is not', () => { + it('logs a warning if elasticsearch.optimizedHealthcheckId is set and opensearch.optimizedHealthcheck.id is not', () => { const { messages } = applyLegacyDeprecations({ optimizedHealthcheckId: '' }); expect(messages).toMatchInlineSnapshot(` Array [ - "\\"elasticsearch.optimizedHealthcheckId\\" is deprecated and has been replaced by \\"opensearch.optimizedHealthcheckId\\"", + "\\"elasticsearch.optimizedHealthcheckId\\" is deprecated and has been replaced by \\"opensearch.optimizedHealthcheck.id\\"", + ] + `); + }); + + it('logs a warning if opensearch.optimizedHealthcheckId is set and opensearch.optimizedHealthcheck.id is not', () => { + const { messages } = applyOpenSearchDeprecations({ optimizedHealthcheckId: '' }); + expect(messages).toMatchInlineSnapshot(` + Array [ + "\\"opensearch.optimizedHealthcheckId\\" is deprecated and has been replaced by \\"opensearch.optimizedHealthcheck.id\\"", ] `); }); diff --git a/src/core/server/opensearch/opensearch_config.ts b/src/core/server/opensearch/opensearch_config.ts index 4bffca8b4b14..e5a0196eb040 100644 --- a/src/core/server/opensearch/opensearch_config.ts +++ b/src/core/server/opensearch/opensearch_config.ts @@ -84,7 +84,14 @@ export const configSchema = schema.object({ requestTimeout: schema.duration({ defaultValue: '30s' }), pingTimeout: schema.duration({ defaultValue: schema.siblingRef('requestTimeout') }), logQueries: schema.boolean({ defaultValue: false }), - optimizedHealthcheckId: schema.maybe(schema.string()), + optimizedHealthcheck: schema.maybe( + schema.object({ + id: schema.string(), + filters: schema.maybe( + schema.recordOf(schema.string(), schema.string(), { defaultValue: {} }) + ), + }) + ), ssl: schema.object( { verificationMode: schema.oneOf( @@ -148,7 +155,8 @@ const deprecations: ConfigDeprecationProvider = ({ renameFromRoot }) => [ renameFromRoot('elasticsearch.requestTimeout', 'opensearch.requestTimeout'), renameFromRoot('elasticsearch.pingTimeout', 'opensearch.pingTimeout'), renameFromRoot('elasticsearch.logQueries', 'opensearch.logQueries'), - renameFromRoot('elasticsearch.optimizedHealthcheckId', 'opensearch.optimizedHealthcheckId'), + renameFromRoot('elasticsearch.optimizedHealthcheckId', 'opensearch.optimizedHealthcheck.id'), + renameFromRoot('opensearch.optimizedHealthcheckId', 'opensearch.optimizedHealthcheck.id'), renameFromRoot('elasticsearch.ssl', 'opensearch.ssl'), renameFromRoot('elasticsearch.apiVersion', 'opensearch.apiVersion'), renameFromRoot('elasticsearch.healthCheck', 'opensearch.healthCheck'), @@ -216,7 +224,7 @@ export class OpenSearchConfig { * Specifies whether Dashboards should only query the local OpenSearch node when * all nodes in the cluster have the same node attribute value */ - public readonly optimizedHealthcheckId?: string; + public readonly optimizedHealthcheck?: OpenSearchConfigType['optimizedHealthcheck']; /** * Hosts that the client will connect to. If sniffing is enabled, this list will @@ -297,7 +305,7 @@ export class OpenSearchConfig { this.ignoreVersionMismatch = rawConfig.ignoreVersionMismatch; this.apiVersion = rawConfig.apiVersion; this.logQueries = rawConfig.logQueries; - this.optimizedHealthcheckId = rawConfig.optimizedHealthcheckId; + this.optimizedHealthcheck = rawConfig.optimizedHealthcheck; this.hosts = Array.isArray(rawConfig.hosts) ? rawConfig.hosts : [rawConfig.hosts]; this.requestHeadersWhitelist = Array.isArray(rawConfig.requestHeadersWhitelist) ? rawConfig.requestHeadersWhitelist diff --git a/src/core/server/opensearch/opensearch_service.ts b/src/core/server/opensearch/opensearch_service.ts index e853dbca6c5e..5d9d2bcf85ac 100644 --- a/src/core/server/opensearch/opensearch_service.ts +++ b/src/core/server/opensearch/opensearch_service.ts @@ -95,7 +95,7 @@ export class OpenSearchService const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({ internalClient: this.client.asInternalUser, - optimizedHealthcheckId: config.optimizedHealthcheckId, + optimizedHealthcheck: config.optimizedHealthcheck, log: this.log, ignoreVersionMismatch: config.ignoreVersionMismatch, opensearchVersionCheckInterval: config.healthCheckDelay.asMilliseconds(), diff --git a/src/core/server/opensearch/version_check/ensure_opensearch_version.test.ts b/src/core/server/opensearch/version_check/ensure_opensearch_version.test.ts index dbbc0f2236ee..99e24f192187 100644 --- a/src/core/server/opensearch/version_check/ensure_opensearch_version.test.ts +++ b/src/core/server/opensearch/version_check/ensure_opensearch_version.test.ts @@ -67,6 +67,35 @@ function createNodes(...versions: string[]): NodesInfo { return { nodes }; } +function createNodesWithAttribute( + targetId: string, + filterId: string, + targetAttributeValue: string, + filterAttributeValue: string, + ...versions: string[] +): NodesInfo { + const nodes = {} as any; + versions + .map((version, i) => { + return { + version, + http: { + publish_address: 'http_address', + }, + ip: 'ip', + attributes: { + cluster_id: i % 2 === 0 ? targetId : filterId, + custom_attribute: i % 2 === 0 ? targetAttributeValue : filterAttributeValue, + }, + }; + }) + .forEach((node, i) => { + nodes[`node-${i}`] = node; + }); + + return { nodes }; +} + describe('mapNodesVersionCompatibility', () => { function createNodesInfoWithoutHTTP(version: string): NodesInfo { return { nodes: { 'node-without-http': { version, ip: 'ip' } } } as any; @@ -180,7 +209,7 @@ describe('pollOpenSearchNodesVersion', () => { pollOpenSearchNodesVersion({ internalClient, - optimizedHealthcheckId: 'cluster_id', + optimizedHealthcheck: { id: 'cluster_id' }, opensearchVersionCheckInterval: 1, ignoreVersionMismatch: false, opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION, @@ -204,7 +233,104 @@ describe('pollOpenSearchNodesVersion', () => { pollOpenSearchNodesVersion({ internalClient, - optimizedHealthcheckId: 'cluster_id', + optimizedHealthcheck: { id: 'cluster_id' }, + opensearchVersionCheckInterval: 1, + ignoreVersionMismatch: false, + opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION, + log: mockLogger, + }) + .pipe(take(1)) + .subscribe({ + next: (result) => { + expect(result).toEqual( + mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false) + ); + }, + complete: done, + error: done, + }); + }); + + it('returns compatibility results and isCompatible=true with filters', (done) => { + expect.assertions(2); + const target = { + id: '0', + attribute: 'foo', + }; + const filter = { + id: '1', + attribute: 'bar', + }; + + // will filter out every odd index + const nodes = createNodesWithAttribute( + target.id, + filter.id, + target.attribute, + filter.attribute, + '5.1.0', + '6.2.0', + '5.1.0', + '5.1.1-Beta1' + ); + + // @ts-expect-error we need to return an incompatible type to use the testScheduler here + internalClient.cluster.state.mockReturnValueOnce({ body: nodes }); + + nodeInfosSuccessOnce(nodes); + + pollOpenSearchNodesVersion({ + internalClient, + optimizedHealthcheck: { id: target.id, filters: { custom_attribute: filter.attribute } }, + opensearchVersionCheckInterval: 1, + ignoreVersionMismatch: false, + opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION, + log: mockLogger, + }) + .pipe(take(1)) + .subscribe({ + next: (result) => { + expect(result).toEqual( + mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false) + ); + expect(result.isCompatible).toBe(true); + }, + complete: done, + error: done, + }); + }); + + it('returns compatibility results and isCompatible=false with filters', (done) => { + expect.assertions(2); + const target = { + id: '0', + attribute: 'foo', + }; + const filter = { + id: '1', + attribute: 'bar', + }; + + // will filter out every odd index + const nodes = createNodesWithAttribute( + target.id, + filter.id, + target.attribute, + filter.attribute, + '5.1.0', + '5.1.0', + '6.2.0', + '5.1.1-Beta1' + ); + + // @ts-expect-error we need to return an incompatible type to use the testScheduler here + internalClient.cluster.state.mockReturnValueOnce({ body: nodes }); + + nodeInfosSuccessOnce(nodes); + + pollOpenSearchNodesVersion({ + internalClient, + optimizedHealthcheck: { id: target.id, filters: { custom_attribute: filter.attribute } }, opensearchVersionCheckInterval: 1, ignoreVersionMismatch: false, opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION, @@ -216,6 +342,7 @@ describe('pollOpenSearchNodesVersion', () => { expect(result).toEqual( mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false) ); + expect(result.isCompatible).toBe(false); }, complete: done, error: done, @@ -233,7 +360,7 @@ describe('pollOpenSearchNodesVersion', () => { pollOpenSearchNodesVersion({ internalClient, - optimizedHealthcheckId: 'cluster_id', + optimizedHealthcheck: { id: 'cluster_id' }, opensearchVersionCheckInterval: 1, ignoreVersionMismatch: false, opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION, @@ -269,7 +396,7 @@ describe('pollOpenSearchNodesVersion', () => { const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({ internalClient, - optimizedHealthcheckId: 'cluster_id', + optimizedHealthcheck: { id: 'cluster_id' }, opensearchVersionCheckInterval: 100, ignoreVersionMismatch: false, opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION, @@ -309,7 +436,7 @@ describe('pollOpenSearchNodesVersion', () => { const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({ internalClient, - optimizedHealthcheckId: 'cluster_id', + optimizedHealthcheck: { id: 'cluster_id' }, opensearchVersionCheckInterval: 10, ignoreVersionMismatch: false, opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION, diff --git a/src/core/server/opensearch/version_check/ensure_opensearch_version.ts b/src/core/server/opensearch/version_check/ensure_opensearch_version.ts index 3f1a88a42cb9..04fb20a988e1 100644 --- a/src/core/server/opensearch/version_check/ensure_opensearch_version.ts +++ b/src/core/server/opensearch/version_check/ensure_opensearch_version.ts @@ -38,6 +38,7 @@ import { timer, of, from, Observable } from 'rxjs'; import { map, distinctUntilChanged, catchError, exhaustMap, mergeMap } from 'rxjs/operators'; import { get } from 'lodash'; +import { ApiResponse } from '@elastic/elasticsearch'; import { opensearchVersionCompatibleWithOpenSearchDashboards, opensearchVersionEqualsOpenSearchDashboards, @@ -47,38 +48,68 @@ import type { OpenSearchClient } from '../client'; /** * Checks if all nodes in the cluster have the same cluster id node attribute - * that is supplied through the healthcheckAttributeName param. This node attribute is configurable - * in opensearch_dashboards.yml. + * that is supplied through the healthcheck param. This node attribute is configurable + * in opensearch_dashboards.yml. It can also filter attributes out by key-value pair. * If all nodes have the same cluster id then we do not fan out the healthcheck and use '_local' node * If there are multiple cluster ids then we use the default fan out behavior * If the supplied node attribute is missing then we return null and use default fan out behavior * @param {OpenSearchClient} internalClient - * @param {string} healthcheckAttributeName + * @param {OptimizedHealthcheck} healthcheck * @returns {string|null} '_local' if all nodes have the same cluster_id, otherwise null */ export const getNodeId = async ( internalClient: OpenSearchClient, - healthcheckAttributeName: string + healthcheck: OptimizedHealthcheck ): Promise => { try { - const state = await internalClient.cluster.state({ + let path = `nodes.*.attributes.${healthcheck.id}`; + const filters = healthcheck.filters; + if (filters) { + Object.keys(filters).forEach((key) => { + path += `,nodes.*.attributes.${key}`; + }); + } + + const state = (await internalClient.cluster.state({ metric: 'nodes', - filter_path: [`nodes.*.attributes.${healthcheckAttributeName}`], - }); + filter_path: [path], + })) as ApiResponse; /* Aggregate different cluster_ids from the OpenSearch nodes * if all the nodes have the same cluster_id, retrieve nodes.info from _local node only * Using _cluster/state/nodes to retrieve the cluster_id of each node from master node which is considered to be a lightweight operation * else if the nodes have different cluster_ids then fan out the request to all nodes * else there are no nodes in the cluster */ - const sharedClusterId = - state.body.nodes.length > 0 - ? get(state.body.nodes[0], `attributes.${healthcheckAttributeName}`, null) - : null; + const nodes = state.body.nodes; + let nodeIds = Object.keys(nodes); + if (nodeIds.length === 0) { + return null; + } + + /* + * If filters are set look for the key and value and filter out any node that matches + * the value for that attribute. + */ + if (filters) { + nodeIds.forEach((id) => { + Object.keys(filters).forEach((key) => { + const attributeValue = get(nodes[id], `attributes.${key}`, null); + if (attributeValue === filters[key]) { + delete nodes[id]; + } + }); + }); + + nodeIds = Object.keys(nodes); + if (nodeIds.length === 0) { + return null; + } + } + + const sharedClusterId = get(nodes[nodeIds[0]], `attributes.${healthcheck.id}`, null); + return sharedClusterId === null || - state.body.nodes.find( - (node: any) => sharedClusterId !== get(node, `attributes.${healthcheckAttributeName}`, null) - ) + nodes.find((node: any) => sharedClusterId !== get(node, `attributes.${healthcheck.id}`, null)) ? null : '_local'; } catch (e) { @@ -88,7 +119,7 @@ export const getNodeId = async ( export interface PollOpenSearchNodesVersionOptions { internalClient: OpenSearchClient; - optimizedHealthcheckId?: string; + optimizedHealthcheck?: OptimizedHealthcheck; log: Logger; opensearchDashboardsVersion: string; ignoreVersionMismatch: boolean; @@ -118,6 +149,13 @@ export interface NodesVersionCompatibility { opensearchDashboardsVersion: string; } +export interface OptimizedHealthcheck { + id?: string; + filters?: { + [key: string]: string; + }; +} + function getHumanizedNodeName(node: NodeInfo) { const publishAddress = node?.http?.publish_address + ' ' || ''; return 'v' + node.version + ' @ ' + publishAddress + '(' + node.ip + ')'; @@ -201,7 +239,7 @@ function compareNodes(prev: NodesVersionCompatibility, curr: NodesVersionCompati export const pollOpenSearchNodesVersion = ({ internalClient, - optimizedHealthcheckId, + optimizedHealthcheck, log, opensearchDashboardsVersion, ignoreVersionMismatch, @@ -214,10 +252,10 @@ export const pollOpenSearchNodesVersion = ({ * Originally, Dashboards queries OpenSearch cluster to get the version info of each node and check the version compatibility with each node. * The /nodes request could fail even one node in cluster fail to response * For better dashboards resilience, the behaviour is changed to only query the local node when all the nodes have the same cluster_id - * Using _cluster/state/nodes to retrieve the cluster_id of each node from the master node + * Using _cluster/state/nodes to retrieve the cluster_id of each node from the cluster manager node */ - if (optimizedHealthcheckId) { - return from(getNodeId(internalClient, optimizedHealthcheckId)).pipe( + if (optimizedHealthcheck) { + return from(getNodeId(internalClient, optimizedHealthcheck)).pipe( mergeMap((nodeId: any) => from( internalClient.nodes.info({