Skip to content

Commit

Permalink
[ZDT migration] Don't run document migration on non-migrator nodes (#…
Browse files Browse the repository at this point in the history
…156345)

## Summary

Part of #150309
A few enhancements to the ZDT migration algorithm.

### 1. Run the 'expand' phase (and only this one) on non-migrator nodes

Given our latests changes to the way we want the algo to function, the
non-migrator nodes will have to run the 'expand' (schema expansion)
phase. However, the document migration phase will have to be run by the
migrator node exclusively.

Note: because it was required for integration tests, a new
`migration.zdt.runOnNonMigratorNodes` option was introduced to change
this behavor and have non-migrator nodes ignore this limitation.

### 2. Don't terminate during `INIT` if higher mapping versions are
found

Any mapping changes are upward compatible, meaning that we can safely
no-op instead of failing of the mapping version check result is
`lesser`. This change is required now that mapping updates will be
performed before all nodes of the previous version are shut down (and is
also required for rollbacks)

### 3. Perform a version check during `DOCUMENTS_UPDATE_INIT`

We were always executing the full doc update cycle when entering this
stage. We're now performing a version check similar to what was done
during `INIT`.

If the check result returns:
- `greater`: we perform the document migration (as it was done before
this change)
- `equal`: we skip the document migration
- `lesser`: we skip the document migration (**NOTE**: this may change
later depending on how we handle rollbacks)
- `conflict`: we terminate with a failure, as done during `INIT`

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
pgayvallet and kibanamachine authored May 4, 2023
1 parent 87ab62e commit 8f34b96
Show file tree
Hide file tree
Showing 45 changed files with 739 additions and 370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,23 @@ const migrationSchema = schema.object({
pollInterval: schema.number({ defaultValue: 1_500 }),
skip: schema.boolean({ defaultValue: false }),
retryAttempts: schema.number({ defaultValue: 15 }),
/**
* ZDT algorithm specific options
*/
zdt: schema.object({
/**
* The delay that the migrator will wait for, in seconds, when updating the
* index mapping's meta to let the other nodes pickup the changes.
*/
metaPickupSyncDelaySec: schema.number({ min: 1, defaultValue: 120 }),
/**
* If set to true, the document migration phase will be run even if the
* instance does not have the `migrator` role.
*
* This is mostly used for testing environments and integration tests were
* we have full control over a single node Kibana deployment.
*/
runOnNonMigratorNodes: schema.boolean({ defaultValue: false }),
}),
});

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
* Side Public License, v 1.
*/

import { omit } from 'lodash';
import { loggerMock, type MockedLogger } from '@kbn/logging-mocks';
import { logStateTransition, type LogAwareState } from './logs';

Expand All @@ -19,62 +18,81 @@ describe('logStateTransition', () => {
logger = loggerMock.create();
});

it('logs the offset of messages between the old and the new state', () => {
const previous: LogAwareState = {
controlState: 'PREVIOUS',
logs: [],
};
const next: LogAwareState = {
controlState: 'NEXT',
logs: [
...previous.logs,
{ level: 'info', message: 'info message' },
{ level: 'warning', message: 'warning message' },
],
};
describe('when DEBUG log level is not enabled', () => {
beforeEach(() => {
logger.isLevelEnabled.mockImplementation((level) => {
return level !== 'debug';
});
});

it('logs the offset of messages between the old and the new state', () => {
logger.isLevelEnabled.mockImplementation((level) => {
return level !== 'debug';
});

const previous: LogAwareState = {
controlState: 'PREVIOUS',
logs: [],
};
const next: LogAwareState = {
controlState: 'NEXT',
logs: [
...previous.logs,
{ level: 'info', message: 'info message' },
{ level: 'warning', message: 'warning message' },
],
};

logStateTransition(logger, messagePrefix, previous, next, 500);
logStateTransition(logger, messagePrefix, previous, next, 500);

expect(omit(loggerMock.collect(logger), 'debug')).toEqual({
error: [],
fatal: [],
info: [['[PREFIX] info message'], ['[PREFIX] PREVIOUS -> NEXT. took: 500ms.']],
log: [],
trace: [],
warn: [['[PREFIX] warning message']],
expect(loggerMock.collect(logger)).toEqual({
error: [],
fatal: [],
info: [['[PREFIX] info message'], ['[PREFIX] PREVIOUS -> NEXT. took: 500ms.']],
log: [],
trace: [],
warn: [['[PREFIX] warning message']],
debug: [],
});
});
});

it('logs a debug message with the correct meta', () => {
const previous: LogAwareState = {
controlState: 'PREVIOUS',
logs: [],
};
const next: LogAwareState = {
controlState: 'NEXT',
logs: [
...previous.logs,
{ level: 'info', message: 'info message' },
{ level: 'warning', message: 'warning message' },
],
};
describe('when DEBUG log level is enabled', () => {
beforeEach(() => {
logger.isLevelEnabled.mockReturnValue(true);
});

it('logs a debug message with the correct meta', () => {
const previous: LogAwareState = {
controlState: 'PREVIOUS',
logs: [],
};
const next: LogAwareState = {
controlState: 'NEXT',
logs: [
...previous.logs,
{ level: 'info', message: 'info message' },
{ level: 'warning', message: 'warning message' },
],
};

logStateTransition(logger, messagePrefix, previous, next, 500);
logStateTransition(logger, messagePrefix, previous, next, 500);

expect(loggerMock.collect(logger).debug).toEqual([
[
'[PREFIX] PREVIOUS -> NEXT. took: 500ms.',
{
kibana: {
migrations: {
duration: 500,
state: expect.objectContaining({
controlState: 'NEXT',
}),
expect(loggerMock.collect(logger).debug).toEqual([
[
'[PREFIX] PREVIOUS -> NEXT. took: 500ms.',
{
kibana: {
migrations: {
duration: 500,
state: expect.objectContaining({
controlState: 'NEXT',
}),
},
},
},
},
],
]);
],
]);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ interface StateTransitionLogMeta extends LogMeta {

export const logStateTransition = (
logger: Logger,
logMessagePrefix: string,
logPrefix: string,
prevState: LogAwareState,
currState: LogAwareState,
tookMs: number
Expand All @@ -34,31 +34,30 @@ export const logStateTransition = (
currState.logs.slice(prevState.logs.length).forEach(({ message, level }) => {
switch (level) {
case 'error':
return logger.error(logMessagePrefix + message);
return logger.error(logPrefix + message);
case 'warning':
return logger.warn(logMessagePrefix + message);
return logger.warn(logPrefix + message);
case 'info':
return logger.info(logMessagePrefix + message);
return logger.info(logPrefix + message);
default:
throw new Error(`unexpected log level ${level}`);
}
});
}

logger.info(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`
);
logger.debug<StateTransitionLogMeta>(
logMessagePrefix + `${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`,
{
const logMessage = `${logPrefix}${prevState.controlState} -> ${currState.controlState}. took: ${tookMs}ms.`;
if (logger.isLevelEnabled('debug')) {
logger.debug<StateTransitionLogMeta>(logMessage, {
kibana: {
migrations: {
state: currState,
duration: tookMs,
},
},
}
);
});
} else {
logger.info(logMessage);
}
};

export const logActionResponse = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,10 +558,12 @@ const mockOptions = () => {
retryAttempts: 20,
zdt: {
metaPickupSyncDelaySec: 120,
runOnNonMigratorNodes: false,
},
},
client: mockedClient,
docLinks: docLinksServiceMock.createSetupContract(),
nodeRoles: { backgroundTasks: true, ui: true, migrator: true },
};
return options;
};
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import { BehaviorSubject } from 'rxjs';
import Semver from 'semver';
import type { NodeRoles } from '@kbn/core-node-server';
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
Expand Down Expand Up @@ -52,6 +53,7 @@ export interface KibanaMigratorOptions {
docLinks: DocLinksServiceStart;
waitForMigrationCompletion: boolean;
defaultIndexTypesMap?: IndexTypesMap;
nodeRoles: NodeRoles;
}

/**
Expand All @@ -74,6 +76,7 @@ export class KibanaMigrator implements IKibanaMigrator {
private readonly docLinks: DocLinksServiceStart;
private readonly waitForMigrationCompletion: boolean;
private readonly defaultIndexTypesMap: IndexTypesMap;
private readonly nodeRoles: NodeRoles;
public readonly kibanaVersion: string;

/**
Expand All @@ -89,6 +92,7 @@ export class KibanaMigrator implements IKibanaMigrator {
docLinks,
defaultIndexTypesMap = DEFAULT_INDEX_TYPES_MAP,
waitForMigrationCompletion,
nodeRoles,
}: KibanaMigratorOptions) {
this.client = client;
this.kibanaIndex = kibanaIndex;
Expand All @@ -105,6 +109,7 @@ export class KibanaMigrator implements IKibanaMigrator {
log: this.log,
});
this.waitForMigrationCompletion = waitForMigrationCompletion;
this.nodeRoles = nodeRoles;
// Building the active mappings (and associated md5sums) is an expensive
// operation so we cache the result
this.activeMappings = buildActiveMappings(this.mappingProperties);
Expand Down Expand Up @@ -159,6 +164,7 @@ export class KibanaMigrator implements IKibanaMigrator {
docLinks: this.docLinks,
serializer: this.serializer,
elasticsearchClient: this.client,
nodeRoles: this.nodeRoles,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ describe('migrationsStateActionMachine', () => {
retryAttempts: 5,
zdt: {
metaPickupSyncDelaySec: 120,
runOnNonMigratorNodes: false,
},
},
typeRegistry,
Expand Down Expand Up @@ -98,7 +99,8 @@ describe('migrationsStateActionMachine', () => {
abort,
});
const logs = loggingSystemMock.collect(mockLogger);
const doneLog = logs.info.splice(8, 1)[0][0];
// the 'done' log is the 5th entry in the list
const doneLog = logs.info.splice(4, 1)[0][0];
expect(doneLog).toMatch(/\[.my-so-index\] Migration completed after \d+ms/);
expect(logs).toMatchSnapshot();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export const createContext = ({
indexPrefix,
typeRegistry,
serializer,
nodeRoles,
}: CreateContextOps): MigratorContext => {
return {
migrationConfig,
Expand All @@ -41,5 +42,6 @@ export const createContext = ({
migrationDocLinks: docLinks.links.kibanaUpgradeSavedObjects,
deletedTypes: REMOVED_TYPES,
discardCorruptObjects: Boolean(migrationConfig.discardCorruptObjects),
nodeRoles,
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { NodeRoles } from '@kbn/core-node-server';
import type {
ISavedObjectTypeRegistry,
ISavedObjectsSerializer,
Expand Down Expand Up @@ -48,4 +49,6 @@ export interface MigratorContext {
readonly deletedTypes: string[];
/** If true, corrupted objects will be discarded instead of failing the migration */
readonly discardCorruptObjects: boolean;
/** The node roles of the Kibana instance */
readonly nodeRoles: NodeRoles;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type {
} from '@kbn/core-saved-objects-server';
import type { Logger } from '@kbn/logging';
import type { DocLinksServiceStart } from '@kbn/core-doc-links-server';
import { NodeRoles } from '@kbn/core-node-server';
import { migrationStateActionMachine } from './migration_state_action_machine';
import type { VersionedTransformer } from '../document_migrator';
import { createContext } from './context';
Expand All @@ -42,6 +43,8 @@ export interface MigrateIndexOptions {
serializer: ISavedObjectsSerializer;
/** The client to use for communications with ES */
elasticsearchClient: ElasticsearchClient;
/** The node roles of the Kibana instance */
readonly nodeRoles: NodeRoles;
}

export const migrateIndex = async ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ describe('model', () => {
retryCount: 0,
retryDelay: 0,
logs: [],
skipDocumentMigration: false,
};

const retryableError: RetryableEsClientError = {
Expand Down
Loading

0 comments on commit 8f34b96

Please sign in to comment.