Skip to content

Commit

Permalink
GCF: Pub/Sub updates (#1363)
Browse files Browse the repository at this point in the history
* Shorten + upgrade samples

* Pub/Sub: Fix sample bug + system tests + Node 8 upgrades

* Fix package.json typo

* Handle Linux's ungraceful timeouts
  • Loading branch information
Ace Nassri authored Jun 17, 2019
1 parent e4e707e commit 51dc42b
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 156 deletions.
51 changes: 17 additions & 34 deletions functions/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,12 @@ const {Buffer} = require('safe-buffer');
* @param {string} req.body.message Message to publish.
* @param {object} res Cloud Function response context.
*/
exports.publish = (req, res) => {
if (!req.body.topic) {
exports.publish = async (req, res) => {
if (!req.body.topic || !req.body.message) {
res
.status(500)
.send(
new Error(
'Topic not provided. Make sure you have a "topic" property in your request'
)
);
return;
} else if (!req.body.message) {
res
.status(500)
.send(
new Error(
'Message not provided. Make sure you have a "message" property in your request'
)
'Missing parameter(s); include "topic" and "subscription" properties in your request.'
);
return;
}
Expand All @@ -65,40 +54,34 @@ exports.publish = (req, res) => {
// References an existing topic
const topic = pubsub.topic(req.body.topic);

const message = {
const messageObject = {
data: {
message: req.body.message,
},
};
const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'base64');

// Publishes a message
return topic
.publish(message)
.then(() => res.status(200).send('Message published.'))
.catch(err => {
console.error(err);
res.status(500).send(err);
return Promise.reject(err);
});
try {
await topic.publish(messageBuffer);
res.status(200).send('Message published.');
} catch (err) {
console.error(err);
res.status(500).send(err);
return Promise.reject(err);
}
};
// [END functions_pubsub_publish]

// [START functions_pubsub_subscribe]
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {object} event The Cloud Functions event.
* @param {object} event.data The Cloud Pub/Sub Message object.
* @param {string} event.data.data The "data" property of the Cloud Pub/Sub Message.
* @param {function} callback The callback function.
* @param {object} pubsubMessage The Cloud Pub/Sub Message object.
* @param {string} pubsubMessage.data The "data" property of the Cloud Pub/Sub Message.
*/
exports.subscribe = (event, callback) => {
const pubsubMessage = event.data;

// We're just going to log the message to prove that it worked!
exports.subscribe = pubsubMessage => {
// Print out the data from Pub/Sub, to prove that it worked
console.log(Buffer.from(pubsubMessage.data, 'base64').toString());

// Don't forget to call the callback!
callback();
};
// [END functions_pubsub_subscribe]
10 changes: 7 additions & 3 deletions functions/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
},
"devDependencies": {
"@google-cloud/nodejs-repo-tools": "^3.3.0",
"child-process-promise": "^2.2.1",
"mocha": "^6.0.0",
"proxyquire": "^2.1.0",
"sinon": "^7.2.7"
"request": "^2.88.0",
"requestretry": "^4.0.0"
},
"cloud-repo-tools": {
"requiresKeyFile": true,
"requiresProjectId": true
"requiresProjectId": true,
"requiredEnvVars": [
"FUNCTIONS_TOPIC"
]
}
}
190 changes: 71 additions & 119 deletions functions/pubsub/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,134 +15,86 @@

'use strict';

const proxyquire = require('proxyquire').noCallThru();
const sinon = require('sinon');
const assert = require('assert');
const tools = require('@google-cloud/nodejs-repo-tools');
const {Buffer} = require('safe-buffer');
const path = require('path');

const TOPIC = 'topic';
const execPromise = require('child-process-promise').exec;
const requestRetry = require('requestretry');

const BASE_URL = process.env.BASE_URL || 'http://localhost:8080';
const cwd = path.join(__dirname, '..');

const TOPIC = process.env.FUNCTIONS_TOPIC;
const MESSAGE = 'Hello, world!';

function getSample() {
const topicMock = {
publish: sinon.stub().returns(Promise.resolve()),
};
const pubsubMock = {
topic: sinon.stub().returns(topicMock),
};
const PubSubMock = sinon.stub().returns(pubsubMock);

return {
program: proxyquire('../', {
'@google-cloud/pubsub': {PubSub: PubSubMock},
}),
mocks: {
PubSub: PubSubMock,
pubsub: pubsubMock,
topic: topicMock,
req: {
body: {
topic: TOPIC,
message: MESSAGE,
},
},
res: {
status: sinon.stub().returnsThis(),
send: sinon.stub().returnsThis(),
describe('functions/pubsub', () => {
beforeEach(tools.stubConsole);
afterEach(tools.restoreConsole);

let ffProc;

before(() => {
ffProc = execPromise(
`functions-framework --target=publish --signature-type=http`,
{timeout: 1000, shell: true, cwd}
);
});

after(async () => {
try {
await ffProc;
} catch (err) {
// Timeouts always cause errors on Linux, so catch them
if (err.name && err.name === 'ChildProcessError') {
return;
}

throw err;
}
});

it('publish fails without parameters', async () => {
const response = await requestRetry({
url: `${BASE_URL}/`,
method: 'POST',
body: {},
retryDelay: 200,
json: true,
});

assert.strictEqual(response.statusCode, 500);
assert.strictEqual(
response.body,
'Missing parameter(s); include "topic" and "subscription" properties in your request.'
);
});

it('publishes a message', async () => {
const response = await requestRetry({
url: `${BASE_URL}/`,
method: 'POST',
body: {
topic: TOPIC,
message: 'Pub/Sub from Cloud Functions',
},
},
};
}

beforeEach(tools.stubConsole);
afterEach(tools.restoreConsole);

it('Publish fails without a topic', () => {
const expectedMsg =
'Topic not provided. Make sure you have a "topic" property in your request';
const sample = getSample();

delete sample.mocks.req.body.topic;
sample.program.publish(sample.mocks.req, sample.mocks.res);

assert.strictEqual(sample.mocks.res.status.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.status.firstCall.args, [500]);
assert.strictEqual(sample.mocks.res.send.callCount, 1);
assert.strictEqual(
sample.mocks.res.send.firstCall.args[0].message,
expectedMsg
);
});
retryDelay: 200,
json: true,
});

it('Publish fails without a message', () => {
const expectedMsg =
'Message not provided. Make sure you have a "message" property in your request';
const sample = getSample();

delete sample.mocks.req.body.message;
sample.program.publish(sample.mocks.req, sample.mocks.res);

assert.strictEqual(sample.mocks.res.status.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.status.firstCall.args, [500]);
assert.strictEqual(sample.mocks.res.send.callCount, 1);
assert.strictEqual(
sample.mocks.res.send.firstCall.args[0].message,
expectedMsg
);
});
assert.strictEqual(response.statusCode, 200);
assert.strictEqual(response.body, 'Message published.');
});

it('Publishes the message to the topic and calls success', async () => {
const expectedMsg = 'Message published.';
const sample = getSample();
it('prints out a message', () => {
const jsonObject = JSON.stringify({data: MESSAGE});
const jsonBuffer = Buffer.from(jsonObject).toString('base64');
const pubsubMessage = {data: jsonBuffer, attributes: {}};

await sample.program.publish(sample.mocks.req, sample.mocks.res);
assert.strictEqual(sample.mocks.topic.publish.callCount, 1);
assert.deepStrictEqual(sample.mocks.topic.publish.firstCall.args, [
{
data: {
message: MESSAGE,
},
},
]);
assert.strictEqual(sample.mocks.res.status.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.status.firstCall.args, [200]);
assert.strictEqual(sample.mocks.res.send.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.send.firstCall.args, [expectedMsg]);
});

it('Fails to publish the message and calls failure', async () => {
const error = new Error('error');
const sample = getSample();
sample.mocks.topic.publish.returns(Promise.reject(error));

try {
await sample.program.publish(sample.mocks.req, sample.mocks.res);
} catch (err) {
assert.deepStrictEqual(err, error);
assert.strictEqual(console.error.callCount, 1);
assert.deepStrictEqual(console.error.firstCall.args, [error]);
assert.strictEqual(sample.mocks.res.status.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.status.firstCall.args, [500]);
assert.strictEqual(sample.mocks.res.send.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.send.firstCall.args, [error]);
}
});
require('..').subscribe(pubsubMessage);

it('Subscribes to a message', () => {
const callback = sinon.stub();
const json = JSON.stringify({data: MESSAGE});
const event = {
data: {
data: Buffer.from(json).toString('base64'),
},
};

const sample = getSample();
sample.program.subscribe(event, callback);

assert.strictEqual(console.log.callCount, 1);
assert.deepStrictEqual(console.log.firstCall.args, [json]);
assert.strictEqual(callback.callCount, 1);
assert.deepStrictEqual(callback.firstCall.args, []);
assert.strictEqual(console.log.callCount, 1);
assert.deepStrictEqual(console.log.firstCall.args, [jsonObject]);
});
});

0 comments on commit 51dc42b

Please sign in to comment.