diff --git a/x-pack/plugins/alerting/server/alerts_service/lib/create_concrete_write_index.ts b/x-pack/plugins/alerting/server/alerts_service/lib/create_concrete_write_index.ts index 31aface312913..32611f44017fb 100644 --- a/x-pack/plugins/alerting/server/alerts_service/lib/create_concrete_write_index.ts +++ b/x-pack/plugins/alerting/server/alerts_service/lib/create_concrete_write_index.ts @@ -143,7 +143,7 @@ export const createConcreteWriteIndex = async ({ indexPatterns, totalFieldsLimit, }: CreateConcreteWriteIndexOpts) => { - logger.info(`Creating concrete write index - ${indexPatterns.name}`); + logger.info(`Creating data stream - ${indexPatterns.alias}`); // check if a concrete write index already exists let concreteIndices: ConcreteIndexInfo[] = []; @@ -151,21 +151,15 @@ export const createConcreteWriteIndex = async ({ // Specify both the index pattern for the backing indices and their aliases // The alias prevents the request from finding other namespaces that could match the -* pattern const response = await retryTransientEsErrors( - () => - esClient.indices.getAlias({ - index: indexPatterns.pattern, - name: indexPatterns.basePattern, - }), + () => esClient.indices.getDataStream({ name: indexPatterns.alias, expand_wildcards: 'all' }), { logger } ); - concreteIndices = Object.entries(response).flatMap(([index, { aliases }]) => - Object.entries(aliases).map(([aliasName, aliasProperties]) => ({ - index, - alias: aliasName, - isWriteIndex: aliasProperties.is_write_index ?? false, - })) - ); + concreteIndices = response.data_streams.map((dataStream) => ({ + index: dataStream.name, + alias: dataStream.name, + isWriteIndex: true, + })); logger.debug( `Found ${concreteIndices.length} concrete indices for ${ @@ -182,11 +176,14 @@ export const createConcreteWriteIndex = async ({ } } - let concreteWriteIndicesExist = false; + // let concreteWriteIndicesExist = false; + const concreteWriteIndicesExist = concreteIndices.length > 0; + // if a concrete write index already exists, update the underlying mapping if (concreteIndices.length > 0) { await updateIndexMappings({ logger, esClient, totalFieldsLimit, concreteIndices }); + /* const concreteIndicesExist = concreteIndices.some( (index) => index.alias === indexPatterns.alias ); @@ -201,6 +198,7 @@ export const createConcreteWriteIndex = async ({ `Indices matching pattern ${indexPatterns.pattern} exist but none are set as the write index for alias ${indexPatterns.alias}` ); } + */ } // check if a concrete write index already exists @@ -208,23 +206,18 @@ export const createConcreteWriteIndex = async ({ try { await retryTransientEsErrors( () => - esClient.indices.create({ - index: indexPatterns.name, - body: { - aliases: { - [indexPatterns.alias]: { - is_write_index: true, - }, - }, - }, + esClient.indices.createDataStream({ + name: indexPatterns.alias, }), { logger } ); } catch (error) { logger.error(`Error creating concrete write index - ${error.message}`); + throw error; // If the index already exists and it's the write index for the alias, // something else created it so suppress the error. If it's not the write // index, that's bad, throw an error. + /* if (error?.meta?.body?.error?.type === 'resource_already_exists_exception') { const existingIndices = await retryTransientEsErrors( () => esClient.indices.get({ index: indexPatterns.name }), @@ -238,6 +231,7 @@ export const createConcreteWriteIndex = async ({ } else { throw error; } + */ } } }; diff --git a/x-pack/plugins/alerting/server/alerts_service/lib/create_or_update_index_template.ts b/x-pack/plugins/alerting/server/alerts_service/lib/create_or_update_index_template.ts index a17fad2d875ed..b922541724c86 100644 --- a/x-pack/plugins/alerting/server/alerts_service/lib/create_or_update_index_template.ts +++ b/x-pack/plugins/alerting/server/alerts_service/lib/create_or_update_index_template.ts @@ -43,7 +43,8 @@ export const getIndexTemplate = ({ return { name: indexPatterns.template, body: { - index_patterns: [indexPatterns.pattern], + data_stream: { hidden: true }, + index_patterns: [indexPatterns.alias], composed_of: componentTemplateRefs, template: { settings: { @@ -51,7 +52,7 @@ export const getIndexTemplate = ({ hidden: true, 'index.lifecycle': { name: ilmPolicyName, - rollover_alias: indexPatterns.alias, + // rollover_alias: indexPatterns.alias, }, 'index.mapping.total_fields.limit': totalFieldsLimit, }, diff --git a/x-pack/plugins/alerting/server/alerts_service/resource_installer_utils.ts b/x-pack/plugins/alerting/server/alerts_service/resource_installer_utils.ts index eba614e655617..17981ce14e1df 100644 --- a/x-pack/plugins/alerting/server/alerts_service/resource_installer_utils.ts +++ b/x-pack/plugins/alerting/server/alerts_service/resource_installer_utils.ts @@ -40,11 +40,11 @@ export const getIndexTemplateAndPattern = ({ const pattern = `${context}.alerts`; const patternWithNamespace = `${pattern}-${concreteNamespace}`; return { - template: `.alerts-${patternWithNamespace}-index-template`, - pattern: `.internal.alerts-${patternWithNamespace}-*`, + template: `.alerts-${patternWithNamespace}-index-template`, // still used + pattern: `.internal.alerts-${patternWithNamespace}-*`, // no longer used basePattern: `.alerts-${pattern}-*`, - name: `.internal.alerts-${patternWithNamespace}-000001`, - alias: `.alerts-${patternWithNamespace}`, + name: `.internal.alerts-${patternWithNamespace}-000001`, // no longer used + alias: `.alerts-${patternWithNamespace}`, // data stream name ...(secondaryAlias ? { secondaryAlias: `${secondaryAlias}-${concreteNamespace}` } : {}), }; }; diff --git a/x-pack/plugins/rule_registry/server/rule_data_client/rule_data_client.ts b/x-pack/plugins/rule_registry/server/rule_data_client/rule_data_client.ts index 5a652a316cc10..617756109d14a 100644 --- a/x-pack/plugins/rule_registry/server/rule_data_client/rule_data_client.ts +++ b/x-pack/plugins/rule_registry/server/rule_data_client/rule_data_client.ts @@ -6,7 +6,7 @@ */ import { errors } from '@elastic/elasticsearch'; -import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import type * as estypes from '@elastic/elasticsearch/lib/api/types'; import { Either, isLeft } from 'fp-ts/lib/Either'; import { ElasticsearchClient } from '@kbn/core/server'; @@ -118,6 +118,7 @@ export class RuleDataClient implements IRuleDataClient { ...request, index: indexPattern, ignore_unavailable: true, + seq_no_primary_term: true, })) as unknown as ESSearchResponse; } catch (err) { this.options.logger.error(`Error performing search in RuleDataClient - ${err.message}`); @@ -235,13 +236,12 @@ export class RuleDataClient implements IRuleDataClient { bulk: async (request: estypes.BulkRequest) => { try { if (this.clusterClient) { - const requestWithDefaultParameters = { - ...request, - require_alias: true, - index: alias, - }; + addCreateIndexBulkActionDoc(request, alias); + this.options.logger.debug( + `writing bulk data: alias: "${alias}" ${JSON.stringify(request, null, 4)}` + ); - const response = await this.clusterClient.bulk(requestWithDefaultParameters, { + const response = await this.clusterClient.bulk(request, { meta: true, }); @@ -261,3 +261,13 @@ export class RuleDataClient implements IRuleDataClient { }; } } + +function addCreateIndexBulkActionDoc(request: estypes.BulkRequest, alias: string) { + const docs: Array>> = ((request as any).body as any) || []; + + for (let index = 0; index < docs.length; index += 2) { + if (new Set(Object.keys(docs[index])).has('create')) { + docs[index].create._index = alias; + } + } +} diff --git a/x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts b/x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts index ce2570f7e5bcc..d23e53e434a8c 100644 --- a/x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts +++ b/x-pack/plugins/rule_registry/server/utils/create_lifecycle_executor.ts @@ -216,10 +216,14 @@ export const createLifecycleExecutor = `[Rule Registry] Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)` ); - const trackedAlertsDataMap: Record< - string, - { indexName: string; fields: Partial } - > = {}; + interface TrackedAlertData { + indexName: string; + fields: Partial; + seqNo: number | undefined; + primaryTerm: number | undefined; + } + + const trackedAlertsDataMap: Record = {}; if (trackedAlertStates.length) { const result = await fetchExistingAlerts( @@ -230,10 +234,18 @@ export const createLifecycleExecutor = result.forEach((hit) => { const alertInstanceId = hit._source ? hit._source[ALERT_INSTANCE_ID] : void 0; if (alertInstanceId && hit._source) { - trackedAlertsDataMap[alertInstanceId] = { - indexName: hit._index, - fields: hit._source, - }; + if (hit._seq_no == null) { + logger.error(`missing _seq_no on alert instance ${alertInstanceId}`); + } else if (hit._primary_term == null) { + logger.error(`missing _primary_term on alert instance ${alertInstanceId}`); + } else { + trackedAlertsDataMap[alertInstanceId] = { + indexName: hit._index, + fields: hit._source, + seqNo: hit._seq_no, + primaryTerm: hit._primary_term, + }; + } } }); } @@ -308,6 +320,8 @@ export const createLifecycleExecutor = return { indexName: alertData?.indexName, + seqNo: alertData?.seqNo, + primaryTerm: alertData?.primaryTerm, event, flappingHistory, flapping, @@ -334,11 +348,25 @@ export const createLifecycleExecutor = if (allEventsToIndex.length > 0 && writeAlerts) { logger.debug(`[Rule Registry] Preparing to index ${allEventsToIndex.length} alerts.`); + // ? { index: { _id: event[ALERT_UUID]!, _index: indexName, require_alias: false, if_seq_no: 0, if_primary_term: 1 } } await ruleDataClientWriter.bulk({ - body: allEventsToIndex.flatMap(({ event, indexName }) => [ + body: allEventsToIndex.flatMap(({ event, indexName, seqNo, primaryTerm }) => [ indexName - ? { index: { _id: event[ALERT_UUID]!, _index: indexName, require_alias: false } } - : { index: { _id: event[ALERT_UUID]! } }, + ? { + index: { + _id: event[ALERT_UUID]!, + _index: indexName, + if_seq_no: seqNo, + if_primary_term: primaryTerm, + require_alias: false, + }, + } + : { + create: { + _id: event[ALERT_UUID]!, + require_alias: false, + }, + }, event, ]), refresh: 'wait_for',