Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await fixes #95

Merged
merged 2 commits into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions boot.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const inherits = require('util').inherits
const TimeTree = require('./time-tree')
const Plugin = require('./plugin')
const debug = require('debug')('avvio')
const kAvvio = Symbol('kAvvio')

function wrap (server, opts, instance) {
const expose = opts.expose || {}
Expand Down Expand Up @@ -33,6 +34,7 @@ function wrap (server, opts, instance) {
}

Object.defineProperty(server, 'then', { get: thenify.bind(instance) })
server[kAvvio] = true

server[afterKey] = function (func) {
if (typeof func !== 'function') {
Expand Down Expand Up @@ -188,17 +190,25 @@ function assertPlugin (plugin) {
}
}

Boot.prototype[kAvvio] = true

// load a plugin
Boot.prototype.use = function (plugin, opts) {
this._lastUsed = this._addPlugin(plugin, opts, false)
return this
}

Boot.prototype._loadRegistered = function (plugin) {
plugin = plugin || this._lastUsed
return new Promise((resolve) => {
Boot.prototype._loadRegistered = function () {
const plugin = this._current[0]
return new Promise((resolve, reject) => {
var weNeedToStart = !this.started && !this.booted
if (plugin && !plugin.loaded) {
plugin.asyncQ.push(() => {
debug('_loadRegistered deferring promise', plugin.name)
plugin.pushToAsyncQ((err) => {
if (err) {
reject(err)
return
}
resolve()
})
} else {
Expand All @@ -207,7 +217,7 @@ Boot.prototype._loadRegistered = function (plugin) {

// if the root plugin is not loaded, let's resume that
// so one can use after() befor calling ready
if (!this.started && !this.booted) {
if (weNeedToStart) {
this._root.q.resume()
}
})
Expand Down Expand Up @@ -249,6 +259,10 @@ Boot.prototype._addPlugin = function (plugin, opts, isAfter) {
}

Boot.prototype.after = function (func) {
if (!func) {
return this._loadRegistered()
}

this._addPlugin(_after.bind(this), {}, true)

function _after (s, opts, done) {
Expand Down Expand Up @@ -344,7 +358,11 @@ function thenify () {
// await server.ready() as ready() resolves
// with the server, end we will end up here
// because of automatic promise chaining.
if (this.booted) return
if (this.booted) {
debug('thenify returning null because we are already booted')
return
}
debug('thenify')
const p = this._loadRegistered()
return p.then.bind(p)
}
Expand All @@ -359,14 +377,14 @@ function callWithCbOrNextTick (func, cb, context) {
if (func.length === 0) {
this._error = err
res = func()
if (res && typeof res.then === 'function') {
if (res && !res[kAvvio] && typeof res.then === 'function') {
res.then(() => process.nextTick(cb), (e) => process.nextTick(cb, e))
} else {
process.nextTick(cb)
}
} else if (func.length === 1) {
res = func(err)
if (res && typeof res.then === 'function') {
if (res && !res[kAvvio] && typeof res.then === 'function') {
res.then(() => process.nextTick(cb), (e) => process.nextTick(cb, e))
} else {
process.nextTick(cb)
Expand Down
56 changes: 36 additions & 20 deletions plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ function getName (func) {
}

function Plugin (parent, func, optsOrFunc, isAfter, timeout) {
this.started = false
this.func = func
this.opts = optsOrFunc
this.deferred = false
this.onFinish = null
this.parent = parent
this.timeout = timeout === undefined ? parent._timeout : timeout
this.name = getName(func)
this.isAfter = isAfter
this.q = fastq(parent, loadPlugin, 1)
this.q.pause()
this.asyncQ = fastq(parent, (resolve, cb) => {
resolve(this.server)
this._error = null
this.asyncQ = fastq(parent, (done, cb) => {
done(this._error || this.parent._error)
cb()
}, 1)
this.asyncQ.pause()
Expand Down Expand Up @@ -83,12 +84,10 @@ Plugin.prototype.exec = function (server, cb) {
return
}

this._error = err

if (err) {
debug('exec errored', name)

// In case of errors, we need to kickstart
// the asyncQ as it won't get started otherwise
this.asyncQ.resume()
} else {
debug('exec completed', name)
}
Expand All @@ -114,19 +113,12 @@ Plugin.prototype.exec = function (server, cb) {
}, this.timeout)
}

this.started = true
this.emit('start', this.server ? this.server.name : null, this.name, Date.now())
var promise = func(this.server, this.opts, done)

if (promise && typeof promise.then === 'function') {
debug('resolving promise', name)
queueMicrotask(() => {
if (this.asyncQ.length() > 0) {
this.server.after(() => {
this.asyncQ.resume()
})
}
this.q.resume()
})

promise.then(
() => process.nextTick(done),
Expand All @@ -140,8 +132,31 @@ Plugin.prototype.enqueue = function (obj, cb) {
this.q.push(obj, cb)
}

Plugin.prototype.pushToAsyncQ = function (fn) {
if (!this.server) {
this.on('start', () => this._pushToAsyncQ(fn))
return
}

this._pushToAsyncQ(fn)
}

Plugin.prototype._pushToAsyncQ = function (fn) {
debug('_pushToAsyncQ', this.name)
if (this.asyncQ.length() === 0) {
this.server.after((err, cb) => {
this.q.pause()
debug('resuming asyncQ', this.name)
this.asyncQ.resume()
process.nextTick(cb, err)
})
}
this.asyncQ.push(fn)
this.q.resume()
}

Plugin.prototype.finish = function (err, cb) {
debug('finish', this.name)
debug('finish', this.name, err)
const done = () => {
if (this.loaded) {
return
Expand All @@ -155,18 +170,19 @@ Plugin.prototype.finish = function (err, cb) {
}

if (err) {
this.asyncQ.resume()
done()
return
}

const check = () => {
debug('check', this.name, this.q.length(), this.q.running())
debug('check', this.name, this.q.length(), this.q.running(), this.asyncQ.length())
if (this.q.length() === 0 && this.q.running() === 0) {
if (this.asyncQ.length() > 0) {
this.asyncQ.drain = () => {
this.asyncQ.drain = noop
this.asyncQ.pause()
check()
queueMicrotask(check)
}
this.asyncQ.resume()
} else {
Expand All @@ -181,12 +197,12 @@ Plugin.prototype.finish = function (err, cb) {

// we defer the check, as a safety net for things
// that might be scheduled in the loading callback
process.nextTick(check)
queueMicrotask(check)
}
}
}

process.nextTick(check)
queueMicrotask(check)

// we start loading the dependents plugins only once
// the current level is finished
Expand Down
34 changes: 34 additions & 0 deletions test/after-and-ready.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -753,3 +753,37 @@ test('preReady event (errored)', (t) => {
t.is(order.shift(), 3)
})
})

test('after return self', (t) => {
t.plan(6)

const app = boot()
let pluginLoaded = false
let afterCalled = false
let second = false

app.use(function (s, opts, done) {
t.notOk(afterCalled, 'after not called')
pluginLoaded = true
done()
})

app.after(function () {
t.ok(pluginLoaded, 'afterred!')
afterCalled = true
// happens with after(() => app.use(..))
return app
})

app.use(function (s, opts, done) {
t.ok(afterCalled, 'after called')
second = true
done()
})

app.on('start', () => {
t.ok(afterCalled, 'after called')
t.ok(pluginLoaded, 'plugin loaded')
t.ok(second, 'second plugin loaded')
})
})
34 changes: 34 additions & 0 deletions test/await-after.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,27 @@ test('await after - nested plugins with same tick callbacks', async (t) => {
t.pass('reachable')
})

test('await after without server', async (t) => {
const app = boot()

let secondLoaded = false

app.use(async (app) => {
t.pass('plugin init')
app.use(async () => {
t.pass('plugin2 init')
await sleep(1)
secondLoaded = true
})
})
await app.after()
t.pass('reachable')
t.is(secondLoaded, true)

await app.ready()
t.pass('reachable')
})

test('await after - nested plugins with future tick callbacks', async (t) => {
const app = {}
boot(app)
Expand Down Expand Up @@ -309,3 +330,16 @@ test('without autostart and with override', async (t) => {

await app.ready()
})

test('stop processing after errors', async (t) => {
const app = boot()

try {
await app.use(async function first (app) {
t.pass('first should be loaded')
throw new Error('kaboom')
})
} catch (e) {
t.is(e.message, 'kaboom')
}
})