Skip to content

Commit

Permalink
[8.17] Set refresh according to stateful vs stateless when indexing a…
Browse files Browse the repository at this point in the history
…lert documents (#201209) (#202221)

# Backport

This will backport the following commits from `main` to `8.17`:
- [Set refresh according to stateful vs stateless when indexing alert
documents (#201209)](#201209)

<!--- Backport version: 8.9.8 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Mike
Côté","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-11-28T17:10:56Z","message":"Set
refresh according to stateful vs stateless when indexing alert documents
(#201209)\n\nIn this PR, I'm making the change so when Kibana is running
with\r\nElasticsearch stateful we set refresh to `wait_for` (instead of
`true`)\r\nso we are not putting too much pressure on the Elasticsearch
indices\r\nwhen under load.\r\n\r\n## To verify\r\n\r\nVery using the
Cloud deployment and Serverless project created from
this\r\nPR\r\n\r\n1. Create an always firing ES Query rule\r\n2. Create
an always firing security detection rule w/ alert suppression\r\n3.
Verify the ECH cluster logs and observe `*** Refresh value
when\r\nindexing alerts: wait_for` and `*** Rule registry - refresh
value when\r\nindexing alerts: wait_for` messages\r\n4. Verify the
serverless project logs on QA overview and observe `***\r\nRefresh value
when indexing alerts: true` and `*** Rule registry -\r\nrefresh value
when indexing alerts: true` messages\r\n\r\n## To-Do\r\n\r\n- [x] Revert
commit\r\nhttps://github.com//pull/201209/commits/7c19b458e6f55866bfff6a5b0f39188ae95dc6c6\r\nthat
was added for testing purposes\r\n\r\n---------\r\n\r\nCo-authored-by:
kibanamachine
<[email protected]>","sha":"a4cb330af2d414e383d75efce526513171098ece","branchLabelMapping":{"^v9.0.0$":"main","^v8.18.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Feature:Alerting","release_note:skip","Team:ResponseOps","v9.0.0","ci:project-deploy-observability","Team:obs-ux-management","backport:version","v8.17.0","v8.16.1"],"number":201209,"url":"https://github.com/elastic/kibana/pull/201209","mergeCommit":{"message":"Set
refresh according to stateful vs stateless when indexing alert documents
(#201209)\n\nIn this PR, I'm making the change so when Kibana is running
with\r\nElasticsearch stateful we set refresh to `wait_for` (instead of
`true`)\r\nso we are not putting too much pressure on the Elasticsearch
indices\r\nwhen under load.\r\n\r\n## To verify\r\n\r\nVery using the
Cloud deployment and Serverless project created from
this\r\nPR\r\n\r\n1. Create an always firing ES Query rule\r\n2. Create
an always firing security detection rule w/ alert suppression\r\n3.
Verify the ECH cluster logs and observe `*** Refresh value
when\r\nindexing alerts: wait_for` and `*** Rule registry - refresh
value when\r\nindexing alerts: wait_for` messages\r\n4. Verify the
serverless project logs on QA overview and observe `***\r\nRefresh value
when indexing alerts: true` and `*** Rule registry -\r\nrefresh value
when indexing alerts: true` messages\r\n\r\n## To-Do\r\n\r\n- [x] Revert
commit\r\nhttps://github.com//pull/201209/commits/7c19b458e6f55866bfff6a5b0f39188ae95dc6c6\r\nthat
was added for testing purposes\r\n\r\n---------\r\n\r\nCo-authored-by:
kibanamachine
<[email protected]>","sha":"a4cb330af2d414e383d75efce526513171098ece"}},"sourceBranch":"main","suggestedTargetBranches":["8.17","8.16"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","labelRegex":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/201209","number":201209,"mergeCommit":{"message":"Set
refresh according to stateful vs stateless when indexing alert documents
(#201209)\n\nIn this PR, I'm making the change so when Kibana is running
with\r\nElasticsearch stateful we set refresh to `wait_for` (instead of
`true`)\r\nso we are not putting too much pressure on the Elasticsearch
indices\r\nwhen under load.\r\n\r\n## To verify\r\n\r\nVery using the
Cloud deployment and Serverless project created from
this\r\nPR\r\n\r\n1. Create an always firing ES Query rule\r\n2. Create
an always firing security detection rule w/ alert suppression\r\n3.
Verify the ECH cluster logs and observe `*** Refresh value
when\r\nindexing alerts: wait_for` and `*** Rule registry - refresh
value when\r\nindexing alerts: wait_for` messages\r\n4. Verify the
serverless project logs on QA overview and observe `***\r\nRefresh value
when indexing alerts: true` and `*** Rule registry -\r\nrefresh value
when indexing alerts: true` messages\r\n\r\n## To-Do\r\n\r\n- [x] Revert
commit\r\nhttps://github.com//pull/201209/commits/7c19b458e6f55866bfff6a5b0f39188ae95dc6c6\r\nthat
was added for testing purposes\r\n\r\n---------\r\n\r\nCo-authored-by:
kibanamachine
<[email protected]>","sha":"a4cb330af2d414e383d75efce526513171098ece"}},{"branch":"8.17","label":"v8.17.0","labelRegex":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"},{"branch":"8.16","label":"v8.16.1","labelRegex":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->
  • Loading branch information
mikecote authored Nov 29, 2024
1 parent ac792ba commit 122e46c
Show file tree
Hide file tree
Showing 30 changed files with 188 additions and 37 deletions.
5 changes: 2 additions & 3 deletions x-pack/plugins/alerting/kibana.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@
"usageCollection",
"security",
"monitoringCollection",
"spaces",
"serverless"
"spaces"
],
"extraPublicDirs": [
"common",
"common/parse_duration"
]
}
}
}
82 changes: 66 additions & 16 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ describe('Alerts Client', () => {
rule: alertRuleData,
kibanaVersion: '8.9.0',
spaceId: 'space1',
isServerless: false,
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts }),
};
maintenanceWindowsService.getMaintenanceWindows.mockReturnValue({
Expand Down Expand Up @@ -543,10 +544,58 @@ describe('Alerts Client', () => {
});

describe('persistAlerts()', () => {
test('should index new alerts', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>(
alertsClientParams
);
test('should index new alerts with refresh: wait_for in stateful', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: false,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

// Report 2 new alerts
const alertExecutorService = alertsClient.factory();
alertExecutorService.create('1').scheduleActions('default');
alertExecutorService.create('2').scheduleActions('default');

await alertsClient.processAlerts(processAlertsOpts);
alertsClient.logAlerts(logAlertsOpts);

await alertsClient.persistAlerts();

const { alertsToReturn } = alertsClient.getAlertsToSerialize();
const uuid1 = alertsToReturn['1'].meta?.uuid;
const uuid2 = alertsToReturn['2'].meta?.uuid;

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
create: { _id: uuid1, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid1 }),
{
create: { _id: uuid2, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid2, [ALERT_INSTANCE_ID]: '2' }),
],
});
expect(maintenanceWindowsService.getMaintenanceWindows).toHaveBeenCalledWith({
eventLogger: alertingEventLogger,
request: fakeRequest,
ruleTypeCategory: 'test',
spaceId: 'space1',
});
});

test('should index new alerts with refresh: true in stateless', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: true,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

Expand Down Expand Up @@ -659,7 +708,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -732,7 +781,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -867,7 +916,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -940,7 +989,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1039,7 +1088,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1196,7 +1245,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1314,7 +1363,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1518,7 +1567,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1602,6 +1651,7 @@ describe('Alerts Client', () => {
shouldWrite: false,
},
},
isServerless: false,
request: fakeRequest,
namespace: 'default',
rule: alertRuleData,
Expand Down Expand Up @@ -2451,7 +2501,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2725,7 +2775,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2826,7 +2876,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2923,7 +2973,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export interface AlertsClientParams extends CreateAlertsClientParams {
elasticsearchClientPromise: Promise<ElasticsearchClient>;
kibanaVersion: string;
dataStreamAdapter: DataStreamAdapter;
isServerless: boolean;
}

interface AlertsAffectedByMaintenanceWindows {
Expand Down Expand Up @@ -109,6 +110,7 @@ export class AlertsClient<
private runTimestampString: string | undefined;
private rule: AlertRule;
private ruleType: UntypedNormalizedRuleType;
private readonly isServerless: boolean;

private indexTemplateAndPattern: IIndexPatternString;

Expand Down Expand Up @@ -143,6 +145,7 @@ export class AlertsClient<
this._isUsingDataStreams = this.options.dataStreamAdapter.isUsingDataStreams();
this.ruleInfoMessage = `for ${this.ruleType.id}:${this.options.rule.id} '${this.options.rule.name}'`;
this.logTags = { tags: [this.ruleType.id, this.options.rule.id, 'alerts-client'] };
this.isServerless = options.isServerless;
}

public async initializeExecution(opts: InitializeExecutionOpts) {
Expand Down Expand Up @@ -555,7 +558,9 @@ export class AlertsClient<

try {
const response = await esClient.bulk({
refresh: true,
// On serverless we can force a refresh to we don't wait for the longer refresh interval
// When too many refresh calls are done in a short period of time, they are throttled by stateless Elasticsearch
refresh: this.isServerless ? true : 'wait_for',
index: this.indexTemplateAndPattern.alias,
require_alias: !this.isUsingDataStreams(),
body: bulkBody,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -159,6 +160,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -219,6 +221,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -288,6 +291,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down
Loading

0 comments on commit 122e46c

Please sign in to comment.