Skip to content

Commit

Permalink
feat(ah-scope): add scope propagation for event emitters (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarchaud authored and mayurkale22 committed Aug 16, 2019
1 parent 9a6ac90 commit 0941b7f
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,26 @@

import { ScopeManager } from '@opentelemetry/scope-base';
import * as asyncHooks from 'async_hooks';
import { EventEmitter } from 'events';

type Func<T> = (...args: unknown[]) => T;

type PatchedEventEmitter = {
/**
* Store a map for each event of all original listener and their "patched"
* version so when the listener is removed by the user, we remove the
* correspoding "patched" function.
*/
__ot_listeners?: { [name: string]: WeakMap<Func<void>, Func<void>> };
} & EventEmitter;

const ADD_LISTENER_METHODS = [
'addListener' as 'addListener',
'on' as 'on',
'once' as 'once',
'prependListener' as 'prependListener',
'prependOnceListener' as 'prependOnceListener',
];

export class AsyncHooksScopeManager implements ScopeManager {
private _asyncHook: asyncHooks.AsyncHook;
Expand Down Expand Up @@ -58,7 +78,9 @@ export class AsyncHooksScopeManager implements ScopeManager {
if (scope === undefined) {
scope = this.active();
}
if (typeof target === 'function') {
if (target instanceof EventEmitter) {
return this._bindEventEmitter(target, scope);
} else if (typeof target === 'function') {
return this._bindFunction(target, scope);
}
return target;
Expand Down Expand Up @@ -94,6 +116,112 @@ export class AsyncHooksScopeManager implements ScopeManager {
return contextWrapper as any;
}

/**
* By default, EventEmitter call their callback with their scope, which we do
* not want, instead we will bind a specific scope to all callbacks that
* go through it.
* @param target EventEmitter a instance of EventEmitter to patch
* @param scope the scope we want to bind
*/
private _bindEventEmitter<T extends EventEmitter>(
target: T,
scope?: unknown
): T {
const ee = (target as unknown) as PatchedEventEmitter;
if (ee.__ot_listeners !== undefined) return target;
ee.__ot_listeners = {};

// patch methods that add a listener to propagate scope
ADD_LISTENER_METHODS.forEach(methodName => {
if (ee[methodName] === undefined) return;
ee[methodName] = this._patchAddListener(ee, ee[methodName], scope);
});
// patch methods that remove a listener
if (typeof ee.removeListener === 'function') {
ee.removeListener = this._patchRemoveListener(ee, ee.removeListener);
}
if (typeof ee.off === 'function') {
ee.off = this._patchRemoveListener(ee, ee.off);
}
// patch method that remove all listeners
if (typeof ee.removeAllListeners === 'function') {
ee.removeAllListeners = this._patchRemoveAllListeners(
ee,
ee.removeAllListeners
);
}
return target;
}

/**
* Patch methods that remove a given listener so that we match the "patched"
* version of that listener (the one that propagate context).
* @param ee EventEmitter instance
* @param original reference to the patched method
*/
private _patchRemoveListener(ee: PatchedEventEmitter, original: Function) {
return function(this: {}, event: string, listener: Func<void>) {
if (
ee.__ot_listeners === undefined ||
ee.__ot_listeners[event] === undefined
) {
return original.call(this, event, listener);
}
const events = ee.__ot_listeners[event];
const patchedListener = events.get(listener);
return original.call(this, event, patchedListener || listener);
};
}

/**
* Patch methods that remove all listeners so we remove our
* internal references for a given event.
* @param ee EventEmitter instance
* @param original reference to the patched method
*/
private _patchRemoveAllListeners(
ee: PatchedEventEmitter,
original: Function
) {
return function(this: {}, event: string) {
if (
ee.__ot_listeners === undefined ||
ee.__ot_listeners[event] === undefined
) {
return original.call(this, event);
}
delete ee.__ot_listeners[event];
return original.call(this, event);
};
}

/**
* Patch methods on an event emitter instance that can add listeners so we
* can force them to propagate a given context.
* @param ee EventEmitter instance
* @param original reference to the patched method
* @param [scope] scope to propagate when calling listeners
*/
private _patchAddListener(
ee: PatchedEventEmitter,
original: Function,
scope?: unknown
) {
const scopeManager = this;
return function(this: {}, event: string, listener: Func<void>) {
if (ee.__ot_listeners === undefined) ee.__ot_listeners = {};
let listeners = ee.__ot_listeners[event];
if (listeners === undefined) {
listeners = new WeakMap();
ee.__ot_listeners[event] = listeners;
}
const patchedListener = scopeManager.bind(listener, scope);
// store a weak reference of the user listener to ours
listeners.set(listener, patchedListener);
return original.call(this, event, patchedListener);
};
}

/**
* Init hook will be called when userland create a async scope, setting the
* scope as the current one if it exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import * as assert from 'assert';
import { AsyncHooksScopeManager } from '../src';
import { EventEmitter } from 'events';

describe('AsyncHooksScopeManager', () => {
let scopeManager: AsyncHooksScopeManager;
Expand Down Expand Up @@ -155,4 +156,105 @@ describe('AsyncHooksScopeManager', () => {
fn();
});
});

describe('.bind(event-emitter)', () => {
it('should return the same target (when enabled)', () => {
const ee = new EventEmitter();
assert.deepStrictEqual(scopeManager.bind(ee), ee);
});

it('should return the same target (when disabled)', () => {
const ee = new EventEmitter();
scopeManager.disable();
assert.deepStrictEqual(scopeManager.bind(ee), ee);
scopeManager.enable();
});

it('should return current scope and removeListener (when enabled)', done => {
const ee = new EventEmitter();
const scope = { a: 2 };
const patchedEe = scopeManager.bind(ee, scope);
const handler = () => {
assert.deepStrictEqual(scopeManager.active(), scope);
patchedEe.removeListener('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 0);
return done();
};
patchedEe.on('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 1);
patchedEe.emit('test');
});

it('should return current scope and removeAllListener (when enabled)', done => {
const ee = new EventEmitter();
const scope = { a: 2 };
const patchedEe = scopeManager.bind(ee, scope);
const handler = () => {
assert.deepStrictEqual(scopeManager.active(), scope);
patchedEe.removeAllListeners('test');
assert.strictEqual(patchedEe.listeners('test').length, 0);
return done();
};
patchedEe.on('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 1);
patchedEe.emit('test');
});

/**
* Even if asynchooks is disabled, the scope propagation will
* still works but it might be lost after any async op.
*/
it('should return scope (when disabled)', done => {
scopeManager.disable();
const ee = new EventEmitter();
const scope = { a: 2 };
const patchedEe = scopeManager.bind(ee, scope);
const handler = () => {
assert.deepStrictEqual(scopeManager.active(), scope);
patchedEe.removeListener('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 0);
scopeManager.enable();
return done();
};
patchedEe.on('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 1);
patchedEe.emit('test');
});

it('should not return current scope (when disabled + async op)', done => {
scopeManager.disable();
const ee = new EventEmitter();
const scope = { a: 3 };
const patchedEe = scopeManager.bind(ee, scope);
const handler = () => {
setImmediate(() => {
assert.deepStrictEqual(scopeManager.active(), null);
patchedEe.removeAllListeners('test');
assert.strictEqual(patchedEe.listeners('test').length, 0);
return done();
});
};
patchedEe.on('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 1);
patchedEe.emit('test');
});

it('should return current scope (when enabled + async op)', done => {
scopeManager.enable();
const ee = new EventEmitter();
const scope = { a: 3 };
const patchedEe = scopeManager.bind(ee, scope);
const handler = () => {
setImmediate(() => {
assert.deepStrictEqual(scopeManager.active(), scope);
patchedEe.removeAllListeners('test');
assert.strictEqual(patchedEe.listeners('test').length, 0);
return done();
});
};
patchedEe.on('test', handler);
assert.strictEqual(patchedEe.listeners('test').length, 1);
patchedEe.emit('test');
});
});
});

0 comments on commit 0941b7f

Please sign in to comment.