-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support APM Server intake API version 2 #465
Changes from 1 commit
4427eda
4db7e11
f19a007
7c80d2e
fee45aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,9 +16,9 @@ var connect = require('./middleware/connect') | |
var Filters = require('./filters') | ||
var Instrumentation = require('./instrumentation') | ||
var parsers = require('./parsers') | ||
var request = require('./request') | ||
var stackman = require('./stackman') | ||
var symbols = require('./symbols') | ||
var truncate = require('./truncate') | ||
|
||
var IncomingMessage = http.IncomingMessage | ||
var ServerResponse = http.ServerResponse | ||
|
@@ -33,6 +33,7 @@ function Agent () { | |
|
||
this._instrumentation = new Instrumentation(this) | ||
this._filters = new Filters() | ||
this._apmServer = null | ||
|
||
this._config() | ||
} | ||
|
@@ -43,6 +44,10 @@ Object.defineProperty(Agent.prototype, 'currentTransaction', { | |
} | ||
}) | ||
|
||
Agent.prototype.destroy = function () { | ||
if (this._apmServer) this._apmServer.destroy() | ||
} | ||
|
||
Agent.prototype.startTransaction = function () { | ||
return this._instrumentation.startTransaction.apply(this._instrumentation, arguments) | ||
} | ||
|
@@ -131,15 +136,35 @@ Agent.prototype.start = function (opts) { | |
}) | ||
} | ||
|
||
this._instrumentation.start() | ||
this._apmServer = new ElasticAPMHttpClient({ | ||
// metadata | ||
agentName: 'nodejs', | ||
agentVersion: version, | ||
serviceName: this._conf.serviceName, | ||
serviceVersion: this._conf.serviceVersion, | ||
frameworkName: this._conf.frameworkName, | ||
frameworkVersion: this._conf.frameworkVersion, | ||
hostname: this._conf.hostname, | ||
|
||
this._httpClient = new ElasticAPMHttpClient({ | ||
// Sanitize conf | ||
truncateStringsAt: config.INTAKE_STRING_MAX_SIZE, // TODO: Do we need to set this (it's the default value) | ||
|
||
// HTTP conf | ||
secretToken: this._conf.secretToken, | ||
userAgent: userAgent, | ||
serverUrl: this._conf.serverUrl, | ||
rejectUnauthorized: this._conf.verifyServerCert, | ||
serverTimeout: this._conf.serverTimeout * 1000 | ||
serverTimeout: this._conf.serverTimeout * 1000, | ||
|
||
// Streaming conf | ||
size: this._conf.apiRequestSize, | ||
time: this._conf.apiRequestTime * 1000 | ||
}) | ||
this._apmServer.on('error', err => { | ||
this.logger.error('An error occrued while communicating with the APM Server:', err.message) | ||
}) | ||
|
||
this._instrumentation.start() | ||
|
||
Error.stackTraceLimit = this._conf.stackTraceLimit | ||
if (this._conf.captureExceptions) this.handleUncaughtExceptions() | ||
|
@@ -287,8 +312,24 @@ Agent.prototype.captureError = function (err, opts, cb) { | |
} | ||
|
||
function send (error) { | ||
agent.logger.info('logging error %s with Elastic APM', id) | ||
request.errors(agent, [error], cb) | ||
error = agent._filters.process(error) // TODO: Update filter to expect this format | ||
|
||
if (!error) { | ||
agent.logger.debug('error ignored by filter %o', {id: id}) | ||
cb() | ||
return | ||
} | ||
|
||
truncate.error(error, agent._conf) | ||
|
||
if (agent._apmServer) { | ||
agent.logger.info(`Sending error ${id} to Elastic APM`) | ||
agent._apmServer.sendError(error, function () { | ||
agent._apmServer.flush(cb) | ||
}) | ||
} else { | ||
process.nextTick(cb.bind(null, new Error('cannot capture error before agent is started'))) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -314,7 +355,12 @@ Agent.prototype.handleUncaughtExceptions = function (cb) { | |
} | ||
|
||
Agent.prototype.flush = function (cb) { | ||
this._instrumentation.flush(cb) | ||
if (this._apmServer) { | ||
this._apmServer.flush(cb) | ||
} else { | ||
this.logger.warn(new Error('cannot flush agent before it is started')) | ||
process.nextTick(cb) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be worthwhile for the flush callback be able to receive that error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left it out as this only occurs if the agent isn't started. So from the outside I want the user to use the module in the same way no matter if the agent is started or not. And normally errors coming from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I just realized that the callback given to But I think maybe that should be avoided in |
||
} | ||
} | ||
|
||
Agent.prototype.lambda = function wrapLambda (type, fn) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,9 @@ var DEFAULTS = { | |
verifyServerCert: true, | ||
active: true, | ||
logLevel: 'info', | ||
hostname: os.hostname(), | ||
hostname: os.hostname(), // TODO: Should we just let the http client default to this? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Defaulting in the http client sounds reasonable to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
apiRequestSize: 1024 * 1024, // TODO: Is this the right default | ||
apiRequestTime: 10, // TODO: Is this the right default. Should this be ms? | ||
stackTraceLimit: 50, | ||
captureExceptions: true, | ||
filterHttpHeaders: true, | ||
|
@@ -44,10 +46,10 @@ var DEFAULTS = { | |
sourceLinesSpanAppFrames: 0, | ||
sourceLinesSpanLibraryFrames: 0, | ||
errorMessageMaxLength: 2048, | ||
flushInterval: 10, | ||
flushInterval: 10, // TODO: Deprecate | ||
transactionMaxSpans: Infinity, | ||
transactionSampleRate: 1.0, | ||
maxQueueSize: 100, | ||
maxQueueSize: 100, // TODO: Deprecate | ||
serverTimeout: 30, | ||
disableInstrumentations: [] | ||
} | ||
|
@@ -61,6 +63,8 @@ var ENV_TABLE = { | |
active: 'ELASTIC_APM_ACTIVE', | ||
logLevel: 'ELASTIC_APM_LOG_LEVEL', | ||
hostname: 'ELASTIC_APM_HOSTNAME', | ||
apiRequestSize: 'ELASTIC_APM_API_REQUEST_SIZE', | ||
apiRequestTime: 'ELASTIC_APM_API_REQUEST_TIME', | ||
frameworkName: 'ELASTIC_APM_FRAMEWORK_NAME', | ||
frameworkVersion: 'ELASTIC_APM_FRAMEWORK_VERSION', | ||
stackTraceLimit: 'ELASTIC_APM_STACK_TRACE_LIMIT', | ||
|
@@ -98,6 +102,8 @@ var BOOL_OPTS = [ | |
] | ||
|
||
var NUM_OPTS = [ | ||
'apiRequestSize', | ||
'apiRequestTime', | ||
'stackTraceLimit', | ||
'abortedErrorThreshold', | ||
'sourceLinesErrorAppFrames', | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,13 +3,11 @@ | |
var fs = require('fs') | ||
var path = require('path') | ||
|
||
var AsyncValuePromise = require('async-value-promise') | ||
var hook = require('require-in-the-middle') | ||
var semver = require('semver') | ||
|
||
var Queue = require('../queue') | ||
var request = require('../request') | ||
var Transaction = require('./transaction') | ||
var truncate = require('../truncate') | ||
var shimmer = require('./shimmer') | ||
|
||
var MODULES = [ | ||
|
@@ -43,7 +41,6 @@ module.exports = Instrumentation | |
|
||
function Instrumentation (agent) { | ||
this._agent = agent | ||
this._queue = null | ||
this._started = false | ||
this.currentTransaction = null | ||
} | ||
|
@@ -56,19 +53,6 @@ Instrumentation.prototype.start = function () { | |
var self = this | ||
this._started = true | ||
|
||
var qopts = { | ||
flushInterval: this._agent._conf.flushInterval, | ||
maxQueueSize: this._agent._conf.maxQueueSize, | ||
logger: this._agent.logger | ||
} | ||
this._queue = new Queue(qopts, function onFlush (transactions, done) { | ||
AsyncValuePromise.all(transactions).then(function (transactions) { | ||
if (self._agent._conf.active && transactions.length > 0) { | ||
request.transactions(self._agent, transactions, done) | ||
} | ||
}, done) | ||
}) | ||
|
||
if (this._agent._conf.asyncHooks && semver.gte(process.version, '8.2.0')) { | ||
require('./async-hooks')(this) | ||
} else { | ||
|
@@ -106,27 +90,44 @@ Instrumentation.prototype._patchModule = function (exports, name, version, enabl | |
} | ||
|
||
Instrumentation.prototype.addEndedTransaction = function (transaction) { | ||
var agent = this._agent | ||
|
||
if (this._started) { | ||
var queue = this._queue | ||
var payload = agent._filters.process(transaction._encode()) // TODO: Update filter to expect this format | ||
if (!payload) return agent.logger.debug('transaction ignored by filter %o', {id: transaction.id}) | ||
truncate.transaction(payload) | ||
agent.logger.debug('sending transaction %o', {id: transaction.id}) | ||
if (agent._apmServer) agent._apmServer.sendTransaction(payload) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any way for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No you're right - if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just made that comment, because this check is already in an outer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yes... I'll remove it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
} else { | ||
agent.logger.debug('ignoring transaction %o', {id: transaction.id}) | ||
} | ||
} | ||
|
||
this._agent.logger.debug('adding transaction to queue %o', {id: transaction.id}) | ||
Instrumentation.prototype.addEndedSpan = function (span) { | ||
var agent = this._agent | ||
|
||
var payload = new AsyncValuePromise() | ||
if (this._started) { | ||
agent.logger.debug('encoding span %o', {trans: span.transaction.id, name: span.name, type: span.type}) | ||
span._encode(function (err, payload) { | ||
if (err) { | ||
agent.logger.error('error encoding span %o', {trans: span.transaction.id, name: span.name, type: span.type, error: err.message}) | ||
return | ||
} | ||
|
||
payload.catch(function (err) { | ||
this._agent.logger.error('error encoding transaction %s: %s', transaction.id, err.message) | ||
}) | ||
payload = agent._filters.process(payload) // TODO: Update filter to expect this format | ||
|
||
// Add the transaction payload to the queue instead of the transation | ||
// object it self to free up the transaction for garbage collection | ||
transaction._encode(function (err, _payload) { | ||
if (err) payload.reject(err) | ||
else payload.resolve(_payload) | ||
}) | ||
if (!payload) { | ||
agent.logger.debug('span ignored by filter %o', {trans: span.transaction.id, name: span.name, type: span.type}) | ||
return | ||
} | ||
|
||
queue.add(payload) | ||
truncate.span(payload) | ||
|
||
agent.logger.debug('sending span %o', {trans: span.transaction.id, name: span.name, type: span.type}) | ||
if (agent._apmServer) agent._apmServer.sendSpan(payload) | ||
}) | ||
} else { | ||
this._agent.logger.debug('ignoring transaction %o', {id: transaction.id}) | ||
agent.logger.debug('ignoring span %o', {trans: span.transaction.id, name: span.name, type: span.type}) | ||
} | ||
} | ||
|
||
|
@@ -216,7 +217,3 @@ Instrumentation.prototype._recoverTransaction = function (trans) { | |
|
||
this.currentTransaction = trans | ||
} | ||
|
||
Instrumentation.prototype.flush = function (cb) { | ||
this._queue.flush(cb) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, log something so we know when
_apmServer
is not set.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed