Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Add jaeger instrumentations for Kafka, Rest, MQTT data send/get
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzcankirmemis committed Jan 17, 2019
1 parent b66ea63 commit 2632aaf
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions public-interface/lib/advancedanalytics-proxy/data-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var MQTTConnector = require('./../../lib/mqtt'),
util = require('../dateUtil'),
logger = require('../logger').init(),
contextProvider = require('./../context-provider').instance(),
tracer = require('./../express-jaeger').tracer,
errBuilder = require("../errorHandler").errBuilder,
Metric = require('./Metric.data').init(util),
responses = require('./utils/responses');
Expand Down Expand Up @@ -64,6 +65,12 @@ var buildICFALMessage = function(data) {
};
};

var createSpan = function(name) {
const routeSpan = contextProvider.get('routeSpan');
const span = tracer.startSpan(name, { childOf: routeSpan });
return span;
}

module.exports = function(config) {

var connector = new MQTTConnector.Broker(config.mqtt, logger);
Expand All @@ -86,6 +93,7 @@ module.exports = function(config) {
}

this.submitDataMQTT = function(data, callback){
const span = createSpan('submitDataMQTT');

var dataMetric = new Metric();

Expand All @@ -98,10 +106,14 @@ module.exports = function(config) {
*/
connector.publish(options.topic, options.message);

span.finish();

callback(null);
};

this.submitDataREST = function(data, callback) {
const span = createSpan('submitDataRest');

var dataMetric = new Metric();
var message = dataMetric.prepareDataIngestionMsg(data);

Expand All @@ -117,6 +129,8 @@ module.exports = function(config) {
};
logger.debug("Calling proxy to submit data");
request(options, function(err, res) {
span.finish();

logger.debug("END Calling proxy to submit data");
if (!err) {
if (res.statusCode === responses.Success.Created) {
Expand All @@ -133,6 +147,7 @@ module.exports = function(config) {
};

this.submitDataKafka = function(data, callback){
const span = createSpan('submitDataKafka');

try {
var dataMetric = new Metric(),
Expand All @@ -145,6 +160,8 @@ module.exports = function(config) {
messages: message
}
], function (err, data) {
span.finish();

if (err) {
logger.error("Error when forwarding observation to Kafka: " + JSON.stringify(err));
callback(errBuilder.build(errBuilder.Errors.Data.SubmissionError));
Expand All @@ -154,12 +171,15 @@ module.exports = function(config) {
}
});
} catch(exception) {
span.finish();

logger.error("Exception occured when forwarding observation to Kafka: " + exception);
callback(errBuilder.build(errBuilder.Errors.Data.SubmissionError));
}
};

this.dataInquiry = function(data, callback){
const span = createSpan('dataInquiry');

var dataInquiryMessage = buildDIMessage(data);
if(data.queryMeasureLocation !== undefined){
Expand All @@ -179,6 +199,8 @@ module.exports = function(config) {

request(options, function (err, res) {
try {
span.finish();

if (!err && (res.statusCode === responses.Success.OK)) {
logger.debug("data-proxy. dataInquiry, Got Response from AA API: " + res.body);
callback(null, JSON.parse(res.body));
Expand All @@ -191,7 +213,7 @@ module.exports = function(config) {
} else if (!err && (res.statusCode === responses.Errors.EntityToLarge)) {
logger.warn("data-proxy. dataInquiry, Got Entity-too-large Response from AA API : " + res.body);
callback("EntityTooLarge");
} else {
} else {
var errMsg = responses.buildErrorMessage(err, res);
logger.warn("data-proxy. dataInquiry, error: " + errMsg);
callback({message: 'error receiving data'});
Expand All @@ -213,8 +235,10 @@ module.exports = function(config) {
}
return message;
};

this.dataInquiryAdvanced = function(data, callback) {
const span = createSpan('dataInquiryAdvanced');

var accountId = data.accountId;
delete data.accountId;

Expand All @@ -233,6 +257,8 @@ module.exports = function(config) {

request(options, function(err, res){
try {
span.finish();

if (!err && (res.statusCode === responses.Success.OK)) {
logger.debug("data-proxy. dataInquiryAdvanced, Got Response from AA API: " + JSON.stringify(res.body));
callback(null, JSON.parse(res.body));
Expand Down Expand Up @@ -262,6 +288,7 @@ module.exports = function(config) {
};

this.report = function(data, callback) {
const span = createSpan('dataReport');

var domainId = data.domainId;
delete data.domainId;
Expand All @@ -280,6 +307,8 @@ module.exports = function(config) {
logger.debug("data-proxy. report, options: " + JSON.stringify(options));

request(options, function (err, res) {
span.finish();

if (!err && (res.statusCode === responses.Success.OK)) {
logger.debug("data-proxy. report, Got Response from AA API: " + JSON.stringify(res.body));
callback(null, res.body);
Expand All @@ -303,6 +332,7 @@ module.exports = function(config) {
};

this.getFirstAndLastMeasurement = function(data, callback) {
span.createSpan('getFirstAndLastMeasurement');

var domainId = data.domainId;
delete data.domainId;
Expand All @@ -321,6 +351,8 @@ module.exports = function(config) {
logger.debug("data-proxy. getFirstAndLastMeasurement, options: " + JSON.stringify(options));

request(options, function (err, res) {
span.finish();

if (!err && res.statusCode === responses.Success.OK) {
logger.debug("data-proxy. getFirstAndLastMeasurement, Got Response from AA API: " + JSON.stringify(res.body));
callback(null, res.body);
Expand Down

0 comments on commit 2632aaf

Please sign in to comment.