Skip to content

Commit

Permalink
pubsub: fix setMetadata method (#2643)
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop authored and stephenplusplus committed Oct 3, 2017
1 parent c7d0e67 commit f2a3c0a
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 52 deletions.
1 change: 1 addition & 0 deletions packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"google-proto-files": "^0.12.0",
"grpc": "^1.6.0",
"is": "^3.0.1",
"lodash.snakecase": "^4.1.1",
"uuid": "^3.0.1"
},
"devDependencies": {
Expand Down
24 changes: 4 additions & 20 deletions packages/pubsub/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,33 +173,17 @@ PubSub.prototype.createSubscription = function(topic, name, options, callback) {

options = options || {};

var subscription = this.subscription(name, options);
var metadata = Subscription.formatMetadata_(options);
var subscription = this.subscription(name, metadata);

var reqOpts = extend({
var reqOpts = extend(metadata, {
topic: topic.name,
name: subscription.name
}, options);
});

delete reqOpts.gaxOpts;
delete reqOpts.flowControl;

if (options.messageRetentionDuration) {
reqOpts.retainAckedMessages = true;

reqOpts.messageRetentionDuration = {
seconds: options.messageRetentionDuration,
nanos: 0
};
}

if (options.pushEndpoint) {
delete reqOpts.pushEndpoint;

reqOpts.pushConfig = {
pushEndpoint: options.pushEndpoint
};
}

this.request({
client: 'subscriberClient',
method: 'createSubscription',
Expand Down
40 changes: 38 additions & 2 deletions packages/pubsub/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var events = require('events');
var extend = require('extend');
var is = require('is');
var os = require('os');
var snakeCase = require('lodash.snakecase');
var util = require('util');

/**
Expand Down Expand Up @@ -231,6 +232,34 @@ function Subscription(pubsub, name, options) {

util.inherits(Subscription, events.EventEmitter);

/**
* Formats Subscription metadata.
*
* @private
*/
Subscription.formatMetadata_ = function(metadata) {
var formatted = extend({}, metadata);

if (metadata.messageRetentionDuration) {
formatted.retainAckedMessages = true;

formatted.messageRetentionDuration = {
seconds: metadata.messageRetentionDuration,
nanos: 0
};
}

if (metadata.pushEndpoint) {
delete formatted.pushEndpoint;

formatted.pushConfig = {
pushEndpoint: metadata.pushEndpoint
};
}

return formatted;
};

/**
* Format the name of a subscription. A subscription's full name is in the
* format of projects/{projectId}/subscriptions/{subName}.
Expand Down Expand Up @@ -1042,9 +1071,16 @@ Subscription.prototype.setMetadata = function(metadata, gaxOpts, callback) {
gaxOpts = {};
}

var subscription = Subscription.formatMetadata_(metadata);
var fields = Object.keys(subscription).map(snakeCase);

subscription.name = this.name;

var reqOpts = {
subscription: this.name,
updateMask: metadata
subscription: subscription,
updateMask: {
paths: fields
}
};

this.request({
Expand Down
28 changes: 28 additions & 0 deletions packages/pubsub/system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,34 @@ describe('pubsub', function() {
});
});

it('should set metadata for a subscription', function() {
var subscription = topic.subscription(generateSubName());
var threeDaysInSeconds = 3 * 24 * 60 * 60;

return subscription.create()
.then(function() {
return subscription.setMetadata({
messageRetentionDuration: threeDaysInSeconds
});
})
.then(function() {
return subscription.getMetadata();
})
.then(function(data) {
var metadata = data[0];

assert.strictEqual(metadata.retainAckedMessages, true);
assert.strictEqual(
parseInt(metadata.messageRetentionDuration.seconds, 10),
threeDaysInSeconds
);
assert.strictEqual(
parseInt(metadata.messageRetentionDuration.nanos, 10),
0
);
});
});

it('should re-use an existing subscription', function(done) {
pubsub.createSubscription(topic, SUB_NAMES[0], done);
});
Expand Down
46 changes: 21 additions & 25 deletions packages/pubsub/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ describe('PubSub', function() {
name: 'subscription-name'
};

beforeEach(function() {
Subscription.formatMetadata_ = function(metadata) {
return extend({}, metadata);
};
});

it('should throw if no Topic is provided', function() {
assert.throws(function() {
pubsub.createSubscription();
Expand Down Expand Up @@ -353,13 +359,7 @@ describe('PubSub', function() {
var expectedBody = extend({
topic: TOPIC.name,
name: SUB_NAME
}, options, {
pushConfig: {
pushEndpoint: options.pushEndpoint
}
});

delete expectedBody.pushEndpoint;
}, options);

pubsub.topic = function() {
return {
Expand Down Expand Up @@ -413,27 +413,23 @@ describe('PubSub', function() {
pubsub.createSubscription(TOPIC, SUB_NAME, options, assert.ifError);
});

describe('message retention', function() {
it('should accept a number', function(done) {
var threeDaysInSeconds = 3 * 24 * 60 * 60;

pubsub.request = function(config) {
assert.strictEqual(config.reqOpts.retainAckedMessages, true);

assert.strictEqual(
config.reqOpts.messageRetentionDuration.seconds,
threeDaysInSeconds
);
it('should format the metadata', function(done) {
var fakeMetadata = {};
var formatted = {
a: 'a'
};

assert.strictEqual(config.reqOpts.messageRetentionDuration.nanos, 0);
Subscription.formatMetadata_ = function(metadata) {
assert.strictEqual(metadata, fakeMetadata);
return formatted;
};

done();
};
pubsub.request = function(config) {
assert.strictEqual(config.reqOpts, formatted);
done();
};

pubsub.createSubscription(TOPIC_NAME, SUB_NAME, {
messageRetentionDuration: threeDaysInSeconds
}, assert.ifError);
});
pubsub.createSubscription(TOPIC, SUB_NAME, fakeMetadata, assert.ifError);
});

describe('error', function() {
Expand Down
72 changes: 67 additions & 5 deletions packages/pubsub/test/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,47 @@ describe('Subscription', function() {
});
});

describe('formatMetadata_', function() {
it('should make a copy of the metadata', function() {
var metadata = { a: 'a' };
var formatted = Subscription.formatMetadata_(metadata);

assert.deepEqual(metadata, formatted);
assert.notStrictEqual(metadata, formatted);
});

it('should format messageRetentionDuration', function() {
var threeDaysInSeconds = 3 * 24 * 60 * 60;

var metadata = {
messageRetentionDuration: threeDaysInSeconds
};

var formatted = Subscription.formatMetadata_(metadata);

assert.strictEqual(formatted.retainAckedMessages, true);
assert.strictEqual(formatted.messageRetentionDuration.nanos, 0);

assert.strictEqual(
formatted.messageRetentionDuration.seconds,
threeDaysInSeconds
);
});

it('should format pushEndpoint', function() {
var pushEndpoint = 'http://noop.com/push';

var metadata = {
pushEndpoint: pushEndpoint
};

var formatted = Subscription.formatMetadata_(metadata);

assert.strictEqual(formatted.pushConfig.pushEndpoint, pushEndpoint);
assert.strictEqual(formatted.pushEndpoint, undefined);
});
});

describe('formatName_', function() {
it('should format name', function() {
var formattedName = Subscription.formatName_(PROJECT_ID, SUB_NAME);
Expand Down Expand Up @@ -1797,16 +1838,37 @@ describe('Subscription', function() {
});

describe('setMetadata', function() {
var METADATA = {};
var METADATA = {
pushEndpoint: 'http://noop.com/push'
};

beforeEach(function() {
Subscription.formatMetadata_ = function(metadata) {
return extend({}, metadata);
};
});

it('should make the correct request', function(done) {
var formattedMetadata = {
pushConfig: {
pushEndpoint: METADATA.pushEndpoint
}
};

var expectedBody = extend({
name: SUB_FULL_NAME
}, formattedMetadata);

Subscription.formatMetadata_ = function(metadata) {
assert.strictEqual(metadata, METADATA);
return formattedMetadata;
};

subscription.request = function(config, callback) {
assert.strictEqual(config.client, 'subscriberClient');
assert.strictEqual(config.method, 'updateSubscription');
assert.deepEqual(config.reqOpts, {
subscription: subscription.name,
updateMask: METADATA
});
assert.deepEqual(config.reqOpts.subscription, expectedBody);
assert.deepEqual(config.reqOpts.updateMask, { paths: ['push_config'] });
callback(); // the done fn
};

Expand Down

0 comments on commit f2a3c0a

Please sign in to comment.