diff --git a/jest.config.base.js b/jest.config.base.js index 09daaf493cb..f5bd2ac25f9 100644 --- a/jest.config.base.js +++ b/jest.config.base.js @@ -8,7 +8,7 @@ module.exports = (projectDir) => { const name = path.basename(projectDir); const workspaceName = name === 'e2e' ? 'e2e' : 'packages'; const rootDir = name === 'e2e' ? '../' : '../../'; - const projectRoot = name === 'e2e' ? '/e2e' : `/${workspaceName}/${name}`; + const packageRoot = name === 'e2e' ? '/e2e' : `/${workspaceName}/${name}`; const isTypescript = fs.pathExistsSync(path.join(projectDir, 'tsconfig.json')); const runInPackage = projectDir === process.cwd(); @@ -18,13 +18,13 @@ module.exports = (projectDir) => { displayName: name, verbose: true, testEnvironment: 'node', - setupFilesAfterEnv: ['jest-extended'], - testMatch: [`${projectRoot}/test/**/*-spec.{ts,js}`, `${projectRoot}/test/*-spec.{ts,js}`], + setupFilesAfterEnv: ['jest-extended', '/scripts/add-test-env.js'], + testMatch: [`${packageRoot}/test/**/*-spec.{ts,js}`, `${packageRoot}/test/*-spec.{ts,js}`], testPathIgnorePatterns: [ '/assets', `/${workspaceName}/*/node_modules`, `/${workspaceName}/*/dist`, - `/${workspaceName}/teraslice-cli/test/fixtures/`, + `/${workspaceName}/teraslice-cli/test/fixtures/` ], transformIgnorePatterns: ['^.+\\.js$'], moduleNameMapper: lernaAliases({ mainFields: ['srcMain', 'main'] }), @@ -33,25 +33,25 @@ module.exports = (projectDir) => { coveragePathIgnorePatterns: ['/node_modules/', '/test/'], watchPathIgnorePatterns: [], coverageReporters: runInPackage ? ['html'] : ['lcov', 'text', 'html'], - coverageDirectory: `${projectRoot}/coverage`, + coverageDirectory: `${packageRoot}/coverage`, preset: 'ts-jest', - watchPlugins: ['jest-watch-typeahead/filename', 'jest-watch-typeahead/testname'], + watchPlugins: ['jest-watch-typeahead/filename', 'jest-watch-typeahead/testname'] }; if (fs.pathExistsSync(path.join(projectDir, 'test/global.setup.js'))) { - config.globalSetup = `${projectRoot}/test/global.setup.js`; + config.globalSetup = `${packageRoot}/test/global.setup.js`; } if (fs.pathExistsSync(path.join(projectDir, 'test/global.teardown.js'))) { - config.globalTeardown = `${projectRoot}/test/global.teardown.js`; + config.globalTeardown = `${packageRoot}/test/global.teardown.js`; } if (fs.pathExistsSync(path.join(projectDir, 'test/test.setup.js'))) { - config.setupFilesAfterEnv.push(`${projectRoot}/test/test.setup.js`); + config.setupFilesAfterEnv.push(`${packageRoot}/test/test.setup.js`); } config.globals = { - availableExtensions: ['.js', '.ts'], + availableExtensions: ['.js', '.ts'] }; if (isTypescript) { @@ -59,37 +59,37 @@ module.exports = (projectDir) => { config.globals['ts-jest'] = { tsConfig: './tsconfig.json', diagnostics: true, - pretty: true, + pretty: true }; } else { config.globals['ts-jest'] = { tsConfig: `./${workspaceName}/${name}/tsconfig.json`, diagnostics: true, - pretty: true, + pretty: true }; } } else { config.globals['ts-jest'] = { diagnostics: true, - pretty: true, + pretty: true }; } - config.roots = [`${projectRoot}/test`]; + config.roots = [`${packageRoot}/test`]; if (fs.pathExistsSync(path.join(projectDir, 'lib'))) { - config.roots.push(`${projectRoot}/lib`); + config.roots.push(`${packageRoot}/lib`); } else if (fs.pathExistsSync(path.join(projectDir, 'index.js'))) { - config.roots.push(`${projectRoot}`); + config.roots.push(`${packageRoot}`); } if (fs.pathExistsSync(path.join(projectDir, 'src'))) { - config.roots.push(`${projectRoot}/src`); + config.roots.push(`${packageRoot}/src`); } if (fs.pathExistsSync(path.join(projectDir, 'peg'))) { - config.watchPathIgnorePatterns.push(`${projectRoot}/peg/*engine*.js`); - config.roots.push(`${projectRoot}/peg`); + config.watchPathIgnorePatterns.push(`${packageRoot}/peg/*engine*.js`); + config.roots.push(`${packageRoot}/peg`); } return config; diff --git a/packages/data-access-plugin/package.json b/packages/data-access-plugin/package.json index e84a4a9ff68..0ca536b1ccc 100644 --- a/packages/data-access-plugin/package.json +++ b/packages/data-access-plugin/package.json @@ -46,7 +46,7 @@ "xlucene-evaluator": "^0.9.3" }, "devDependencies": { - "@terascope/job-components": "^0.19.0", + "@terascope/job-components": "^0.20.0", "@types/express": "^4.17.0", "@types/got": "^9.6.0", "@types/graphql-iso-date": "^3.3.1", diff --git a/packages/job-components/package.json b/packages/job-components/package.json index 27af31cd2a8..dd45417d8e3 100644 --- a/packages/job-components/package.json +++ b/packages/job-components/package.json @@ -1,6 +1,6 @@ { "name": "@terascope/job-components", - "version": "0.19.0", + "version": "0.20.0", "publishConfig": { "access": "public" }, diff --git a/packages/job-components/src/execution-context/api.ts b/packages/job-components/src/execution-context/api.ts index 5a2f2337982..dd912922907 100644 --- a/packages/job-components/src/execution-context/api.ts +++ b/packages/job-components/src/execution-context/api.ts @@ -1,10 +1,9 @@ -import { OpAPI, Context, ExecutionConfig, APIConfig } from '../interfaces'; -import { OperationAPIConstructor } from '../operations'; - -// WeakMaps are used as a memory efficient reference to private data -const _registry = new WeakMap(); -const _apis = new WeakMap(); -const _config = new WeakMap(); +import { EventEmitter } from 'events'; +import { AnyObject, Logger, isTest } from '@terascope/utils'; +import { OpAPI, Context, ExecutionConfig, APIConfig, WorkerContext } from '../interfaces'; +import { isOperationAPI, getOperationAPIType, makeContextLogger } from './utils'; +import { Observer, APIConstructor } from '../operations'; +import { JobAPIInstances } from './interfaces'; /** * A utility API exposed on the Terafoundation Context APIs. @@ -12,91 +11,128 @@ const _config = new WeakMap(); * - Registering Operation API * - Creating an API (usually done from an Operation), * it also includes attaching the API to the Execution LifeCycle. - * An API can only be created once. + * An API will only be created once. * - Getting a reference to an API -*/ + */ export class ExecutionContextAPI { + private readonly _apis: JobAPIInstances = {}; + private readonly _context: WorkerContext; + private readonly _events: EventEmitter; + private readonly _executionConfig: ExecutionConfig; + private readonly _logger: Logger; + constructor(context: Context, executionConfig: ExecutionConfig) { - _config.set(this, { - context, - executionConfig, - events: context.apis.foundation.getSystemEvents(), - }); - - _registry.set(this, {}); - _apis.set(this, {}); + this._context = context as WorkerContext; + this._events = context.apis.foundation.getSystemEvents(); + this._executionConfig = executionConfig; + this._logger = this.makeLogger('execution_context_api'); } - /** Add an API constructor to the registry */ - addToRegistry(name: string, api: OperationAPIConstructor) { - const registry = this.registry; - registry[name] = api; - _registry.set(this, registry); + /** For backwards compatibility */ + get registry() { + return {}; } - /** Get all of the registered API constructors */ - get registry(): APIRegistry { - return _registry.get(this); + get apis(): Readonly { + return this._apis; } - /** Get all of the initalized APIs */ - get apis(): APIS { - return _apis.get(this); - } + /** Add an API constructor to the registry */ + addToRegistry(name: string, API: APIConstructor) { + if (this._apis[name] != null) { + throw new Error(`Cannot register API "${name}" due to conflict`); + } + + const { apis = [] } = this._executionConfig; - /** Set an api to be used */ - addAPI(name: string, opAPI: OpAPI) { - const apis = _apis.get(this); - apis[name] = opAPI; + const apiConfig = apis.find((a: APIConfig) => a._name === name) || { + _name: name, + }; + + const instance = new API(this._context, apiConfig, this._executionConfig); + const type = getOperationAPIType(instance); + this._apis[name] = { + instance, + type, + }; + + const eventName = 'execution:add-to-lifecycle'; + const count = this._events.listenerCount(eventName); + if (!count) { + if (isTest) return; + this._logger.warn(`no listener ${eventName} available but is needed to register the api`); + } else { + this._events.emit(eventName, instance); + this._logger.trace(`registered api ${name}`); + } } /** - * Get a reference to a specific API, - * the must be initialized. + * Get a reference to a specific operation API, + * the must be initialized and created */ - getAPI(name: string) { - if (this.apis[name] == null) { + getObserver(name: string): T { + const api = this._apis[name]; + if (api == null) { throw new Error(`Unable to find API by name "${name}"`); } - return this.apis[name]; + if (api.type !== 'observer') { + throw new Error(`Unable to find observer by name "${name}"`); + } + return api.instance as T; } /** - * Initalize an API and attach it - * to the lifecycle of an Execution. + * Get a reference to a specific operation API, + * the must be initialized and created */ - async initAPI(name: string, ...params: any[]) { - const config = _config.get(this); - if (this.registry[name] == null) { + getAPI(name: string): T { + const api = this._apis[name]; + if (api == null) { throw new Error(`Unable to find API by name "${name}"`); } - - if (this.apis[name] != null) { - throw new Error(`API "${name}" can only be initalized once`); + if (api.opAPI == null) { + throw new Error(`Unable to find created API by name "${name}"`); } + return api.opAPI as T; + } - const API = this.registry[name]; - - const { apis = [] } = config.executionConfig; - - const apiConfig = apis.find((a: APIConfig) => a._name === name) || { - _name: name, - }; + /** + * Create an instance of the API + * + * @param name the name of API to create + * @param params any additional options that the API may need + */ + async initAPI(name: string, ...params: any[]): Promise { + const api = this._apis[name]; + if (api == null) { + throw new Error(`Unable to find API by name "${name}"`); + } - const api = new API(config.context, apiConfig, config.executionConfig); - await api.initialize(); + if (!isOperationAPI(api.instance)) { + throw new Error('Observers cannot be created'); + } - config.events.emit('execution:add-to-lifecycle', api); + if (api.opAPI != null) { + const msg = `using existing instance of api: "${name}"`; + if (params.length) { + this._logger.warn(`${msg}, ignoring params`); + } else { + this._logger.debug(`${msg}`); + } + return api.opAPI; + } - this.addAPI(name, await api.createAPI(...params)); - return this.apis[name]; + api.opAPI = await api.instance.createAPI(...params); + this._logger.trace(`initialized api ${name}`); + return api.opAPI; } -} - -interface APIS { - [name: string]: OpAPI; -} -interface APIRegistry { - [name: string]: OperationAPIConstructor; + /** + * Make a logger with a the job_id and ex_id in the logger context + */ + makeLogger(moduleName: string, extra: AnyObject = {}) { + const { ex_id, job_id } = this._executionConfig; + return makeContextLogger(this._context, moduleName, { ex_id, job_id, ...extra }); + } } diff --git a/packages/job-components/src/execution-context/base.ts b/packages/job-components/src/execution-context/base.ts index 29f3856050f..665eee300dc 100644 --- a/packages/job-components/src/execution-context/base.ts +++ b/packages/job-components/src/execution-context/base.ts @@ -3,14 +3,11 @@ import { isFunction, cloneDeep } from '@terascope/utils'; import { OperationLoader } from '../operation-loader'; import { registerApis } from '../register-apis'; import { ExecutionConfig, WorkerContext, OperationLifeCycle } from '../interfaces'; -import { - EventHandlers, - ExecutionContextConfig, -} from './interfaces'; +import { EventHandlers, ExecutionContextConfig } from './interfaces'; /** * A base class for an Execution Context -*/ + */ export default class BaseExecutionContext { readonly config: ExecutionConfig; readonly context: WorkerContext; @@ -56,7 +53,7 @@ export default class BaseExecutionContext { /** * Called to initialize all of the registered operations - */ + */ async initialize(initConfig?: any) { const promises = []; for (const op of this.getOperations()) { @@ -68,7 +65,7 @@ export default class BaseExecutionContext { /** * Called to cleanup all of the registered operations - */ + */ async shutdown() { const promises = []; for (const op of this.getOperations()) { @@ -77,17 +74,20 @@ export default class BaseExecutionContext { await Promise.all(promises); - Object.keys(this._handlers) - .forEach((event) => { - const listener = this._handlers[event]; - this.events.removeListener(event, listener); - }); + Object.keys(this._handlers).forEach(event => { + const listener = this._handlers[event]; + this.events.removeListener(event, listener); + }); + } + + get api() { + return this.context.apis.executionContext; } /** * Returns a list of any registered Operation that has been * initialized. - */ + */ getOperations() { return this._operations.values(); } diff --git a/packages/job-components/src/execution-context/index.ts b/packages/job-components/src/execution-context/index.ts index 61793e499e9..ecac26417ca 100644 --- a/packages/job-components/src/execution-context/index.ts +++ b/packages/job-components/src/execution-context/index.ts @@ -2,6 +2,7 @@ export * from './api'; export * from './interfaces'; export * from './slicer'; export * from './worker'; +export * from './utils'; import { Context, WorkerContext } from '../interfaces'; import { SlicerExecutionContext } from './slicer'; @@ -25,32 +26,13 @@ export function isSlicerExecutionContext(context: any): context is SlicerExecuti } export function makeExecutionContext(config: ExecutionContextConfig) { - const { ex_id, job_id } = config.executionConfig; - if (isSlicerContext(config.context)) { - const logger = makeContextLogger(config.context, 'slicer_context', { ex_id, job_id }); - return new SlicerExecutionContext(config, logger); + return new SlicerExecutionContext(config); } if (isWorkerContext(config.context)) { - const logger = makeContextLogger(config.context, 'worker_context', { ex_id, job_id }); - return new WorkerExecutionContext(config, logger); + return new WorkerExecutionContext(config); } throw new Error('ExecutionContext requires an assignment of "execution_controller" or "worker"'); } - -export function makeContextLogger(context: Context, moduleName: string, extra = {}) { - const { assignment, cluster } = context; - - return context.apis.foundation.makeLogger( - Object.assign( - { - module: moduleName, - worker_id: cluster.worker.id, - assignment, - }, - extra - ) - ); -} diff --git a/packages/job-components/src/execution-context/interfaces.ts b/packages/job-components/src/execution-context/interfaces.ts index b2518dc8751..0de5d0b0cf6 100644 --- a/packages/job-components/src/execution-context/interfaces.ts +++ b/packages/job-components/src/execution-context/interfaces.ts @@ -2,13 +2,13 @@ import { DataEntity } from '@terascope/utils'; import { Context, ExecutionConfig, - OpAPI, Slice, SliceAnalyticsData, SlicerOperationLifeCycle, WorkerOperationLifeCycle, + OpAPI, } from '../interfaces'; -import { APICore } from '../operations'; +import { APICore, OperationAPIType } from '../operations'; export interface ExecutionContextConfig { context: Context; @@ -29,7 +29,7 @@ export interface EventHandlers { export interface JobAPIInstance { instance: APICore; opAPI?: OpAPI; - type: 'api' | 'observer'; + type: OperationAPIType; } export interface JobAPIInstances { diff --git a/packages/job-components/src/execution-context/slicer.ts b/packages/job-components/src/execution-context/slicer.ts index 8efb292da81..9bf13dab852 100644 --- a/packages/job-components/src/execution-context/slicer.ts +++ b/packages/job-components/src/execution-context/slicer.ts @@ -1,11 +1,9 @@ -import { cloneDeep, debugLogger, Logger } from '@terascope/utils'; +import { cloneDeep, Logger } from '@terascope/utils'; import { SlicerOperationLifeCycle, ExecutionStats, Slice, SliceResult } from '../interfaces'; import SlicerCore from '../operations/core/slicer-core'; import { ExecutionContextConfig } from './interfaces'; import BaseExecutionContext from './base'; -const _logger = debugLogger('execution-context-slicer'); - /** * SlicerExecutionContext is designed to add more * functionality to interface with the @@ -15,9 +13,9 @@ export class SlicerExecutionContext extends BaseExecutionContext 0) return val; + return 0; +} + +export function isOperationAPI(api: any): api is OperationAPI { + return api && isFunction(api.createAPI); +} + +export function getOperationAPIType(api: any): OperationAPIType { + return isOperationAPI(api) ? 'api' : 'observer'; +} + +export function makeContextLogger(context: Context, moduleName: string, extra = {}) { + const { assignment, cluster } = context; + + return context.apis.foundation.makeLogger( + Object.assign( + { + module: moduleName, + worker_id: cluster.worker.id, + assignment, + }, + extra + ) + ); +} diff --git a/packages/job-components/src/execution-context/worker.ts b/packages/job-components/src/execution-context/worker.ts index 74589645627..ddd8092a71f 100644 --- a/packages/job-components/src/execution-context/worker.ts +++ b/packages/job-components/src/execution-context/worker.ts @@ -1,12 +1,10 @@ import * as ts from '@terascope/utils'; +import { ExecutionContextConfig, RunSliceResult, WorkerSliceState, WorkerStatus, SliceStatus } from './interfaces'; +import { WorkerOperationLifeCycle, Slice, sliceAnalyticsMetrics, SliceAnalyticsData } from '../interfaces'; import { FetcherCore, ProcessorCore, OperationCore } from '../operations/core'; -import { OperationAPI, OperationAPIConstructor } from '../operations'; -import { WorkerOperationLifeCycle, Slice, OpAPI, sliceAnalyticsMetrics, SliceAnalyticsData } from '../interfaces'; -import { ExecutionContextConfig, RunSliceResult, JobAPIInstances, WorkerSliceState, WorkerStatus, SliceStatus } from './interfaces'; import JobObserver from '../operations/job-observer'; import BaseExecutionContext from './base'; - -const _logger = ts.debugLogger('execution-context-worker'); +import { getMetric } from './utils'; /** * WorkerExecutionContext is designed to add more @@ -15,10 +13,6 @@ const _logger = ts.debugLogger('execution-context-worker'); */ export class WorkerExecutionContext extends BaseExecutionContext implements WorkerOperationLifeCycle { readonly processors: ProcessorCore[]; - readonly apis: JobAPIInstances = {}; - - private readonly jobObserver: JobObserver; - readonly logger: ts.Logger; /** the active (or last) run slice */ @@ -28,9 +22,9 @@ export class WorkerExecutionContext extends BaseExecutionContext Promise)[]; - constructor(config: ExecutionContextConfig, logger: ts.Logger = _logger) { + constructor(config: ExecutionContextConfig) { super(config); - this.logger = logger; + this.logger = this.api.makeLogger('worker_context'); this._methodRegistry.set('onSliceInitialized', new Set()); this._methodRegistry.set('onSliceStarted', new Set()); @@ -43,49 +37,44 @@ export class WorkerExecutionContext extends BaseExecutionContext { - const opAPI = await instance.createAPI(); - api.opAPI = opAPI; - - this.addAPI(name, opAPI); - this.apis[name] = api; - })() - ); + // make sure we autoload the api + const promises: Promise[] = []; + for (const { _name: name } of this.config.apis || []) { + const api = this.apis[name]; + if (api.type === 'api') { + promises.push(this.api.initAPI(name)); } } - await Promise.all(promises); this.status = 'idle'; @@ -161,7 +140,7 @@ export class WorkerExecutionContext extends BaseExecutionContext= 0) { - index = findBy; + index = findBy as number; } if (index === 0) { @@ -183,6 +162,16 @@ export class WorkerExecutionContext extends BaseExecutionContext('job-observer'); + if (jobObserver == null) throw new Error("Job Observer hasn't not be initialized"); + return jobObserver; + } + async initializeSlice(slice: Slice) { const currentSliceId = this._sliceId; if (this.status !== 'flushing') { @@ -320,23 +309,6 @@ export class WorkerExecutionContext extends BaseExecutionContext _name === name); - if (hasName) { - throw new Error(`Cannot register API ${name} due to conflict`); - } - - this.context.apis.executionContext.addToRegistry(name, API); - } - - /** Add an API to the executionContext api */ - protected addAPI(name: string, opAPI: OpAPI) { - this.context.apis.executionContext.addAPI(name, opAPI); - } - private _onOperationComplete(index: number, processed: number) { this._runMethod('onOperationComplete', this._sliceId, index, processed); } @@ -446,13 +418,3 @@ export class WorkerExecutionContext extends BaseExecutionContext 0) return val; - return 0; -} - -function isOperationAPI(api: any): api is OperationAPI { - return api && ts.isFunction(api.createAPI); -} diff --git a/packages/job-components/src/operations/interfaces.ts b/packages/job-components/src/operations/interfaces.ts index 868732aeec4..c097b1a2bea 100644 --- a/packages/job-components/src/operations/interfaces.ts +++ b/packages/job-components/src/operations/interfaces.ts @@ -9,20 +9,20 @@ import ParallelSlicer from './parallel-slicer'; import OperationAPI from './operation-api'; export type APICoreConstructor = { - new(context: WorkerContext, apiConfig: APIConfig, executionConfig: ExecutionConfig): U; + new (context: WorkerContext, apiConfig: APIConfig, executionConfig: ExecutionConfig): U; }; export type OperationCoreConstructor = { - new(context: WorkerContext, opConfig: OpConfig & T, executionConfig: ExecutionConfig): U; + new (context: WorkerContext, opConfig: OpConfig & T, executionConfig: ExecutionConfig): U; }; export type SlicerCoreConstructor = { - new(context: WorkerContext, opConfig: OpConfig & T, executionConfig: ExecutionConfig): U; + new (context: WorkerContext, opConfig: OpConfig & T, executionConfig: ExecutionConfig): U; }; export type SchemaConstructor = { type(): string; - new(context: Context, opType?: OpType): SchemaCore; + new (context: Context, opType?: OpType): SchemaCore; }; export type OperationAPIConstructor = APICoreConstructor; @@ -34,7 +34,7 @@ export type ParallelSlicerConstructor = SlicerCoreConstructor; export type FetcherConstructor = OperationCoreConstructor; export type ProcessorConstructor = OperationCoreConstructor; -export type CoreOperation = FetcherCore|SlicerCore|ProcessorCore; +export type CoreOperation = FetcherCore | SlicerCore | ProcessorCore; export interface OperationModule { Schema: SchemaConstructor; @@ -45,9 +45,10 @@ export interface SchemaModule { Schema: SchemaConstructor; } +export type OperationAPIType = 'api' | 'observer'; export interface APIModule extends SchemaModule { - API: OperationAPIConstructor|ObserverConstructor; - type: 'api'|'observer'; + API: OperationAPIConstructor | ObserverConstructor; + type: OperationAPIType; } export interface ReaderModule extends OperationModule { diff --git a/packages/job-components/src/register-apis.ts b/packages/job-components/src/register-apis.ts index 923d0da8a94..9a8d69ba903 100644 --- a/packages/job-components/src/register-apis.ts +++ b/packages/job-components/src/register-apis.ts @@ -1,7 +1,7 @@ import fs from 'fs'; import path from 'path'; import { parseJSON } from '@terascope/utils'; -import { ConnectionConfig, Context, ValidatedJobConfig, ExecutionConfig, OpConfig, GetClientConfig } from './interfaces'; +import { ConnectionConfig, Context, ValidatedJobConfig, ExecutionConfig, OpConfig, GetClientConfig, WorkerContextAPIs } from './interfaces'; import { ExecutionContextAPI } from './execution-context'; /** Get the first opConfig from an operation name */ @@ -81,18 +81,21 @@ export function getClient(context: Context, config: GetClientConfig, type: strin } export function registerApis(context: Context, job: ValidatedJobConfig | ExecutionConfig, assetIds?: string[]): void { - if (context.apis.executionContext == null) { - context.apis.registerAPI('executionContext', new ExecutionContextAPI(context, job as ExecutionConfig)); + const cleanupApis: (keyof WorkerContextAPIs)[] = ['op_runner', 'executionContext', 'job_runner', 'assets']; + for (const api of cleanupApis) { + if (context.apis[api] != null) { + delete context.apis[api]; + } } - delete context.apis.op_runner; + context.apis.registerAPI('executionContext', new ExecutionContextAPI(context, job as ExecutionConfig)); + context.apis.registerAPI('op_runner', { getClient(config: GetClientConfig, type: string): { client: any } { return getClient(context, config, type); }, }); - delete context.apis.job_runner; context.apis.registerAPI('job_runner', { getOpConfig(name: string): OpConfig | undefined { return getOpConfig(job, name); @@ -100,8 +103,6 @@ export function registerApis(context: Context, job: ValidatedJobConfig | Executi }); const assetDir = context.sysconfig.teraslice.assets_directory; - - delete context.apis.assets; context.apis.registerAPI('assets', { getPath(name: string): Promise { return getAssetPath(assetDir || '', assetIds || job.assets, name); diff --git a/packages/job-components/test/execution-context/worker-spec.ts b/packages/job-components/test/execution-context/worker-spec.ts index d143aabc26d..b01ef9d627a 100644 --- a/packages/job-components/test/execution-context/worker-spec.ts +++ b/packages/job-components/test/execution-context/worker-spec.ts @@ -1,16 +1,8 @@ import 'jest-extended'; import path from 'path'; +import { pDelay, DataEntity } from '@terascope/utils'; import { terasliceOpPath } from '../helpers'; -import { - WorkerExecutionContext, - TestContext, - newTestExecutionConfig, - DataEntity, - FetcherCore, - ProcessorCore, - newTestSlice, - pDelay, -} from '../../src'; +import { WorkerExecutionContext, TestContext, newTestExecutionConfig, FetcherCore, ProcessorCore, newTestSlice } from '../../src'; describe('WorkerExecutionContext', () => { const assetIds = ['fixtures']; @@ -48,14 +40,16 @@ describe('WorkerExecutionContext', () => { ], }); - const executionContext = new WorkerExecutionContext({ - context, - executionConfig, - assetIds, - terasliceOpPath, - }); + let executionContext: WorkerExecutionContext; beforeAll(async () => { + executionContext = new WorkerExecutionContext({ + context, + executionConfig, + assetIds, + terasliceOpPath, + }); + expect(executionContext).toHaveProperty('status', 'initializing'); await executionContext.initialize(); expect(executionContext).toHaveProperty('status', 'idle'); @@ -97,8 +91,24 @@ describe('WorkerExecutionContext', () => { } }); - it('should have the APIs', () => { - expect(executionContext.apis).toContainKeys(['example-observer', 'example-api']); + it('should have the registered APIs', () => { + const registeredAPIs = Object.keys(executionContext.apis); + // this test is order specific to ensure everything is loaded correctly + expect(registeredAPIs).toEqual(['job-observer', 'example-observer', 'example-api', 'example-reader']); + }); + + it('should be able to get the example-api', async () => { + const delay = executionContext.getOperation('delay'); + const api = await delay.getAPI('example-api'); + expect(api).not.toBeNil(); + }); + + it('should be able to create the example-api', async () => { + const delay = executionContext.getOperation('delay'); + const api = await delay.createAPI('example-api'); + expect(api).not.toBeNil(); + + expect(delay.getAPI('example-api')).toBe(api); }); it('should be able to an operation instance by index', async () => { @@ -123,10 +133,6 @@ describe('WorkerExecutionContext', () => { expect(processor.opConfig._op).toEqual('example-op'); }); - it('should have the registered apis', () => { - expect(context.apis.executionContext.registry).toContainKeys(['example-reader']); - }); - it('should have the operations initialized', () => { const ops = executionContext.getOperations(); @@ -263,14 +269,15 @@ describe('WorkerExecutionContext', () => { }, ]; - const executionContext = new WorkerExecutionContext({ - context, - executionConfig, - assetIds, - terasliceOpPath, - }); + let executionContext: WorkerExecutionContext; beforeAll(() => { + executionContext = new WorkerExecutionContext({ + context, + executionConfig, + assetIds, + terasliceOpPath, + }); return executionContext.initialize(); }); diff --git a/packages/job-components/test/operations/operation-api-spec.ts b/packages/job-components/test/operations/operation-api-spec.ts index eedb3af93aa..76494a320af 100644 --- a/packages/job-components/test/operations/operation-api-spec.ts +++ b/packages/job-components/test/operations/operation-api-spec.ts @@ -1,11 +1,5 @@ import 'jest-extended'; // require for type definitions -import { - OperationAPI, - OpAPIInstance, - ExecutionContextAPI, - newTestExecutionConfig, - TestContext -} from '../../src'; +import { OperationAPI, OpAPIInstance, ExecutionContextAPI, newTestExecutionConfig, TestContext } from '../../src'; describe('OperationAPI', () => { interface ExampleAPI extends OpAPIInstance { @@ -15,7 +9,7 @@ describe('OperationAPI', () => { class ExampleOperationAPI extends OperationAPI { public async createAPI(): Promise { return { - hi: () => 'hello' + hi: () => 'hello', }; } } @@ -35,16 +29,12 @@ describe('OperationAPI', () => { }); it('should be able to be created', async () => { - const api:ExampleAPI = await context.apis.executionContext.initAPI('example/api'); + const api: ExampleAPI = await context.apis.executionContext.initAPI('example/api'); expect(api.hi()).toEqual('hello'); }); - it('should throw an error if created again', async () => { - return expect(context.apis.executionContext.initAPI('example/api')).rejects.toThrow(); - }); - it('should be able to be fetched', async () => { - const api:ExampleAPI = await context.apis.executionContext.getAPI('example/api'); + const api: ExampleAPI = await context.apis.executionContext.getAPI('example/api'); expect(api.hi()).toEqual('hello'); }); }); diff --git a/packages/job-components/test/register-apis-spec.ts b/packages/job-components/test/register-apis-spec.ts index d6a478b0a33..583e08da8cc 100644 --- a/packages/job-components/test/register-apis-spec.ts +++ b/packages/job-components/test/register-apis-spec.ts @@ -71,10 +71,10 @@ describe('registerApis', () => { create() { return { client: { - elasticsearch: true - } + elasticsearch: true, + }, }; - } + }, }, { type: 'elasticsearch', @@ -83,10 +83,10 @@ describe('registerApis', () => { return { client: { elasticsearch: true, - endpoint: 'otherConnection' - } + endpoint: 'otherConnection', + }, }; - } + }, }, { type: 'elasticsearch', @@ -95,10 +95,10 @@ describe('registerApis', () => { return { client: { elasticsearch: true, - endpoint: 'thirdConnection' - } + endpoint: 'thirdConnection', + }, }; - } + }, }, { type: 'kafka', @@ -106,63 +106,86 @@ describe('registerApis', () => { create() { return { client: { - kafka: true - } + kafka: true, + }, }; - } + }, }, { type: 'mongo', create() { return { client: { - mongo: true - } + mongo: true, + }, }; - } - } + }, + }, ]; context.apis.setTestClients(clients); it('getClient should return a client', () => { expect(getClient({}, 'elasticsearch')).toEqual({ - elasticsearch: true + elasticsearch: true, }); - const firstResult = getClient({ - connection: 'otherConnection', - connection_cache: true - }, 'elasticsearch'); + const firstResult = getClient( + { + connection: 'otherConnection', + connection_cache: true, + }, + 'elasticsearch' + ); expect(firstResult).toEqual({ elasticsearch: true, - endpoint: 'otherConnection' + endpoint: 'otherConnection', }); - expect(getClient({ - connection: 'otherConnection', - connection_cache: true - }, 'elasticsearch')).toBe(firstResult); - - expect(getClient({ - connection: 'thirdConnection', - connection_cache: false, - }, 'elasticsearch')).toEqual({ + expect( + getClient( + { + connection: 'otherConnection', + connection_cache: true, + }, + 'elasticsearch' + ) + ).toBe(firstResult); + + expect( + getClient( + { + connection: 'thirdConnection', + connection_cache: false, + }, + 'elasticsearch' + ) + ).toEqual({ elasticsearch: true, endpoint: 'thirdConnection', }); - expect(getClient({ - connection: 'someConnection' - }, 'kafka')).toEqual({ - kafka: true + expect( + getClient( + { + connection: 'someConnection', + }, + 'kafka' + ) + ).toEqual({ + kafka: true, }); - expect(getClient({ - connection_cache: false - }, 'mongo')).toEqual({ - mongo: true + expect( + getClient( + { + connection_cache: false, + }, + 'mongo' + ) + ).toEqual({ + mongo: true, }); }); @@ -186,9 +209,7 @@ describe('registerApis', () => { failingContext.foundation.getConnection = makeError; const events = failingContext.apis.foundation.getSystemEvents(); - const errStr = - 'No configuration for endpoint default ' + - 'was found in the terafoundation connectors'; + const errStr = 'No configuration for endpoint default ' + 'was found in the terafoundation connectors'; events.once('client:initialization:error', errMsg => { expect(errMsg.error.includes(errStr)).toEqual(true); @@ -230,14 +251,9 @@ describe('registerApis', () => { expect(result()).toEqual('hello'); }); - it('should throw an error when the API is already created', async () => { - expect.hasAssertions(); - - try { - await context.apis.executionContext.initAPI('hello'); - } catch (err) { - expect(err.message).toEqual('API "hello" can only be initalized once'); - } + it('should not throw the API is already created', async () => { + const result = await context.apis.executionContext.initAPI('hello'); + expect(result()).toEqual('hello'); }); }); diff --git a/packages/teraslice-op-test-harness/package.json b/packages/teraslice-op-test-harness/package.json index 005c7cf7a47..c332ac57c14 100644 --- a/packages/teraslice-op-test-harness/package.json +++ b/packages/teraslice-op-test-harness/package.json @@ -1,6 +1,6 @@ { "name": "@terascope/teraslice-op-test-harness", - "version": "1.6.0", + "version": "1.7.0", "publishConfig": { "access": "public" }, @@ -23,7 +23,7 @@ "url": "https://github.com/terascope/teraslice/issues" }, "dependencies": { - "@terascope/job-components": "^0.19.0", + "@terascope/job-components": "^0.20.0", "bluebird": "^3.5.5", "lodash": "^4.17.11" }, diff --git a/packages/teraslice-test-harness/package.json b/packages/teraslice-test-harness/package.json index 6c70639b630..c8335153f57 100644 --- a/packages/teraslice-test-harness/package.json +++ b/packages/teraslice-test-harness/package.json @@ -1,6 +1,6 @@ { "name": "teraslice-test-harness", - "version": "0.7.0", + "version": "0.8.0", "publishConfig": { "access": "public" }, @@ -37,7 +37,7 @@ "test:debug": "env DEBUG=\"${DEBUG:-*teraslice*}\" jest --detectOpenHandles --coverage=false --runInBand" }, "dependencies": { - "@terascope/job-components": "^0.19.0", + "@terascope/job-components": "^0.20.0", "@terascope/teraslice-op-test-harness": "^1.6.0", "lodash": "^4.17.11" }, diff --git a/packages/teraslice-test-harness/src/worker-test-harness.ts b/packages/teraslice-test-harness/src/worker-test-harness.ts index 1338b1a7991..70de76b65c4 100644 --- a/packages/teraslice-test-harness/src/worker-test-harness.ts +++ b/packages/teraslice-test-harness/src/worker-test-harness.ts @@ -33,9 +33,9 @@ export default class WorkerTestHarness extends BaseTestHarness; + async flush(options: { fullResponse: false }): Promise; + async flush(options: { fullResponse: true }): Promise; async flush({ fullResponse = false } = {}) { const response = await this.executionContext.flush(); if (response != null) { diff --git a/packages/teraslice-test-harness/test/example-asset-spec.ts b/packages/teraslice-test-harness/test/example-asset-spec.ts index e06e5107d43..313f02b4a4e 100644 --- a/packages/teraslice-test-harness/test/example-asset-spec.ts +++ b/packages/teraslice-test-harness/test/example-asset-spec.ts @@ -1,13 +1,8 @@ +import 'jest-extended'; import path from 'path'; import { DataEntity, TestClientConfig } from '@terascope/job-components'; import SimpleClient from './fixtures/asset/simple-connector/client'; -import { - JobTestHarness, - newTestJobConfig, - newTestSlice, - SlicerTestHarness, - WorkerTestHarness, -} from '../src'; +import { JobTestHarness, newTestJobConfig, newTestSlice, SlicerTestHarness, WorkerTestHarness } from '../src'; import { SimpleAPI } from './fixtures/asset/simple-api/interfaces'; jest.mock('./fixtures/asset/simple-connector/client'); @@ -35,18 +30,18 @@ describe('Example Asset', () => { job.apis = [ { _name: 'simple-api', - } + }, ]; job.operations = [ { - _op: 'simple-reader' + _op: 'simple-reader', }, { _op: 'transformer', action: 'set', key: 'foo', setValue: 'bar', - } + }, ]; let harness: WorkerTestHarness; @@ -60,7 +55,7 @@ describe('Example Asset', () => { a: 'b', c: 'd', e: 'f', - } + }, }; }); @@ -85,8 +80,7 @@ describe('Example Asset', () => { testSlice.request = { count: 10 }; const results = await harness.runSlice(testSlice); - expect(Array.isArray(results)).toBe(true); - expect(results.length).toBe(10); + expect(results).toBeArrayOfSize(10); for (const result of results) { expect(DataEntity.isDataEntity(result)).toBe(true); @@ -100,9 +94,7 @@ describe('Example Asset', () => { }); it('should have use the simple api', async () => { - expect(Object.keys(harness.apis)).toEqual([ - 'simple-api' - ]); + expect(Object.keys(harness.apis)).toContain('simple-api'); const api = harness.apis['simple-api'].opAPI as SimpleAPI; @@ -115,11 +107,11 @@ describe('Example Asset', () => { job.analytics = true; job.operations = [ { - _op: 'simple-reader' + _op: 'simple-reader', }, { - _op: 'noop' - } + _op: 'noop', + }, ]; let harness: SlicerTestHarness; @@ -148,8 +140,7 @@ describe('Example Asset', () => { it('should return a list of records', async () => { const results = await harness.createSlices(); - expect(Array.isArray(results)).toBe(true); - expect(results.length).toBe(10); + expect(results).toBeArrayOfSize(10); for (const result of results) { expect(DataEntity.isDataEntity(result)).toBe(false); @@ -165,11 +156,11 @@ describe('Example Asset', () => { job.apis = [ { _name: 'simple-api', - } + }, ]; job.operations = [ { - _op: 'simple-reader' + _op: 'simple-reader', }, { _op: 'transformer', @@ -182,7 +173,7 @@ describe('Example Asset', () => { action: 'inc', key: 'scale', incBy: 1, - } + }, ]; let harness: JobTestHarness; @@ -207,12 +198,10 @@ describe('Example Asset', () => { it('should batches of results', async () => { const batches = await harness.run(); - expect(Array.isArray(batches)).toBe(true); - expect(batches.length).toBe(10); + expect(batches).toBeArrayOfSize(10); for (const results of batches) { - expect(Array.isArray(results)).toBe(true); - expect(results.length).toBe(10); + expect(results).toBeArrayOfSize(10); for (const result of results) { expect(DataEntity.isDataEntity(result)).toBe(true); @@ -222,9 +211,7 @@ describe('Example Asset', () => { }); it('should have one api', async () => { - expect(Object.keys(harness.apis)).toEqual([ - 'simple-api' - ]); + expect(Object.keys(harness.apis)).toContain('simple-api'); }); it('should be finished for the second batch of slices', async () => { @@ -233,12 +220,10 @@ describe('Example Asset', () => { // @ts-ignore simpleClient.isFinished.mockReturnValue(true); - expect(Array.isArray(batches)).toBe(true); - expect(batches.length).toBe(10); + expect(batches).toBeArrayOfSize(10); for (const results of batches) { - expect(Array.isArray(results)).toBe(true); - expect(results.length).toBe(10); + expect(results).toBeArrayOfSize(10); for (const result of results) { expect(DataEntity.isDataEntity(result)).toBe(true); diff --git a/packages/teraslice-test-harness/test/worker-test-harness-spec.ts b/packages/teraslice-test-harness/test/worker-test-harness-spec.ts index 884bf578972..36df810d3e1 100644 --- a/packages/teraslice-test-harness/test/worker-test-harness-spec.ts +++ b/packages/teraslice-test-harness/test/worker-test-harness-spec.ts @@ -1,14 +1,6 @@ import 'jest-extended'; import path from 'path'; -import { - newTestJobConfig, - newTestSlice, - DataEntity, - Fetcher, - BatchProcessor, - NoopProcessor, - RunSliceResult -} from '@terascope/job-components'; +import { newTestJobConfig, newTestSlice, DataEntity, Fetcher, BatchProcessor, NoopProcessor } from '@terascope/job-components'; import { WorkerTestHarness } from '../src'; describe('WorkerTestHarness', () => { @@ -19,10 +11,10 @@ describe('WorkerTestHarness', () => { client: { say() { return 'hello'; - } - } - }) - } + }, + }, + }), + }, ]; describe('when given a valid job config', () => { @@ -35,12 +27,12 @@ describe('WorkerTestHarness', () => { }, { _op: 'noop', - } + }, ]; const workerHarness = new WorkerTestHarness(job, { assetDir: path.join(__dirname, 'fixtures'), - clients + clients, }); workerHarness.processors[0].handle = jest.fn(async (data: DataEntity[]) => { @@ -85,7 +77,7 @@ describe('WorkerTestHarness', () => { // @ts-ignore .mockRejectedValueOnce(err); - const results = await workerHarness.runSlice({ }); + const results = await workerHarness.runSlice({}); expect(results).toBeArray(); expect(onSliceRetryEvent).toHaveBeenCalledTimes(2); @@ -103,20 +95,24 @@ describe('WorkerTestHarness', () => { }); }); - describe('can use static helper testProcessor shorthand method', () => { + describe('when using static method testProcessor', () => { const options = { assetDir: path.join(__dirname, 'fixtures') }; - let harness:WorkerTestHarness; + let harness: WorkerTestHarness; - beforeAll(async() => { + beforeAll(async () => { harness = WorkerTestHarness.testProcessor({ _op: 'noop' }, options); await harness.initialize(); }); - afterAll(async() => { + afterAll(async () => { await harness.shutdown(); }); - it('can call testProcessor for easy instantiation', async() => { + it('should return an instance of the test harness', () => { + expect(harness).toBeInstanceOf(WorkerTestHarness); + }); + + it('should be able run a slice', async () => { const data = [{ some: 'data' }]; const results = await harness.runSlice(data); @@ -124,20 +120,24 @@ describe('WorkerTestHarness', () => { }); }); - describe('can use static helper testFetcher shorthand method', () => { + describe('when using static method testFetcher', () => { const options = { assetDir: path.join(__dirname, 'fixtures') }; - let harness:WorkerTestHarness; + let harness: WorkerTestHarness; - beforeAll(async() => { + beforeAll(async () => { harness = WorkerTestHarness.testFetcher({ _op: 'test-reader', passthrough_slice: true }, options); await harness.initialize(); }); - afterAll(async() => { + afterAll(async () => { await harness.shutdown(); }); - it('can call testFetcher for easy instantiation', async() => { + it('should return an instance of the test harness', () => { + expect(harness).toBeInstanceOf(WorkerTestHarness); + }); + + it('should be able to run a slice', async () => { const data = [{ some: 'data' }]; const results = await harness.runSlice(data); @@ -145,73 +145,68 @@ describe('WorkerTestHarness', () => { }); }); - describe('can call lifecyle events', () => { - describe('shorthand flush with no analytics', () => { - const options = { assetDir: path.join(__dirname, 'fixtures') }; - let harness:WorkerTestHarness; + describe('when testing flush', () => { + const options = { assetDir: path.join(__dirname, 'fixtures') }; + let harness: WorkerTestHarness; - beforeAll(async() => { - harness = WorkerTestHarness.testProcessor({ _op: 'simple-flush' }, options); - await harness.initialize(); - }); + beforeAll(async () => { + harness = WorkerTestHarness.testProcessor({ _op: 'simple-flush' }, options); + await harness.initialize(); + }); - afterAll(async() => { - await harness.shutdown(); - }); + afterAll(async () => { + await harness.shutdown(); + }); - it('can call flush', async() => { - const data = [{ some: 'data' }, { other: 'data' }]; + it('should flush any remaining records', async () => { + const data = [{ some: 'data' }, { other: 'data' }]; - const emptyResults = await harness.runSlice(data); - expect(emptyResults).toEqual([]); + const emptyResults = await harness.runSlice(data); + expect(emptyResults).toEqual([]); - const flushedResults = await harness.flush(); - expect(flushedResults).toEqual(data); - }); + const flushedResults = await harness.flush(); + expect(flushedResults).toEqual(data); }); + }); - describe('flush with analytics returned', () => { - const options = { assetDir: path.join(__dirname, 'fixtures') }; - let harness: WorkerTestHarness; - - beforeAll(async() => { - const job = newTestJobConfig({ - max_retries: 0, - analytics: true, - operations: [ - { - _op: 'test-reader', - passthrough_slice: true - }, - { _op: 'simple-flush' } - ], - }); + describe('when testing flush with analytics', () => { + const options = { assetDir: path.join(__dirname, 'fixtures') }; + let harness: WorkerTestHarness; + + beforeAll(async () => { + const job = newTestJobConfig({ + max_retries: 0, + analytics: true, + operations: [ + { + _op: 'test-reader', + passthrough_slice: true, + }, + { _op: 'simple-flush' }, + ], + }); - harness = new WorkerTestHarness(job, options); + harness = new WorkerTestHarness(job, options); - await harness.initialize(); - }); + await harness.initialize(); + }); - afterAll(async() => { - await harness.shutdown(); - }); + afterAll(async () => { + await harness.shutdown(); + }); - it('can get full flush response', async() => { - const data = [{ some: 'data' }, { other: 'data' }]; + it('should able return the result with analytics', async () => { + const data = [{ some: 'data' }, { other: 'data' }]; - const emptyResults = await harness.runSlice(data); - expect(emptyResults).toEqual([]); + const emptyResults = await harness.runSlice(data); + expect(emptyResults).toEqual([]); - const flushedResults = await harness.flush({ fullResponse: true }) as RunSliceResult; + const flushedResults = await harness.flush({ fullResponse: true }); - expect(flushedResults).toBeDefined(); - expect(flushedResults.results).toEqual(data); - expect(flushedResults.status).toEqual('flushed'); - expect(flushedResults.analytics).toBeDefined(); - expect(flushedResults.analytics!.time).toBeDefined(); - expect(flushedResults.analytics!.memory).toBeDefined(); - expect(flushedResults.analytics!.size).toBeDefined(); - }); + expect(flushedResults).toHaveProperty('results', data); + expect(flushedResults).toHaveProperty('status', 'flushed'); + expect(flushedResults).toHaveProperty('analytics'); + expect(flushedResults!.analytics).toContainKeys(['time', 'memory', 'size']); }); }); }); diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index 038b2c774c5..c7ff6f3c48f 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -1,6 +1,6 @@ { "name": "teraslice", - "version": "0.53.0", + "version": "0.53.1", "description": "Distributed computing platform for processing JSON data", "bin": "service.js", "main": "index.js", @@ -36,7 +36,7 @@ "dependencies": { "@terascope/elasticsearch-api": "^2.0.3", "@terascope/error-parser": "^1.0.2", - "@terascope/job-components": "^0.19.0", + "@terascope/job-components": "^0.20.0", "@terascope/queue": "^1.1.6", "@terascope/teraslice-messaging": "^0.3.2", "@terascope/utils": "^0.11.0", diff --git a/scripts/add-test-env.js b/scripts/add-test-env.js new file mode 100644 index 00000000000..7aa2867a03e --- /dev/null +++ b/scripts/add-test-env.js @@ -0,0 +1,3 @@ +'use strict'; + +process.env.NODE_ENV = 'test';