Skip to content

Commit

Permalink
[SO migrations] exit early if cluster routing allocation is disabled (#…
Browse files Browse the repository at this point in the history
…126612)

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
TinaHeiligers and kibanamachine authored Mar 8, 2022
1 parent 192309f commit 3864f63
Show file tree
Hide file tree
Showing 10 changed files with 416 additions and 13 deletions.
24 changes: 18 additions & 6 deletions src/core/server/saved_objects/migrations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,36 @@ is left out of the description for brevity.

## INIT
### Next action
`initAction`

Check that replica allocation is enabled from cluster settings (`cluster.routing.allocation.enabled`). Migrations will fail when replica allocation is disabled during the bulk index operation that waits for all active shards. Migrations wait for all active shards to ensure that saved objects are replicated to protect against data loss.

The Elasticsearch documentation mentions switching off replica allocation when restoring a cluster and this is a setting that might be overlooked when a restore is done. Migrations will fail early if replica allocation is incorrectly set to avoid adding a write block to the old index before running into a failure later.

If replica allocation is set to 'all', the migration continues to fetch the saved object indices:

`fetchIndices`

Fetch the saved object indices, mappings and aliases to find the source index
and determine whether we’re migrating from a legacy index or a v1 migrations
index.

### New control state
1. If `.kibana` and the version specific aliases both exists and are pointing
1. Two conditions have to be met before migrations begin:
1. If replica allocation is set as a persistent or transient setting to "perimaries", "new_primaries" or "none" fail the migration. Without replica allocation enabled or not set to 'all', the migration will timeout when waiting for index yellow status before bulk indexing. The check only considers persistent and transient settings and does not take static configuration in `elasticsearch.yml` into account. If `cluster.routing.allocation.enable` is configured in `elaticsearch.yml` and not set to the default of 'all', the migration will timeout. Static settings can only be returned from the `nodes/info` API.
`FATAL`

2. If `.kibana` is pointing to an index that belongs to a later version of
Kibana .e.g. a 7.11.0 instance found the `.kibana` alias pointing to
`.kibana_7.12.0_001` fail the migration
`FATAL`

2. If `.kibana` and the version specific aliases both exists and are pointing
to the same index. This version's migration has already been completed. Since
the same version could have plugins enabled at any time that would introduce
new transforms or mappings.
`OUTDATED_DOCUMENTS_SEARCH`

2. If `.kibana` is pointing to an index that belongs to a later version of
Kibana .e.g. a 7.11.0 instance found the `.kibana` alias pointing to
`.kibana_7.12.0_001` fail the migration
`FATAL`

3. If the `.kibana` alias exists we’re migrating from either a v1 or v2 index
and the migration source index is the index the `.kibana` alias points to.
`WAIT_FOR_YELLOW_SOURCE`
Expand Down
6 changes: 6 additions & 0 deletions src/core/server/saved_objects/migrations/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export {
export type { RetryableEsClientError };

// actions/* imports
export type { InitActionParams, UnsupportedClusterRoutingAllocation } from './initialize_action';
export { initAction } from './initialize_action';

export type { FetchIndexResponse, FetchIndicesParams } from './fetch_indices';
export { fetchIndices } from './fetch_indices';

Expand Down Expand Up @@ -81,6 +84,8 @@ export type {
export { updateAndPickupMappings } from './update_and_pickup_mappings';

import type { UnknownDocsFound } from './check_for_unknown_docs';
import type { UnsupportedClusterRoutingAllocation } from './initialize_action';

export type {
CheckForUnknownDocsParams,
UnknownDocsFound,
Expand Down Expand Up @@ -143,6 +148,7 @@ export interface ActionErrorTypeMap {
documents_transform_failed: DocumentsTransformFailed;
request_entity_too_large_exception: RequestEntityTooLargeException;
unknown_docs_found: UnknownDocsFound;
unsupported_cluster_routing_allocation: UnsupportedClusterRoutingAllocation;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { errors as EsErrors } from '@elastic/elasticsearch';
jest.mock('./catch_retryable_es_client_errors');
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { initAction } from './initialize_action';

describe('initAction', () => {
beforeEach(() => {
jest.clearAllMocks();
});
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = initAction({ client, indices: ['my_index'] });
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import * as TaskEither from 'fp-ts/lib/TaskEither';
import * as Either from 'fp-ts/lib/Either';
import { pipe } from 'fp-ts/lib/pipeable';
import { ElasticsearchClient } from '../../../elasticsearch';
import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';

import { FetchIndexResponse, fetchIndices } from './fetch_indices';

const routingAllocationEnable = 'cluster.routing.allocation.enable';
export interface ClusterRoutingAllocationEnabled {
clusterRoutingAllocationEnabled: boolean;
}

export interface InitActionParams {
client: ElasticsearchClient;
indices: string[];
}

export interface UnsupportedClusterRoutingAllocation {
type: 'unsupported_cluster_routing_allocation';
}

export const checkClusterRoutingAllocationEnabledTask =
({
client,
}: {
client: ElasticsearchClient;
}): TaskEither.TaskEither<RetryableEsClientError | UnsupportedClusterRoutingAllocation, {}> =>
() => {
return client.cluster
.getSettings({
flat_settings: true,
})
.then((settings) => {
const clusterRoutingAllocations: string[] =
settings?.transient?.[routingAllocationEnable] ??
settings?.persistent?.[routingAllocationEnable] ??
[];

const clusterRoutingAllocationEnabled =
[...clusterRoutingAllocations].length === 0 ||
[...clusterRoutingAllocations].every((s: string) => s === 'all'); // if set, only allow 'all'

if (!clusterRoutingAllocationEnabled) {
return Either.left({ type: 'unsupported_cluster_routing_allocation' as const });
} else {
return Either.right({});
}
})
.catch(catchRetryableEsClientErrors);
};

export const initAction = ({
client,
indices,
}: InitActionParams): TaskEither.TaskEither<
RetryableEsClientError | UnsupportedClusterRoutingAllocation,
FetchIndexResponse
> => {
return pipe(
checkClusterRoutingAllocationEnabledTask({ client }),
TaskEither.chainW((value) => {
return fetchIndices({ client, indices });
})
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
cloneIndex,
closePit,
createIndex,
fetchIndices,
openPit,
OpenPitResponse,
reindex,
Expand All @@ -35,6 +34,7 @@ import {
removeWriteBlock,
transformDocs,
waitForIndexStatusYellow,
initAction,
} from '../../actions';
import * as Either from 'fp-ts/lib/Either';
import * as Option from 'fp-ts/lib/Option';
Expand Down Expand Up @@ -111,10 +111,20 @@ describe('migration actions', () => {
await esServer.stop();
});

describe('fetchIndices', () => {
describe('initAction', () => {
afterAll(async () => {
await client.cluster.putSettings({
body: {
persistent: {
// Remove persistent test settings
cluster: { routing: { allocation: { enable: null } } },
},
},
});
});
it('resolves right empty record if no indices were found', async () => {
expect.assertions(1);
const task = fetchIndices({ client, indices: ['no_such_index'] });
const task = initAction({ client, indices: ['no_such_index'] });
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
Expand All @@ -124,7 +134,7 @@ describe('migration actions', () => {
});
it('resolves right record with found indices', async () => {
expect.assertions(1);
const res = (await fetchIndices({
const res = (await initAction({
client,
indices: ['no_such_index', 'existing_index_with_docs'],
})()) as Either.Right<unknown>;
Expand All @@ -139,6 +149,69 @@ describe('migration actions', () => {
})
);
});
it('resolves left with cluster routing allocation disabled', async () => {
expect.assertions(3);
await client.cluster.putSettings({
body: {
persistent: {
// Disable all routing allocation
cluster: { routing: { allocation: { enable: 'none' } } },
},
},
});
const task = initAction({
client,
indices: ['existing_index_with_docs'],
});
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"type": "unsupported_cluster_routing_allocation",
},
}
`);
await client.cluster.putSettings({
body: {
persistent: {
// Allow routing to existing primaries only
cluster: { routing: { allocation: { enable: 'primaries' } } },
},
},
});
const task2 = initAction({
client,
indices: ['existing_index_with_docs'],
});
await expect(task2()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"type": "unsupported_cluster_routing_allocation",
},
}
`);
await client.cluster.putSettings({
body: {
persistent: {
// Allow routing to new primaries only
cluster: { routing: { allocation: { enable: 'new_primaries' } } },
},
},
});
const task3 = initAction({
client,
indices: ['existing_index_with_docs'],
});
await expect(task3()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"type": "unsupported_cluster_routing_allocation",
},
}
`);
});
});

describe('setWriteBlock', () => {
Expand Down
Loading

0 comments on commit 3864f63

Please sign in to comment.