Skip to content

Commit

Permalink
[ResponseOps] change AAD indices/alias to a data stream
Browse files Browse the repository at this point in the history
  • Loading branch information
pmuellr committed Jun 5, 2023
1 parent 9abed02 commit cd1d6de
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,29 +143,23 @@ export const createConcreteWriteIndex = async ({
indexPatterns,
totalFieldsLimit,
}: CreateConcreteWriteIndexOpts) => {
logger.info(`Creating concrete write index - ${indexPatterns.name}`);
logger.info(`Creating data stream - ${indexPatterns.alias}`);

// check if a concrete write index already exists
let concreteIndices: ConcreteIndexInfo[] = [];
try {
// Specify both the index pattern for the backing indices and their aliases
// The alias prevents the request from finding other namespaces that could match the -* pattern
const response = await retryTransientEsErrors(
() =>
esClient.indices.getAlias({
index: indexPatterns.pattern,
name: indexPatterns.basePattern,
}),
() => esClient.indices.getDataStream({ name: indexPatterns.alias, expand_wildcards: 'all' }),
{ logger }
);

concreteIndices = Object.entries(response).flatMap(([index, { aliases }]) =>
Object.entries(aliases).map(([aliasName, aliasProperties]) => ({
index,
alias: aliasName,
isWriteIndex: aliasProperties.is_write_index ?? false,
}))
);
concreteIndices = response.data_streams.map((dataStream) => ({
index: dataStream.name,
alias: dataStream.name,
isWriteIndex: true,
}));

logger.debug(
`Found ${concreteIndices.length} concrete indices for ${
Expand All @@ -182,11 +176,14 @@ export const createConcreteWriteIndex = async ({
}
}

let concreteWriteIndicesExist = false;
// let concreteWriteIndicesExist = false;
const concreteWriteIndicesExist = concreteIndices.length > 0;

// if a concrete write index already exists, update the underlying mapping
if (concreteIndices.length > 0) {
await updateIndexMappings({ logger, esClient, totalFieldsLimit, concreteIndices });

/*
const concreteIndicesExist = concreteIndices.some(
(index) => index.alias === indexPatterns.alias
);
Expand All @@ -201,30 +198,26 @@ export const createConcreteWriteIndex = async ({
`Indices matching pattern ${indexPatterns.pattern} exist but none are set as the write index for alias ${indexPatterns.alias}`
);
}
*/
}

// check if a concrete write index already exists
if (!concreteWriteIndicesExist) {
try {
await retryTransientEsErrors(
() =>
esClient.indices.create({
index: indexPatterns.name,
body: {
aliases: {
[indexPatterns.alias]: {
is_write_index: true,
},
},
},
esClient.indices.createDataStream({
name: indexPatterns.alias,
}),
{ logger }
);
} catch (error) {
logger.error(`Error creating concrete write index - ${error.message}`);
throw error;
// If the index already exists and it's the write index for the alias,
// something else created it so suppress the error. If it's not the write
// index, that's bad, throw an error.
/*
if (error?.meta?.body?.error?.type === 'resource_already_exists_exception') {
const existingIndices = await retryTransientEsErrors(
() => esClient.indices.get({ index: indexPatterns.name }),
Expand All @@ -238,6 +231,7 @@ export const createConcreteWriteIndex = async ({
} else {
throw error;
}
*/
}
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ export const getIndexTemplate = ({
return {
name: indexPatterns.template,
body: {
index_patterns: [indexPatterns.pattern],
data_stream: { hidden: true },
index_patterns: [indexPatterns.alias],
composed_of: componentTemplateRefs,
template: {
settings: {
auto_expand_replicas: '0-1',
hidden: true,
'index.lifecycle': {
name: ilmPolicyName,
rollover_alias: indexPatterns.alias,
// rollover_alias: indexPatterns.alias,
},
'index.mapping.total_fields.limit': totalFieldsLimit,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ export const getIndexTemplateAndPattern = ({
const pattern = `${context}.alerts`;
const patternWithNamespace = `${pattern}-${concreteNamespace}`;
return {
template: `.alerts-${patternWithNamespace}-index-template`,
pattern: `.internal.alerts-${patternWithNamespace}-*`,
template: `.alerts-${patternWithNamespace}-index-template`, // still used
pattern: `.internal.alerts-${patternWithNamespace}-*`, // no longer used
basePattern: `.alerts-${pattern}-*`,
name: `.internal.alerts-${patternWithNamespace}-000001`,
alias: `.alerts-${patternWithNamespace}`,
name: `.internal.alerts-${patternWithNamespace}-000001`, // no longer used
alias: `.alerts-${patternWithNamespace}`, // data stream name
...(secondaryAlias ? { secondaryAlias: `${secondaryAlias}-${concreteNamespace}` } : {}),
};
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { errors } from '@elastic/elasticsearch';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type * as estypes from '@elastic/elasticsearch/lib/api/types';
import { Either, isLeft } from 'fp-ts/lib/Either';

import { ElasticsearchClient } from '@kbn/core/server';
Expand Down Expand Up @@ -118,6 +118,7 @@ export class RuleDataClient implements IRuleDataClient {
...request,
index: indexPattern,
ignore_unavailable: true,
seq_no_primary_term: true,
})) as unknown as ESSearchResponse<TAlertDoc, TSearchRequest>;
} catch (err) {
this.options.logger.error(`Error performing search in RuleDataClient - ${err.message}`);
Expand Down Expand Up @@ -235,13 +236,12 @@ export class RuleDataClient implements IRuleDataClient {
bulk: async (request: estypes.BulkRequest) => {
try {
if (this.clusterClient) {
const requestWithDefaultParameters = {
...request,
require_alias: true,
index: alias,
};
addCreateIndexBulkActionDoc(request, alias);
this.options.logger.debug(
`writing bulk data: alias: "${alias}" ${JSON.stringify(request, null, 4)}`
);

const response = await this.clusterClient.bulk(requestWithDefaultParameters, {
const response = await this.clusterClient.bulk(request, {
meta: true,
});

Expand All @@ -261,3 +261,13 @@ export class RuleDataClient implements IRuleDataClient {
};
}
}

function addCreateIndexBulkActionDoc(request: estypes.BulkRequest, alias: string) {
const docs: Array<Record<string, Record<string, unknown>>> = ((request as any).body as any) || [];

for (let index = 0; index < docs.length; index += 2) {
if (new Set(Object.keys(docs[index])).has('create')) {
docs[index].create._index = alias;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,14 @@ export const createLifecycleExecutor =
`[Rule Registry] Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)`
);

const trackedAlertsDataMap: Record<
string,
{ indexName: string; fields: Partial<ParsedTechnicalFields & ParsedExperimentalFields> }
> = {};
interface TrackedAlertData {
indexName: string;
fields: Partial<ParsedTechnicalFields & ParsedExperimentalFields>;
seqNo: number | undefined;
primaryTerm: number | undefined;
}

const trackedAlertsDataMap: Record<string, TrackedAlertData> = {};

if (trackedAlertStates.length) {
const result = await fetchExistingAlerts(
Expand All @@ -230,10 +234,18 @@ export const createLifecycleExecutor =
result.forEach((hit) => {
const alertInstanceId = hit._source ? hit._source[ALERT_INSTANCE_ID] : void 0;
if (alertInstanceId && hit._source) {
trackedAlertsDataMap[alertInstanceId] = {
indexName: hit._index,
fields: hit._source,
};
if (hit._seq_no == null) {
logger.error(`missing _seq_no on alert instance ${alertInstanceId}`);
} else if (hit._primary_term == null) {
logger.error(`missing _primary_term on alert instance ${alertInstanceId}`);
} else {
trackedAlertsDataMap[alertInstanceId] = {
indexName: hit._index,
fields: hit._source,
seqNo: hit._seq_no,
primaryTerm: hit._primary_term,
};
}
}
});
}
Expand Down Expand Up @@ -308,6 +320,8 @@ export const createLifecycleExecutor =

return {
indexName: alertData?.indexName,
seqNo: alertData?.seqNo,
primaryTerm: alertData?.primaryTerm,
event,
flappingHistory,
flapping,
Expand All @@ -334,11 +348,25 @@ export const createLifecycleExecutor =
if (allEventsToIndex.length > 0 && writeAlerts) {
logger.debug(`[Rule Registry] Preparing to index ${allEventsToIndex.length} alerts.`);

// ? { index: { _id: event[ALERT_UUID]!, _index: indexName, require_alias: false, if_seq_no: 0, if_primary_term: 1 } }
await ruleDataClientWriter.bulk({
body: allEventsToIndex.flatMap(({ event, indexName }) => [
body: allEventsToIndex.flatMap(({ event, indexName, seqNo, primaryTerm }) => [
indexName
? { index: { _id: event[ALERT_UUID]!, _index: indexName, require_alias: false } }
: { index: { _id: event[ALERT_UUID]! } },
? {
index: {
_id: event[ALERT_UUID]!,
_index: indexName,
if_seq_no: seqNo,
if_primary_term: primaryTerm,
require_alias: false,
},
}
: {
create: {
_id: event[ALERT_UUID]!,
require_alias: false,
},
},
event,
]),
refresh: 'wait_for',
Expand Down

0 comments on commit cd1d6de

Please sign in to comment.