Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to define 'push' apis #76

Merged
merged 1 commit into from
Apr 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 79 additions & 33 deletions src/Bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import _ from 'lodash';
import chalk from 'chalk';


const API_MODE_POLL = 'poll';
const API_MODE_PUSH = 'push';


/**
* @param {Mozaik} mozaik
* @returns {*}
Expand All @@ -15,34 +19,62 @@ const Bus = mozaik => {
const subscriptions = {};

/**
* Register a new API
* Push message to matching clients.
*
* @param {String} subscriptionId
* @param {Object} data
*/
const send = (subscriptionId, data) => {
if (!_.has(subscriptions, subscriptionId)) {
mozaik.logger.warn(chalk.red(`No subscription found mathing '${subscriptionId}'`));

return;
}

subscriptions[subscriptionId].clients.forEach((clientId) => {
clients[clientId].send(JSON.stringify(data));
});
};

/**
* Register a new API,
* which is basically an object composed of various methods.
*
* @param {String} id
* @param {Object} api
* @param {String} id unique API identifier
* @param {Object} api api function
* @param {String} mode api mode, can be one of 'poll' or 'push'
*/
const registerApi = (id, api) => {
const registerApi = (id, api, mode = API_MODE_POLL) => {
if (mode !== API_MODE_POLL && mode !== API_MODE_PUSH) {
const errMsg = `API mode '${mode}' is not a valid mode, must be one of 'poll' or 'push'`;
mozaik.logger.error(chalk.red(errMsg));

throw new Error(errMsg);
}

if (_.has(apis, id)) {
const errMsg = `API "${ id }" already registered`;
const errMsg = `API '${id}' already registered`;
mozaik.logger.error(chalk.red(errMsg));

throw new Error(errMsg);
}

apis[id] = api(mozaik);
apis[id] = { methods: api(mozaik), mode };

mozaik.logger.info(chalk.yellow(`registered API "${ id }"`));
mozaik.logger.info(chalk.yellow(`registered API '${id}' (mode: ${mode})`));
};

/**
* Register a new client.
*
* @param {Object} client
* @param {String} id
* @param {WebSocket} client
* @param {String} id
*/
const addClient = (client, id) => {
if (_.has(clients, id)) {
const errMsg = `Client with id "${ id }" already exists`;
const errMsg = `Client with id '${id}' already exists`;
mozaik.logger.error(chalk.red(errMsg));

throw new Error(errMsg);
}

Expand All @@ -54,7 +86,7 @@ const Bus = mozaik => {
/**
* Remove a client.
*
* @param id
* @param {String} id
*/
const removeClient = (id) => {
_.forOwn(subscriptions, (subscription, subscriptionId) => {
Expand All @@ -63,7 +95,7 @@ const Bus = mozaik => {
// if there's no more subscribers, clear the interval
// to avoid consuming APIs for nothing.
if (subscription.clients.length === 0 && subscription.timer) {
mozaik.logger.info(`removing interval for ${subscriptionId}`);
mozaik.logger.info(`removing interval for '${subscriptionId}'`);

clearInterval(subscription.timer);
delete subscription.timer;
Expand All @@ -76,26 +108,21 @@ const Bus = mozaik => {
};

/**
*
* @param {String} id
* @param {Function} callFn
* @param {Object} params
*/
const processApiCall = (id, callFn, params) => {
mozaik.logger.info(`Calling "${id}"`);
mozaik.logger.info(`Calling '${id}'`);

callFn(params)
.then(data => {
const message = {
id,
body: data
};
const message = { id, body: data };

// cache message
subscriptions[id].cached = message;

subscriptions[id].clients.forEach((clientId) => {
clients[clientId].send(JSON.stringify(message));
});
send(id, message);
})
.catch(err => {
mozaik.logger.error(chalk.red(`[${id.split('.')[0]}] ${id} - status code: ${err.status || err.statusCode}`));
Expand All @@ -111,7 +138,7 @@ const Bus = mozaik => {
*/
const clientSubscription = (clientId, request) => {
if (!_.has(clients, clientId)) {
mozaik.logger.error(`Unable to find a client with id "${ clientId }"`);
mozaik.logger.error(`Unable to find a client with id '${clientId}'`);

return;
}
Expand All @@ -120,48 +147,67 @@ const Bus = mozaik => {
const parts = requestId.split('.');
let errMsg;
if (parts.length < 2) {
errMsg = `Invalid request id "${ requestId }", should be something like 'api_id.method'`;
errMsg = `Invalid request id '${requestId}', should be something like 'api_id.method'`;
mozaik.logger.error(chalk.red(errMsg));

throw new Error(errMsg);
}

if (!_.has(apis, parts[0])) {
errMsg = `Unable to find API matching id "${ parts[0] }"`;
errMsg = `Unable to find API matching id '${parts[0]}'`;
mozaik.logger.error(chalk.red(errMsg));

throw new Error(errMsg);
}

const api = apis[parts[0]];
if (!_.has(api, parts[1])) {
errMsg = `Unable to find API method matching "${ parts[1] }"`;
if (!_.has(api.methods, parts[1])) {
errMsg = `Unable to find API method matching '${parts[1]}'`;
mozaik.logger.error(chalk.red(errMsg));

throw new Error(errMsg);
}

const callFn = api[parts[1]];
const callFn = api.methods[parts[1]];
if (!_.isFunction(callFn)) {
errMsg = `API method '${parts[0]}.${parts[1]}' MUST be a function`;
mozaik.logger.error(chalk.red(errMsg));

throw new Error(errMsg);
}

if (!subscriptions[requestId]) {
subscriptions[requestId] = {
clients: [],
currentResponse: null
};

mozaik.logger.info(`Added subscription "${ requestId }"`);
mozaik.logger.info(`Added subscription '${requestId}'`);

// make an immediate call to avoid waiting for the first interval.
processApiCall(requestId, callFn, request.params);
if (api.mode === API_MODE_POLL) {
// make an immediate call to avoid waiting for the first interval.
processApiCall(requestId, callFn, request.params);
} else if (api.mode === API_MODE_PUSH) {
mozaik.logger.info(`Creating producer for '${requestId}'`);
callFn(data => {
send(requestId, {
id: requestId,
body: data
});
}, request.params);
}
}

// if there is no interval running, create one
if (!subscriptions[requestId].timer) {
mozaik.logger.info(`Setting timer for "${ requestId }"`);
if (!subscriptions[requestId].timer && api.mode === API_MODE_POLL) {
mozaik.logger.info(`Setting timer for '${requestId}'`);
subscriptions[requestId].timer = setInterval(() => {
processApiCall(requestId, callFn, request.params);
}, apisPollInterval);
}

// avoid adding a client for the same API call twice
if (!_.includes(subscriptions[requestId].clients, clientId)) {
if (subscriptions[requestId].clients.indexOf(clientId) === -1) {
subscriptions[requestId].clients.push(clientId);

// if there's an available cached response, send it immediately
Expand Down
11 changes: 4 additions & 7 deletions src/CoreApi.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import request from 'superagent-bluebird-promise';
import Promise from 'bluebird';


Expand All @@ -8,12 +7,10 @@ import Promise from 'bluebird';
const CoreApi = mozaik => {
const methods = {
inspector() {
return new Promise((resolve, reject) => {
resolve({
apis: mozaik.bus.listApis(),
clientCount: mozaik.bus.clientCount(),
uptime: process.uptime()
});
return Promise.resolve({
apis: mozaik.bus.listApis(),
clientCount: mozaik.bus.clientCount(),
uptime: process.uptime()
});
},
};
Expand Down
20 changes: 18 additions & 2 deletions src/browser/mixins/ApiConsumerMixin.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,33 @@ import ApiActions from './../actions/ApiActions';

const ApiConsumerMixin = {
componentWillMount() {
const displayName = this.constructor.displayName || 'Unknown';

if (!this.getApiRequest) {
console.warn(`Seems you're trying to use 'ApiConsumerMixin' without implementing 'getApiRequest()', see '${displayName}' component`);
return;
}

this.apiRequest = this.getApiRequest();
this.listenTo(ApiStore, this._onApiData);
if (!this.apiRequest.id) {
console.error(`'getApiRequest()' MUST return an object with an 'id' property, see '${displayName}' component`);
return;
}

this.listenTo(ApiStore, this.onAllApiData);
},

_onApiData(data) {
onAllApiData(data) {
if (data.id === this.apiRequest.id) {
this.onApiData(data.body);
}
},

componentDidMount() {
if (!this.apiRequest || !this.apiRequest.id) {
return;
}

ApiActions.get(this.apiRequest.id, this.apiRequest.params || {});
}
};
Expand Down
Loading