-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1163 from terascope/fix-register-api
v0.53.1 - Fix/Improve Operation API usage
- Loading branch information
Showing
22 changed files
with
455 additions
and
445 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,102 +1,138 @@ | ||
import { OpAPI, Context, ExecutionConfig, APIConfig } from '../interfaces'; | ||
import { OperationAPIConstructor } from '../operations'; | ||
|
||
// WeakMaps are used as a memory efficient reference to private data | ||
const _registry = new WeakMap(); | ||
const _apis = new WeakMap(); | ||
const _config = new WeakMap(); | ||
import { EventEmitter } from 'events'; | ||
import { AnyObject, Logger, isTest } from '@terascope/utils'; | ||
import { OpAPI, Context, ExecutionConfig, APIConfig, WorkerContext } from '../interfaces'; | ||
import { isOperationAPI, getOperationAPIType, makeContextLogger } from './utils'; | ||
import { Observer, APIConstructor } from '../operations'; | ||
import { JobAPIInstances } from './interfaces'; | ||
|
||
/** | ||
* A utility API exposed on the Terafoundation Context APIs. | ||
* The following functionality is included: | ||
* - Registering Operation API | ||
* - Creating an API (usually done from an Operation), | ||
* it also includes attaching the API to the Execution LifeCycle. | ||
* An API can only be created once. | ||
* An API will only be created once. | ||
* - Getting a reference to an API | ||
*/ | ||
*/ | ||
export class ExecutionContextAPI { | ||
private readonly _apis: JobAPIInstances = {}; | ||
private readonly _context: WorkerContext; | ||
private readonly _events: EventEmitter; | ||
private readonly _executionConfig: ExecutionConfig; | ||
private readonly _logger: Logger; | ||
|
||
constructor(context: Context, executionConfig: ExecutionConfig) { | ||
_config.set(this, { | ||
context, | ||
executionConfig, | ||
events: context.apis.foundation.getSystemEvents(), | ||
}); | ||
|
||
_registry.set(this, {}); | ||
_apis.set(this, {}); | ||
this._context = context as WorkerContext; | ||
this._events = context.apis.foundation.getSystemEvents(); | ||
this._executionConfig = executionConfig; | ||
this._logger = this.makeLogger('execution_context_api'); | ||
} | ||
|
||
/** Add an API constructor to the registry */ | ||
addToRegistry(name: string, api: OperationAPIConstructor) { | ||
const registry = this.registry; | ||
registry[name] = api; | ||
_registry.set(this, registry); | ||
/** For backwards compatibility */ | ||
get registry() { | ||
return {}; | ||
} | ||
|
||
/** Get all of the registered API constructors */ | ||
get registry(): APIRegistry { | ||
return _registry.get(this); | ||
get apis(): Readonly<JobAPIInstances> { | ||
return this._apis; | ||
} | ||
|
||
/** Get all of the initalized APIs */ | ||
get apis(): APIS { | ||
return _apis.get(this); | ||
} | ||
/** Add an API constructor to the registry */ | ||
addToRegistry(name: string, API: APIConstructor) { | ||
if (this._apis[name] != null) { | ||
throw new Error(`Cannot register API "${name}" due to conflict`); | ||
} | ||
|
||
const { apis = [] } = this._executionConfig; | ||
|
||
/** Set an api to be used */ | ||
addAPI(name: string, opAPI: OpAPI) { | ||
const apis = _apis.get(this); | ||
apis[name] = opAPI; | ||
const apiConfig = apis.find((a: APIConfig) => a._name === name) || { | ||
_name: name, | ||
}; | ||
|
||
const instance = new API(this._context, apiConfig, this._executionConfig); | ||
const type = getOperationAPIType(instance); | ||
this._apis[name] = { | ||
instance, | ||
type, | ||
}; | ||
|
||
const eventName = 'execution:add-to-lifecycle'; | ||
const count = this._events.listenerCount(eventName); | ||
if (!count) { | ||
if (isTest) return; | ||
this._logger.warn(`no listener ${eventName} available but is needed to register the api`); | ||
} else { | ||
this._events.emit(eventName, instance); | ||
this._logger.trace(`registered api ${name}`); | ||
} | ||
} | ||
|
||
/** | ||
* Get a reference to a specific API, | ||
* the must be initialized. | ||
* Get a reference to a specific operation API, | ||
* the must be initialized and created | ||
*/ | ||
getAPI(name: string) { | ||
if (this.apis[name] == null) { | ||
getObserver<T extends Observer = Observer>(name: string): T { | ||
const api = this._apis[name]; | ||
if (api == null) { | ||
throw new Error(`Unable to find API by name "${name}"`); | ||
} | ||
return this.apis[name]; | ||
if (api.type !== 'observer') { | ||
throw new Error(`Unable to find observer by name "${name}"`); | ||
} | ||
return api.instance as T; | ||
} | ||
|
||
/** | ||
* Initalize an API and attach it | ||
* to the lifecycle of an Execution. | ||
* Get a reference to a specific operation API, | ||
* the must be initialized and created | ||
*/ | ||
async initAPI(name: string, ...params: any[]) { | ||
const config = _config.get(this); | ||
if (this.registry[name] == null) { | ||
getAPI<T extends OpAPI = OpAPI>(name: string): T { | ||
const api = this._apis[name]; | ||
if (api == null) { | ||
throw new Error(`Unable to find API by name "${name}"`); | ||
} | ||
|
||
if (this.apis[name] != null) { | ||
throw new Error(`API "${name}" can only be initalized once`); | ||
if (api.opAPI == null) { | ||
throw new Error(`Unable to find created API by name "${name}"`); | ||
} | ||
return api.opAPI as T; | ||
} | ||
|
||
const API = this.registry[name]; | ||
|
||
const { apis = [] } = config.executionConfig; | ||
|
||
const apiConfig = apis.find((a: APIConfig) => a._name === name) || { | ||
_name: name, | ||
}; | ||
/** | ||
* Create an instance of the API | ||
* | ||
* @param name the name of API to create | ||
* @param params any additional options that the API may need | ||
*/ | ||
async initAPI(name: string, ...params: any[]): Promise<OpAPI> { | ||
const api = this._apis[name]; | ||
if (api == null) { | ||
throw new Error(`Unable to find API by name "${name}"`); | ||
} | ||
|
||
const api = new API(config.context, apiConfig, config.executionConfig); | ||
await api.initialize(); | ||
if (!isOperationAPI(api.instance)) { | ||
throw new Error('Observers cannot be created'); | ||
} | ||
|
||
config.events.emit('execution:add-to-lifecycle', api); | ||
if (api.opAPI != null) { | ||
const msg = `using existing instance of api: "${name}"`; | ||
if (params.length) { | ||
this._logger.warn(`${msg}, ignoring params`); | ||
} else { | ||
this._logger.debug(`${msg}`); | ||
} | ||
return api.opAPI; | ||
} | ||
|
||
this.addAPI(name, await api.createAPI(...params)); | ||
return this.apis[name]; | ||
api.opAPI = await api.instance.createAPI(...params); | ||
this._logger.trace(`initialized api ${name}`); | ||
return api.opAPI; | ||
} | ||
} | ||
|
||
interface APIS { | ||
[name: string]: OpAPI; | ||
} | ||
|
||
interface APIRegistry { | ||
[name: string]: OperationAPIConstructor; | ||
/** | ||
* Make a logger with a the job_id and ex_id in the logger context | ||
*/ | ||
makeLogger(moduleName: string, extra: AnyObject = {}) { | ||
const { ex_id, job_id } = this._executionConfig; | ||
return makeContextLogger(this._context, moduleName, { ex_id, job_id, ...extra }); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.