Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pubsub v1beta2 #433

Merged
merged 2 commits into from
Mar 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var util = require('../common/util.js');
* @const {string} Base URL for Pub/Sub API.
* @private
*/
var PUBSUB_BASE_URL = 'https://www.googleapis.com/pubsub/v1beta1';
var PUBSUB_BASE_URL = 'https://pubsub.googleapis.com/v1beta2/';

/**
* @const {array} Required scopes for Pub/Sub API.
Expand Down Expand Up @@ -116,7 +116,7 @@ function PubSub(options) {
});

this.projectId = options.projectId;
this.projectName = '/projects/' + this.projectId;
this.projectName = 'projects/' + this.projectId;
}

/**
Expand All @@ -125,7 +125,7 @@ function PubSub(options) {
*
* @param {object=} query - Query object.
* @param {string=} query.pageToken - Page token.
* @param {number=} query.maxResults - Maximum number of results to return.
* @param {number=} query.pageSize - Max number of results to return.
* @param {function} callback - The callback function.
*
* @example
Expand All @@ -137,23 +137,23 @@ function PubSub(options) {
*
* // Customize the query.
* pubsub.getTopics({
* maxResults: 3
* pageSize: 3
* }, function(err, topics, nextQuery) {});
*/
PubSub.prototype.getTopics = function(query, callback) {
var that = this;
var self = this;
if (!callback) {
callback = query;
query = {};
}
query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')';
this.makeReq_('GET', 'topics', query, true, function(err, result) {
var path = this.projectName + '/topics';
this.makeReq_('GET', path, query, true, function(err, result) {
if (err) {
callback(err);
return;
}
var topics = (result.topic || []).map(function(item) {
return new Topic(that, {
var topics = (result.topics || []).map(function(item) {
return new Topic(self, {
name: item.name
});
});
Expand All @@ -180,10 +180,8 @@ PubSub.prototype.getTopics = function(query, callback) {
PubSub.prototype.createTopic = function(name, callback) {
callback = callback || util.noop;
var topic = this.topic(name);
var req = {
name: topic.name
};
this.makeReq_('POST', 'topics', null, req, function(err) {
var path = this.projectName + '/topics/' + name;
this.makeReq_('PUT', path, null, null, function(err) {
if (err) {
callback(err);
return;
Expand Down Expand Up @@ -220,7 +218,7 @@ PubSub.prototype.topic = function(name) {
*
* @param {object=} query - Query object.
* @param {string=} query.pageToken - Page token.
* @param {number=} query.maxResults - Maximum number of results to return.
* @param {number=} query.pageSize - Maximum number of results to return.
* @param {function} callback - The callback function.
*
* @example
Expand All @@ -232,25 +230,25 @@ PubSub.prototype.topic = function(name) {
*
* // Customize the query.
* pubsub.getSubscriptions({
* maxResults: 10
* pageSize: 10
* }, function(err, subscriptions, nextQuery) {});
*/
PubSub.prototype.getSubscriptions = function(query, callback) {
var that = this;
var self = this;
if (!callback) {
callback = query;
query = {};
}
if (!query.query) {
query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')';
}
this.makeReq_('GET', 'subscriptions', query, true, function(err, result) {

var path = this.projectName + '/subscriptions';
this.makeReq_('GET', path, query, true, function(err, result) {
if (err) {
callback(err);
return;
}
var subscriptions = (result.subscription || []).map(function(item) {
return new Subscription(that, {

var subscriptions = (result.subscriptions || []).map(function(item) {
return new Subscription(self, {
name: item.name
});
});
Expand Down Expand Up @@ -279,7 +277,7 @@ PubSub.prototype.makeReq_ = function(method, path, q, body, callback) {
var reqOpts = {
method: method,
qs: q,
uri: PUBSUB_BASE_URL + '/' + path
uri: PUBSUB_BASE_URL + path
};

if (body) {
Expand Down
118 changes: 76 additions & 42 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ nodeutil.inherits(Subscription, events.EventEmitter);

/**
* Format the name of a subscription. A subscription's full name is in the
* format of /subscription/{projectId}/{name}.
* format of projects/{projectId}/subscriptions/{subName}.
*
* @private
*/
Expand All @@ -136,32 +136,35 @@ Subscription.formatName_ = function(projectId, name) {
if (name.indexOf('/') > -1) {
return name;
}
return '/subscriptions/' + projectId + '/' + name;
return 'projects/' + projectId + '/subscriptions/' + name;
};

/**
* Simplify a message from an API response to have two properties, `id` and
* `data`. `data` is always converted to a string.
* Simplify a message from an API response to have three properties, `id`,
* `data` and `attributes`. `data` is always converted to a string.
*
* @private
*/
Subscription.formatMessage_ = function(msg) {
var event = msg.pubsubEvent;

var innerMessage = msg.message;
var message = {
ackId: msg.ackId
};

if (event && event.message) {
message.id = event.message.messageId;
if (innerMessage) {
message.id = innerMessage.messageId;

if (event.message.data) {
message.data = new Buffer(event.message.data, 'base64').toString('utf-8');
if (innerMessage.data) {
message.data = new Buffer(innerMessage.data, 'base64').toString('utf-8');

try {
message.data = JSON.parse(message.data);
} catch(e) {}
}

if (innerMessage.attributes) {
message.attributes = innerMessage.attributes;
}
}

return message;
Expand All @@ -181,22 +184,22 @@ Subscription.formatMessage_ = function(msg) {
* this.listenForEvents_();
*/
Subscription.prototype.listenForEvents_ = function() {
var that = this;
var self = this;
var messageListeners = 0;

this.on('newListener', function(event) {
if (event === 'message') {
messageListeners++;
if (that.closed) {
that.closed = false;
if (self.closed) {
self.closed = false;
}
that.startPulling_();
self.startPulling_();
}
});

this.on('removeListener', function(event) {
if (event === 'message' && --messageListeners === 0) {
that.closed = true;
self.closed = true;
}
});
};
Expand All @@ -219,21 +222,22 @@ Subscription.prototype.listenForEvents_ = function() {
* subscription.startPulling_();
*/
Subscription.prototype.startPulling_ = function() {
var that = this;
var self = this;
if (this.closed) {
return;
}
this.pull({
maxResults: 1,
returnImmediately: false
}, function(err, message) {
}, function(err, messages) {
if (err) {
that.emit('error', err);
self.emit('error', err);
}
if (message) {
that.emit('message', message);
if (messages) {
messages.forEach(function(message) {
self.emit('message', message);
});
}
setTimeout(that.startPulling_.bind(that), that.interval);
setTimeout(self.startPulling_.bind(self), self.interval);
});
};

Expand All @@ -256,10 +260,10 @@ Subscription.prototype.ack = function(ids, callback) {
}
ids = util.arrayize(ids);
var body = {
subscription: this.name,
ackId: ids
ackIds: ids
};
this.makeReq_('POST', 'subscriptions/acknowledge', null, body, callback);
var path = this.name + ':acknowledge';
this.makeReq_('POST', path, null, body, callback);
};

/**
Expand All @@ -272,17 +276,17 @@ Subscription.prototype.ack = function(ids, callback) {
* subscription.delete(function(err) {});
*/
Subscription.prototype.delete = function(callback) {
var self = this;
callback = callback || util.noop;
this.makeReq_(
'DELETE', 'subscriptions/' + this.name, null, true, function(err) {
this.makeReq_('DELETE', this.name, null, true, function(err) {
if (err) {
callback(err);
return;
}
this.closed = true;
this.removeAllListeners();
self.closed = true;
self.removeAllListeners();
callback(null);
}.bind(this));
});
};

/**
Expand All @@ -295,7 +299,6 @@ Subscription.prototype.delete = function(callback) {
* `subscription.on('message', function() {})` event handler.
*
* @todo Should not be racing with other pull.
* @todo Fix API to return a list of messages.
*
* @param {object=} options - Configuration object.
* @param {boolean} options.returnImmediately - If set, the system will respond
Expand All @@ -311,9 +314,10 @@ Subscription.prototype.delete = function(callback) {
* subscription.pull(function(err, messages) {
* // messages = [
* // {
* // ackId: '', // ID used to acknowledge its receival.
* // id: '', // Unique message ID.
* // data: '' // Contents of the message.
* // ackId: '', // ID used to acknowledge its receival.

This comment was marked as spam.

This comment was marked as spam.

* // id: '', // Unique message ID.
* // data: '', // Contents of the message.
* // attributes: {} // Attributes of the message.
* // },
* // // ...
* // ]
Expand All @@ -329,9 +333,8 @@ Subscription.prototype.delete = function(callback) {
* subscription.pull(opts, function(err, messages) {});
*/
Subscription.prototype.pull = function(options, callback) {
var that = this;
var self = this;
var MAX_EVENTS_LIMIT = 1000;
var apiEndpoint = 'subscriptions/pullBatch';

if (!callback) {
callback = options;
Expand All @@ -343,26 +346,26 @@ Subscription.prototype.pull = function(options, callback) {
}

var body = {
subscription: this.name,
returnImmediately: !!options.returnImmediately,
maxEvents: options.maxResults
maxMessages: options.maxResults
};

this.makeReq_('POST', apiEndpoint, null, body, function(err, response) {
var path = this.name + ':pull';
this.makeReq_('POST', path, null, body, function(err, response) {
if (err) {
callback(err);
return;
}

var messages = response.pullResponses || [response];
var messages = response.receivedMessages || [];
messages = messages.map(Subscription.formatMessage_);

if (that.autoAck) {
if (self.autoAck) {
var ackIds = messages.map(function(message) {
return message.ackId;
});

that.ack(ackIds, function(err) {
self.ack(ackIds, function(err) {
callback(err, messages);
});
} else {
Expand All @@ -371,4 +374,35 @@ Subscription.prototype.pull = function(options, callback) {
});
};

/**
* Modify the ack deadline for a specific message. This method is useful to
* indicate that more time is needed to process a message by the subscriber, or
* to make the message available for redelivery if the processing was
* interrupted.
*
* @param {object} options - The configuration object.
* @param {number} options.ackId - The ack id to change.
* @param {number} options.seconds - Number of seconds after call is made to
* set the deadline of the ack.
* @param {Function=} callback - The callback function.

This comment was marked as spam.

This comment was marked as spam.

*
* @example
* var options = {
* ackId: 123,
* seconds: 10 // Expire in 10 seconds from call.
* };
* subscription.setAckDeadline(options, function(err) {});
*/
Subscription.prototype.setAckDeadline = function(options, callback) {
var body = {
ackId: options.ackId,
ackDeadlineSeconds: options.seconds
};

callback = callback || util.noop;

var path = this.name + ':modifyAckDeadline';
this.makeReq_('POST', path, null, body, callback);
};

module.exports = Subscription;
Loading