Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement publishBatch and pullBatch #388

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ pubsub.createTopic('my-new-topic', function(err, topic) {});
var topic = pubsub.topic('my-existing-topic');

// Publish a message to the topic.
topic.publish('New message!', function(err) {});
topic.publish({
data: 'New message!'
}, function(err) {});

// Subscribe to the topic.
topic.subscribe('new-subscription', function(err, subscription) {
Expand Down
5 changes: 5 additions & 0 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,14 @@ module.exports.extendGlobalConfig = extendGlobalConfig;
* // [ 'Hi' ]
*/
function arrayize(input) {
if (input === null || input === undefined) {
return [];
}

if (!Array.isArray(input)) {
return [input];
}

return input;
}

Expand Down
89 changes: 67 additions & 22 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,24 @@ Subscription.formatName_ = function(projectId, name) {
* @private
*/
Subscription.formatMessage_ = function(msg) {
var event = msg.pubsubEvent;

var message = {
id: msg.ackId
ackId: msg.ackId
};
var evt = msg.pubsubEvent;
if (evt && evt.message && evt.message.data) {
message.data = new Buffer(evt.message.data, 'base64').toString('utf-8');
try {
message.data = JSON.parse(message.data);
} catch(e) {}

if (event && event.message) {
message.id = event.message.messageId;

if (event.message.data) {
message.data = new Buffer(event.message.data, 'base64').toString('utf-8');

try {
message.data = JSON.parse(message.data);
} catch(e) {}
}
}

return message;
};

Expand Down Expand Up @@ -216,6 +224,7 @@ Subscription.prototype.startPulling_ = function() {
return;
}
this.pull({
maxResults: 1,
returnImmediately: false
}, function(err, message) {
if (err) {
Expand Down Expand Up @@ -243,7 +252,7 @@ Subscription.prototype.startPulling_ = function() {
Subscription.prototype.ack = function(ids, callback) {
if (!ids || ids.length === 0) {
throw new Error(
'At least one ID must be specified before it can be acknowledged');
'At least one ID must be specified before it can be acknowledged.');
}
ids = util.arrayize(ids);
var body = {
Expand Down Expand Up @@ -278,50 +287,86 @@ Subscription.prototype.delete = function(callback) {

/**
* Pull messages from the subscribed topic. If messages were found, your
* callback is executed with the message object.
* callback is executed with an array of message objects.
*
* Note that messages are pulled automatically once you register your first
* event listener to the subscription, thus the call to `pull` is handled for
* you. If you don't want to start pulling, simply don't register a
* `subscription.on('message', function() {})` event handler.
*
* @todo Should not be racing with other pull.
* @todo Fix API to return a list of messages.
*
* @param {object=} options - Configuration object.
* @param {boolean=} options.returnImmediately - If set, the system will respond
* @param {boolean} options.returnImmediately - If set, the system will respond
* immediately. Otherwise, wait until new messages are available. Returns if
* timeout is reached.
* @param {number} options.maxResults - Limit the amount of messages pulled.
* @param {function} callback - The callback function.
*
* @example
* subscription.pull(function(err, message) {
* // message.id = ID used to acknowledge its receival.
* // message.data = Contents of the message.
* //-
* // Pull all available messages.
* //-
* subscription.pull(function(err, messages) {
* // messages = [
* // {
* // ackId: '', // ID used to acknowledge its receival.
* // id: '', // Unique message ID.
* // data: '' // Contents of the message.
* // },
* // // ...
* // ]
* });
*
* //-
* // Pull a single message.
* //-
* var opts = {
* maxResults: 1
* };
*
* subscription.pull(opts, function(err, messages) {});
*/
Subscription.prototype.pull = function(options, callback) {
var that = this;
// TODO(jbd): Should not be racing with other pull.
var MAX_EVENTS_LIMIT = 1000;
var apiEndpoint = 'subscriptions/pullBatch';

if (!callback) {
callback = options;
options = {};
}

if (!util.is(options.maxResults, 'number')) {
options.maxResults = MAX_EVENTS_LIMIT;
}

var body = {
subscription: this.name,
returnImmediately: !!options.returnImmediately
returnImmediately: !!options.returnImmediately,
maxEvents: options.maxResults
};
this.makeReq_(
'POST', 'subscriptions/pull', null, body, function(err, message) {
// TODO(jbd): Fix API to return a list of messages.

this.makeReq_('POST', apiEndpoint, null, body, function(err, response) {
if (err) {
callback(err);
return;
}
message = Subscription.formatMessage_(message);

var messages = response.pullResponses || [response];
messages = messages.map(Subscription.formatMessage_);

if (that.autoAck) {
that.ack(message.id, function(err) {
callback(err, message);
var ackIds = messages.map(function(message) {
return message.ackId;
});

that.ack(ackIds, function(err) {
callback(err, messages);
});
} else {
callback(null, message);
callback(null, messages);
}
});
};
Expand Down
99 changes: 56 additions & 43 deletions lib/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ function Topic(pubsub, options) {
this.pubsub = pubsub;
}

/**
* Format a message object as the upstream API expects it.
*
* @private
*
* @return {object}
*/
Topic.formatMessage_ = function(message) {
if (!util.is(message.data, 'buffer')) {
message.data = new Buffer(JSON.stringify(message.data));
}

message.data = message.data.toString('base64');

return message;
};

/**
* Format the name of a topic. A Topic's full name is in the format of
* /topics/{projectId}/{name}.
Expand All @@ -83,59 +100,55 @@ Topic.formatName_ = function(projectId, name) {
*
* @throws {Error} If no message is provided.
*
* @param {*} message - The message to publish.
* @param {object|object[]} message - The message(s) to publish.
* @param {*} message.data - The contents of the message.
* @param {array=} message.labels - Labels to apply to the message.
* @param {function=} callback - The callback function.
*
* @example
* topic.publish('New message!', function(err) {});
*
* topic.publish({
* user_id: 3,
* name: 'Stephen',
* message: 'Hello from me!'
* }, function(err) {});
*/
Topic.prototype.publish = function(message, callback) {
if (!message) {
throw new Error('Cannot publish an empty message.');
}
callback = callback || util.noop;
if (!util.is(message, 'string') && !util.is(message, 'buffer')) {
message = JSON.stringify(message);
}
this.publishRaw({
data: new Buffer(message).toString('base64')
}, callback);
};

/**
* Publish a raw message.
* var registrationMessage = {
* data: {
* userId: 3,
* name: 'Stephen',
* event: 'new user'
* },
* labels: [
* 'registration'
* ]
* };
* topic.publish(registrationMessage, function(err) {});
*
* @throws {Error} If no message is provided.
*
* @param {object} message - Raw message to publish.
* @param {array=} message.label - List of labels for the message.
* @param {string} message.data - The base64-encoded contents of the message.
* @param {function=} callback - The callback function.
* //-
* // You can publish a batch of messages at once by supplying an array.
* //-
* var purchaseMessage = {
* data: {
* userId: 3,
* product: 'computer',
* event: 'purchase'
* }
* };
*
* @example
* topic.publishRaw({
* data: new Buffer('New message!').toString('base64')
* }, function(err) {});
* topic.publish([
* registrationMessage,
* purchaseMessage
* ], function(err) {});
*/
Topic.prototype.publishRaw = function(message, callback) {
if (!message) {
throw new Error('Cannot publish an empty message.');
Topic.prototype.publish = function(messages, callback) {
messages = util.arrayize(messages);

if (messages.length === 0) {
throw new Error('Cannot publish without a message.');
}

callback = callback || util.noop;
if (!util.is(message.data, 'string') && !util.is(message.data, 'buffer')) {
message.data = new Buffer(JSON.stringify(message.data)).toString('base64');
}

var body = {
message: message,
topic: this.name
topic: this.name,
messages: messages.map(Topic.formatMessage_)
};
this.makeReq_('POST', 'topics/publish', null, body, callback);

this.makeReq_('POST', 'topics/publishBatch', null, body, callback);
};

/**
Expand Down
Loading