Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle cluster_block_exception during reindexing the TM index #201297

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export class KibanaDiscoveryService {
}

private async scheduleUpsertCurrentNode() {
let retryInterval = this.discoveryInterval;
if (!this.stopped) {
const lastSeenDate = new Date();
const lastSeen = lastSeenDate.toISOString();
Expand All @@ -71,18 +72,21 @@ export class KibanaDiscoveryService {
} catch (e) {
if (!this.started) {
this.logger.error(
`Kibana Discovery Service couldn't be started and will be retried in ${this.discoveryInterval}ms, error:${e.message}`
`Kibana Discovery Service couldn't be started and will be retried in ${retryInterval}ms, error:${e.message}`
);
} else {
this.logger.error(
`Kibana Discovery Service couldn't update this node's last_seen timestamp. id: ${this.currentNode}, last_seen: ${lastSeen}, error:${e.message}`
);
}
if (e.message.includes('cluster_block_exception')) {
retryInterval = 60000;
}
} finally {
this.timer = setTimeout(
async () => await this.scheduleUpsertCurrentNode(),
// The timeout should not be less than the default timeout of two seconds
Math.max(this.discoveryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
Math.max(retryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT)
);
}
}
Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_update_error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string |
return (error as BulkUpdateError).type;
}
}

export function isClusterBlockException(error: Error | BulkUpdateError): boolean {
return getBulkUpdateErrorType(error) === 'cluster_block_exception';
}
137 changes: 85 additions & 52 deletions x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import { isEsCannotExecuteScriptError } from './identify_es_error';
import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config';
import { TaskCost } from '../task';
import { getMsearchStatusCode } from './msearch_error';
import { getBulkUpdateStatusCode } from './bulk_update_error';
import { getBulkUpdateStatusCode, isClusterBlockException } from './bulk_update_error';

const FLUSH_MARKER = Symbol('flush');
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
export const PREFERRED_MAX_POLL_INTERVAL = 60 * 1000;
export const INTERVAL_AFTER_BLOCK_EXCEPTION = 61 * 1000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it 1 sec longer than the max limit, so I can check the previousPollInterval on error flush and set the interval back to default.


// Capacity is measured in number of normal cost tasks that can be run
// At a minimum, we need to be able to run a single task with the greatest cost
Expand Down Expand Up @@ -46,6 +47,11 @@ interface ManagedConfigurationOpts {
logger: Logger;
}

interface ErrorScanResult {
count: number;
isBlockException: boolean;
}

export interface ManagedConfiguration {
startingCapacity: number;
capacityConfiguration$: Observable<number>;
Expand Down Expand Up @@ -77,7 +83,7 @@ export function createManagedConfiguration({
}

function createCapacityScan(config: TaskManagerConfig, logger: Logger, startingCapacity: number) {
return scan((previousCapacity: number, errorCount: number) => {
return scan((previousCapacity: number, { count: errorCount }: ErrorScanResult) => {
let newCapacity: number;
if (errorCount > 0) {
const minCapacity = getMinCapacity(config);
Expand Down Expand Up @@ -112,52 +118,66 @@ function createCapacityScan(config: TaskManagerConfig, logger: Logger, startingC
}

function createPollIntervalScan(logger: Logger, startingPollInterval: number) {
return scan((previousPollInterval: number, errorCount: number) => {
let newPollInterval: number;
if (errorCount > 0) {
// Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to
// make sure the number is different than previous while not being a decimal value.
// Also ensure we don't go over PREFERRED_MAX_POLL_INTERVAL or startingPollInterval,
// whichever is greater.
newPollInterval = Math.min(
Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE),
Math.ceil(Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval))
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
}
} else {
// Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to
// make sure the number is different than previous while not being a decimal value.
newPollInterval = Math.max(
startingPollInterval,
Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE)
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.max(${startingPollInterval}, Math.floor(${previousPollInterval} * ${POLL_INTERVAL_DECREASE_PERCENTAGE})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
return scan(
(previousPollInterval: number, { count: errorCount, isBlockException }: ErrorScanResult) => {
let newPollInterval: number;
if (isBlockException) {
newPollInterval = INTERVAL_AFTER_BLOCK_EXCEPTION;
} else {
if (errorCount > 0) {
// Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to
// make sure the number is different than previous while not being a decimal value.
// Also ensure we don't go over PREFERRED_MAX_POLL_INTERVAL or startingPollInterval,
// whichever is greater.
newPollInterval = Math.min(
Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE),
Math.ceil(Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval))
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
}
} else {
if (previousPollInterval === INTERVAL_AFTER_BLOCK_EXCEPTION) {
previousPollInterval = startingPollInterval;
ymao1 marked this conversation as resolved.
Show resolved Hide resolved
}
// Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to
// make sure the number is different than previous while not being a decimal value.
newPollInterval = Math.max(
startingPollInterval,
Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE)
);
if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) {
logger.error(
`Poll interval configuration had an issue calculating the new poll interval: Math.max(${startingPollInterval}, Math.floor(${previousPollInterval} * ${POLL_INTERVAL_DECREASE_PERCENTAGE})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})`
);
newPollInterval = previousPollInterval;
}
}
}
}
if (newPollInterval !== previousPollInterval) {
logger.debug(
`Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} "too many request" and/or "execute [inline] script" error(s)`
);
if (previousPollInterval === startingPollInterval) {
logger.warn(
`Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script" error(s).`

if (newPollInterval !== previousPollInterval) {
logger.debug(
`Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s)`
);
if (previousPollInterval === startingPollInterval) {
logger.warn(
`Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script and/or "cluster_block_exception"" error(s).`
);
}
}
}
return newPollInterval;
}, startingPollInterval);
return newPollInterval;
},
startingPollInterval
);
}

function countErrors(errors$: Observable<Error>, countInterval: number): Observable<number> {
function countErrors(
errors$: Observable<Error>,
countInterval: number
): Observable<ErrorScanResult> {
return merge(
// Flush error count at fixed interval
interval(countInterval).pipe(map(() => FLUSH_MARKER)),
Expand All @@ -173,43 +193,56 @@ function countErrors(errors$: Observable<Error>, countInterval: number): Observa
getMsearchStatusCode(e) === 503 ||
getBulkUpdateStatusCode(e) === 429 ||
getBulkUpdateStatusCode(e) === 500 ||
getBulkUpdateStatusCode(e) === 503
getBulkUpdateStatusCode(e) === 503 ||
isClusterBlockException(e)
)
)
).pipe(
// When tag is "flush", reset the error counter
// Otherwise increment the error counter
mergeScan(({ count }, next) => {
mergeScan(({ count, isBlockException }, next) => {
return next === FLUSH_MARKER
? of(emitErrorCount(count), resetErrorCount())
: of(incementErrorCount(count));
}, emitErrorCount(0)),
? of(emitErrorCount(count, isBlockException), resetErrorCount())
: of(incrementOrEmitErrorCount(count, isClusterBlockException(next as Error)));
}, emitErrorCount(0, false)),
filter(isEmitEvent),
map(({ count }) => count)
map(({ count, isBlockException }) => {
return { count, isBlockException };
})
);
}

function emitErrorCount(count: number) {
function emitErrorCount(count: number, isBlockException: boolean) {
return {
tag: 'emit',
isBlockException,
count,
};
}

function isEmitEvent(event: { tag: string; count: number }) {
function isEmitEvent(event: { tag: string; count: number; isBlockException: boolean }) {
return event.tag === 'emit';
}

function incementErrorCount(count: number) {
function incrementOrEmitErrorCount(count: number, isBlockException: boolean) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to emit the error event as soon as possible in case of ClusterBlockException

if (isBlockException) {
return {
tag: 'emit',
isBlockException,
count: count + 1,
};
}
return {
tag: 'inc',
isBlockException,
count: count + 1,
};
}

function resetErrorCount() {
return {
tag: 'initial',
isBlockException: false,
count: 0,
};
}
Expand Down