Skip to content

Commit

Permalink
[Fleet] Retry Saved Object import on conflict error (#126900) (#127007)
Browse files Browse the repository at this point in the history
* retry SO import on conflict errors

* add jitter + increase retries

* Apply suggestions from code review

Co-authored-by: Josh Dover <[email protected]>

Co-authored-by: Josh Dover <[email protected]>
(cherry picked from commit 7c6d314)

Co-authored-by: Mark Hopkin <[email protected]>
  • Loading branch information
kibanamachine and hop-dev authored Mar 7, 2022
1 parent 027ca22 commit 50c7904
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 8 deletions.
124 changes: 124 additions & 0 deletions x-pack/plugins/fleet/server/services/epm/kibana/assets/install.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type {
ISavedObjectsImporter,
SavedObjectsImportFailure,
SavedObjectsImportSuccess,
SavedObjectsImportResponse,
} from 'src/core/server';

import { loggingSystemMock } from '../../../../../../../../src/core/server/mocks';

import type { ArchiveAsset } from './install';

jest.mock('timers/promises', () => ({
async setTimeout() {},
}));

import { installKibanaSavedObjects } from './install';

const mockLogger = loggingSystemMock.createLogger();

const mockImporter: jest.Mocked<ISavedObjectsImporter> = {
import: jest.fn(),
resolveImportErrors: jest.fn(),
};

const createImportError = (so: ArchiveAsset, type: string) =>
({ id: so.id, error: { type } } as SavedObjectsImportFailure);
const createImportSuccess = (so: ArchiveAsset) =>
({ id: so.id, type: so.type, meta: {} } as SavedObjectsImportSuccess);
const createAsset = (asset: Partial<ArchiveAsset>) =>
({ id: 1234, type: 'dashboard', attributes: {}, ...asset } as ArchiveAsset);

const createImportResponse = (
errors: SavedObjectsImportFailure[] = [],
successResults: SavedObjectsImportSuccess[] = []
) =>
({
success: !!successResults.length,
errors,
successResults,
warnings: [],
successCount: successResults.length,
} as SavedObjectsImportResponse);

describe('installKibanaSavedObjects', () => {
beforeEach(() => {
mockImporter.import.mockReset();
mockImporter.resolveImportErrors.mockReset();
});

it('should retry on conflict error', async () => {
const asset = createAsset({ attributes: { hello: 'world' } });
const conflictResponse = createImportResponse([createImportError(asset, 'conflict')]);
const successResponse = createImportResponse([], [createImportSuccess(asset)]);

mockImporter.import
.mockResolvedValueOnce(conflictResponse)
.mockResolvedValueOnce(successResponse);

await installKibanaSavedObjects({
savedObjectsImporter: mockImporter,
logger: mockLogger,
kibanaAssets: [asset],
});

expect(mockImporter.import).toHaveBeenCalledTimes(2);
});

it('should give up after 50 retries on conflict errors', async () => {
const asset = createAsset({ attributes: { hello: 'world' } });
const conflictResponse = createImportResponse([createImportError(asset, 'conflict')]);

mockImporter.import.mockImplementation(() => Promise.resolve(conflictResponse));

await expect(
installKibanaSavedObjects({
savedObjectsImporter: mockImporter,
logger: mockLogger,
kibanaAssets: [asset],
})
).rejects.toEqual(expect.any(Error));
expect(mockImporter.import).toHaveBeenCalledTimes(51);
});
it('should not retry errors that arent conflict errors', async () => {
const asset = createAsset({ attributes: { hello: 'world' } });
const errorResponse = createImportResponse([createImportError(asset, 'something_bad')]);
const successResponse = createImportResponse([], [createImportSuccess(asset)]);

mockImporter.import.mockResolvedValueOnce(errorResponse).mockResolvedValueOnce(successResponse);

expect(
installKibanaSavedObjects({
savedObjectsImporter: mockImporter,
logger: mockLogger,
kibanaAssets: [asset],
})
).rejects.toEqual(expect.any(Error));
});

it('should resolve reference errors', async () => {
const asset = createAsset({ attributes: { hello: 'world' } });
const referenceErrorResponse = createImportResponse([
createImportError(asset, 'missing_references'),
]);
const successResponse = createImportResponse([], [createImportSuccess(asset)]);

mockImporter.import.mockResolvedValueOnce(referenceErrorResponse);
mockImporter.resolveImportErrors.mockResolvedValueOnce(successResponse);

await installKibanaSavedObjects({
savedObjectsImporter: mockImporter,
logger: mockLogger,
kibanaAssets: [asset],
});

expect(mockImporter.import).toHaveBeenCalledTimes(1);
expect(mockImporter.resolveImportErrors).toHaveBeenCalledTimes(1);
});
});
51 changes: 43 additions & 8 deletions x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* 2.0.
*/

import { setTimeout } from 'timers/promises';

import type {
SavedObject,
SavedObjectsBulkCreateObject,
Expand All @@ -13,7 +15,6 @@ import type {
Logger,
} from 'src/core/server';
import type { SavedObjectsImportSuccess, SavedObjectsImportFailure } from 'src/core/server/types';

import { createListStream } from '@kbn/utils';
import { partition } from 'lodash';

Expand Down Expand Up @@ -166,7 +167,40 @@ export async function getKibanaAssets(
return result;
}

async function installKibanaSavedObjects({
const isImportConflictError = (e: SavedObjectsImportFailure) => e?.error?.type === 'conflict';
/**
* retry saved object import if only conflict errors are encountered
*/
async function retryImportOnConflictError(
importCall: () => ReturnType<SavedObjectsImporterContract['import']>,
{
logger,
maxAttempts = 50,
_attempt = 0,
}: { logger?: Logger; _attempt?: number; maxAttempts?: number } = {}
): ReturnType<SavedObjectsImporterContract['import']> {
const result = await importCall();

const errors = result.errors ?? [];
if (_attempt < maxAttempts && errors.length && errors.every(isImportConflictError)) {
const retryCount = _attempt + 1;
const retryDelayMs = 1000 + Math.floor(Math.random() * 3000); // 1s + 0-3s of jitter

logger?.debug(
`Retrying import operation after [${
retryDelayMs * 1000
}s] due to conflict errors: ${JSON.stringify(errors)}`
);

await setTimeout(retryDelayMs);
return retryImportOnConflictError(importCall, { logger, _attempt: retryCount });
}

return result;
}

// only exported for testing
export async function installKibanaSavedObjects({
savedObjectsImporter,
kibanaAssets,
logger,
Expand All @@ -185,18 +219,19 @@ async function installKibanaSavedObjects({
return [];
} else {
const { successResults: importSuccessResults = [], errors: importErrors = [] } =
await savedObjectsImporter.import({
overwrite: true,
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
});
await retryImportOnConflictError(() =>
savedObjectsImporter.import({
overwrite: true,
readStream: createListStream(toBeSavedObjects),
createNewCopies: false,
})
);

allSuccessResults = importSuccessResults;
const [referenceErrors, otherErrors] = partition(
importErrors,
(e) => e?.error?.type === 'missing_references'
);

if (otherErrors?.length) {
throw new Error(
`Encountered ${
Expand Down

0 comments on commit 50c7904

Please sign in to comment.