From 44f8c129f7cb9bd2d933a51253f97755c828217d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Thu, 25 Jul 2019 21:35:42 -0400 Subject: [PATCH] Task manager enhancements for error handling in alerting and actions (#39829) (#42004) * Allow mtask definitions to overwrite default setting maxAttemps * Leverage scheduledAt from task manager * Treat maxAttempts like attempts and not retries * Add support for second intervals * Min 1 attempt * Reverse relying on scheduledAt * Add new startedAt attribute in task manager that keeps track when task started running * Don't extend runAt when claiming a task * Remove startedAt from state * Attempt trying to define custom getBackpressureDelay function * Pass error object to getBackpressureDelay * Cleanup processResultForRecurringTask code * Add backpressure to timed out tasks * Change default timeout backpressure calculation * getBackpressureDelay to return seconds instead of milliseconds * Add comment for task store query * Compress query * Revert alert / actions specific code * Add more interval tests * Fix failing jest tests * Fix test * Add more unit tests * Fix integration tests * Fix sorting of tasks to process * WIP * Always provide error when getBackpressureDelay is called * Rename getBackpressureDelay to getRetryDelay * retryAt to be calculated from timeout time by default * Remove invalid test * Add unit tests * Consider timeout before scheduling a retryAt * Remove backpressure terminology * Remove support for 0 based intervals and timeouts * Apply PR feedback * Fix last place using Math.abs * Modify migrations to allow running a script when converting an index to an alias * Convert task manager to use saved objects * Fix broken test * Fix broken tests pt1 * Remove index from task manager config schema * Accept platform changes * PR feedback * Apply PR feedback * Apply PR feedback pt2 * Apply PR feedback pt3 * Apply PR feedback pt4 * Fix feedback pt3 * Rename RawSavedObjectDoc to SavedObjectsRawDoc --- .../core/server/kibana-plugin-server.md | 3 + ...na-plugin-server.savedobjectsrawdoc._id.md | 11 + ...server.savedobjectsrawdoc._primary_term.md | 11 + ...lugin-server.savedobjectsrawdoc._seq_no.md | 11 + ...lugin-server.savedobjectsrawdoc._source.md | 11 + ...-plugin-server.savedobjectsrawdoc._type.md | 11 + ...kibana-plugin-server.savedobjectsrawdoc.md | 24 + ...server.savedobjectsschema.(constructor).md | 20 + ...rver.savedobjectsschema.getindexfortype.md | 22 + ...-server.savedobjectsschema.ishiddentype.md | 22 + ....savedobjectsschema.isnamespaceagnostic.md | 22 + ...kibana-plugin-server.savedobjectsschema.md | 26 ++ ...er.savedobjectsserializer.(constructor).md | 20 + ...er.savedobjectsserializer.generaterawid.md | 26 ++ ...savedobjectsserializer.israwsavedobject.md | 24 + ...na-plugin-server.savedobjectsserializer.md | 27 ++ ...savedobjectsserializer.rawtosavedobject.md | 24 + ...savedobjectsserializer.savedobjecttoraw.md | 24 + src/core/server/index.ts | 3 + src/core/server/saved_objects/index.ts | 2 + src/core/server/server.api.md | 42 ++ .../alerting/server/alerts_client.test.ts | 21 +- .../server/maps_telemetry/telemetry_task.js | 30 +- .../maps_telemetry/telemetry_task.test.js | 2 +- .../server/lib/get_next_midnight.test.ts | 2 +- .../server/lib/get_next_midnight.ts | 2 +- .../oss_telemetry/server/lib/tasks/index.ts | 28 +- .../tasks/visualizations/task_runner.test.ts | 2 +- x-pack/legacy/plugins/task_manager/README.md | 13 +- .../legacy/plugins/task_manager/constants.ts | 6 +- x-pack/legacy/plugins/task_manager/index.js | 36 +- .../task_manager/lib/intervals.test.ts | 123 ++++- .../plugins/task_manager/lib/intervals.ts | 82 +++- .../task_manager/lib/middleware.test.ts | 6 + .../legacy/plugins/task_manager/mappings.json | 42 ++ .../legacy/plugins/task_manager/migrations.ts | 16 + x-pack/legacy/plugins/task_manager/task.ts | 53 ++- .../plugins/task_manager/task_manager.test.ts | 71 ++- .../plugins/task_manager/task_manager.ts | 69 +-- .../plugins/task_manager/task_poller.test.ts | 32 -- .../plugins/task_manager/task_poller.ts | 8 - .../plugins/task_manager/task_runner.test.ts | 196 +++++++- .../plugins/task_manager/task_runner.ts | 54 ++- .../plugins/task_manager/task_store.test.ts | 442 ++++++++++-------- .../legacy/plugins/task_manager/task_store.ts | 406 +++++----------- .../api_integration/apis/alerting/create.ts | 2 +- .../task_manager/task_manager_integration.js | 2 +- 47 files changed, 1473 insertions(+), 659 deletions(-) create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._id.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._primary_term.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._seq_no.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._source.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._type.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsschema.(constructor).md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsschema.getindexfortype.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsschema.ishiddentype.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsschema.isnamespaceagnostic.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsschema.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsserializer.(constructor).md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsserializer.generaterawid.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsserializer.israwsavedobject.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsserializer.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsserializer.rawtosavedobject.md create mode 100644 docs/development/core/server/kibana-plugin-server.savedobjectsserializer.savedobjecttoraw.md create mode 100644 x-pack/legacy/plugins/task_manager/mappings.json create mode 100644 x-pack/legacy/plugins/task_manager/migrations.ts diff --git a/docs/development/core/server/kibana-plugin-server.md b/docs/development/core/server/kibana-plugin-server.md index 95b6a7793e893..9428544758e13 100644 --- a/docs/development/core/server/kibana-plugin-server.md +++ b/docs/development/core/server/kibana-plugin-server.md @@ -19,6 +19,8 @@ The plugin integrates with the core system via lifecycle events: `setup` | [KibanaRequest](./kibana-plugin-server.kibanarequest.md) | Kibana specific abstraction for an incoming request. | | [Router](./kibana-plugin-server.router.md) | | | [SavedObjectsErrorHelpers](./kibana-plugin-server.savedobjectserrorhelpers.md) | | +| [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) | | +| [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) | | | [ScopedClusterClient](./kibana-plugin-server.scopedclusterclient.md) | Serves the same purpose as "normal" ClusterClient but exposes additional callAsCurrentUser method that doesn't use credentials of the Kibana internal user (as callAsInternalUser does) to request Elasticsearch API, but rather passes HTTP headers extracted from the current user request to the API | ## Interfaces @@ -60,6 +62,7 @@ The plugin integrates with the core system via lifecycle events: `setup` | [SavedObjectsFindOptions](./kibana-plugin-server.savedobjectsfindoptions.md) | | | [SavedObjectsFindResponse](./kibana-plugin-server.savedobjectsfindresponse.md) | | | [SavedObjectsMigrationVersion](./kibana-plugin-server.savedobjectsmigrationversion.md) | A dictionary of saved object type -> version used to determine what migrations need to be applied to a saved object. | +| [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) | A raw document as represented directly in the saved object index. | | [SavedObjectsService](./kibana-plugin-server.savedobjectsservice.md) | | | [SavedObjectsUpdateOptions](./kibana-plugin-server.savedobjectsupdateoptions.md) | | | [SavedObjectsUpdateResponse](./kibana-plugin-server.savedobjectsupdateresponse.md) | | diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._id.md b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._id.md new file mode 100644 index 0000000000000..cd16eadf51931 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._id.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) > [\_id](./kibana-plugin-server.savedobjectsrawdoc._id.md) + +## SavedObjectsRawDoc.\_id property + +Signature: + +```typescript +_id: string; +``` diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._primary_term.md b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._primary_term.md new file mode 100644 index 0000000000000..c5eef82322f58 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._primary_term.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) > [\_primary\_term](./kibana-plugin-server.savedobjectsrawdoc._primary_term.md) + +## SavedObjectsRawDoc.\_primary\_term property + +Signature: + +```typescript +_primary_term?: number; +``` diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._seq_no.md b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._seq_no.md new file mode 100644 index 0000000000000..a3b9a943a708c --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._seq_no.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) > [\_seq\_no](./kibana-plugin-server.savedobjectsrawdoc._seq_no.md) + +## SavedObjectsRawDoc.\_seq\_no property + +Signature: + +```typescript +_seq_no?: number; +``` diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._source.md b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._source.md new file mode 100644 index 0000000000000..1babaab14f14d --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._source.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) > [\_source](./kibana-plugin-server.savedobjectsrawdoc._source.md) + +## SavedObjectsRawDoc.\_source property + +Signature: + +```typescript +_source: any; +``` diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._type.md b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._type.md new file mode 100644 index 0000000000000..31c40e15b53c0 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc._type.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) > [\_type](./kibana-plugin-server.savedobjectsrawdoc._type.md) + +## SavedObjectsRawDoc.\_type property + +Signature: + +```typescript +_type?: string; +``` diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc.md b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc.md new file mode 100644 index 0000000000000..5864a85465396 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsrawdoc.md @@ -0,0 +1,24 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsRawDoc](./kibana-plugin-server.savedobjectsrawdoc.md) + +## SavedObjectsRawDoc interface + +A raw document as represented directly in the saved object index. + +Signature: + +```typescript +export interface RawDoc +``` + +## Properties + +| Property | Type | Description | +| --- | --- | --- | +| [\_id](./kibana-plugin-server.savedobjectsrawdoc._id.md) | string | | +| [\_primary\_term](./kibana-plugin-server.savedobjectsrawdoc._primary_term.md) | number | | +| [\_seq\_no](./kibana-plugin-server.savedobjectsrawdoc._seq_no.md) | number | | +| [\_source](./kibana-plugin-server.savedobjectsrawdoc._source.md) | any | | +| [\_type](./kibana-plugin-server.savedobjectsrawdoc._type.md) | string | | + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsschema.(constructor).md b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.(constructor).md new file mode 100644 index 0000000000000..abac3bc88fac1 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.(constructor).md @@ -0,0 +1,20 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) > [(constructor)](./kibana-plugin-server.savedobjectsschema.(constructor).md) + +## SavedObjectsSchema.(constructor) + +Constructs a new instance of the `SavedObjectsSchema` class + +Signature: + +```typescript +constructor(schemaDefinition?: SavedObjectsSchemaDefinition); +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| schemaDefinition | SavedObjectsSchemaDefinition | | + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsschema.getindexfortype.md b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.getindexfortype.md new file mode 100644 index 0000000000000..3c9b810cfe1a6 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.getindexfortype.md @@ -0,0 +1,22 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) > [getIndexForType](./kibana-plugin-server.savedobjectsschema.getindexfortype.md) + +## SavedObjectsSchema.getIndexForType() method + +Signature: + +```typescript +getIndexForType(type: string): string | undefined; +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| type | string | | + +Returns: + +`string | undefined` + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsschema.ishiddentype.md b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.ishiddentype.md new file mode 100644 index 0000000000000..f67b12a4d14c3 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.ishiddentype.md @@ -0,0 +1,22 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) > [isHiddenType](./kibana-plugin-server.savedobjectsschema.ishiddentype.md) + +## SavedObjectsSchema.isHiddenType() method + +Signature: + +```typescript +isHiddenType(type: string): boolean; +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| type | string | | + +Returns: + +`boolean` + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsschema.isnamespaceagnostic.md b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.isnamespaceagnostic.md new file mode 100644 index 0000000000000..2ca0abd7e4aa7 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.isnamespaceagnostic.md @@ -0,0 +1,22 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) > [isNamespaceAgnostic](./kibana-plugin-server.savedobjectsschema.isnamespaceagnostic.md) + +## SavedObjectsSchema.isNamespaceAgnostic() method + +Signature: + +```typescript +isNamespaceAgnostic(type: string): boolean; +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| type | string | | + +Returns: + +`boolean` + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsschema.md b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.md new file mode 100644 index 0000000000000..1b9cb2ad94c22 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsschema.md @@ -0,0 +1,26 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSchema](./kibana-plugin-server.savedobjectsschema.md) + +## SavedObjectsSchema class + +Signature: + +```typescript +export declare class SavedObjectsSchema +``` + +## Constructors + +| Constructor | Modifiers | Description | +| --- | --- | --- | +| [(constructor)(schemaDefinition)](./kibana-plugin-server.savedobjectsschema.(constructor).md) | | Constructs a new instance of the SavedObjectsSchema class | + +## Methods + +| Method | Modifiers | Description | +| --- | --- | --- | +| [getIndexForType(type)](./kibana-plugin-server.savedobjectsschema.getindexfortype.md) | | | +| [isHiddenType(type)](./kibana-plugin-server.savedobjectsschema.ishiddentype.md) | | | +| [isNamespaceAgnostic(type)](./kibana-plugin-server.savedobjectsschema.isnamespaceagnostic.md) | | | + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.(constructor).md b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.(constructor).md new file mode 100644 index 0000000000000..6524ff3e17caf --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.(constructor).md @@ -0,0 +1,20 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) > [(constructor)](./kibana-plugin-server.savedobjectsserializer.(constructor).md) + +## SavedObjectsSerializer.(constructor) + +Constructs a new instance of the `SavedObjectsSerializer` class + +Signature: + +```typescript +constructor(schema: SavedObjectsSchema); +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| schema | SavedObjectsSchema | | + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.generaterawid.md b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.generaterawid.md new file mode 100644 index 0000000000000..4705f48a201ae --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.generaterawid.md @@ -0,0 +1,26 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) > [generateRawId](./kibana-plugin-server.savedobjectsserializer.generaterawid.md) + +## SavedObjectsSerializer.generateRawId() method + +Given a saved object type and id, generates the compound id that is stored in the raw document. + +Signature: + +```typescript +generateRawId(namespace: string | undefined, type: string, id?: string): string; +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| namespace | string | undefined | | +| type | string | | +| id | string | | + +Returns: + +`string` + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.israwsavedobject.md b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.israwsavedobject.md new file mode 100644 index 0000000000000..e190e7bce8c01 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.israwsavedobject.md @@ -0,0 +1,24 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) > [isRawSavedObject](./kibana-plugin-server.savedobjectsserializer.israwsavedobject.md) + +## SavedObjectsSerializer.isRawSavedObject() method + +Determines whether or not the raw document can be converted to a saved object. + +Signature: + +```typescript +isRawSavedObject(rawDoc: RawDoc): any; +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| rawDoc | RawDoc | | + +Returns: + +`any` + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.md b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.md new file mode 100644 index 0000000000000..205e29cb0727d --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.md @@ -0,0 +1,27 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) + +## SavedObjectsSerializer class + +Signature: + +```typescript +export declare class SavedObjectsSerializer +``` + +## Constructors + +| Constructor | Modifiers | Description | +| --- | --- | --- | +| [(constructor)(schema)](./kibana-plugin-server.savedobjectsserializer.(constructor).md) | | Constructs a new instance of the SavedObjectsSerializer class | + +## Methods + +| Method | Modifiers | Description | +| --- | --- | --- | +| [generateRawId(namespace, type, id)](./kibana-plugin-server.savedobjectsserializer.generaterawid.md) | | Given a saved object type and id, generates the compound id that is stored in the raw document. | +| [isRawSavedObject(rawDoc)](./kibana-plugin-server.savedobjectsserializer.israwsavedobject.md) | | Determines whether or not the raw document can be converted to a saved object. | +| [rawToSavedObject(doc)](./kibana-plugin-server.savedobjectsserializer.rawtosavedobject.md) | | Converts a document from the format that is stored in elasticsearch to the saved object client format. | +| [savedObjectToRaw(savedObj)](./kibana-plugin-server.savedobjectsserializer.savedobjecttoraw.md) | | Converts a document from the saved object client format to the format that is stored in elasticsearch. | + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.rawtosavedobject.md b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.rawtosavedobject.md new file mode 100644 index 0000000000000..b36cdb3be64da --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.rawtosavedobject.md @@ -0,0 +1,24 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) > [rawToSavedObject](./kibana-plugin-server.savedobjectsserializer.rawtosavedobject.md) + +## SavedObjectsSerializer.rawToSavedObject() method + +Converts a document from the format that is stored in elasticsearch to the saved object client format. + +Signature: + +```typescript +rawToSavedObject(doc: RawDoc): SanitizedSavedObjectDoc; +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| doc | RawDoc | | + +Returns: + +`SanitizedSavedObjectDoc` + diff --git a/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.savedobjecttoraw.md b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.savedobjecttoraw.md new file mode 100644 index 0000000000000..4854a97a845b8 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-server.savedobjectsserializer.savedobjecttoraw.md @@ -0,0 +1,24 @@ + + +[Home](./index.md) > [kibana-plugin-server](./kibana-plugin-server.md) > [SavedObjectsSerializer](./kibana-plugin-server.savedobjectsserializer.md) > [savedObjectToRaw](./kibana-plugin-server.savedobjectsserializer.savedobjecttoraw.md) + +## SavedObjectsSerializer.savedObjectToRaw() method + +Converts a document from the saved object client format to the format that is stored in elasticsearch. + +Signature: + +```typescript +savedObjectToRaw(savedObj: SanitizedSavedObjectDoc): RawDoc; +``` + +## Parameters + +| Parameter | Type | Description | +| --- | --- | --- | +| savedObj | SanitizedSavedObjectDoc | | + +Returns: + +`RawDoc` + diff --git a/src/core/server/index.ts b/src/core/server/index.ts index b0a8e554ef8ae..8d0ebb0c73ac8 100644 --- a/src/core/server/index.ts +++ b/src/core/server/index.ts @@ -103,6 +103,9 @@ export { SavedObjectsFindOptions, SavedObjectsFindResponse, SavedObjectsMigrationVersion, + SavedObjectsRawDoc, + SavedObjectsSchema, + SavedObjectsSerializer, SavedObjectsService, SavedObjectsUpdateOptions, SavedObjectsUpdateResponse, diff --git a/src/core/server/saved_objects/index.ts b/src/core/server/saved_objects/index.ts index e6e9e2d266000..623c722eb95b1 100644 --- a/src/core/server/saved_objects/index.ts +++ b/src/core/server/saved_objects/index.ts @@ -22,3 +22,5 @@ export * from './service'; export { SavedObjectsSchema } from './schema'; export { SavedObjectsManagement } from './management'; + +export { SavedObjectsSerializer, RawDoc as SavedObjectsRawDoc } from './serialization'; diff --git a/src/core/server/server.api.md b/src/core/server/server.api.md index d0bf8c704b29c..e7d3a379c948c 100644 --- a/src/core/server/server.api.md +++ b/src/core/server/server.api.md @@ -634,6 +634,48 @@ export interface SavedObjectsMigrationVersion { [pluginName: string]: string; } +// Warning: (ae-missing-release-tag) "RawDoc" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal) +// +// @public +export interface SavedObjectsRawDoc { + // (undocumented) + _id: string; + // (undocumented) + _primary_term?: number; + // (undocumented) + _seq_no?: number; + // (undocumented) + _source: any; + // (undocumented) + _type?: string; +} + +// Warning: (ae-missing-release-tag) "SavedObjectsSchema" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal) +// +// @public (undocumented) +export class SavedObjectsSchema { + // Warning: (ae-forgotten-export) The symbol "SavedObjectsSchemaDefinition" needs to be exported by the entry point index.d.ts + constructor(schemaDefinition?: SavedObjectsSchemaDefinition); + // (undocumented) + getIndexForType(type: string): string | undefined; + // (undocumented) + isHiddenType(type: string): boolean; + // (undocumented) + isNamespaceAgnostic(type: string): boolean; +} + +// Warning: (ae-missing-release-tag) "SavedObjectsSerializer" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal) +// +// @public (undocumented) +export class SavedObjectsSerializer { + constructor(schema: SavedObjectsSchema); + generateRawId(namespace: string | undefined, type: string, id?: string): string; + isRawSavedObject(rawDoc: SavedObjectsRawDoc): any; + // Warning: (ae-forgotten-export) The symbol "SanitizedSavedObjectDoc" needs to be exported by the entry point index.d.ts + rawToSavedObject(doc: SavedObjectsRawDoc): SanitizedSavedObjectDoc; + savedObjectToRaw(savedObj: SanitizedSavedObjectDoc): SavedObjectsRawDoc; + } + // @public (undocumented) export interface SavedObjectsService { // Warning: (ae-forgotten-export) The symbol "ScopedSavedObjectsClientProvider" needs to be exported by the entry point index.d.ts diff --git a/x-pack/legacy/plugins/alerting/server/alerts_client.test.ts b/x-pack/legacy/plugins/alerting/server/alerts_client.test.ts index 2041ab7cfcefe..4421ea7435c98 100644 --- a/x-pack/legacy/plugins/alerting/server/alerts_client.test.ts +++ b/x-pack/legacy/plugins/alerting/server/alerts_client.test.ts @@ -94,12 +94,12 @@ describe('create()', () => { taskManager.schedule.mockResolvedValueOnce({ id: 'task-123', taskType: 'alerting:123', - sequenceNumber: 1, - primaryTerm: 1, scheduledAt: new Date(), attempts: 1, status: 'idle', runAt: new Date(), + startedAt: null, + retryAt: null, state: {}, params: {}, }); @@ -437,8 +437,6 @@ describe('enable()', () => { }); taskManager.schedule.mockResolvedValueOnce({ id: 'task-123', - sequenceNumber: 1, - primaryTerm: 1, scheduledAt: new Date(), attempts: 0, status: 'idle', @@ -446,6 +444,8 @@ describe('enable()', () => { state: {}, params: {}, taskType: '', + startedAt: null, + retryAt: null, }); await alertsClient.enable({ id: '1' }); @@ -737,19 +737,8 @@ describe('delete()', () => { savedObjectsClient.delete.mockResolvedValueOnce({ success: true, }); - taskManager.remove.mockResolvedValueOnce({ - index: '.task_manager', - id: 'task-123', - sequenceNumber: 1, - primaryTerm: 1, - result: '', - }); const result = await alertsClient.delete({ id: '1' }); - expect(result).toMatchInlineSnapshot(` - Object { - "success": true, - } - `); + expect(result).toEqual({ success: true }); expect(savedObjectsClient.delete).toHaveBeenCalledTimes(1); expect(savedObjectsClient.delete.mock.calls[0]).toMatchInlineSnapshot(` Array [ diff --git a/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.js b/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.js index 7fbbe8ef77ff5..1f1a9e369a952 100644 --- a/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.js +++ b/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.js @@ -13,16 +13,24 @@ export const TASK_ID = `Maps-${TELEMETRY_TASK_TYPE}`; export function scheduleTask(server, taskManager) { const { kbnServer } = server.plugins.xpack_main.status.plugin; - kbnServer.afterPluginsInit(async () => { - try { - await taskManager.schedule({ - id: TASK_ID, - taskType: TELEMETRY_TASK_TYPE, - state: { stats: {}, runs: 0 }, - }); - }catch(e) { - server.log(['warning', 'maps'], `Error scheduling telemetry task, received ${e.message}`); - } + kbnServer.afterPluginsInit(() => { + // The code block below can't await directly within "afterPluginsInit" + // callback due to circular dependency. The server isn't "ready" until + // this code block finishes. Migrations wait for server to be ready before + // executing. Saved objects repository waits for migrations to finish before + // finishing the request. To avoid this, we'll await within a separate + // function block. + (async () => { + try { + await taskManager.schedule({ + id: TASK_ID, + taskType: TELEMETRY_TASK_TYPE, + state: { stats: {}, runs: 0 }, + }); + }catch(e) { + server.log(['warning', 'maps'], `Error scheduling telemetry task, received ${e.message}`); + } + })(); }); } @@ -73,5 +81,5 @@ export function getNextMidnight() { const nextMidnight = new Date(); nextMidnight.setHours(0, 0, 0, 0); nextMidnight.setDate(nextMidnight.getDate() + 1); - return nextMidnight.toISOString(); + return nextMidnight; } diff --git a/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.test.js b/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.test.js index 58d53d35260c7..1e149ed62b09f 100644 --- a/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.test.js +++ b/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.test.js @@ -25,7 +25,7 @@ describe('telemetryTaskRunner', () => { moment() .add(1, 'days') .startOf('day') - .toISOString(); + .toDate(); const getRunner = telemetryTaskRunner(); const runResult = await getRunner( diff --git a/x-pack/legacy/plugins/oss_telemetry/server/lib/get_next_midnight.test.ts b/x-pack/legacy/plugins/oss_telemetry/server/lib/get_next_midnight.test.ts index d8df006a55b7f..c20e4a0b4be54 100644 --- a/x-pack/legacy/plugins/oss_telemetry/server/lib/get_next_midnight.test.ts +++ b/x-pack/legacy/plugins/oss_telemetry/server/lib/get_next_midnight.test.ts @@ -12,7 +12,7 @@ describe('getNextMidnight', () => { const nextMidnightMoment = moment() .add(1, 'days') .startOf('day') - .toISOString(); + .toDate(); expect(getNextMidnight()).toEqual(nextMidnightMoment); }); diff --git a/x-pack/legacy/plugins/oss_telemetry/server/lib/get_next_midnight.ts b/x-pack/legacy/plugins/oss_telemetry/server/lib/get_next_midnight.ts index c286af1854b6a..a5ee8d572343c 100644 --- a/x-pack/legacy/plugins/oss_telemetry/server/lib/get_next_midnight.ts +++ b/x-pack/legacy/plugins/oss_telemetry/server/lib/get_next_midnight.ts @@ -8,5 +8,5 @@ export function getNextMidnight() { const nextMidnight = new Date(); nextMidnight.setHours(0, 0, 0, 0); nextMidnight.setDate(nextMidnight.getDate() + 1); - return nextMidnight.toISOString(); + return nextMidnight; } diff --git a/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/index.ts b/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/index.ts index 8c0771aba1150..bc00e39a2886c 100644 --- a/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/index.ts +++ b/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/index.ts @@ -29,15 +29,23 @@ export function scheduleTasks(server: HapiServer) { const { taskManager } = server; const { kbnServer } = server.plugins.xpack_main.status.plugin; - kbnServer.afterPluginsInit(async () => { - try { - await taskManager.schedule({ - id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}`, - taskType: VIS_TELEMETRY_TASK, - state: { stats: {}, runs: 0 }, - }); - } catch (e) { - server.log(['warning', 'telemetry'], `Error scheduling task, received ${e.message}`); - } + kbnServer.afterPluginsInit(() => { + // The code block below can't await directly within "afterPluginsInit" + // callback due to circular dependency. The server isn't "ready" until + // this code block finishes. Migrations wait for server to be ready before + // executing. Saved objects repository waits for migrations to finish before + // finishing the request. To avoid this, we'll await within a separate + // function block. + (async () => { + try { + await taskManager.schedule({ + id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}`, + taskType: VIS_TELEMETRY_TASK, + state: { stats: {}, runs: 0 }, + }); + } catch (e) { + server.log(['warning', 'telemetry'], `Error scheduling task, received ${e.message}`); + } + })(); }); } diff --git a/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/visualizations/task_runner.test.ts b/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/visualizations/task_runner.test.ts index 879846aeed1ae..f7e3ae7091c73 100644 --- a/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/visualizations/task_runner.test.ts +++ b/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/visualizations/task_runner.test.ts @@ -43,7 +43,7 @@ describe('visualizationsTaskRunner', () => { moment() .add(1, 'days') .startOf('day') - .toISOString(); + .toDate(); const runner = visualizationsTaskRunner(mockTaskInstance, { server: mockKbnServer }); const result = await runner(); diff --git a/x-pack/legacy/plugins/task_manager/README.md b/x-pack/legacy/plugins/task_manager/README.md index 54992424f9fef..a5534dc61e81d 100644 --- a/x-pack/legacy/plugins/task_manager/README.md +++ b/x-pack/legacy/plugins/task_manager/README.md @@ -19,7 +19,8 @@ At a high-level, the task manager works like this: - `attempts` is less than the configured threshold - Attempt to claim the task by using optimistic concurrency to set: - status to `running` - - `runAt` to now + the timeout specified by the task + - `startedAt` to now + - `retryAt` to next time task should retry if it times out and is still in `running` status - Execute the task, if the previous claim succeeded - If the task fails, increment the `attempts` count and reschedule it - If the task succeeds: @@ -38,7 +39,7 @@ If a task specifies a higher `numWorkers` than the system supports, the system's The task_manager can be configured via `taskManager` config options (e.g. `taskManager.maxAttempts`): -- `max_attempts` - How many times a failing task instance will be retried before it is never run again +- `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed - `poll_interval` - How often the background worker should check the task_manager index for more work - `index` - The name of the index that the task_manager - `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10) @@ -64,11 +65,15 @@ taskManager.registerTaskDefinitions({ // Optional, human-friendly, more detailed description description: 'Amazing!!', - // Optional, how long, in minutes, the system should wait before + // Optional, how long, in minutes or seconds, the system should wait before // a running instance of this task is considered to be timed out. // This defaults to 5 minutes. timeout: '5m', + // Optional, how many attempts before marking task as failed. + // This defaults to what is configured at the task manager level. + maxAttempts: 5, + // The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots, // 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is // overridden by the `override_num_workers` config value, if specified. @@ -161,7 +166,7 @@ The data stored for a task instance looks something like this: runAt: "2020-07-24T17:34:35.272Z", // Indicates that this is a recurring task. We currently only support - // 1 minute granularity. + // minute syntax `5m` or second syntax `10s`. interval: '5m', // How many times this task has been unsuccesfully attempted, diff --git a/x-pack/legacy/plugins/task_manager/constants.ts b/x-pack/legacy/plugins/task_manager/constants.ts index a43ac1893b685..1b0d0b001071c 100644 --- a/x-pack/legacy/plugins/task_manager/constants.ts +++ b/x-pack/legacy/plugins/task_manager/constants.ts @@ -4,8 +4,4 @@ * you may not use this file except in compliance with the Elastic License. */ -import xPackage from '../../../package.json'; -import { getTemplateVersion } from './lib/get_template_version'; - -export const TASK_MANAGER_API_VERSION = 1; -export const TASK_MANAGER_TEMPLATE_VERSION = getTemplateVersion(xPackage.version); +export const TASK_MANAGER_INDEX = '.kibana_task_manager'; diff --git a/x-pack/legacy/plugins/task_manager/index.js b/x-pack/legacy/plugins/task_manager/index.js index e0116820a3e05..ba92e9db50601 100644 --- a/x-pack/legacy/plugins/task_manager/index.js +++ b/x-pack/legacy/plugins/task_manager/index.js @@ -4,7 +4,11 @@ * you may not use this file except in compliance with the Elastic License. */ +import { SavedObjectsSerializer, SavedObjectsSchema } from '../../../../src/core/server'; import { TaskManager } from './task_manager'; +import mappings from './mappings.json'; +import { migrations } from './migrations'; +import { TASK_MANAGER_INDEX } from './constants'; export function taskManager(kibana) { return new kibana.Plugin({ @@ -16,15 +20,12 @@ export function taskManager(kibana) { enabled: Joi.boolean().default(true), max_attempts: Joi.number() .description('The maximum number of times a task will be attempted before being abandoned as failed') - .min(0) // no retries + .min(1) .default(3), poll_interval: Joi.number() .description('How often, in milliseconds, the task manager will look for more work.') .min(1000) .default(3000), - index: Joi.string() - .description('The name of the index used to store task information.') - .default('.kibana_task_manager'), max_workers: Joi.number() .description('The maximum number of tasks that this Kibana instance will run simultaneously.') .min(1) // disable the task manager rather than trying to specify it with 0 workers @@ -37,8 +38,33 @@ export function taskManager(kibana) { }, init(server) { const config = server.config(); - const taskManager = new TaskManager(this.kbnServer, server, config); + const schema = new SavedObjectsSchema(this.kbnServer.uiExports.savedObjectSchemas); + const serializer = new SavedObjectsSerializer(schema); + const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin'); + const savedObjectsRepository = server.savedObjects.getSavedObjectsRepository( + callWithInternalUser, + ['task'] + ); + + const taskManager = new TaskManager({ + kbnServer: this.kbnServer, + config, + savedObjectsRepository, + serializer, + }); server.decorate('server', 'taskManager', taskManager); }, + uiExports: { + mappings, + migrations, + savedObjectSchemas: { + task: { + hidden: true, + isNamespaceAgnostic: true, + indexPattern: TASK_MANAGER_INDEX, + convertToAliasScript: `ctx._id = ctx._source.type + ':' + ctx._id`, + }, + }, + }, }); } diff --git a/x-pack/legacy/plugins/task_manager/lib/intervals.test.ts b/x-pack/legacy/plugins/task_manager/lib/intervals.test.ts index bba5cdf8591ff..a8e186c397d76 100644 --- a/x-pack/legacy/plugins/task_manager/lib/intervals.test.ts +++ b/x-pack/legacy/plugins/task_manager/lib/intervals.test.ts @@ -5,7 +5,24 @@ */ import _ from 'lodash'; -import { assertValidInterval, intervalFromNow, minutesFromNow } from './intervals'; +import sinon from 'sinon'; +import { + assertValidInterval, + intervalFromNow, + intervalFromDate, + minutesFromNow, + minutesFromDate, + secondsFromNow, + secondsFromDate, +} from './intervals'; + +let fakeTimer: sinon.SinonFakeTimers; + +beforeAll(() => { + fakeTimer = sinon.useFakeTimers(); +}); + +afterAll(() => fakeTimer.restore()); describe('taskIntervals', () => { describe('assertValidInterval', () => { @@ -13,7 +30,20 @@ describe('taskIntervals', () => { expect(() => assertValidInterval(`${_.random(1000)}m`)).not.toThrow(); }); - test('it rejects intervals are not of the form `Nm`', () => { + test('it accepts intervals in the form `Ns`', () => { + expect(() => assertValidInterval(`${_.random(1000)}s`)).not.toThrow(); + }); + + test('it rejects 0 based intervals', () => { + expect(() => assertValidInterval('0m')).toThrow( + /Invalid interval "0m"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => assertValidInterval('0s')).toThrow( + /Invalid interval "0s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); + + test('it rejects intervals are not of the form `Nm` or `Ns`', () => { expect(() => assertValidInterval(`5m 2s`)).toThrow( /Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/ ); @@ -28,10 +58,17 @@ describe('taskIntervals', () => { const mins = _.random(1, 100); const expected = Date.now() + mins * 60 * 1000; const nextRun = intervalFromNow(`${mins}m`)!.getTime(); - expect(Math.abs(nextRun - expected)).toBeLessThan(100); + expect(nextRun).toEqual(expected); }); - test('it rejects intervals are not of the form `Nm`', () => { + test('it returns the current date plus n seconds', () => { + const secs = _.random(1, 100); + const expected = Date.now() + secs * 1000; + const nextRun = intervalFromNow(`${secs}s`)!.getTime(); + expect(nextRun).toEqual(expected); + }); + + test('it rejects intervals are not of the form `Nm` or `Ns`', () => { expect(() => intervalFromNow(`5m 2s`)).toThrow( /Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/ ); @@ -39,6 +76,53 @@ describe('taskIntervals', () => { /Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/ ); }); + + test('it rejects 0 based intervals', () => { + expect(() => intervalFromNow('0m')).toThrow( + /Invalid interval "0m"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => intervalFromNow('0s')).toThrow( + /Invalid interval "0s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); + }); + + describe('intervalFromDate', () => { + test('it returns the given date plus n minutes', () => { + const originalDate = new Date(2019, 1, 1); + const mins = _.random(1, 100); + const expected = originalDate.valueOf() + mins * 60 * 1000; + const nextRun = intervalFromDate(originalDate, `${mins}m`)!.getTime(); + expect(expected).toEqual(nextRun); + }); + + test('it returns the current date plus n seconds', () => { + const originalDate = new Date(2019, 1, 1); + const secs = _.random(1, 100); + const expected = originalDate.valueOf() + secs * 1000; + const nextRun = intervalFromDate(originalDate, `${secs}s`)!.getTime(); + expect(expected).toEqual(nextRun); + }); + + test('it rejects intervals are not of the form `Nm` or `Ns`', () => { + const date = new Date(); + expect(() => intervalFromDate(date, `5m 2s`)).toThrow( + /Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => intervalFromDate(date, `hello`)).toThrow( + /Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); + + test('it rejects 0 based intervals', () => { + const date = new Date(); + expect(() => intervalFromDate(date, '0m')).toThrow( + /Invalid interval "0m"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => intervalFromDate(date, '0s')).toThrow( + /Invalid interval "0s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); }); describe('minutesFromNow', () => { @@ -46,7 +130,36 @@ describe('taskIntervals', () => { const mins = _.random(1, 100); const expected = Date.now() + mins * 60 * 1000; const nextRun = minutesFromNow(mins).getTime(); - expect(Math.abs(nextRun - expected)).toBeLessThan(100); + expect(nextRun).toEqual(expected); + }); + }); + + describe('minutesFromDate', () => { + test('it returns the given date plus a number of minutes', () => { + const originalDate = new Date(2019, 1, 1); + const mins = _.random(1, 100); + const expected = originalDate.valueOf() + mins * 60 * 1000; + const nextRun = minutesFromDate(originalDate, mins).getTime(); + expect(expected).toEqual(nextRun); + }); + }); + + describe('secondsFromNow', () => { + test('it returns the current date plus a number of seconds', () => { + const secs = _.random(1, 100); + const expected = Date.now() + secs * 1000; + const nextRun = secondsFromNow(secs).getTime(); + expect(nextRun).toEqual(expected); + }); + }); + + describe('secondsFromDate', () => { + test('it returns the given date plus a number of seconds', () => { + const originalDate = new Date(2019, 1, 1); + const secs = _.random(1, 100); + const expected = originalDate.valueOf() + secs * 1000; + const nextRun = secondsFromDate(originalDate, secs).getTime(); + expect(expected).toEqual(nextRun); }); }); }); diff --git a/x-pack/legacy/plugins/task_manager/lib/intervals.ts b/x-pack/legacy/plugins/task_manager/lib/intervals.ts index f095c336098f9..9009be5f78220 100644 --- a/x-pack/legacy/plugins/task_manager/lib/intervals.ts +++ b/x-pack/legacy/plugins/task_manager/lib/intervals.ts @@ -6,7 +6,7 @@ /** * Returns a date that is the specified interval from now. Currently, - * only minute-intervals are supported. + * only minute-intervals and second-intervals are supported. * * @param {string} interval - An interval of the form `Nm` such as `5m` */ @@ -17,29 +17,91 @@ export function intervalFromNow(interval?: string): Date | undefined { assertValidInterval(interval); + if (isSeconds(interval)) { + return secondsFromNow(parseInterval(interval)); + } + return minutesFromNow(parseInterval(interval)); } +/** + * Returns a date that is the specified interval from given date. Currently, + * only minute-intervals and second-intervals are supported. + * + * @param {Date} date - The date to add interval to + * @param {string} interval - An interval of the form `Nm` such as `5m` + */ +export function intervalFromDate(date: Date, interval?: string): Date | undefined { + if (interval === undefined) { + return; + } + + assertValidInterval(interval); + + if (isSeconds(interval)) { + return secondsFromDate(date, parseInterval(interval)); + } + + return minutesFromDate(date, parseInterval(interval)); +} + /** * Returns a date that is mins minutes from now. * * @param mins The number of mintues from now */ export function minutesFromNow(mins: number): Date { - const now = new Date(); + return minutesFromDate(new Date(), mins); +} + +/** + * Returns a date that is mins minutes from given date. + * + * @param date The date to add minutes to + * @param mins The number of mintues from given date + */ +export function minutesFromDate(date: Date, mins: number): Date { + const result = new Date(date.valueOf()); + + result.setMinutes(result.getMinutes() + mins); + + return result; +} + +/** + * Returns a date that is secs seconds from now. + * + * @param secs The number of seconds from now + */ +export function secondsFromNow(secs: number): Date { + return secondsFromDate(new Date(), secs); +} + +/** + * Returns a date that is secs seconds from given date. + * + * @param date The date to add seconds to + * @param secs The number of seconds from given date + */ +export function secondsFromDate(date: Date, secs: number): Date { + const result = new Date(date.valueOf()); - now.setMinutes(now.getMinutes() + mins); + result.setSeconds(result.getSeconds() + secs); - return now; + return result; } /** * Verifies that the specified interval matches our expected format. * - * @param {string} interval - An interval such as `5m` + * @param {string} interval - An interval such as `5m` or `10s` */ export function assertValidInterval(interval: string) { - if (/^[0-9]+m$/.test(interval)) { + if (isMinutes(interval)) { + return interval; + } + + if (isSeconds(interval)) { return interval; } @@ -51,3 +113,11 @@ export function assertValidInterval(interval: string) { function parseInterval(interval: string) { return parseInt(interval, 10); } + +function isMinutes(interval: string) { + return /^[1-9][0-9]*m$/.test(interval); +} + +function isSeconds(interval: string) { + return /^[1-9][0-9]*s$/.test(interval); +} diff --git a/x-pack/legacy/plugins/task_manager/lib/middleware.test.ts b/x-pack/legacy/plugins/task_manager/lib/middleware.test.ts index 4add3a81501a1..ff840061285c4 100644 --- a/x-pack/legacy/plugins/task_manager/lib/middleware.test.ts +++ b/x-pack/legacy/plugins/task_manager/lib/middleware.test.ts @@ -26,6 +26,8 @@ const getMockConcreteTaskInstance = () => { status: TaskStatus; runAt: Date; scheduledAt: Date; + startedAt: Date | null; + retryAt: Date | null; state: any; taskType: string; params: any; @@ -37,6 +39,8 @@ const getMockConcreteTaskInstance = () => { status: 'idle', runAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()), scheduledAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()), + startedAt: null, + retryAt: null, state: {}, taskType: 'nice_task', params: { abc: 'def' }, @@ -153,9 +157,11 @@ Object { "abc": "def", }, "primaryTerm": 1, + "retryAt": null, "runAt": 2018-09-18T05:33:09.588Z, "scheduledAt": 2018-09-18T05:33:09.588Z, "sequenceNumber": 1, + "startedAt": null, "state": Object {}, "status": "idle", "taskType": "nice_task", diff --git a/x-pack/legacy/plugins/task_manager/mappings.json b/x-pack/legacy/plugins/task_manager/mappings.json new file mode 100644 index 0000000000000..6638a75d82546 --- /dev/null +++ b/x-pack/legacy/plugins/task_manager/mappings.json @@ -0,0 +1,42 @@ +{ + "task": { + "properties": { + "taskType": { + "type": "keyword" + }, + "scheduledAt": { + "type": "date" + }, + "runAt": { + "type": "date" + }, + "startedAt": { + "type": "date" + }, + "retryAt": { + "type": "date" + }, + "interval": { + "type": "text" + }, + "attempts": { + "type": "integer" + }, + "status": { + "type": "keyword" + }, + "params": { + "type": "text" + }, + "state": { + "type": "text" + }, + "user": { + "type": "keyword" + }, + "scope": { + "type": "keyword" + } + } + } +} diff --git a/x-pack/legacy/plugins/task_manager/migrations.ts b/x-pack/legacy/plugins/task_manager/migrations.ts new file mode 100644 index 0000000000000..dd6651fddb90a --- /dev/null +++ b/x-pack/legacy/plugins/task_manager/migrations.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { SavedObject } from 'src/core/server'; + +export const migrations = { + task: { + '7.4.0': (doc: SavedObject) => ({ + ...doc, + updated_at: new Date().toISOString(), + }), + }, +}; diff --git a/x-pack/legacy/plugins/task_manager/task.ts b/x-pack/legacy/plugins/task_manager/task.ts index 9b7191491a27e..dfb8ce6c5f9f9 100644 --- a/x-pack/legacy/plugins/task_manager/task.ts +++ b/x-pack/legacy/plugins/task_manager/task.ts @@ -94,13 +94,25 @@ export interface TaskDefinition { description?: string; /** - * How long, in minutes, the system should wait for the task to complete + * How long, in minutes or seconds, the system should wait for the task to complete * before it is considered to be timed out. (e.g. '5m', the default). If * the task takes longer than this, Kibana will send it a kill command and * the task will be re-attempted. */ timeout?: string; + /** + * Up to how many times the task should retry when it fails to run. This will + * default to the global variable. + */ + maxAttempts?: number; + + /** + * Function that returns the delay in seconds to wait before attempting the + * failed task again. + */ + getRetryDelay?: (attempts: number, error: object) => number; + /** * The numer of workers / slots a running instance of this task occupies. * This defaults to 1. @@ -126,10 +138,14 @@ export const validateTaskDefinition = Joi.object({ title: Joi.string().optional(), description: Joi.string().optional(), timeout: Joi.string().default('5m'), + maxAttempts: Joi.number() + .min(1) + .optional(), numWorkers: Joi.number() .min(1) .default(1), createTaskRunner: Joi.func().required(), + getRetryDelay: Joi.func().optional(), }).default(); /** @@ -164,6 +180,19 @@ export interface TaskInstance { */ scheduledAt?: Date; + /** + * The date and time that this task started execution. This is used to determine + * the "real" runAt that ended up running the task. This value is only set + * when status is set to "running". + */ + startedAt?: Date | null; + + /** + * The date and time that this task should re-execute if stuck in "running" / timeout + * status. This value is only set when status is set to "running". + */ + retryAt?: Date | null; + /** * The date and time that this task is scheduled to be run. It is not * guaranteed to run at this time, but it is guaranteed not to run earlier @@ -212,14 +241,9 @@ export interface ConcreteTaskInstance extends TaskInstance { id: string; /** - * The sequence number from the Elaticsearch document. - */ - sequenceNumber: number; - - /** - * The primary term from the Elaticsearch document. + * The saved object version from the Elaticsearch document. */ - primaryTerm: number; + version?: string; /** * The date and time that this task was originally scheduled. This is used @@ -244,6 +268,19 @@ export interface ConcreteTaskInstance extends TaskInstance { */ runAt: Date; + /** + * The date and time that this task started execution. This is used to determine + * the "real" runAt that ended up running the task. This value is only set + * when status is set to "running". + */ + startedAt: Date | null; + + /** + * The date and time that this task should re-execute if stuck in "running" / timeout + * status. This value is only set when status is set to "running". + */ + retryAt: Date | null; + /** * The state passed into the task's run function, and returned by the previous * run. If there was no previous run, or if the previous run did not return diff --git a/x-pack/legacy/plugins/task_manager/task_manager.test.ts b/x-pack/legacy/plugins/task_manager/task_manager.test.ts index 58f3c6f50bc33..77a8139b1d1c5 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.test.ts @@ -7,6 +7,11 @@ import _ from 'lodash'; import sinon from 'sinon'; import { TaskManager } from './task_manager'; +import { SavedObjectsClientMock } from 'src/core/server/mocks'; +import { SavedObjectsSerializer, SavedObjectsSchema } from 'src/core/server'; + +const savedObjectsClient = SavedObjectsClientMock.create(); +const serializer = new SavedObjectsSerializer(new SavedObjectsSchema()); describe('TaskManager', () => { let clock: sinon.SinonFakeTimers; @@ -28,7 +33,12 @@ describe('TaskManager', () => { test('disallows schedule before init', async () => { const { opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const client = new TaskManager({ + kbnServer: opts.kbnServer, + config: opts.config, + savedObjectsRepository: savedObjectsClient, + serializer, + }); const task = { taskType: 'foo', params: {}, @@ -39,19 +49,34 @@ describe('TaskManager', () => { test('disallows fetch before init', async () => { const { opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const client = new TaskManager({ + kbnServer: opts.kbnServer, + config: opts.config, + savedObjectsRepository: savedObjectsClient, + serializer, + }); await expect(client.fetch({})).rejects.toThrow(/^NotInitialized: .*/i); }); test('disallows remove before init', async () => { const { opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const client = new TaskManager({ + kbnServer: opts.kbnServer, + config: opts.config, + savedObjectsRepository: savedObjectsClient, + serializer, + }); await expect(client.remove('23')).rejects.toThrow(/^NotInitialized: .*/i); }); test('allows middleware registration before init', () => { const { opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const client = new TaskManager({ + kbnServer: opts.kbnServer, + config: opts.config, + savedObjectsRepository: savedObjectsClient, + serializer, + }); const middleware = { beforeSave: async (saveOpts: any) => saveOpts, beforeRun: async (runOpts: any) => runOpts, @@ -61,7 +86,12 @@ describe('TaskManager', () => { test('disallows middleware registration after init', async () => { const { $test, opts } = testOpts(); - const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const client = new TaskManager({ + kbnServer: opts.kbnServer, + config: opts.config, + savedObjectsRepository: savedObjectsClient, + serializer, + }); const middleware = { beforeSave: async (saveOpts: any) => saveOpts, beforeRun: async (runOpts: any) => runOpts, @@ -94,20 +124,23 @@ describe('TaskManager', () => { afterPluginsInit(callback: any) { $test.afterPluginsInit = callback; }, - }, - server: { - log: sinon.spy(), - decorate(...args: any[]) { - _.set(opts, args.slice(0, -1), _.last(args)); - }, - plugins: { - elasticsearch: { - getCluster() { - return { callWithInternalUser: callCluster }; - }, - status: { - on(eventName: string, callback: () => any) { - $test.events[eventName] = callback; + server: { + log: sinon.spy(), + decorate(...args: any[]) { + _.set(opts, args.slice(0, -1), _.last(args)); + }, + kibanaMigrator: { + awaitMigration: jest.fn(), + }, + plugins: { + elasticsearch: { + getCluster() { + return { callWithInternalUser: callCluster }; + }, + status: { + on(eventName: string, callback: () => any) { + $test.events[eventName] = callback; + }, }, }, }, diff --git a/x-pack/legacy/plugins/task_manager/task_manager.ts b/x-pack/legacy/plugins/task_manager/task_manager.ts index 8979f99d1074d..790a9df71256a 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.ts @@ -4,6 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ +import { SavedObjectsClientContract, SavedObjectsSerializer } from 'src/core/server'; import { fillPool } from './lib/fill_pool'; import { Logger, TaskManagerLogger } from './lib/logger'; import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware'; @@ -13,7 +14,14 @@ import { SanitizedTaskDefinition, TaskDefinition, TaskDictionary } from './task' import { TaskPoller } from './task_poller'; import { TaskPool } from './task_pool'; import { TaskManagerRunner } from './task_runner'; -import { FetchOpts, FetchResult, RemoveResult, TaskStore } from './task_store'; +import { FetchOpts, FetchResult, TaskStore } from './task_store'; + +export interface TaskManagerOpts { + kbnServer: any; + config: any; + savedObjectsRepository: SavedObjectsClientContract; + serializer: SavedObjectsSerializer; +} /* * The TaskManager is the public interface into the task manager system. This glues together @@ -46,9 +54,10 @@ export class TaskManager { * enabling the task manipulation methods, and beginning the background polling * mechanism. */ - constructor(kbnServer: any, server: any, config: any) { - this.maxWorkers = config.get('xpack.task_manager.max_workers'); - this.overrideNumWorkers = config.get('xpack.task_manager.override_num_workers'); + constructor(opts: TaskManagerOpts) { + const { server } = opts.kbnServer; + this.maxWorkers = opts.config.get('xpack.task_manager.max_workers'); + this.overrideNumWorkers = opts.config.get('xpack.task_manager.override_num_workers'); this.definitions = {}; const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); @@ -56,12 +65,11 @@ export class TaskManager { /* Kibana UUID needs to be pulled live (not cached), as it takes a long time * to initialize, and can change after startup */ const store = new TaskStore({ + serializer: opts.serializer, + savedObjectsRepository: opts.savedObjectsRepository, callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser, - index: config.get('xpack.task_manager.index'), - maxAttempts: config.get('xpack.task_manager.max_attempts'), - supportedTypes: Object.keys(this.definitions), - logger, - getKibanaUuid: () => config.get('server.uuid'), + maxAttempts: opts.config.get('xpack.task_manager.max_attempts'), + definitions: this.definitions, }); const pool = new TaskPool({ logger, @@ -70,7 +78,7 @@ export class TaskManager { const createRunner = (instance: ConcreteTaskInstance) => new TaskManagerRunner({ logger, - kbnServer, + kbnServer: opts.kbnServer, instance, store, definitions: this.definitions, @@ -78,8 +86,7 @@ export class TaskManager { }); const poller = new TaskPoller({ logger, - pollInterval: config.get('xpack.task_manager.poll_interval'), - store, + pollInterval: opts.config.get('xpack.task_manager.poll_interval'), work(): Promise { return fillPool(pool.run, store.fetchAvailableTasks, createRunner); }, @@ -89,25 +96,25 @@ export class TaskManager { this.store = store; this.poller = poller; - kbnServer.afterPluginsInit(async () => { - store.addSupportedTypes(Object.keys(this.definitions)); - const startPoller = () => { - return poller - .start() - .then(() => { - this.isInitialized = true; - }) - .catch((err: Error) => { - // FIXME: check the type of error to make sure it's actually an ES error - logger.warning(`PollError ${err.message}`); - - // rety again to initialize store and poller, using the timing of - // task_manager's configurable poll interval - const retryInterval = config.get('xpack.task_manager.poll_interval'); - setTimeout(() => startPoller(), retryInterval); - }); + opts.kbnServer.afterPluginsInit(() => { + // By this point, the plugins had their chance to register task definitions + // and we're good to start doing CRUD actions + this.isInitialized = true; + const startPoller = async () => { + await server.kibanaMigrator.awaitMigration(); + try { + await poller.start(); + } catch (err) { + // FIXME: check the type of error to make sure it's actually an ES error + logger.warning(`PollError ${err.message}`); + + // rety again to initialize store and poller, using the timing of + // task_manager's configurable poll interval + const retryInterval = opts.config.get('xpack.task_manager.poll_interval'); + setTimeout(() => startPoller(), retryInterval); + } }; - return startPoller(); + startPoller(); }); } @@ -180,7 +187,7 @@ export class TaskManager { * @param {string} id * @returns {Promise} */ - public async remove(id: string): Promise { + public async remove(id: string): Promise { this.assertInitialized('Tasks cannot be removed before task manager is initialized!'); return this.store.remove(id); } diff --git a/x-pack/legacy/plugins/task_manager/task_poller.test.ts b/x-pack/legacy/plugins/task_manager/task_poller.test.ts index 23f604e861c14..478c1a4dc1b17 100644 --- a/x-pack/legacy/plugins/task_manager/task_poller.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_poller.test.ts @@ -7,24 +7,12 @@ import _ from 'lodash'; import sinon from 'sinon'; import { TaskPoller } from './task_poller'; -import { TaskStore } from './task_store'; import { mockLogger, resolvable, sleep } from './test_utils'; -let store: TaskStore; - describe('TaskPoller', () => { beforeEach(() => { const callCluster = sinon.stub(); callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} })); - const getKibanaUuid = sinon.stub().returns('kibana-123-uuid-test'); - store = new TaskStore({ - callCluster, - getKibanaUuid, - logger: mockLogger(), - index: 'tasky', - maxAttempts: 2, - supportedTypes: ['a', 'b', 'c'], - }); }); describe('interval tests', () => { @@ -44,7 +32,6 @@ describe('TaskPoller', () => { return Promise.resolve(); }); const poller = new TaskPoller({ - store, pollInterval, work, logger: mockLogger(), @@ -67,7 +54,6 @@ describe('TaskPoller', () => { const logger = mockLogger(); const doneWorking = resolvable(); const poller = new TaskPoller({ - store, logger, pollInterval: 1, work: async () => { @@ -98,7 +84,6 @@ describe('TaskPoller', () => { }); const poller = new TaskPoller({ - store, logger: mockLogger(), pollInterval: 1, work, @@ -117,7 +102,6 @@ describe('TaskPoller', () => { await doneWorking; }); const poller = new TaskPoller({ - store, pollInterval: 1, logger: mockLogger(), work, @@ -143,7 +127,6 @@ describe('TaskPoller', () => { doneWorking.resolve(); }); const poller = new TaskPoller({ - store, pollInterval: 1, logger: mockLogger(), work, @@ -154,19 +137,4 @@ describe('TaskPoller', () => { sinon.assert.calledOnce(work); }); - - test('start method passes through error from store.init', async () => { - store.init = () => { - throw new Error('test error'); - }; - - const poller = new TaskPoller({ - store, - pollInterval: 1, - logger: mockLogger(), - work: sinon.stub(), - }); - - await expect(poller.start()).rejects.toMatchInlineSnapshot(`[Error: test error]`); - }); }); diff --git a/x-pack/legacy/plugins/task_manager/task_poller.ts b/x-pack/legacy/plugins/task_manager/task_poller.ts index e77d9c97fba0d..739f2d35a3675 100644 --- a/x-pack/legacy/plugins/task_manager/task_poller.ts +++ b/x-pack/legacy/plugins/task_manager/task_poller.ts @@ -9,14 +9,12 @@ */ import { Logger } from './lib/logger'; -import { TaskStore } from './task_store'; type WorkFn = () => Promise; interface Opts { pollInterval: number; logger: Logger; - store: TaskStore; work: WorkFn; } @@ -30,7 +28,6 @@ export class TaskPoller { private timeout: any; private pollInterval: number; private logger: Logger; - private store: TaskStore; private work: WorkFn; /** @@ -44,7 +41,6 @@ export class TaskPoller { constructor(opts: Opts) { this.pollInterval = opts.pollInterval; this.logger = opts.logger; - this.store = opts.store; this.work = opts.work; } @@ -56,10 +52,6 @@ export class TaskPoller { return; } - if (!this.store.isInitialized) { - await this.store.init(); - } - this.isStarted = true; const poll = async () => { diff --git a/x-pack/legacy/plugins/task_manager/task_runner.test.ts b/x-pack/legacy/plugins/task_manager/task_runner.test.ts index 0909be3b5c471..95dead2013a26 100644 --- a/x-pack/legacy/plugins/task_manager/task_runner.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_runner.test.ts @@ -6,10 +6,18 @@ import _ from 'lodash'; import sinon from 'sinon'; -import { minutesFromNow } from './lib/intervals'; +import { minutesFromNow, secondsFromNow } from './lib/intervals'; import { ConcreteTaskInstance } from './task'; import { TaskManagerRunner } from './task_runner'; +let fakeTimer: sinon.SinonFakeTimers; + +beforeAll(() => { + fakeTimer = sinon.useFakeTimers(); +}); + +afterAll(() => fakeTimer.restore()); + describe('TaskManagerRunner', () => { test('provides details about the task that is running', () => { const { runner } = testOpts({ @@ -53,7 +61,7 @@ describe('TaskManagerRunner', () => { state: { hey: 'there' }, }, definitions: { - testtype: { + bar: { createTaskRunner: () => ({ async run() { throw new Error('Dangit!'); @@ -69,8 +77,7 @@ describe('TaskManagerRunner', () => { const instance = store.update.args[0][0]; expect(instance.id).toEqual(id); - expect(instance.attempts).toEqual(initialAttempts + 1); - expect(instance.runAt.getTime()).toBeGreaterThan(Date.now()); + expect(instance.runAt.getTime()).toEqual(minutesFromNow(initialAttempts * 5).getTime()); expect(instance.params).toEqual({ a: 'b' }); expect(instance.state).toEqual({ hey: 'there' }); }); @@ -79,6 +86,17 @@ describe('TaskManagerRunner', () => { const { runner, store } = testOpts({ instance: { interval: '10m', + status: 'running', + startedAt: new Date(), + }, + definitions: { + bar: { + createTaskRunner: () => ({ + async run() { + return; + }, + }), + }, }, }); @@ -165,7 +183,9 @@ describe('TaskManagerRunner', () => { bar: { createTaskRunner: () => ({ async run() { - await new Promise(r => setTimeout(r, 1000)); + const promise = new Promise(r => setTimeout(r, 1000)); + fakeTimer.tick(1000); + await promise; }, async cancel() { wasCancelled = true; @@ -176,7 +196,7 @@ describe('TaskManagerRunner', () => { }); const promise = runner.run(); - await new Promise(r => setInterval(r, 1)); + await Promise.resolve(); await runner.cancel(); await promise; @@ -187,7 +207,7 @@ describe('TaskManagerRunner', () => { test('warns if cancel is called on a non-cancellable task', async () => { const { runner, logger } = testOpts({ definitions: { - testType: { + bar: { createTaskRunner: () => ({ run: async () => undefined, }), @@ -202,6 +222,166 @@ describe('TaskManagerRunner', () => { sinon.assert.calledWithMatch(logger.warning, /not cancellable/); }); + test('sets startedAt, status, attempts and retryAt when claiming a task', async () => { + const timeoutMinutes = 1; + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(0, 2); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + interval: undefined, + }, + definitions: { + bar: { + timeout: `${timeoutMinutes}m`, + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + await runner.claimOwnership(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + + expect(instance.attempts).toEqual(initialAttempts + 1); + expect(instance.status).toBe('running'); + expect(instance.startedAt.getTime()).toEqual(Date.now()); + expect(instance.retryAt.getTime()).toEqual( + minutesFromNow((initialAttempts + 1) * 5).getTime() + timeoutMinutes * 60 * 1000 + ); + }); + + test('uses getRetryDelay function on error when defined', async () => { + const initialAttempts = _.random(0, 2); + const retryDelay = _.random(15, 100); + const id = Date.now().toString(); + const getRetryDelayStub = sinon.stub().returns(retryDelay); + const error = new Error('Dangit!'); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + }, + definitions: { + bar: { + getRetryDelay: getRetryDelayStub, + createTaskRunner: () => ({ + async run() { + throw error; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWith(getRetryDelayStub, initialAttempts, error); + const instance = store.update.args[0][0]; + + expect(instance.runAt.getTime()).toEqual(secondsFromNow(retryDelay).getTime()); + }); + + test('uses getRetryDelay to set retryAt when defined', async () => { + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(0, 2); + const retryDelay = _.random(15, 100); + const timeoutMinutes = 1; + const getRetryDelayStub = sinon.stub().returns(retryDelay); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + interval: undefined, + }, + definitions: { + bar: { + timeout: `${timeoutMinutes}m`, + getRetryDelay: getRetryDelayStub, + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + await runner.claimOwnership(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWith(getRetryDelayStub, initialAttempts + 1); + const instance = store.update.args[0][0]; + + expect(instance.retryAt.getTime()).toEqual( + secondsFromNow(retryDelay).getTime() + timeoutMinutes * 60 * 1000 + ); + }); + + test('Fails non-recurring task when maxAttempts reached', async () => { + const id = _.random(1, 20).toString(); + const initialAttempts = 3; + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + interval: undefined, + }, + definitions: { + bar: { + maxAttempts: 3, + createTaskRunner: () => ({ + run: async () => { + throw new Error(); + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + expect(instance.attempts).toEqual(3); + expect(instance.status).toEqual('failed'); + expect(instance.retryAt).toBeNull(); + expect(instance.runAt.getTime()).toBeLessThanOrEqual(Date.now()); + }); + + test(`Doesn't fail recurring tasks when maxAttempts reached`, async () => { + const id = _.random(1, 20).toString(); + const initialAttempts = 3; + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + interval: '10s', + }, + definitions: { + bar: { + maxAttempts: 3, + createTaskRunner: () => ({ + run: async () => { + throw new Error(); + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + expect(instance.attempts).toEqual(3); + expect(instance.status).toEqual('idle'); + expect(instance.runAt.getTime()).toEqual(minutesFromNow(15).getTime()); + }); + interface TestOpts { instance?: Partial; definitions?: any; @@ -234,6 +414,8 @@ describe('TaskManagerRunner', () => { primaryTerm: 32, runAt: new Date(), scheduledAt: new Date(), + startedAt: null, + retryAt: null, attempts: 0, params: {}, scope: ['reporting'], diff --git a/x-pack/legacy/plugins/task_manager/task_runner.ts b/x-pack/legacy/plugins/task_manager/task_runner.ts index d5e0196d80864..4993648873903 100644 --- a/x-pack/legacy/plugins/task_manager/task_runner.ts +++ b/x-pack/legacy/plugins/task_manager/task_runner.ts @@ -11,7 +11,8 @@ */ import Joi from 'joi'; -import { intervalFromNow, minutesFromNow } from './lib/intervals'; +import Boom from 'boom'; +import { intervalFromDate, intervalFromNow } from './lib/intervals'; import { Logger } from './lib/logger'; import { BeforeRunFunction } from './lib/middleware'; import { @@ -23,7 +24,8 @@ import { TaskDictionary, validateRunResult, } from './task'; -import { RemoveResult } from './task_store'; + +const defaultBackoffPerFailure = 5 * 60 * 1000; export interface TaskRunner { numWorkers: number; @@ -37,7 +39,7 @@ export interface TaskRunner { interface Updatable { readonly maxAttempts: number; update(doc: ConcreteTaskInstance): Promise; - remove(id: string): Promise; + remove(id: string): Promise; } interface Opts { @@ -119,7 +121,7 @@ export class TaskManagerRunner implements TaskRunner { * Gets whether or not this task has run longer than its expiration setting allows. */ public get isExpired() { - return this.instance.runAt < new Date(); + return intervalFromDate(this.instance.startedAt!, this.definition.timeout)! < new Date(); } /** @@ -166,12 +168,20 @@ export class TaskManagerRunner implements TaskRunner { */ public async claimOwnership(): Promise { const VERSION_CONFLICT_STATUS = 409; + const attempts = this.instance.attempts + 1; + const now = new Date(); + + const timeoutDate = intervalFromNow(this.definition.timeout!)!; try { this.instance = await this.store.update({ ...this.instance, status: 'running', - runAt: intervalFromNow(this.definition.timeout)!, + startedAt: now, + attempts, + retryAt: new Date( + timeoutDate.getTime() + this.getRetryDelay(attempts, Boom.clientTimeout()) + ), }); return true; @@ -211,19 +221,21 @@ export class TaskManagerRunner implements TaskRunner { private async processResultForRecurringTask(result: RunResult): Promise { // recurring task: update the task instance + const startedAt = this.instance.startedAt!; const state = result.state || this.instance.state || {}; - const status = this.instance.attempts < this.store.maxAttempts ? 'idle' : 'failed'; + const status = this.getInstanceStatus(); let runAt; if (status === 'failed') { // task run errored, keep the same runAt runAt = this.instance.runAt; + } else if (result.runAt) { + runAt = result.runAt; + } else if (result.error) { + // when result.error is truthy, then we're retrying because it failed + runAt = new Date(Date.now() + this.getRetryDelay(this.instance.attempts, result.error)); } else { - runAt = - result.runAt || - intervalFromNow(this.instance.interval) || - // when result.error is truthy, then we're retrying because it failed - minutesFromNow((this.instance.attempts + 1) * 5); // incrementally backs off an extra 5m per failure + runAt = intervalFromDate(startedAt, this.instance.interval)!; } await this.store.update({ @@ -231,7 +243,9 @@ export class TaskManagerRunner implements TaskRunner { runAt, state, status, - attempts: result.error ? this.instance.attempts + 1 : 0, + startedAt: null, + retryAt: null, + attempts: result.error ? this.instance.attempts : 0, }); return result; @@ -262,6 +276,22 @@ export class TaskManagerRunner implements TaskRunner { } return result; } + + private getInstanceStatus() { + if (this.instance.interval) { + return 'idle'; + } + + const maxAttempts = this.definition.maxAttempts || this.store.maxAttempts; + return this.instance.attempts < maxAttempts ? 'idle' : 'failed'; + } + + private getRetryDelay(attempts: number, error: any) { + if (this.definition.getRetryDelay) { + return this.definition.getRetryDelay(attempts, error) * 1000; + } + return attempts * defaultBackoffPerFailure; + } } function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance { diff --git a/x-pack/legacy/plugins/task_manager/task_store.test.ts b/x-pack/legacy/plugins/task_manager/task_store.test.ts index 91ed8bcad8a6a..1d50ec79a150b 100644 --- a/x-pack/legacy/plugins/task_manager/task_store.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_store.test.ts @@ -6,104 +6,73 @@ import _ from 'lodash'; import sinon from 'sinon'; -import { - TASK_MANAGER_API_VERSION as API_VERSION, - TASK_MANAGER_TEMPLATE_VERSION as TEMPLATE_VERSION, -} from './constants'; -import { TaskInstance, TaskStatus } from './task'; +import { TaskDictionary, SanitizedTaskDefinition, TaskInstance, TaskStatus } from './task'; import { FetchOpts, TaskStore } from './task_store'; import { mockLogger } from './test_utils'; - -const getKibanaUuid = sinon.stub().returns('kibana-uuid-123-test'); +import { SavedObjectsClientMock } from 'src/core/server/mocks'; +import { SavedObjectsSerializer, SavedObjectsSchema, SavedObjectAttributes } from 'src/core/server'; + +const taskDefinitions: TaskDictionary = { + report: { + type: 'report', + title: '', + numWorkers: 1, + createTaskRunner: jest.fn(), + }, + dernstraight: { + type: 'dernstraight', + title: '', + numWorkers: 1, + createTaskRunner: jest.fn(), + }, + yawn: { + type: 'yawn', + title: '', + numWorkers: 1, + createTaskRunner: jest.fn(), + }, +}; + +const savedObjectsClient = SavedObjectsClientMock.create(); +const serializer = new SavedObjectsSerializer(new SavedObjectsSchema()); + +beforeEach(() => jest.resetAllMocks()); + +const mockedDate = new Date('2019-02-12T21:01:22.479Z'); +(global as any).Date = class Date { + constructor() { + return mockedDate; + } + static now() { + return mockedDate.getTime(); + } +}; describe('TaskStore', () => { - describe('init', () => { - test('creates the task manager index', async () => { - const callCluster = sinon.stub(); - callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} })); - const store = new TaskStore({ - callCluster, - getKibanaUuid, - logger: mockLogger(), - index: 'tasky', - maxAttempts: 2, - supportedTypes: ['a', 'b', 'c'], - }); - - await store.init(); - - sinon.assert.calledTwice(callCluster); // store.init calls twice: once to check for existing template, once to put the template (if needed) - - sinon.assert.calledWithMatch(callCluster, 'indices.putTemplate', { - body: { - index_patterns: ['tasky'], - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - }, - }, - name: 'tasky', - }); - }); - - test('logs a warning if newer index template exists', async () => { - const callCluster = sinon.stub(); - callCluster - .withArgs('indices.getTemplate') - .returns(Promise.resolve({ tasky: { version: Infinity } })); - - const logger = { - info: sinon.spy(), - debug: sinon.spy(), - warning: sinon.spy(), - error: sinon.spy(), - }; - - const store = new TaskStore({ - callCluster, - getKibanaUuid, - logger, - index: 'tasky', - maxAttempts: 2, - supportedTypes: ['a', 'b', 'c'], - }); - - await store.init(); - const loggingCall = logger.warning.getCall(0); - expect(loggingCall.args[0]).toBe( - `This Kibana instance defines an older template version (${TEMPLATE_VERSION}) than is currently in Elasticsearch (Infinity). ` + - `Because of the potential for non-backwards compatible changes, this Kibana instance will only be able to claim scheduled tasks with ` + - `"kibana.apiVersion" <= ${API_VERSION} in the task metadata.` - ); - expect(logger.warning.calledOnce).toBe(true); - }); - }); - describe('schedule', () => { async function testSchedule(task: TaskInstance) { - const callCluster = sinon.stub(); - callCluster.withArgs('index').returns( - Promise.resolve({ - _id: 'testid', - _seq_no: 3344, - _primary_term: 3344, + const callCluster = jest.fn(); + savedObjectsClient.create.mockImplementation( + async (type: string, attributes: SavedObjectAttributes) => ({ + id: 'testid', + type, + attributes, + references: [], + version: '123', }) ); - callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} })); const store = new TaskStore({ + serializer, callCluster, - getKibanaUuid, - logger: mockLogger(), - index: 'tasky', maxAttempts: 2, - supportedTypes: ['report', 'dernstraight', 'yawn'], + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, }); - await store.init(); const result = await store.schedule(task); - sinon.assert.calledThrice(callCluster); + expect(savedObjectsClient.create).toHaveBeenCalledTimes(1); - return { result, callCluster, arg: callCluster.args[2][1] }; + return result; } test('serializes the params and state', async () => { @@ -112,18 +81,42 @@ describe('TaskStore', () => { state: { foo: 'bar' }, taskType: 'report', }; - const { callCluster, arg } = await testSchedule(task); + const result = await testSchedule(task); - sinon.assert.calledWith(callCluster, 'index'); - - expect(arg).toMatchObject({ - index: 'tasky', - body: { - task: { - params: JSON.stringify(task.params), - state: JSON.stringify(task.state), - }, + expect(savedObjectsClient.create).toHaveBeenCalledWith( + 'task', + { + attempts: 0, + interval: undefined, + params: '{"hello":"world"}', + retryAt: null, + runAt: '2019-02-12T21:01:22.479Z', + scheduledAt: '2019-02-12T21:01:22.479Z', + scope: undefined, + startedAt: null, + state: '{"foo":"bar"}', + status: 'idle', + taskType: 'report', + user: undefined, }, + {} + ); + + expect(result).toEqual({ + id: 'testid', + attempts: 0, + interval: undefined, + params: { hello: 'world' }, + retryAt: null, + runAt: mockedDate, + scheduledAt: mockedDate, + scope: undefined, + startedAt: null, + state: { foo: 'bar' }, + status: 'idle', + taskType: 'report', + user: undefined, + version: '123', }); }); @@ -133,26 +126,27 @@ describe('TaskStore', () => { state: { foo: 'bar' }, taskType: 'report', }; - const { result } = await testSchedule(task); + const result = await testSchedule(task); expect(result).toMatchObject({ ...task, - sequenceNumber: 3344, - primaryTerm: 3344, id: 'testid', }); }); test('sets runAt to now if not specified', async () => { - const now = Date.now(); - const { arg } = await testSchedule({ taskType: 'dernstraight', params: {}, state: {} }); - expect(arg.body.task.runAt.getTime()).toBeGreaterThanOrEqual(now); + await testSchedule({ taskType: 'dernstraight', params: {}, state: {} }); + expect(savedObjectsClient.create).toHaveBeenCalledTimes(1); + const attributes = savedObjectsClient.create.mock.calls[0][1]; + expect(new Date(attributes.runAt as string).getTime()).toEqual(mockedDate.getTime()); }); test('ensures params and state are not null', async () => { - const { arg } = await testSchedule({ taskType: 'yawn' } as any); - expect(arg.body.task.params).toEqual('{}'); - expect(arg.body.task.state).toEqual('{}'); + await testSchedule({ taskType: 'yawn' } as any); + expect(savedObjectsClient.create).toHaveBeenCalledTimes(1); + const attributes = savedObjectsClient.create.mock.calls[0][1]; + expect(attributes.params).toEqual('{}'); + expect(attributes.state).toEqual('{}'); }); test('errors if the task type is unknown', async () => { @@ -166,12 +160,11 @@ describe('TaskStore', () => { async function testFetch(opts?: FetchOpts, hits: any[] = []) { const callCluster = sinon.spy(async () => ({ hits: { hits } })); const store = new TaskStore({ + serializer, callCluster, - getKibanaUuid, - logger: mockLogger(), - index: 'tasky', maxAttempts: 2, - supportedTypes: ['a', 'b', 'c'], + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, }); const result = await store.fetch(opts); @@ -188,7 +181,7 @@ describe('TaskStore', () => { test('empty call filters by type, sorts by runAt and id', async () => { const { args } = await testFetch(); expect(args).toMatchObject({ - index: 'tasky', + index: '.kibana_task_manager', body: { sort: [{ 'task.runAt': 'asc' }, { _id: 'desc' }], query: { term: { type: 'task' } }, @@ -301,13 +294,14 @@ describe('TaskStore', () => { interval: undefined, params: { hello: 'world' }, runAt, + scheduledAt: mockedDate, scope: ['reporting'], state: { baby: 'Henhen' }, status: 'idle', taskType: 'foo', user: 'jimbo', - sequenceNumber: undefined, - primaryTerm: undefined, + retryAt: undefined, + startedAt: undefined, }, { attempts: 2, @@ -315,13 +309,14 @@ describe('TaskStore', () => { interval: '5m', params: { shazm: 1 }, runAt, + scheduledAt: mockedDate, scope: ['reporting', 'ceo'], state: { henry: 'The 8th' }, status: 'running', taskType: 'bar', user: 'dabo', - sequenceNumber: undefined, - primaryTerm: undefined, + retryAt: undefined, + startedAt: undefined, }, ], searchAfter: ['b', 2], @@ -335,9 +330,9 @@ describe('TaskStore', () => { const store = new TaskStore({ callCluster, logger: mockLogger(), - supportedTypes: ['a', 'b', 'c'], - index: 'tasky', + definitions: taskDefinitions, maxAttempts: 2, + serializer, ...opts, }); @@ -355,12 +350,11 @@ describe('TaskStore', () => { test('it returns normally with no tasks when the index does not exist.', async () => { const callCluster = sinon.spy(async () => ({ hits: { hits: [] } })); const store = new TaskStore({ + serializer, callCluster, - getKibanaUuid, - logger: mockLogger(), - supportedTypes: ['a', 'b', 'c'], - index: 'tasky', + definitions: taskDefinitions, maxAttempts: 2, + savedObjectsRepository: savedObjectsClient, }); const result = await store.fetchAvailableTasks(); @@ -373,12 +367,25 @@ describe('TaskStore', () => { test('it filters tasks by supported types, maxAttempts, and runAt', async () => { const maxAttempts = _.random(2, 43); - const index = `index_${_.random(1, 234)}`; + const customMaxAttempts = _.random(44, 100); const { args } = await testFetchAvailableTasks({ opts: { - index, maxAttempts, - supportedTypes: ['foo', 'bar'], + definitions: { + foo: { + type: 'foo', + title: '', + numWorkers: 1, + createTaskRunner: jest.fn(), + }, + bar: { + type: 'bar', + title: '', + numWorkers: 1, + maxAttempts: customMaxAttempts, + createTaskRunner: jest.fn(), + }, + }, }, }); expect(args).toMatchObject({ @@ -390,10 +397,63 @@ describe('TaskStore', () => { { bool: { must: [ - { terms: { 'task.taskType': ['foo', 'bar'] } }, - { range: { 'task.attempts': { lte: maxAttempts } } }, - { range: { 'task.runAt': { lte: 'now' } } }, - { range: { 'kibana.apiVersion': { lte: 1 } } }, + { + bool: { + should: [ + { + bool: { + must: [ + { term: { 'task.status': 'idle' } }, + { range: { 'task.runAt': { lte: 'now' } } }, + ], + }, + }, + { + bool: { + must: [ + { term: { 'task.status': 'running' } }, + { range: { 'task.retryAt': { lte: 'now' } } }, + ], + }, + }, + ], + }, + }, + { + bool: { + should: [ + { exists: { field: 'task.interval' } }, + { + bool: { + must: [ + { term: { 'task.taskType': 'foo' } }, + { + range: { + 'task.attempts': { + lt: maxAttempts, + }, + }, + }, + ], + }, + }, + { + bool: { + must: [ + { term: { 'task.taskType': 'bar' } }, + { + range: { + 'task.attempts': { + lt: customMaxAttempts, + }, + }, + }, + ], + }, + }, + ], + }, + }, ], }, }, @@ -401,10 +461,18 @@ describe('TaskStore', () => { }, }, size: 10, - sort: { 'task.runAt': { order: 'asc' } }, + sort: { + _script: { + type: 'number', + order: 'asc', + script: { + lang: 'expression', + source: `doc['task.retryAt'].value || doc['task.runAt'].value`, + }, + }, + }, seq_no_primary_term: true, }, - index, }); }); @@ -428,6 +496,8 @@ describe('TaskStore', () => { scope: ['reporting'], }, }, + _seq_no: 1, + _primary_term: 2, sort: ['a', 1], }, { @@ -446,6 +516,8 @@ describe('TaskStore', () => { scope: ['reporting', 'ceo'], }, }, + _seq_no: 3, + _primary_term: 4, sort: ['b', 2], }, ], @@ -462,8 +534,6 @@ describe('TaskStore', () => { status: 'idle', taskType: 'foo', user: 'jimbo', - sequenceNumber: undefined, - primaryTerm: undefined, }, { attempts: 2, @@ -476,8 +546,6 @@ describe('TaskStore', () => { status: 'running', taskType: 'bar', user: 'dabo', - sequenceNumber: undefined, - primaryTerm: undefined, }, ]); }); @@ -485,60 +553,70 @@ describe('TaskStore', () => { describe('update', () => { test('refreshes the index, handles versioning', async () => { - const runAt = new Date(); const task = { - runAt, - scheduledAt: runAt, + runAt: mockedDate, + scheduledAt: mockedDate, + startedAt: null, + retryAt: null, id: 'task:324242', params: { hello: 'world' }, state: { foo: 'bar' }, taskType: 'report', - sequenceNumber: 2, - primaryTerm: 2, attempts: 3, status: 'idle' as TaskStatus, + version: '123', }; - const callCluster = sinon.spy(async () => ({ - _seq_no: task.sequenceNumber + 1, - _primary_term: task.primaryTerm + 1, - })); + savedObjectsClient.update.mockImplementation( + async (type: string, id: string, attributes: SavedObjectAttributes) => { + return { + id, + type, + attributes, + references: [], + version: '123', + }; + } + ); const store = new TaskStore({ - callCluster, - getKibanaUuid, - logger: mockLogger(), - index: 'tasky', + serializer, + callCluster: jest.fn(), maxAttempts: 2, - supportedTypes: ['a', 'b', 'c'], + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, }); const result = await store.update(task); - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'update'); - - expect(callCluster.args[0][1]).toMatchObject({ - id: task.id, - index: 'tasky', - if_seq_no: 2, - if_primary_term: 2, - refresh: true, - body: { - doc: { - task: { - ..._.omit(task, ['id', 'sequenceNumber', 'primaryTerm']), - params: JSON.stringify(task.params), - state: JSON.stringify(task.state), - }, - }, + expect(savedObjectsClient.update).toHaveBeenCalledWith( + 'task', + task.id, + { + attempts: task.attempts, + interval: undefined, + params: JSON.stringify(task.params), + retryAt: null, + runAt: task.runAt.toISOString(), + scheduledAt: mockedDate.toISOString(), + scope: undefined, + startedAt: null, + state: JSON.stringify(task.state), + status: task.status, + taskType: task.taskType, + user: undefined, }, - }); + { version: '123' } + ); expect(result).toEqual({ ...task, - sequenceNumber: 3, - primaryTerm: 3, + interval: undefined, + retryAt: null, + scope: undefined, + startedAt: null, + user: undefined, + version: '123', }); }); }); @@ -546,41 +624,17 @@ describe('TaskStore', () => { describe('remove', () => { test('removes the task with the specified id', async () => { const id = `id-${_.random(1, 20)}`; - const callCluster = sinon.spy(() => - Promise.resolve({ - _index: 'myindex', - _id: id, - _seq_no: 32, - _primary_term: 32, - result: 'deleted', - }) - ); + const callCluster = jest.fn(); const store = new TaskStore({ + serializer, callCluster, - getKibanaUuid, - logger: mockLogger(), - index: 'myindex', maxAttempts: 2, - supportedTypes: ['a'], + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, }); const result = await store.remove(id); - - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'delete'); - - expect(result).toEqual({ - id, - index: 'myindex', - sequenceNumber: 32, - primaryTerm: 32, - result: 'deleted', - }); - - expect(callCluster.args[0][1]).toMatchObject({ - id, - index: 'myindex', - refresh: true, - }); + expect(result).toBeUndefined(); + expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id); }); }); }); diff --git a/x-pack/legacy/plugins/task_manager/task_store.ts b/x-pack/legacy/plugins/task_manager/task_store.ts index 810cec6ab4b3f..79ad72259c996 100644 --- a/x-pack/legacy/plugins/task_manager/task_store.ts +++ b/x-pack/legacy/plugins/task_manager/task_store.ts @@ -8,20 +8,29 @@ * This module contains helpers for managing the task manager storage layer. */ +import { omit } from 'lodash'; import { - TASK_MANAGER_API_VERSION as API_VERSION, - TASK_MANAGER_TEMPLATE_VERSION as TEMPLATE_VERSION, -} from './constants'; -import { Logger } from './lib/logger'; -import { ConcreteTaskInstance, ElasticJs, TaskInstance, TaskStatus } from './task'; + SavedObjectsClientContract, + SavedObject, + SavedObjectAttributes, + SavedObjectsSerializer, + SavedObjectsRawDoc, +} from 'src/core/server'; +import { + ConcreteTaskInstance, + ElasticJs, + SanitizedTaskDefinition, + TaskDictionary, + TaskInstance, +} from './task'; +import { TASK_MANAGER_INDEX } from './constants'; export interface StoreOpts { callCluster: ElasticJs; - getKibanaUuid: () => string; - index: string; maxAttempts: number; - supportedTypes: string[]; - logger: Logger; + definitions: TaskDictionary; + savedObjectsRepository: SavedObjectsClientContract; + serializer: SavedObjectsSerializer; } export interface FetchOpts { @@ -35,224 +44,57 @@ export interface FetchResult { docs: ConcreteTaskInstance[]; } -export interface RemoveResult { - index: string; - id: string; - sequenceNumber: number; - primaryTerm: number; - result: string; -} - -// Internal, the raw document, as stored in the Kibana index. -export interface RawTaskDoc { - _id: string; - _index: string; - _seq_no: number; - _primary_term: number; - _source: { - type: string; - kibana: { - uuid: string; - version: number; - apiVersion: number; - }; - task: { - taskType: string; - scheduledAt: Date; - runAt: Date; - interval?: string; - attempts: number; - status: TaskStatus; - params: string; - state: string; - user?: string; - scope?: string[]; - }; - }; -} - /** * Wraps an elasticsearch connection and provides a task manager-specific * interface into the index. */ export class TaskStore { public readonly maxAttempts: number; - public getKibanaUuid: () => string; - public readonly index: string; private callCluster: ElasticJs; - private supportedTypes: string[]; - private _isInitialized = false; // eslint-disable-line @typescript-eslint/camelcase - private logger: Logger; + private definitions: TaskDictionary; + private savedObjectsRepository: SavedObjectsClientContract; + private serializer: SavedObjectsSerializer; /** * Constructs a new TaskStore. * @param {StoreOpts} opts * @prop {CallCluster} callCluster - The elastic search connection - * @prop {string} index - The name of the task manager index * @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned - * @prop {string[]} supportedTypes - The task types supported by this store - * @prop {Logger} logger - The task manager logger. + * @prop {TaskDefinition} definition - The definition of the task being run + * @prop {serializer} - The saved object serializer + * @prop {savedObjectsRepository} - An instance to the saved objects repository */ constructor(opts: StoreOpts) { this.callCluster = opts.callCluster; - this.index = opts.index; this.maxAttempts = opts.maxAttempts; - this.supportedTypes = opts.supportedTypes; - this.logger = opts.logger; - this.getKibanaUuid = opts.getKibanaUuid; + this.definitions = opts.definitions; + this.serializer = opts.serializer; + this.savedObjectsRepository = opts.savedObjectsRepository; this.fetchAvailableTasks = this.fetchAvailableTasks.bind(this); } - public addSupportedTypes(types: string[]) { - if (!this._isInitialized) { - this.supportedTypes = this.supportedTypes.concat(types); - } else { - throw new Error('Cannot add task types after initialization'); - } - } - - /** - * Initializes the store, ensuring the task manager index template is created - * and the version is up to date. - */ - public async init() { - if (this._isInitialized) { - throw new Error('TaskStore has already been initialized!'); - } - - let existingVersion = -Infinity; - const templateName = this.index; - - try { - // check if template exists - const templateCheck = await this.callCluster('indices.getTemplate', { - name: templateName, - filter_path: '*.version', - }); - // extract the existing version - const template = templateCheck[templateName] || {}; - existingVersion = template.version || 0; - } catch (err) { - if (err.statusCode !== 404) { - throw err; // ignore not found - } - } - - if (existingVersion > TEMPLATE_VERSION) { - // Do not trample a newer version template - this.logger.warning( - `This Kibana instance defines an older template version (${TEMPLATE_VERSION}) than is currently in Elasticsearch (${existingVersion}). ` + - `Because of the potential for non-backwards compatible changes, this Kibana instance will only be able to claim scheduled tasks with ` + - `"kibana.apiVersion" <= ${API_VERSION} in the task metadata.` - ); - return; - } else if (existingVersion === TEMPLATE_VERSION) { - // The latest template is already saved, so just log a debug line. - this.logger.debug( - `Not installing ${this.index} index template: version ${TEMPLATE_VERSION} already exists.` - ); - return; - } - - // Activate template creation / update - if (existingVersion > 0) { - this.logger.info( - `Upgrading ${this.index} index template. Old version: ${existingVersion}, New version: ${TEMPLATE_VERSION}.` - ); - } else { - this.logger.info(`Installing ${this.index} index template version: ${TEMPLATE_VERSION}.`); - } - - const templateResult = await this.callCluster('indices.putTemplate', { - name: templateName, - body: { - index_patterns: [this.index], - mappings: { - dynamic: false, - properties: { - type: { type: 'keyword' }, - task: { - properties: { - taskType: { type: 'keyword' }, - scheduledAt: { type: 'date' }, - runAt: { type: 'date' }, - interval: { type: 'text' }, - attempts: { type: 'integer' }, - status: { type: 'keyword' }, - params: { type: 'text' }, - state: { type: 'text' }, - user: { type: 'keyword' }, - scope: { type: 'keyword' }, - }, - }, - kibana: { - properties: { - apiVersion: { type: 'integer' }, // 1, 2, 3, etc - uuid: { type: 'keyword' }, // - version: { type: 'integer' }, // 7000099, etc - }, - }, - }, - }, - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - }, - version: TEMPLATE_VERSION, - }, - }); - - this._isInitialized = true; - this.logger.info( - `Installed ${this.index} index template: version ${TEMPLATE_VERSION} (API version ${API_VERSION})` - ); - - return templateResult; - } - - public get isInitialized() { - return this._isInitialized; - } - /** * Schedules a task. * * @param task - The task being scheduled. */ public async schedule(taskInstance: TaskInstance): Promise { - if (!this._isInitialized) { - await this.init(); - } - - if (!this.supportedTypes.includes(taskInstance.taskType)) { + if (!this.definitions[taskInstance.taskType]) { throw new Error( - `Unsupported task type "${ - taskInstance.taskType - }". Supported types are ${this.supportedTypes.join(', ')}` + `Unsupported task type "${taskInstance.taskType}". Supported types are ${Object.keys( + this.definitions + ).join(', ')}` ); } - const { id, ...body } = rawSource(taskInstance, this); - const result = await this.callCluster('index', { - id, - body, - index: this.index, - refresh: true, - }); + const savedObject = await this.savedObjectsRepository.create( + 'task', + taskInstanceToAttributes(taskInstance), + { id: taskInstance.id } + ); - const { task } = body; - return { - ...taskInstance, - id: result._id, - sequenceNumber: result._seq_no, - primaryTerm: result._primary_term, - attempts: 0, - status: task.status, - scheduledAt: task.scheduledAt, - runAt: task.runAt, - state: taskInstance.state || {}, - }; + return savedObjectToConcreteTaskInstance(savedObject); } /** @@ -285,15 +127,66 @@ export class TaskStore { query: { bool: { must: [ - { terms: { 'task.taskType': this.supportedTypes } }, - { range: { 'task.attempts': { lte: this.maxAttempts } } }, - { range: { 'task.runAt': { lte: 'now' } } }, - { range: { 'kibana.apiVersion': { lte: API_VERSION } } }, + // Either a task with idle status and runAt <= now or + // status running with a retryAt <= now. + { + bool: { + should: [ + { + bool: { + must: [ + { term: { 'task.status': 'idle' } }, + { range: { 'task.runAt': { lte: 'now' } } }, + ], + }, + }, + { + bool: { + must: [ + { term: { 'task.status': 'running' } }, + { range: { 'task.retryAt': { lte: 'now' } } }, + ], + }, + }, + ], + }, + }, + // Either task has an interval or the attempts < the maximum configured + { + bool: { + should: [ + { exists: { field: 'task.interval' } }, + ...Object.entries(this.definitions).map(([type, definition]) => ({ + bool: { + must: [ + { term: { 'task.taskType': type } }, + { + range: { + 'task.attempts': { + lt: definition.maxAttempts || this.maxAttempts, + }, + }, + }, + ], + }, + })), + ], + }, + }, ], }, }, size: 10, - sort: { 'task.runAt': { order: 'asc' } }, + sort: { + _script: { + type: 'number', + order: 'asc', + script: { + lang: 'expression', + source: `doc['task.retryAt'].value || doc['task.runAt'].value`, + }, + }, + }, seq_no_primary_term: true, }); @@ -308,26 +201,14 @@ export class TaskStore { * @returns {Promise} */ public async update(doc: ConcreteTaskInstance): Promise { - const rawDoc = taskDocToRaw(doc, this); - - const result = await this.callCluster('update', { - body: { - doc: rawDoc._source, - }, - id: doc.id, - index: this.index, - if_seq_no: doc.sequenceNumber, - if_primary_term: doc.primaryTerm, - // The refresh is important so that if we immediately look for work, - // we don't pick up this task. - refresh: true, - }); + const updatedSavedObject = await this.savedObjectsRepository.update( + 'task', + doc.id, + taskInstanceToAttributes(doc), + { version: doc.version } + ); - return { - ...doc, - sequenceNumber: result._seq_no, - primaryTerm: result._primary_term, - }; + return savedObjectToConcreteTaskInstance(updatedSavedObject); } /** @@ -336,22 +217,8 @@ export class TaskStore { * @param {string} id * @returns {Promise} */ - public async remove(id: string): Promise { - const result = await this.callCluster('delete', { - id, - index: this.index, - // The refresh is important so that if we immediately look for work, - // we don't pick up this task. - refresh: true, - }); - - return { - index: result._index, - id: result._id, - sequenceNumber: result._seq_no, - primaryTerm: result._primary_term, - result: result.result, - }; + public async remove(id: string): Promise { + await this.savedObjectsRepository.delete('task', id); } private async search(opts: any = {}): Promise { @@ -362,7 +229,7 @@ export class TaskStore { : queryOnlyTasks; const result = await this.callCluster('search', { - index: this.index, + index: TASK_MANAGER_INDEX, ignoreUnavailable: true, body: { ...opts, @@ -373,7 +240,10 @@ export class TaskStore { const rawDocs = result.hits.hits; return { - docs: (rawDocs as RawTaskDoc[]).map(rawToTaskDoc), + docs: (rawDocs as SavedObjectsRawDoc[]) + .map(doc => this.serializer.rawToSavedObject(doc)) + .map(doc => omit(doc, 'namespace') as SavedObject) + .map(savedObjectToConcreteTaskInstance), searchAfter: (rawDocs.length && rawDocs[rawDocs.length - 1].sort) || [], }; } @@ -393,62 +263,38 @@ function paginatableSort(sort: any[] = []) { return [...sort, sortById]; } -function rawSource(doc: TaskInstance, store: TaskStore) { - const { id, ...taskFields } = doc; - const source = { - ...taskFields, +function taskInstanceToAttributes(doc: TaskInstance): SavedObjectAttributes { + return { + ...omit(doc, 'id', 'version'), params: JSON.stringify(doc.params || {}), state: JSON.stringify(doc.state || {}), attempts: (doc as ConcreteTaskInstance).attempts || 0, - scheduledAt: doc.scheduledAt || new Date(), - runAt: doc.runAt || new Date(), + scheduledAt: (doc.scheduledAt || new Date()).toISOString(), + startedAt: (doc.startedAt && doc.startedAt.toISOString()) || null, + retryAt: (doc.retryAt && doc.retryAt.toISOString()) || null, + runAt: (doc.runAt || new Date()).toISOString(), status: (doc as ConcreteTaskInstance).status || 'idle', }; - - delete (source as any).id; - delete (source as any).sequenceNumber; - delete (source as any).primaryTerm; - delete (source as any).type; - - return { - id, - type: 'task', - task: source, - kibana: { - uuid: store.getKibanaUuid(), // needs to be pulled live - version: TEMPLATE_VERSION, - apiVersion: API_VERSION, - }, - }; -} - -function taskDocToRaw(doc: ConcreteTaskInstance, store: TaskStore): RawTaskDoc { - const { type, task, kibana } = rawSource(doc, store); - - return { - _id: doc.id, - _index: store.index, - _source: { type, task, kibana }, - _seq_no: doc.sequenceNumber, - _primary_term: doc.primaryTerm, - }; } -function rawToTaskDoc(doc: RawTaskDoc): ConcreteTaskInstance { +function savedObjectToConcreteTaskInstance(savedObject: SavedObject): ConcreteTaskInstance { return { - ...doc._source.task, - id: doc._id, - sequenceNumber: doc._seq_no, - primaryTerm: doc._primary_term, - params: parseJSONField(doc._source.task.params, 'params', doc), - state: parseJSONField(doc._source.task.state, 'state', doc), + ...savedObject.attributes, + id: savedObject.id, + version: savedObject.version, + scheduledAt: new Date(savedObject.attributes.scheduledAt), + runAt: new Date(savedObject.attributes.runAt), + startedAt: savedObject.attributes.startedAt && new Date(savedObject.attributes.startedAt), + retryAt: savedObject.attributes.retryAt && new Date(savedObject.attributes.retryAt), + state: parseJSONField(savedObject.attributes.state, 'state', savedObject.id), + params: parseJSONField(savedObject.attributes.params, 'params', savedObject.id), }; } -function parseJSONField(json: string, fieldName: string, doc: RawTaskDoc) { +function parseJSONField(json: string, fieldName: string, id: string) { try { return json ? JSON.parse(json) : {}; } catch (error) { - throw new Error(`Task "${doc._id}"'s ${fieldName} field has invalid JSON: ${json}`); + throw new Error(`Task "${id}"'s ${fieldName} field has invalid JSON: ${json}`); } } diff --git a/x-pack/test/api_integration/apis/alerting/create.ts b/x-pack/test/api_integration/apis/alerting/create.ts index 9d2b925b8999f..7a63d2df8dc55 100644 --- a/x-pack/test/api_integration/apis/alerting/create.ts +++ b/x-pack/test/api_integration/apis/alerting/create.ts @@ -33,7 +33,7 @@ export default function createAlertTests({ getService }: KibanaFunctionalTestDef async function getScheduledTask(id: string) { return await es.get({ - id, + id: `task:${id}`, index: '.kibana_task_manager', }); } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index 4bfae4c24a882..06beb5aa08fc1 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -106,7 +106,7 @@ export default function ({ getService }) { const [scheduledTask] = (await currentTasks()).docs; expect(scheduledTask.id).to.eql(task.id); expect(scheduledTask.attempts).to.be.greaterThan(0); - expect(Date.parse(scheduledTask.runAt)).to.be.greaterThan(Date.parse(task.runAt)); + expect(Date.parse(scheduledTask.runAt)).to.be.greaterThan(Date.parse(task.runAt) + 5 * 60 * 1000); }); });