Skip to content

Commit

Permalink
[SoMigV2] Fail fast if unknown document types are present in the sour…
Browse files Browse the repository at this point in the history
…ce index (#103341) (#103733)

* initial draft

* fix some tests

* fix additional unit tests

* move all the things

* create error generation fn

* add correct error message

* add unknown types to log message

* fix types

* fix existing test suites

* add IT test

* review comments

* add tests + use unknown instead of undefined for empty types

* update RFC with new step
# Conflicts:
#	rfcs/text/0013_saved_object_migrations.md
  • Loading branch information
pgayvallet authored Jun 29, 2021
1 parent 4720ea1 commit acf96e7
Show file tree
Hide file tree
Showing 26 changed files with 1,316 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ export class KibanaMigrator {
migrationVersionPerType: this.documentMigrator.migrationVersion,
indexPrefix: index,
migrationsConfig: this.soMigrationsConfig,
typeRegistry: this.typeRegistry,
});
},
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import * as Either from 'fp-ts/lib/Either';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { errors as EsErrors, estypes } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { checkForUnknownDocs } from './check_for_unknown_docs';

jest.mock('./catch_retryable_es_client_errors');

describe('checkForUnknownDocs', () => {
const unusedTypesQuery: estypes.QueryDslQueryContainer = {
bool: { must: [{ term: { hello: 'dolly' } }] },
};
const knownTypes = ['foo', 'bar'];

beforeEach(() => {
jest.clearAllMocks();
});

it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
// Create a mock client that rejects all methods with a 503 status code response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

const task = checkForUnknownDocs({
client,
indexName: '.kibana_8.0.0',
knownTypes,
unusedTypesQuery,
});
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});

it('calls `client.search` with the correct parameters', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({ hits: { hits: [] } })
);

const task = checkForUnknownDocs({
client,
indexName: '.kibana_8.0.0',
knownTypes,
unusedTypesQuery,
});

await task();

expect(client.search).toHaveBeenCalledTimes(1);
expect(client.search).toHaveBeenCalledWith({
index: '.kibana_8.0.0',
body: {
query: {
bool: {
must: unusedTypesQuery,
must_not: knownTypes.map((type) => ({
term: {
type,
},
})),
},
},
},
});
});

it('resolves with `Either.right` when no unknown docs are found', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({ hits: { hits: [] } })
);

const task = checkForUnknownDocs({
client,
indexName: '.kibana_8.0.0',
knownTypes,
unusedTypesQuery,
});

const result = await task();

expect(Either.isRight(result)).toBe(true);
});

it('resolves with `Either.left` when unknown docs are found', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [
{ _id: '12', _source: { type: 'foo' } },
{ _id: '14', _source: { type: 'bar' } },
],
},
})
);

const task = checkForUnknownDocs({
client,
indexName: '.kibana_8.0.0',
knownTypes,
unusedTypesQuery,
});

const result = await task();

expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual({
type: 'unknown_docs_found',
unknownDocs: [
{ id: '12', type: 'foo' },
{ id: '14', type: 'bar' },
],
});
});

it('uses `unknown` as the type when the document does not contain a type field', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
hits: {
hits: [{ _id: '12', _source: {} }],
},
})
);

const task = checkForUnknownDocs({
client,
indexName: '.kibana_8.0.0',
knownTypes,
unusedTypesQuery,
});

const result = await task();

expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual({
type: 'unknown_docs_found',
unknownDocs: [{ id: '12', type: 'unknown' }],
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import { estypes } from '@elastic/elasticsearch';
import type { SavedObjectsRawDocSource } from '../../serialization';
import { ElasticsearchClient } from '../../../elasticsearch';
import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';

/** @internal */
export interface CheckForUnknownDocsParams {
client: ElasticsearchClient;
indexName: string;
unusedTypesQuery: estypes.QueryDslQueryContainer;
knownTypes: string[];
}

/** @internal */
export interface CheckForUnknownDocsFoundDoc {
id: string;
type: string;
}

/** @internal */
export interface UnknownDocsFound {
type: 'unknown_docs_found';
unknownDocs: CheckForUnknownDocsFoundDoc[];
}

export const checkForUnknownDocs = ({
client,
indexName,
unusedTypesQuery,
knownTypes,
}: CheckForUnknownDocsParams): TaskEither.TaskEither<
RetryableEsClientError | UnknownDocsFound,
{}
> => () => {
const query = createUnknownDocQuery(unusedTypesQuery, knownTypes);

return client
.search<SavedObjectsRawDocSource>({
index: indexName,
body: {
query,
},
})
.then((response) => {
const { hits } = response.body.hits;
if (hits.length) {
return Either.left({
type: 'unknown_docs_found' as const,
unknownDocs: hits.map((hit) => ({ id: hit._id, type: hit._source?.type ?? 'unknown' })),
});
} else {
return Either.right({});
}
})
.catch(catchRetryableEsClientErrors);
};

const createUnknownDocQuery = (
unusedTypesQuery: estypes.QueryDslQueryContainer,
knownTypes: string[]
): estypes.QueryDslQueryContainer => {
return {
bool: {
must: unusedTypesQuery,
must_not: knownTypes.map((type) => ({
term: {
type,
},
})),
},
};
};
14 changes: 14 additions & 0 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ export type { ReindexResponse, ReindexParams } from './reindex';
export { reindex } from './reindex';

import type { IncompatibleMappingException } from './wait_for_reindex_task';

export { waitForReindexTask } from './wait_for_reindex_task';

export type { VerifyReindexParams } from './verify_reindex';
export { verifyReindex } from './verify_reindex';

import type { AliasNotFound, RemoveIndexNotAConcreteIndex } from './update_aliases';

export type { AliasAction, UpdateAliasesParams } from './update_aliases';
export { updateAliases } from './update_aliases';

Expand All @@ -78,6 +80,14 @@ export type {
} from './update_and_pickup_mappings';
export { updateAndPickupMappings } from './update_and_pickup_mappings';

import type { UnknownDocsFound } from './check_for_unknown_docs';
export type {
CheckForUnknownDocsParams,
UnknownDocsFound,
CheckForUnknownDocsFoundDoc,
} from './check_for_unknown_docs';
export { checkForUnknownDocs } from './check_for_unknown_docs';

export { waitForPickupUpdatedMappingsTask } from './wait_for_pickup_updated_mappings_task';

export type {
Expand All @@ -96,9 +106,11 @@ export interface IndexNotFound {
type: 'index_not_found_exception';
index: string;
}

export interface WaitForReindexTaskFailure {
readonly cause: { type: string; reason: string };
}

export interface TargetIndexHadWriteBlock {
type: 'target_index_had_write_block';
}
Expand All @@ -108,6 +120,7 @@ export interface AcknowledgeResponse {
acknowledged: boolean;
shardsAcknowledged: boolean;
}

// Map of left response 'type' string -> response interface
export interface ActionErrorTypeMap {
wait_for_task_completion_timeout: WaitForTaskCompletionTimeout;
Expand All @@ -118,6 +131,7 @@ export interface ActionErrorTypeMap {
alias_not_found_exception: AliasNotFound;
remove_index_not_a_concrete_index: RemoveIndexNotAConcreteIndex;
documents_transform_failed: DocumentsTransformFailed;
unknown_docs_found: UnknownDocsFound;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import { ElasticsearchClient } from '../../../elasticsearch';
Expand Down
7 changes: 6 additions & 1 deletion src/core/server/saved_objects/migrationsv2/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import type { SavedObjectsMigrationVersion } from '../types';
import type { TransformRawDocs } from './types';
import { MigrationResult } from '../migrations/core';
import { next } from './next';
import { createInitialState, model } from './model';
import { model } from './model';
import { createInitialState } from './initial_state';
import { migrationStateActionMachine } from './migrations_state_action_machine';
import { SavedObjectsMigrationConfigType } from '../saved_objects_config';
import type { ISavedObjectTypeRegistry } from '../saved_objects_type_registry';

/**
* Migrates the provided indexPrefix index using a resilient algorithm that is
Expand All @@ -32,6 +34,7 @@ export async function runResilientMigrator({
migrationVersionPerType,
indexPrefix,
migrationsConfig,
typeRegistry,
}: {
client: ElasticsearchClient;
kibanaVersion: string;
Expand All @@ -42,6 +45,7 @@ export async function runResilientMigrator({
migrationVersionPerType: SavedObjectsMigrationVersion;
indexPrefix: string;
migrationsConfig: SavedObjectsMigrationConfigType;
typeRegistry: ISavedObjectTypeRegistry;
}): Promise<MigrationResult> {
const initialState = createInitialState({
kibanaVersion,
Expand All @@ -50,6 +54,7 @@ export async function runResilientMigrator({
migrationVersionPerType,
indexPrefix,
migrationsConfig,
typeRegistry,
});
return migrationStateActionMachine({
initialState,
Expand Down
Loading

0 comments on commit acf96e7

Please sign in to comment.