Skip to content

Commit

Permalink
switch shortid to hyperid for performance (fixes mcollina#7) & remove…
Browse files Browse the repository at this point in the history
… setImmediates (fixes mcollina#8)
  • Loading branch information
behrad committed Jun 6, 2018
1 parent 2b3d148 commit 4062f69
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 23 deletions.
32 changes: 11 additions & 21 deletions mqemitter-redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var Redis = require('ioredis')
var MQEmitter = require('mqemitter')
var shortid = require('shortid')
var hyperid = require('hyperid')()
var inherits = require('inherits')
var LRU = require('lru-cache')
var msgpack = require('msgpack-lite')
Expand Down Expand Up @@ -97,26 +97,22 @@ MQEmitterRedis.prototype._subTopic = function (topic) {

MQEmitterRedis.prototype.on = function on (topic, cb, done) {
var subTopic = this._subTopic(topic)
var onFinish = function () {
if (done) {
setImmediate(done)
}
}
done = done || function () {}

this._on(topic, cb)

if (this._topics[subTopic]) {
this._topics[subTopic]++
onFinish()
done()
return this
}

this._topics[subTopic] = 1

if (this._containsWildcard(topic)) {
this.subConn.psubscribe(subTopic, onFinish)
this.subConn.psubscribe(subTopic, done)
} else {
this.subConn.subscribe(subTopic, onFinish)
this.subConn.subscribe(subTopic, done)
}

return this
Expand All @@ -128,37 +124,31 @@ MQEmitterRedis.prototype.emit = function (msg, done) {
}

var packet = {
id: shortid(),
id: hyperid(),
msg: msg
}

this.pubConn.publish(msg.topic, msgpack.encode(packet))
if (done) {
setImmediate(done)
}
done && done()
}

MQEmitterRedis.prototype.removeListener = function (topic, cb, done) {
var subTopic = this._subTopic(topic)
var onFinish = function () {
if (done) {
setImmediate(done)
}
}
done = done || function () {}

this._removeListener(topic, cb)

if (--this._topics[subTopic] > 0) {
onFinish()
done()
return this
}

delete this._topics[subTopic]

if (this._containsWildcard(topic)) {
this.subConn.punsubscribe(subTopic, onFinish)
this.subConn.punsubscribe(subTopic, done)
} else if (this._matcher.match(topic)) {
this.subConn.unsubscribe(subTopic, onFinish)
this.subConn.unsubscribe(subTopic, done)
}

return this
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
"description": "Redis-based MQEmitter",
"main": "mqemitter-redis.js",
"dependencies": {
"hyperid": "^1.4.1",
"inherits": "^2.0.1",
"ioredis": "^3.0.0",
"lru-cache": "^4.1.1",
"mqemitter": "^2.0.0",
"msgpack-lite": "^0.1.14",
"shortid": "^2.2.2"
"msgpack-lite": "^0.1.14"
},
"devDependencies": {
"faucet": "^0.0.1",
Expand Down

0 comments on commit 4062f69

Please sign in to comment.