Skip to content

Commit

Permalink
[RAC] Change index bootstrapping strategy (elastic#113389)
Browse files Browse the repository at this point in the history
* Change index bootstrapping to cater for non-additive changes only
  • Loading branch information
Kerry350 authored and kibanamachine committed Oct 13, 2021
1 parent 291ea9d commit dbb1f1d
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 105 deletions.
5 changes: 4 additions & 1 deletion x-pack/plugins/rule_registry/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import { schema, TypeOf } from '@kbn/config-schema';
import { PluginConfigDescriptor } from 'src/core/server';

export const config: PluginConfigDescriptor = {
deprecations: ({ deprecate }) => [deprecate('enabled', '8.0.0')],
deprecations: ({ deprecate, unused }) => [
deprecate('enabled', '8.0.0'),
unused('unsafe.indexUpgrade.enabled'),
],
schema: schema.object({
enabled: schema.boolean({ defaultValue: true }),
write: schema.object({
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/rule_registry/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ export class RuleRegistryPlugin
logger,
kibanaVersion,
isWriteEnabled: isWriteEnabled(this.config, this.legacyConfig),
isIndexUpgradeEnabled: this.config.unsafe.indexUpgrade.enabled,
getClusterClient: async () => {
const deps = await startDependencies;
return deps.core.elasticsearch.client.asInternalUser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
* 2.0.
*/

import { BulkRequest } from '@elastic/elasticsearch/api/types';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { Either, isLeft } from 'fp-ts/lib/Either';

import { ElasticsearchClient } from 'kibana/server';
import { Logger } from 'kibana/server';
import { IndexPatternsFetcher } from '../../../../../src/plugins/data/server';

import { RuleDataWriteDisabledError } from '../rule_data_plugin_service/errors';
import {
RuleDataWriteDisabledError,
RuleDataWriterInitializationError,
} from '../rule_data_plugin_service/errors';
import { IndexInfo } from '../rule_data_plugin_service/index_info';
import { ResourceInstaller } from '../rule_data_plugin_service/resource_installer';
import { IRuleDataClient, IRuleDataReader, IRuleDataWriter } from './types';
Expand All @@ -22,12 +27,21 @@ interface ConstructorOptions {
isWriteEnabled: boolean;
waitUntilReadyForReading: Promise<WaitResult>;
waitUntilReadyForWriting: Promise<WaitResult>;
logger: Logger;
}

export type WaitResult = Either<Error, ElasticsearchClient>;

export class RuleDataClient implements IRuleDataClient {
constructor(private readonly options: ConstructorOptions) {}
private _isWriteEnabled: boolean = false;

// Writers cached by namespace
private writerCache: Map<string, IRuleDataWriter>;

constructor(private readonly options: ConstructorOptions) {
this.writeEnabled = this.options.isWriteEnabled;
this.writerCache = new Map();
}

public get indexName(): string {
return this.options.indexInfo.baseName;
Expand All @@ -37,8 +51,16 @@ export class RuleDataClient implements IRuleDataClient {
return this.options.indexInfo.kibanaVersion;
}

private get writeEnabled(): boolean {
return this._isWriteEnabled;
}

private set writeEnabled(isEnabled: boolean) {
this._isWriteEnabled = isEnabled;
}

public isWriteEnabled(): boolean {
return this.options.isWriteEnabled;
return this.writeEnabled;
}

public getReader(options: { namespace?: string } = {}): IRuleDataReader {
Expand Down Expand Up @@ -95,62 +117,89 @@ export class RuleDataClient implements IRuleDataClient {
}

public getWriter(options: { namespace?: string } = {}): IRuleDataWriter {
const { indexInfo, resourceInstaller } = this.options;

const namespace = options.namespace || 'default';
const cachedWriter = this.writerCache.get(namespace);

// There is no cached writer, so we'll install / update the namespace specific resources now.
if (!cachedWriter) {
const writerForNamespace = this.initializeWriter(namespace);
this.writerCache.set(namespace, writerForNamespace);
return writerForNamespace;
} else {
return cachedWriter;
}
}

private initializeWriter(namespace: string): IRuleDataWriter {
const isWriteEnabled = () => this.writeEnabled;
const turnOffWrite = () => (this.writeEnabled = false);

const { indexInfo, resourceInstaller } = this.options;
const alias = indexInfo.getPrimaryAlias(namespace);
const isWriteEnabled = this.isWriteEnabled();

const waitUntilReady = async () => {
const result = await this.options.waitUntilReadyForWriting;
if (isLeft(result)) {
throw result.left;
// Wait until both index and namespace level resources have been installed / updated.
const prepareForWriting = async () => {
if (!isWriteEnabled()) {
throw new RuleDataWriteDisabledError();
}

const indexLevelResourcesResult = await this.options.waitUntilReadyForWriting;

if (isLeft(indexLevelResourcesResult)) {
throw new RuleDataWriterInitializationError(
'index',
indexInfo.indexOptions.registrationContext,
indexLevelResourcesResult.left
);
} else {
return result.right;
try {
await resourceInstaller.installAndUpdateNamespaceLevelResources(indexInfo, namespace);
return indexLevelResourcesResult.right;
} catch (e) {
throw new RuleDataWriterInitializationError(
'namespace',
indexInfo.indexOptions.registrationContext,
e
);
}
}
};

return {
bulk: async (request) => {
if (!isWriteEnabled) {
throw new RuleDataWriteDisabledError();
}
const prepareForWritingResult = prepareForWriting();

const clusterClient = await waitUntilReady();
return {
bulk: async (request: BulkRequest) => {
return prepareForWritingResult
.then((clusterClient) => {
const requestWithDefaultParameters = {
...request,
require_alias: true,
index: alias,
};

const requestWithDefaultParameters = {
...request,
require_alias: true,
index: alias,
};

return clusterClient.bulk(requestWithDefaultParameters).then((response) => {
if (response.body.errors) {
if (
response.body.items.length > 0 &&
(response.body.items.every(
(item) => item.index?.error?.type === 'index_not_found_exception'
) ||
response.body.items.every(
(item) => item.index?.error?.type === 'illegal_argument_exception'
))
) {
return resourceInstaller
.installNamespaceLevelResources(indexInfo, namespace)
.then(() => {
return clusterClient.bulk(requestWithDefaultParameters).then((retryResponse) => {
if (retryResponse.body.errors) {
throw new ResponseError(retryResponse);
}
return retryResponse;
});
});
return clusterClient.bulk(requestWithDefaultParameters).then((response) => {
if (response.body.errors) {
const error = new ResponseError(response);
throw error;
}
return response;
});
})
.catch((error) => {
if (error instanceof RuleDataWriterInitializationError) {
this.options.logger.error(error);
this.options.logger.error(
`The writer for the Rule Data Client for the ${indexInfo.indexOptions.registrationContext} registration context was not initialized properly, bulk() cannot continue, and writing will be disabled.`
);
turnOffWrite();
} else if (error instanceof RuleDataWriteDisabledError) {
this.options.logger.debug(`Writing is disabled, bulk() will not write any data.`);
} else {
this.options.logger.error(error);
}
const error = new ResponseError(response);
throw error;
}
return response;
});

return undefined;
});
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ export interface IRuleDataReader {
}

export interface IRuleDataWriter {
bulk(request: BulkRequest): Promise<ApiResponse<BulkResponse>>;
bulk(request: BulkRequest): Promise<ApiResponse<BulkResponse> | undefined>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,24 @@
* 2.0.
*/

/* eslint-disable max-classes-per-file */
export class RuleDataWriteDisabledError extends Error {
constructor(message?: string) {
super(message);
Object.setPrototypeOf(this, new.target.prototype);
this.name = 'RuleDataWriteDisabledError';
}
}

export class RuleDataWriterInitializationError extends Error {
constructor(
resourceType: 'index' | 'namespace',
registrationContext: string,
error: string | Error
) {
super(`There has been a catastrophic error trying to install ${resourceType} level resources for the following registration context: ${registrationContext}.
This may have been due to a non-additive change to the mappings, removal and type changes are not permitted. Full error: ${error.toString()}`);
Object.setPrototypeOf(this, new.target.prototype);
this.name = 'RuleDataWriterInitializationError';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import { ecsComponentTemplate } from '../../common/assets/component_templates/ec
import { defaultLifecyclePolicy } from '../../common/assets/lifecycle_policies/default_lifecycle_policy';

import { IndexInfo } from './index_info';
import { incrementIndexName } from './utils';

const INSTALLATION_TIMEOUT = 20 * 60 * 1000; // 20 minutes

Expand All @@ -29,7 +28,6 @@ interface ConstructorOptions {
getClusterClient: () => Promise<ElasticsearchClient>;
logger: Logger;
isWriteEnabled: boolean;
isIndexUpgradeEnabled: boolean;
}

export class ResourceInstaller {
Expand Down Expand Up @@ -111,12 +109,10 @@ export class ResourceInstaller {
* Installs index-level resources shared between all namespaces of this index:
* - custom ILM policy if it was provided
* - component templates
* - attempts to update mappings of existing concrete indices
*/
public async installIndexLevelResources(indexInfo: IndexInfo): Promise<void> {
await this.installWithTimeout(`resources for index ${indexInfo.baseName}`, async () => {
const { componentTemplates, ilmPolicy } = indexInfo.indexOptions;
const { isIndexUpgradeEnabled } = this.options;

if (ilmPolicy != null) {
await this.createOrUpdateLifecyclePolicy({
Expand All @@ -139,35 +135,30 @@ export class ResourceInstaller {
});
})
);

if (isIndexUpgradeEnabled) {
// TODO: Update all existing namespaced index templates matching this index' base name

await this.updateIndexMappings(indexInfo);
}
});
}

private async updateIndexMappings(indexInfo: IndexInfo) {
private async updateIndexMappings(indexInfo: IndexInfo, namespace: string) {
const { logger } = this.options;

const aliases = indexInfo.basePattern;
const backingIndices = indexInfo.getPatternForBackingIndices();
const backingIndices = indexInfo.getPatternForBackingIndices(namespace);

logger.debug(`Updating mappings of existing concrete indices for ${indexInfo.baseName}`);

// Find all concrete indices for all namespaces of the index.
const concreteIndices = await this.fetchConcreteIndices(aliases, backingIndices);
const concreteWriteIndices = concreteIndices.filter((item) => item.isWriteIndex);

// Update mappings of the found write indices.
await Promise.all(concreteWriteIndices.map((item) => this.updateAliasWriteIndexMapping(item)));
// Update mappings of the found indices.
await Promise.all(concreteIndices.map((item) => this.updateAliasWriteIndexMapping(item)));
}

// NOTE / IMPORTANT: Please note this will update the mappings of backing indices but
// *not* the settings. This is due to the fact settings can be classed as dynamic and static,
// and static updates will fail on an index that isn't closed. New settings *will* be applied as part
// of the ILM policy rollovers. More info: https://github.com/elastic/kibana/pull/113389#issuecomment-940152654
private async updateAliasWriteIndexMapping({ index, alias }: ConcreteIndexInfo) {
const { logger, getClusterClient } = this.options;
const clusterClient = await getClusterClient();

const simulatedIndexMapping = await clusterClient.indices.simulateIndexTemplate({
name: index,
});
Expand All @@ -180,47 +171,21 @@ export class ResourceInstaller {
});
return;
} catch (err) {
if (err.meta?.body?.error?.type !== 'illegal_argument_exception') {
/**
* We skip the rollover if we catch anything except for illegal_argument_exception - that's the error
* returned by ES when the mapping update contains a conflicting field definition (e.g., a field changes types).
* We expect to get that error for some mapping changes we might make, and in those cases,
* we want to continue to rollover the index. Other errors are unexpected.
*/
logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`);
return;
}
const newIndexName = incrementIndexName(index);
if (newIndexName == null) {
logger.error(`Failed to increment write index name for alias: ${alias}`);
return;
}
try {
await clusterClient.indices.rollover({
alias,
new_index: newIndexName,
});
} catch (e) {
/**
* If we catch resource_already_exists_exception, that means that the index has been
* rolled over already — nothing to do for us in this case.
*/
if (e?.meta?.body?.error?.type !== 'resource_already_exists_exception') {
logger.error(`Failed to rollover index for alias ${alias}: ${e.message}`);
}
}
logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`);
throw err;
}
}

// -----------------------------------------------------------------------------------------------
// Namespace-level resources

/**
* Installs resources tied to concrete namespace of an index:
* Installs and updates resources tied to concrete namespace of an index:
* - namespaced index template
* - Index mappings for existing concrete indices
* - concrete index (write target) if it doesn't exist
*/
public async installNamespaceLevelResources(
public async installAndUpdateNamespaceLevelResources(
indexInfo: IndexInfo,
namespace: string
): Promise<void> {
Expand All @@ -230,15 +195,19 @@ export class ResourceInstaller {

logger.info(`Installing namespace-level resources and creating concrete index for ${alias}`);

// Install / update the index template
await this.installNamespacedIndexTemplate(indexInfo, namespace);
// Update index mappings for indices matching this namespace.
await this.updateIndexMappings(indexInfo, namespace);

// If we find a concrete backing index which is the write index for the alias here, we shouldn't
// be making a new concrete index. We return early because we don't need a new write target.
const indexExists = await this.checkIfConcreteWriteIndexExists(indexInfo, namespace);
if (indexExists) {
return;
} else {
await this.createConcreteWriteIndex(indexInfo, namespace);
}

await this.installNamespacedIndexTemplate(indexInfo, namespace);
await this.createConcreteWriteIndex(indexInfo, namespace);
}

private async checkIfConcreteWriteIndexExists(
Expand Down
Loading

0 comments on commit dbb1f1d

Please sign in to comment.