Skip to content

Commit

Permalink
Set refresh according to stateful vs stateless when indexing alert do…
Browse files Browse the repository at this point in the history
…cuments (elastic#201209)

In this PR, I'm making the change so when Kibana is running with
Elasticsearch stateful we set refresh to `wait_for` (instead of `true`)
so we are not putting too much pressure on the Elasticsearch indices
when under load.

## To verify

Very using the Cloud deployment and Serverless project created from this
PR

1. Create an always firing ES Query rule
2. Create an always firing security detection rule w/ alert suppression
3. Verify the ECH cluster logs and observe `*** Refresh value when
indexing alerts: wait_for` and `*** Rule registry - refresh value when
indexing alerts: wait_for` messages
4. Verify the serverless project logs on QA overview and observe `***
Refresh value when indexing alerts: true` and `*** Rule registry -
refresh value when indexing alerts: true` messages

## To-Do

- [x] Revert commit
elastic@7c19b45
that was added for testing purposes

---------

Co-authored-by: kibanamachine <[email protected]>
(cherry picked from commit a4cb330)

# Conflicts:
#	x-pack/plugins/alerting/kibana.jsonc
#	x-pack/plugins/alerting/server/plugin.ts
  • Loading branch information
mikecote committed Nov 28, 2024
1 parent 2e1ed36 commit a7f052a
Show file tree
Hide file tree
Showing 30 changed files with 187 additions and 36 deletions.
3 changes: 1 addition & 2 deletions x-pack/plugins/alerting/kibana.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
"usageCollection",
"security",
"monitoringCollection",
"spaces",
"serverless",
"spaces"
],
"extraPublicDirs": [
"common",
Expand Down
82 changes: 66 additions & 16 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ describe('Alerts Client', () => {
rule: alertRuleData,
kibanaVersion: '8.9.0',
spaceId: 'space1',
isServerless: false,
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts }),
};
maintenanceWindowsService.getMaintenanceWindows.mockReturnValue({
Expand Down Expand Up @@ -542,10 +543,58 @@ describe('Alerts Client', () => {
});

describe('persistAlerts()', () => {
test('should index new alerts', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>(
alertsClientParams
);
test('should index new alerts with refresh: wait_for in stateful', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: false,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

// Report 2 new alerts
const alertExecutorService = alertsClient.factory();
alertExecutorService.create('1').scheduleActions('default');
alertExecutorService.create('2').scheduleActions('default');

await alertsClient.processAlerts(processAlertsOpts);
alertsClient.logAlerts(logAlertsOpts);

await alertsClient.persistAlerts();

const { alertsToReturn } = alertsClient.getAlertsToSerialize();
const uuid1 = alertsToReturn['1'].meta?.uuid;
const uuid2 = alertsToReturn['2'].meta?.uuid;

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
create: { _id: uuid1, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid1 }),
{
create: { _id: uuid2, ...(useDataStreamForAlerts ? {} : { require_alias: true }) },
},
// new alert doc
getNewIndexedAlertDoc({ [ALERT_UUID]: uuid2, [ALERT_INSTANCE_ID]: '2' }),
],
});
expect(maintenanceWindowsService.getMaintenanceWindows).toHaveBeenCalledWith({
eventLogger: alertingEventLogger,
request: fakeRequest,
ruleTypeCategory: 'test',
spaceId: 'space1',
});
});

test('should index new alerts with refresh: true in stateless', async () => {
const alertsClient = new AlertsClient<{}, {}, {}, 'default', 'recovered'>({
...alertsClientParams,
isServerless: true,
});

await alertsClient.initializeExecution(defaultExecutionOpts);

Expand Down Expand Up @@ -658,7 +707,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -731,7 +780,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -866,7 +915,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -939,7 +988,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1038,7 +1087,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1203,7 +1252,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1321,7 +1370,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1525,7 +1574,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -1609,6 +1658,7 @@ describe('Alerts Client', () => {
shouldWrite: false,
},
},
isServerless: false,
request: fakeRequest,
namespace: 'default',
rule: alertRuleData,
Expand Down Expand Up @@ -2458,7 +2508,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2732,7 +2782,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2833,7 +2883,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down Expand Up @@ -2930,7 +2980,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalledWith({
index: '.alerts-test.alerts-default',
refresh: true,
refresh: 'wait_for',
require_alias: !useDataStreamForAlerts,
body: [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export interface AlertsClientParams extends CreateAlertsClientParams {
elasticsearchClientPromise: Promise<ElasticsearchClient>;
kibanaVersion: string;
dataStreamAdapter: DataStreamAdapter;
isServerless: boolean;
}

interface AlertsAffectedByMaintenanceWindows {
Expand Down Expand Up @@ -109,6 +110,7 @@ export class AlertsClient<
private runTimestampString: string | undefined;
private rule: AlertRule;
private ruleType: UntypedNormalizedRuleType;
private readonly isServerless: boolean;

private indexTemplateAndPattern: IIndexPatternString;

Expand Down Expand Up @@ -143,6 +145,7 @@ export class AlertsClient<
this._isUsingDataStreams = this.options.dataStreamAdapter.isUsingDataStreams();
this.ruleInfoMessage = `for ${this.ruleType.id}:${this.options.rule.id} '${this.options.rule.name}'`;
this.logTags = { tags: [this.ruleType.id, this.options.rule.id, 'alerts-client'] };
this.isServerless = options.isServerless;
}

public async initializeExecution(opts: InitializeExecutionOpts) {
Expand Down Expand Up @@ -555,7 +558,9 @@ export class AlertsClient<

try {
const response = await esClient.bulk({
refresh: true,
// On serverless we can force a refresh to we don't wait for the longer refresh interval
// When too many refresh calls are done in a short period of time, they are throttled by stateless Elasticsearch
refresh: this.isServerless ? true : 'wait_for',
index: this.indexTemplateAndPattern.alias,
require_alias: !this.isUsingDataStreams(),
body: bulkBody,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -159,6 +160,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -219,6 +221,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down Expand Up @@ -288,6 +291,7 @@ describe('initializeAlertsClient', () => {
ruleLogPrefix: `${RULE_TYPE_ID}:${RULE_ID}: '${RULE_NAME}'`,
ruleRunMetricsStore,
spaceId: 'default',
isServerless: false,
},
executionId: 'abc',
logger,
Expand Down
Loading

0 comments on commit a7f052a

Please sign in to comment.