From 1233ac63f6a794a08339bf5956ee798552a6ce9c Mon Sep 17 00:00:00 2001 From: Ryan Seys Date: Fri, 6 Mar 2015 18:54:25 -0500 Subject: [PATCH 1/2] Use pubsub v1beta2 --- lib/pubsub/index.js | 44 +++++---- lib/pubsub/subscription.js | 118 +++++++++++++++--------- lib/pubsub/topic.js | 77 +++++++++++----- regression/pubsub.js | 39 +++++++- test/pubsub/index.js | 31 +++---- test/pubsub/subscription.js | 174 ++++++++++++------------------------ test/pubsub/topic.js | 49 ++++------ 7 files changed, 271 insertions(+), 261 deletions(-) diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 77acea061a2..0252f0a8529 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -42,7 +42,7 @@ var util = require('../common/util.js'); * @const {string} Base URL for Pub/Sub API. * @private */ -var PUBSUB_BASE_URL = 'https://www.googleapis.com/pubsub/v1beta1'; +var PUBSUB_BASE_URL = 'https://pubsub.googleapis.com/v1beta2/'; /** * @const {array} Required scopes for Pub/Sub API. @@ -116,7 +116,7 @@ function PubSub(options) { }); this.projectId = options.projectId; - this.projectName = '/projects/' + this.projectId; + this.projectName = 'projects/' + this.projectId; } /** @@ -125,7 +125,7 @@ function PubSub(options) { * * @param {object=} query - Query object. * @param {string=} query.pageToken - Page token. - * @param {number=} query.maxResults - Maximum number of results to return. + * @param {number=} query.pageSize - Max number of results to return. * @param {function} callback - The callback function. * * @example @@ -137,23 +137,23 @@ function PubSub(options) { * * // Customize the query. * pubsub.getTopics({ - * maxResults: 3 + * pageSize: 3 * }, function(err, topics, nextQuery) {}); */ PubSub.prototype.getTopics = function(query, callback) { - var that = this; + var self = this; if (!callback) { callback = query; query = {}; } - query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')'; - this.makeReq_('GET', 'topics', query, true, function(err, result) { + var path = this.projectName + '/topics'; + this.makeReq_('GET', path, query, true, function(err, result) { if (err) { callback(err); return; } - var topics = (result.topic || []).map(function(item) { - return new Topic(that, { + var topics = (result.topics || []).map(function(item) { + return new Topic(self, { name: item.name }); }); @@ -180,10 +180,8 @@ PubSub.prototype.getTopics = function(query, callback) { PubSub.prototype.createTopic = function(name, callback) { callback = callback || util.noop; var topic = this.topic(name); - var req = { - name: topic.name - }; - this.makeReq_('POST', 'topics', null, req, function(err) { + var path = this.projectName + '/topics/' + name; + this.makeReq_('PUT', path, null, null, function(err) { if (err) { callback(err); return; @@ -220,7 +218,7 @@ PubSub.prototype.topic = function(name) { * * @param {object=} query - Query object. * @param {string=} query.pageToken - Page token. - * @param {number=} query.maxResults - Maximum number of results to return. + * @param {number=} query.pageSize - Maximum number of results to return. * @param {function} callback - The callback function. * * @example @@ -232,25 +230,25 @@ PubSub.prototype.topic = function(name) { * * // Customize the query. * pubsub.getSubscriptions({ - * maxResults: 10 + * pageSize: 10 * }, function(err, subscriptions, nextQuery) {}); */ PubSub.prototype.getSubscriptions = function(query, callback) { - var that = this; + var self = this; if (!callback) { callback = query; query = {}; } - if (!query.query) { - query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')'; - } - this.makeReq_('GET', 'subscriptions', query, true, function(err, result) { + + var path = this.projectName + '/subscriptions'; + this.makeReq_('GET', path, query, true, function(err, result) { if (err) { callback(err); return; } - var subscriptions = (result.subscription || []).map(function(item) { - return new Subscription(that, { + + var subscriptions = (result.subscriptions || []).map(function(item) { + return new Subscription(self, { name: item.name }); }); @@ -279,7 +277,7 @@ PubSub.prototype.makeReq_ = function(method, path, q, body, callback) { var reqOpts = { method: method, qs: q, - uri: PUBSUB_BASE_URL + '/' + path + uri: PUBSUB_BASE_URL + path }; if (body) { diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index 1f31fc35280..d7cc5da7844 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -127,7 +127,7 @@ nodeutil.inherits(Subscription, events.EventEmitter); /** * Format the name of a subscription. A subscription's full name is in the - * format of /subscription/{projectId}/{name}. + * format of projects/{projectId}/subscriptions/{subName}. * * @private */ @@ -136,32 +136,35 @@ Subscription.formatName_ = function(projectId, name) { if (name.indexOf('/') > -1) { return name; } - return '/subscriptions/' + projectId + '/' + name; + return 'projects/' + projectId + '/subscriptions/' + name; }; /** - * Simplify a message from an API response to have two properties, `id` and - * `data`. `data` is always converted to a string. + * Simplify a message from an API response to have three properties, `id`, + * `data` and `attributes`. `data` is always converted to a string. * * @private */ Subscription.formatMessage_ = function(msg) { - var event = msg.pubsubEvent; - + var innerMessage = msg.message; var message = { ackId: msg.ackId }; - if (event && event.message) { - message.id = event.message.messageId; + if (innerMessage) { + message.id = innerMessage.messageId; - if (event.message.data) { - message.data = new Buffer(event.message.data, 'base64').toString('utf-8'); + if (innerMessage.data) { + message.data = new Buffer(innerMessage.data, 'base64').toString('utf-8'); try { message.data = JSON.parse(message.data); } catch(e) {} } + + if (innerMessage.attributes) { + message.attributes = innerMessage.attributes; + } } return message; @@ -181,22 +184,22 @@ Subscription.formatMessage_ = function(msg) { * this.listenForEvents_(); */ Subscription.prototype.listenForEvents_ = function() { - var that = this; + var self = this; var messageListeners = 0; this.on('newListener', function(event) { if (event === 'message') { messageListeners++; - if (that.closed) { - that.closed = false; + if (self.closed) { + self.closed = false; } - that.startPulling_(); + self.startPulling_(); } }); this.on('removeListener', function(event) { if (event === 'message' && --messageListeners === 0) { - that.closed = true; + self.closed = true; } }); }; @@ -219,21 +222,22 @@ Subscription.prototype.listenForEvents_ = function() { * subscription.startPulling_(); */ Subscription.prototype.startPulling_ = function() { - var that = this; + var self = this; if (this.closed) { return; } this.pull({ - maxResults: 1, returnImmediately: false - }, function(err, message) { + }, function(err, messages) { if (err) { - that.emit('error', err); + self.emit('error', err); } - if (message) { - that.emit('message', message); + if (messages) { + messages.forEach(function(message) { + self.emit('message', message); + }); } - setTimeout(that.startPulling_.bind(that), that.interval); + setTimeout(self.startPulling_.bind(self), self.interval); }); }; @@ -256,10 +260,10 @@ Subscription.prototype.ack = function(ids, callback) { } ids = util.arrayize(ids); var body = { - subscription: this.name, - ackId: ids + ackIds: ids }; - this.makeReq_('POST', 'subscriptions/acknowledge', null, body, callback); + var path = this.name + ':acknowledge'; + this.makeReq_('POST', path, null, body, callback); }; /** @@ -272,17 +276,17 @@ Subscription.prototype.ack = function(ids, callback) { * subscription.delete(function(err) {}); */ Subscription.prototype.delete = function(callback) { + var self = this; callback = callback || util.noop; - this.makeReq_( - 'DELETE', 'subscriptions/' + this.name, null, true, function(err) { + this.makeReq_('DELETE', this.name, null, true, function(err) { if (err) { callback(err); return; } - this.closed = true; - this.removeAllListeners(); + self.closed = true; + self.removeAllListeners(); callback(null); - }.bind(this)); + }); }; /** @@ -295,7 +299,6 @@ Subscription.prototype.delete = function(callback) { * `subscription.on('message', function() {})` event handler. * * @todo Should not be racing with other pull. - * @todo Fix API to return a list of messages. * * @param {object=} options - Configuration object. * @param {boolean} options.returnImmediately - If set, the system will respond @@ -311,9 +314,10 @@ Subscription.prototype.delete = function(callback) { * subscription.pull(function(err, messages) { * // messages = [ * // { - * // ackId: '', // ID used to acknowledge its receival. - * // id: '', // Unique message ID. - * // data: '' // Contents of the message. + * // ackId: '', // ID used to acknowledge its receival. + * // id: '', // Unique message ID. + * // data: '', // Contents of the message. + * // attributes: {} // Attributes of the message. * // }, * // // ... * // ] @@ -329,9 +333,8 @@ Subscription.prototype.delete = function(callback) { * subscription.pull(opts, function(err, messages) {}); */ Subscription.prototype.pull = function(options, callback) { - var that = this; + var self = this; var MAX_EVENTS_LIMIT = 1000; - var apiEndpoint = 'subscriptions/pullBatch'; if (!callback) { callback = options; @@ -343,26 +346,26 @@ Subscription.prototype.pull = function(options, callback) { } var body = { - subscription: this.name, returnImmediately: !!options.returnImmediately, - maxEvents: options.maxResults + maxMessages: options.maxResults }; - this.makeReq_('POST', apiEndpoint, null, body, function(err, response) { + var path = this.name + ':pull'; + this.makeReq_('POST', path, null, body, function(err, response) { if (err) { callback(err); return; } - var messages = response.pullResponses || [response]; + var messages = response.receivedMessages || []; messages = messages.map(Subscription.formatMessage_); - if (that.autoAck) { + if (self.autoAck) { var ackIds = messages.map(function(message) { return message.ackId; }); - that.ack(ackIds, function(err) { + self.ack(ackIds, function(err) { callback(err, messages); }); } else { @@ -371,4 +374,35 @@ Subscription.prototype.pull = function(options, callback) { }); }; +/** + * Modify the ack deadline for a specific message. This method is useful to + * indicate that more time is needed to process a message by the subscriber, or + * to make the message available for redelivery if the processing was + * interrupted. + * + * @param {object} options - The configuration object. + * @param {number} options.ackId - The ack id to change. + * @param {number} options.seconds - Number of seconds after call is made to + * set the deadline of the ack. + * @param {Function=} callback - The callback function. + * + * @example + * var options = { + * ackId: 123, + * seconds: 10 // Expire in 10 seconds from call. + * } + * subscription.setAckDeadline(options, function(err) {}); + */ +Subscription.prototype.setAckDeadline = function(options, callback) { + var body = { + ackId: options.ackId, + ackDeadlineSeconds: options.seconds + }; + + callback = callback || util.noop; + + var path = this.name + ':modifyAckDeadline'; + this.makeReq_('POST', path, null, body, callback); +}; + module.exports = Subscription; diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js index 179ff534888..83e50f70f20 100644 --- a/lib/pubsub/topic.js +++ b/lib/pubsub/topic.js @@ -81,7 +81,7 @@ Topic.formatMessage_ = function(message) { /** * Format the name of a topic. A Topic's full name is in the format of - * /topics/{projectId}/{name}. + * 'projects/{projectId}/topics/{topicName}. * * @private * @@ -92,17 +92,19 @@ Topic.formatName_ = function(projectId, name) { if (name.indexOf('/') > -1) { return name; } - return '/topics/' + projectId + '/' + name; + return 'projects/' + projectId + '/topics/' + name; }; /** - * Publish the provided message. A message can be of any type. + * Publish the provided message or array of messages. A message can be of any + * type. On success, an array of messageIds is returned in the response. * * @throws {Error} If no message is provided. * * @param {object|object[]} message - The message(s) to publish. * @param {*} message.data - The contents of the message. - * @param {array=} message.labels - Labels to apply to the message. + * @param {array=} message.attributes - Key/value pair of attributes to apply to + * the message. All values must be strings. * @param {function=} callback - The callback function. * * @example @@ -112,11 +114,12 @@ Topic.formatName_ = function(projectId, name) { * name: 'Stephen', * event: 'new user' * }, - * labels: [ - * 'registration' - * ] + * attributes: { + * 'key': 'value', + * 'hello': 'world' + * } * }; - * topic.publish(registrationMessage, function(err) {}); + * topic.publish(registrationMessage, function(err, messageIds) {}); * * //- * // You can publish a batch of messages at once by supplying an array. @@ -132,7 +135,7 @@ Topic.formatName_ = function(projectId, name) { * topic.publish([ * registrationMessage, * purchaseMessage - * ], function(err) {}); + * ], function(err, messageIds) {}); */ Topic.prototype.publish = function(messages, callback) { messages = util.arrayize(messages); @@ -144,11 +147,17 @@ Topic.prototype.publish = function(messages, callback) { callback = callback || util.noop; var body = { - topic: this.name, messages: messages.map(Topic.formatMessage_) }; - this.makeReq_('POST', 'topics/publishBatch', null, body, callback); + var path = this.name + ':publish'; + this.makeReq_('POST', path, null, body, function(err, result) { + if (err) { + callback(err); + return; + } + callback(null, result && result.messageIds || []); + }); }; /** @@ -161,11 +170,11 @@ Topic.prototype.publish = function(messages, callback) { */ Topic.prototype.delete = function(callback) { callback = callback || util.noop; - this.makeReq_('DELETE', 'topics/' + this.name, null, true, callback); + this.makeReq_('DELETE', this.name, null, null, callback); }; /** - * Get a list of the subscriptions registered to this topic. You may optionally + * Lists the name of the subscriptions for this topic. You may optionally * provide a query object as the first argument to customize the response. * * Your provided callback will either be invoked with an error object, if an API @@ -174,11 +183,11 @@ Topic.prototype.delete = function(callback) { * * @param {object=} query - Query object. * @param {string=} query.pageToken - Page token. - * @param {number=} query.maxResults - Maximum number of results to return. + * @param {number=} query.pageSize - Maximum number of results to return. * @param {function} callback - The callback function. * * @example - * // Get all subscriptions. + * // Get all subscriptions for this topic. * topic.getSubscriptions(function(err, subscriptions, nextQuery) { * // If `nextQuery` is non-null, there may be more results to fetch. To do * // so, run `topic.getSubscriptions(nextQuery, callback);`. @@ -186,18 +195,35 @@ Topic.prototype.delete = function(callback) { * * // Customize the query. * topic.getSubscriptions({ - * maxResults: 3 + * pageSize: 3 * }, function(err, subscriptions, nextQuery) {}); */ Topic.prototype.getSubscriptions = function(query, callback) { + var self = this; if (util.is(query, 'function')) { callback = query; query = {}; } - query.query = 'pubsub.googleapis.com/topic in (' + this.name + ')'; + var path = this.name + '/subscriptions'; + this.makeReq_('GET', path, query, true, function(err, result) { + if (err) { + callback(err); + return; + } - this.pubsub.getSubscriptions(query, callback); + var subscriptions = (result.subscriptions || []).map(function(name) { + return new Subscription(self, { + name: name + }); + }); + var nextQuery = null; + if (result.nextPageToken) { + nextQuery = query; + nextQuery.pageToken = result.nextPageToken; + } + callback(null, subscriptions, nextQuery); + }); }; /** @@ -231,6 +257,7 @@ Topic.prototype.getSubscriptions = function(query, callback) { * }, function(err, subscription) {}); */ Topic.prototype.subscribe = function(name, options, callback) { + var self = this; if (!name) { throw new Error('A name is required for a new subscription.'); } @@ -238,20 +265,23 @@ Topic.prototype.subscribe = function(name, options, callback) { callback = options; options = {}; } + var body = { - topic: this.name, - name: Subscription.formatName_(this.projectId, name) + topic: this.name }; + if (options.ackDeadlineSeconds) { body.ackDeadlineSeconds = options.ackDeadlineSeconds; } - this.makeReq_('POST', 'subscriptions', null, body, function(err) { + + var path = Subscription.formatName_(this.projectId, name); + this.makeReq_('PUT', path, null, body, function(err) { if (err) { callback(err); return; } - callback(null, this.subscription(name, options)); - }.bind(this)); + callback(null, self.subscription(name, options)); + }); }; /** @@ -278,6 +308,7 @@ Topic.prototype.subscribe = function(name, options, callback) { * // Called every time a message is received. * // message.id = ID used to acknowledge its receival. * // message.data = Contents of the message. + * // message.attributes = Attributes of the message. * }); */ Topic.prototype.subscription = function(name, options) { diff --git a/regression/pubsub.js b/regression/pubsub.js index fc3267afa59..3a1bace86dd 100644 --- a/regression/pubsub.js +++ b/regression/pubsub.js @@ -82,11 +82,11 @@ describe('pubsub', function() { it('should return a nextQuery if there are more results', function(done) { pubsub.getTopics({ - maxResults: TOPIC_NAMES.length - 1 + pageSize: TOPIC_NAMES.length - 1 }, function(err, topics, next) { assert.ifError(err); assert(topics.length, TOPIC_NAMES.length - 1); - assert(next.maxResults, TOPIC_NAMES.length - 1); + assert(next.pageSize, TOPIC_NAMES.length - 1); assert(!!next.pageToken, true); done(); }); @@ -101,7 +101,12 @@ describe('pubsub', function() { }); it('should publish a message', function(done) { - pubsub.topic(TOPIC_NAMES[0]).publish({ data: 'message from me' }, done); + var topic = pubsub.topic(TOPIC_NAMES[0]); + topic.publish({ data: 'message from me' }, function(err, messageIds) { + assert.ifError(err); + assert.equal(messageIds.length, 1); + done(); + }); }); }); @@ -162,6 +167,14 @@ describe('pubsub', function() { }); }); + it('should list all subscriptions regardless of topic', function(done) { + pubsub.getSubscriptions(function(err, subscriptions) { + assert.ifError(err); + assert(subscriptions instanceof Array); + done(); + }); + }); + it('should allow creation and deletion of a topic', function(done) { var subName = generateSubName(); topic.subscribe(subName, function(err, sub) { @@ -196,6 +209,26 @@ describe('pubsub', function() { }); }); + it('should be able to set a new ack deadline', function(done) { + var subscription = topic.subscription(SUB_NAMES[0]); + + topic.publish({ data: 'hello' }, function(err) { + assert.ifError(err); + + subscription.pull({ + returnImmediately: true, + maxResults: 1 + }, function(err, msgs) { + assert.ifError(err); + var options = { + ackId: msgs[0].ackId, + seconds: 10 + }; + subscription.setAckDeadline(options, done); + }); + }); + }); + it('should receive the published message', function(done) { var subscription = topic.subscription(SUB_NAMES[1]); diff --git a/test/pubsub/index.js b/test/pubsub/index.js index cb2940e47e2..f613c6f22a0 100644 --- a/test/pubsub/index.js +++ b/test/pubsub/index.js @@ -65,7 +65,7 @@ describe('PubSub', function() { describe('getTopics', function() { beforeEach(function() { pubsub.makeReq_ = function(method, path, q, body, callback) { - callback(null, { topic: [{ name: 'fake-topic' }] }); + callback(null, { topics: [{ name: 'fake-topic' }] }); }; }); @@ -77,13 +77,10 @@ describe('PubSub', function() { pubsub.getTopics(done); }); - it('should build a project-wide query', function() { - pubsub.makeReq_ = function(method, path, q) { - var query = - 'cloud.googleapis.com/project in (/projects/' + PROJECT_ID + ')'; + it('should build the right request', function() { + pubsub.makeReq_ = function(method, path) { assert.equal(method, 'GET'); - assert.equal(path, 'topics'); - assert.equal(q.query, query); + assert.equal(path, 'projects/' + PROJECT_ID + '/topics'); }; pubsub.getTopics(function() {}); }); @@ -100,10 +97,10 @@ describe('PubSub', function() { pubsub.makeReq_ = function(method, path, q, body, callback) { callback(null, { nextPageToken: token }); }; - var query = { maxResults: 1 }; + var query = { pageSize: 1 }; pubsub.getTopics(query, function(err, topics, nextQuery) { assert.ifError(err); - assert.strictEqual(query.maxResults, nextQuery.maxResults); + assert.strictEqual(query.pageSize, nextQuery.pageSize); assert.equal(query.pageToken, token); }); }); @@ -123,9 +120,9 @@ describe('PubSub', function() { it('should create a topic', function() { var topicName = 'new-topic-name'; pubsub.makeReq_ = function(method, path, q, body) { - assert.equal(method, 'POST'); - assert.equal(path, 'topics'); - assert.equal(body.name, '/topics/' + PROJECT_ID + '/' + topicName); + assert.equal(method, 'PUT'); + assert.equal(path, 'projects/' + PROJECT_ID + '/topics/' + topicName); + assert.equal(body, null); }; pubsub.createTopic(topicName, function() {}); }); @@ -153,7 +150,7 @@ describe('PubSub', function() { describe('getSubscriptions', function() { beforeEach(function() { pubsub.makeReq_ = function(method, path, q, body, callback) { - callback(null, { subscription: [{ name: 'fake-subscription' }] }); + callback(null, { subscriptions: [{ name: 'fake-subscription' }] }); }; }); @@ -165,13 +162,11 @@ describe('PubSub', function() { pubsub.getSubscriptions(done); }); - it('should build a project-wide query', function() { + it('should pass the right parameters', function() { pubsub.makeReq_ = function(method, path, q) { - var query = - 'cloud.googleapis.com/project in (/projects/' + PROJECT_ID + ')'; assert.equal(method, 'GET'); - assert.equal(path, 'subscriptions'); - assert.equal(q.query, query); + assert.equal(path, 'projects/' + PROJECT_ID + '/subscriptions'); + assert.equal(q.query, undefined); }; pubsub.getSubscriptions(function() {}); }); diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js index dd2427ee2cf..2be34b0782a 100644 --- a/test/pubsub/subscription.js +++ b/test/pubsub/subscription.js @@ -25,7 +25,7 @@ var Subscription = require('../../lib/pubsub/subscription.js'); describe('Subscription', function() { var PROJECT_ID = 'test-project'; var SUB_NAME = 'test-subscription'; - var SUB_FULL_NAME = '/subscriptions/' + PROJECT_ID + '/' + SUB_NAME; + var SUB_FULL_NAME = 'projects/' + PROJECT_ID + '/subscriptions/' + SUB_NAME; var pubsubMock = { projectId: PROJECT_ID, makeReq_: util.noop @@ -33,13 +33,13 @@ describe('Subscription', function() { var message = 'howdy'; var messageBuffer = new Buffer(message).toString('base64'); var messageObj = { - ackId: 3, - pubsubEvent: { + receivedMessages: [{ + ackId: 3, message: { data: messageBuffer, messageId: 7 } - } + }] }; var expectedMessage = { ackId: 3, @@ -152,8 +152,7 @@ describe('Subscription', function() { it('should make an array out of ids', function(done) { var ID = 1; subscription.makeReq_ = function(method, path, qs, body) { - assert.equal(body.subscription, SUB_FULL_NAME); - assert.deepEqual(body.ackId, [ID]); + assert.deepEqual(body.ackIds, [ID]); done(); }; subscription.ack(ID, assert.ifError); @@ -162,8 +161,8 @@ describe('Subscription', function() { it('should make correct api request', function(done) { var IDS = [1, 2, 3]; subscription.makeReq_ = function(method, path, qs, body) { - assert.equal(body.subscription, SUB_FULL_NAME); - assert.deepEqual(body.ackId, IDS); + assert.equal(path, SUB_FULL_NAME + ':acknowledge'); + assert.deepEqual(body.ackIds, IDS); done(); }; subscription.ack(IDS, assert.ifError); @@ -205,116 +204,38 @@ describe('Subscription', function() { subscription.pull({ returnImmediately: true }, assert.ifError); }); - it('should default to batching', function(done) { - subscription.makeReq_ = function(method, path, query, body) { - assert.equal(path, 'subscriptions/pullBatch'); - assert.equal(body.maxEvents, 1000); + it('should make correct api request', function(done) { + subscription.makeReq_ = function(method, path, qs, body) { + assert.equal(method, 'POST'); + assert.equal(path, SUB_FULL_NAME + ':pull'); + assert.strictEqual(body.returnImmediately, false); + assert.equal(body.maxMessages, 1); done(); }; - subscription.pull(assert.ifError); + subscription.pull({ maxResults: 1 }, assert.ifError); }); - describe('single pull', function() { - it('should make correct api request', function(done) { - subscription.makeReq_ = function(method, path, qs, body) { - assert.equal(method, 'POST'); - assert.equal(path, 'subscriptions/pullBatch'); - assert.equal(body.subscription, SUB_FULL_NAME); - assert.strictEqual(body.returnImmediately, false); - assert.equal(body.maxEvents, 1); - done(); - }; - - subscription.pull({ maxResults: 1 }, assert.ifError); - }); - - it('should execute callback with a message', function(done) { - var apiResponse = { + it('should execute callback with a message', function(done) { + var apiResponse = { + receivedMessages: [{ ackId: 1, - pubsubEvent: { - message: { - data: new Buffer('message').toString('base64') - } + message: { + messageId: '123', + data: new Buffer('message').toString('base64') } - }; - - subscription.makeReq_ = function(method, path, query, body, callback) { - callback(null, apiResponse); - }; - - subscription.pull(function(err, msgs) { - assert.ifError(err); - - assert.deepEqual(msgs, [Subscription.formatMessage_(apiResponse)]); - - done(); - }); - }); - }); - - describe('batching', function() { - it('should make correct api request', function(done) { - subscription.makeReq_ = function(method, path, query, body) { - assert.equal(method, 'POST'); - assert.equal(path, 'subscriptions/pullBatch'); - assert.strictEqual(query, null); - assert.deepEqual(body, { - subscription: subscription.name, - returnImmediately: false, - maxEvents: 3 - }); - - done(); - }; - - subscription.pull({ maxResults: 3 }, assert.ifError); - }); - - it('should execute callback with the messages', function(done) { - var apiResponse = { - pullResponses: [ - { - ackId: 1, - pubsubEvent: { - message: { - data: new Buffer('message').toString('base64') - } - } - }, - { - ackId: 2, - pubsubEvent: { - message: { - data: new Buffer('message').toString('base64') - } - } - }, - { - ackId: 3, - pubsubEvent: { - message: { - data: new Buffer('message').toString('base64') - } - } - } - ] - }; - - subscription.makeReq_ = function(method, path, query, body, callback) { - callback(null, apiResponse); - }; - - subscription.pull({ maxResults: 3 }, function(err, messages) { - assert.ifError(err); + }] + }; - assert.deepEqual( - messages, - apiResponse.pullResponses.map(Subscription.formatMessage_) - ); + subscription.makeReq_ = function(method, path, query, body, callback) { + callback(null, apiResponse); + }; - done(); - }); + subscription.pull(function(err, msgs) { + assert.ifError(err); + var msg = Subscription.formatMessage_(apiResponse.receivedMessages[0]); + assert.deepEqual(msgs, [msg]); + done(); }); }); @@ -401,7 +322,6 @@ describe('Subscription', function() { it('should pull at specified interval', function(done) { var INTERVAL = 5; subscription.pull = function(options, callback) { - assert.equal(options.maxResults, 1); assert.strictEqual(options.returnImmediately, false); // After pull is called once, overwrite with `done`. // This is to override the function passed to `setTimeout`, so we are @@ -462,7 +382,7 @@ describe('Subscription', function() { it('should emit a message event', function(done) { subscription.pull = function(options, callback) { - callback(null, { hi: 'there' }); + callback(null, [{ hi: 'there' }]); }; subscription .once('message', function(msg) { @@ -476,7 +396,7 @@ describe('Subscription', function() { it('should delete a subscription', function(done) { subscription.makeReq_ = function(method, path) { assert.equal(method, 'DELETE'); - assert.equal(path, 'subscriptions/' + subscription.name); + assert.equal(path, subscription.name); done(); }; subscription.delete(); @@ -520,6 +440,26 @@ describe('Subscription', function() { }); }); + describe('setAckDeadline', function() { + it('should set the ack deadline', function(done) { + subscription.makeReq_ = function(method, path, qs, body) { + assert.equal(method, 'POST'); + assert.equal(path, this.name + ':modifyAckDeadline'); + assert.equal(qs, null); + assert.deepEqual(body, { ackId: 123, ackDeadlineSeconds: 10 }); + done(); + }; + subscription.setAckDeadline({ ackId: 123, seconds: 10 }, done); + }); + + it('should execute the callback', function(done) { + subscription.makeReq_ = function(method, path, qs, body, callback) { + callback(); + }; + subscription.setAckDeadline({}, done); + }); + }); + describe('formatMessage_', function() { it('should decode stringified JSON to object', function() { var obj = { hi: 'there' }; @@ -527,11 +467,9 @@ describe('Subscription', function() { var msg = Subscription.formatMessage_({ ackId: 3, - pubsubEvent: { - message: { - data: stringified, - messageId: 7 - } + message: { + data: stringified, + messageId: 7 } }); @@ -543,7 +481,7 @@ describe('Subscription', function() { }); it('should decode buffer to string', function() { - var msg = Subscription.formatMessage_(messageObj); + var msg = Subscription.formatMessage_(messageObj.receivedMessages[0]); assert.deepEqual(msg, expectedMessage); }); }); diff --git a/test/pubsub/topic.js b/test/pubsub/topic.js index 06138672bd6..f01044b133c 100644 --- a/test/pubsub/topic.js +++ b/test/pubsub/topic.js @@ -109,7 +109,7 @@ describe('Topic', function() { }); describe('formatName_', function() { - var fullName = '/topics/' + PROJECT_ID + '/' + TOPIC_NAME; + var fullName = 'projects/' + PROJECT_ID + '/topics/' + TOPIC_NAME; it('should format name', function() { var formattedName = Topic.formatName_(PROJECT_ID, TOPIC_NAME); @@ -139,10 +139,9 @@ describe('Topic', function() { it('should send correct api request', function(done) { topic.makeReq_ = function(method, path, query, body) { assert.equal(method, 'POST'); - assert.equal(path, 'topics/publishBatch'); + assert.equal(path, topic.name + ':publish'); assert.strictEqual(query, null); assert.deepEqual(body, { - topic: topic.name, messages: [ { data: new Buffer(JSON.stringify(message)).toString('base64') } ] @@ -166,29 +165,30 @@ describe('Topic', function() { it('should delete a topic', function(done) { topic.makeReq_ = function(method, path) { assert.equal(method, 'DELETE'); - assert.equal(path, 'topics/' + topic.name); + assert.equal(path, topic.name); done(); }; topic.delete(); }); + + it('should call the callback', function(done) { + topic.makeReq_ = function(method, path, q, body, callback) { + callback(); + }; + topic.delete(done); + }); }); describe('subscriptions', function() { var SUB_NAME = 'new-sub-name'; - var SUB_FULL_NAME = '/subscriptions/' + PROJECT_ID + '/' + SUB_NAME; + var SUB_FULL_NAME = 'projects/' + PROJECT_ID + '/subscriptions/' + SUB_NAME; var CONFIG = { autoAck: true, interval: 90 }; describe('getSubscriptions', function() { - it('should call parent getSubscriptions', function(done) { - topic.pubsub.getSubscriptions = function() { - done(); - }; - topic.getSubscriptions(assert.ifError); - }); it('should pass query', function(done) { var query = { pageToken: 1, maxResults: 3 }; - topic.pubsub.getSubscriptions = function(q) { + topic.getSubscriptions = function(q) { assert.strictEqual(q.pageToken, query.pageToken); assert.strictEqual(q.maxResults, query.maxResults); done(); @@ -197,29 +197,11 @@ describe('Topic', function() { }); it('should pass callback', function(done) { - topic.pubsub.getSubscriptions = function(q, callback) { + topic.getSubscriptions = function(q, callback) { callback(); }; topic.getSubscriptions({}, done); }); - - it('should attach scoped topic query to query object', function(done) { - topic.pubsub.getSubscriptions = function(q) { - assert.equal( - q.query, 'pubsub.googleapis.com/topic in (' + topic.name + ')'); - done(); - }; - topic.getSubscriptions({}, assert.ifError); - }); - - it('should attach scoped topic query without a query', function(done) { - topic.pubsub.getSubscriptions = function(q) { - assert.equal( - q.query, 'pubsub.googleapis.com/topic in (' + topic.name + ')'); - done(); - }; - topic.getSubscriptions(assert.ifError); - }); }); describe('subscribe', function() { @@ -245,10 +227,9 @@ describe('Topic', function() { it('should send correct request', function(done) { topic.makeReq_ = function(method, path, qs, body) { - assert.equal(method, 'POST'); - assert.equal(path, 'subscriptions'); + assert.equal(method, 'PUT'); + assert.equal(path, SUB_FULL_NAME); assert.equal(body.topic, topic.name); - assert.equal(body.name, SUB_FULL_NAME); done(); }; topic.subscribe(SUB_NAME, assert.ifError); From 27ebdc1ad7fb3ae4372d7881f8080e62852ed985 Mon Sep 17 00:00:00 2001 From: Ryan Seys Date: Tue, 10 Mar 2015 11:09:07 -0400 Subject: [PATCH 2/2] Fix documentation issues for pubsub v1beta2 --- lib/pubsub/subscription.js | 12 ++++++------ lib/pubsub/topic.js | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index d7cc5da7844..006b3cdbed7 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -380,17 +380,17 @@ Subscription.prototype.pull = function(options, callback) { * to make the message available for redelivery if the processing was * interrupted. * - * @param {object} options - The configuration object. - * @param {number} options.ackId - The ack id to change. - * @param {number} options.seconds - Number of seconds after call is made to + * @param {object} options - The configuration object. + * @param {number} options.ackId - The ack id to change. + * @param {number} options.seconds - Number of seconds after call is made to * set the deadline of the ack. * @param {Function=} callback - The callback function. * * @example * var options = { - * ackId: 123, - * seconds: 10 // Expire in 10 seconds from call. - * } + * ackId: 123, + * seconds: 10 // Expire in 10 seconds from call. + * }; * subscription.setAckDeadline(options, function(err) {}); */ Subscription.prototype.setAckDeadline = function(options, callback) { diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js index 83e50f70f20..7c339e63f86 100644 --- a/lib/pubsub/topic.js +++ b/lib/pubsub/topic.js @@ -115,8 +115,8 @@ Topic.formatName_ = function(projectId, name) { * event: 'new user' * }, * attributes: { - * 'key': 'value', - * 'hello': 'world' + * key: 'value', + * hello: 'world' * } * }; * topic.publish(registrationMessage, function(err, messageIds) {}); @@ -174,7 +174,7 @@ Topic.prototype.delete = function(callback) { }; /** - * Lists the name of the subscriptions for this topic. You may optionally + * Get a list of the subscriptions registered to this topic. You may optionally * provide a query object as the first argument to customize the response. * * Your provided callback will either be invoked with an error object, if an API