Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCF: Pub/Sub updates #1363

Merged
merged 9 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 17 additions & 34 deletions functions/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,12 @@ const {Buffer} = require('safe-buffer');
* @param {string} req.body.message Message to publish.
* @param {object} res Cloud Function response context.
*/
exports.publish = (req, res) => {
if (!req.body.topic) {
exports.publish = async (req, res) => {
if (!req.body.topic || !req.body.message) {
res
.status(500)
.send(
new Error(
'Topic not provided. Make sure you have a "topic" property in your request'
)
);
return;
} else if (!req.body.message) {
res
.status(500)
.send(
new Error(
'Message not provided. Make sure you have a "message" property in your request'
)
'Missing parameter(s); include "topic" and "subscription" properties in your request.'
);
return;
}
Expand All @@ -65,40 +54,34 @@ exports.publish = (req, res) => {
// References an existing topic
const topic = pubsub.topic(req.body.topic);

const message = {
const messageObject = {
data: {
message: req.body.message,
},
};
const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'base64');

// Publishes a message
return topic
.publish(message)
.then(() => res.status(200).send('Message published.'))
.catch(err => {
console.error(err);
res.status(500).send(err);
return Promise.reject(err);
});
try {
await topic.publish(messageBuffer);
res.status(200).send('Message published.');
} catch (err) {
console.error(err);
res.status(500).send(err);
return Promise.reject(err);
}
};
// [END functions_pubsub_publish]

// [START functions_pubsub_subscribe]
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {object} event The Cloud Functions event.
* @param {object} event.data The Cloud Pub/Sub Message object.
* @param {string} event.data.data The "data" property of the Cloud Pub/Sub Message.
* @param {function} callback The callback function.
* @param {object} pubsubMessage The Cloud Pub/Sub Message object.
* @param {string} pubsubMessage.data The "data" property of the Cloud Pub/Sub Message.
*/
exports.subscribe = (event, callback) => {
const pubsubMessage = event.data;

// We're just going to log the message to prove that it worked!
exports.subscribe = pubsubMessage => {
// Print out the data from Pub/Sub, to prove that it worked
console.log(Buffer.from(pubsubMessage.data, 'base64').toString());

// Don't forget to call the callback!
callback();
};
// [END functions_pubsub_subscribe]
10 changes: 7 additions & 3 deletions functions/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
},
"devDependencies": {
"@google-cloud/nodejs-repo-tools": "^3.3.0",
"child-process-promise": "^2.2.1",
"mocha": "^6.0.0",
"proxyquire": "^2.1.0",
"sinon": "^7.2.7"
"request": "^2.88.0",
"requestretry": "^4.0.0"
},
"cloud-repo-tools": {
"requiresKeyFile": true,
"requiresProjectId": true
"requiresProjectId": true,
"requiredEnvVars": [
"FUNCTIONS_TOPIC"
]
}
}
190 changes: 71 additions & 119 deletions functions/pubsub/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,134 +15,86 @@

'use strict';

const proxyquire = require('proxyquire').noCallThru();
const sinon = require('sinon');
const assert = require('assert');
const tools = require('@google-cloud/nodejs-repo-tools');
const {Buffer} = require('safe-buffer');
const path = require('path');

const TOPIC = 'topic';
const execPromise = require('child-process-promise').exec;
const requestRetry = require('requestretry');

const BASE_URL = process.env.BASE_URL || 'http://localhost:8080';
const cwd = path.join(__dirname, '..');

const TOPIC = process.env.FUNCTIONS_TOPIC;
const MESSAGE = 'Hello, world!';

function getSample() {
const topicMock = {
publish: sinon.stub().returns(Promise.resolve()),
};
const pubsubMock = {
topic: sinon.stub().returns(topicMock),
};
const PubSubMock = sinon.stub().returns(pubsubMock);

return {
program: proxyquire('../', {
'@google-cloud/pubsub': {PubSub: PubSubMock},
}),
mocks: {
PubSub: PubSubMock,
pubsub: pubsubMock,
topic: topicMock,
req: {
body: {
topic: TOPIC,
message: MESSAGE,
},
},
res: {
status: sinon.stub().returnsThis(),
send: sinon.stub().returnsThis(),
describe('functions/pubsub', () => {
beforeEach(tools.stubConsole);
afterEach(tools.restoreConsole);

let ffProc;

before(() => {
ffProc = execPromise(
`functions-framework --target=publish --signature-type=http`,
{timeout: 1000, shell: true, cwd}
);
});

after(async () => {
try {
await ffProc;
} catch (err) {
// Timeouts always cause errors on Linux, so catch them
if (err.name && err.name === 'ChildProcessError') {
return;
}

throw err;
}
});

it('publish fails without parameters', async () => {
const response = await requestRetry({
url: `${BASE_URL}/`,
method: 'POST',
body: {},
retryDelay: 200,
json: true,
});

assert.strictEqual(response.statusCode, 500);
assert.strictEqual(
response.body,
'Missing parameter(s); include "topic" and "subscription" properties in your request.'
);
});

it('publishes a message', async () => {
const response = await requestRetry({
url: `${BASE_URL}/`,
method: 'POST',
body: {
topic: TOPIC,
message: 'Pub/Sub from Cloud Functions',
},
},
};
}

beforeEach(tools.stubConsole);
afterEach(tools.restoreConsole);

it('Publish fails without a topic', () => {
const expectedMsg =
'Topic not provided. Make sure you have a "topic" property in your request';
const sample = getSample();

delete sample.mocks.req.body.topic;
sample.program.publish(sample.mocks.req, sample.mocks.res);

assert.strictEqual(sample.mocks.res.status.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.status.firstCall.args, [500]);
assert.strictEqual(sample.mocks.res.send.callCount, 1);
assert.strictEqual(
sample.mocks.res.send.firstCall.args[0].message,
expectedMsg
);
});
retryDelay: 200,
json: true,
});

it('Publish fails without a message', () => {
const expectedMsg =
'Message not provided. Make sure you have a "message" property in your request';
const sample = getSample();

delete sample.mocks.req.body.message;
sample.program.publish(sample.mocks.req, sample.mocks.res);

assert.strictEqual(sample.mocks.res.status.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.status.firstCall.args, [500]);
assert.strictEqual(sample.mocks.res.send.callCount, 1);
assert.strictEqual(
sample.mocks.res.send.firstCall.args[0].message,
expectedMsg
);
});
assert.strictEqual(response.statusCode, 200);
assert.strictEqual(response.body, 'Message published.');
});

it('Publishes the message to the topic and calls success', async () => {
const expectedMsg = 'Message published.';
const sample = getSample();
it('prints out a message', () => {
const jsonObject = JSON.stringify({data: MESSAGE});
const jsonBuffer = Buffer.from(jsonObject).toString('base64');
const pubsubMessage = {data: jsonBuffer, attributes: {}};

await sample.program.publish(sample.mocks.req, sample.mocks.res);
assert.strictEqual(sample.mocks.topic.publish.callCount, 1);
assert.deepStrictEqual(sample.mocks.topic.publish.firstCall.args, [
{
data: {
message: MESSAGE,
},
},
]);
assert.strictEqual(sample.mocks.res.status.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.status.firstCall.args, [200]);
assert.strictEqual(sample.mocks.res.send.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.send.firstCall.args, [expectedMsg]);
});

it('Fails to publish the message and calls failure', async () => {
const error = new Error('error');
const sample = getSample();
sample.mocks.topic.publish.returns(Promise.reject(error));

try {
await sample.program.publish(sample.mocks.req, sample.mocks.res);
} catch (err) {
assert.deepStrictEqual(err, error);
assert.strictEqual(console.error.callCount, 1);
assert.deepStrictEqual(console.error.firstCall.args, [error]);
assert.strictEqual(sample.mocks.res.status.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.status.firstCall.args, [500]);
assert.strictEqual(sample.mocks.res.send.callCount, 1);
assert.deepStrictEqual(sample.mocks.res.send.firstCall.args, [error]);
}
});
require('..').subscribe(pubsubMessage);

it('Subscribes to a message', () => {
const callback = sinon.stub();
const json = JSON.stringify({data: MESSAGE});
const event = {
data: {
data: Buffer.from(json).toString('base64'),
},
};

const sample = getSample();
sample.program.subscribe(event, callback);

assert.strictEqual(console.log.callCount, 1);
assert.deepStrictEqual(console.log.firstCall.args, [json]);
assert.strictEqual(callback.callCount, 1);
assert.deepStrictEqual(callback.firstCall.args, []);
assert.strictEqual(console.log.callCount, 1);
assert.deepStrictEqual(console.log.firstCall.args, [jsonObject]);
});
});