Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support APM Server intake API version 2 #465

Merged
merged 5 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/agent-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ Defaults to `unnamed`
You can alternatively set this via <<span-type,`span.type`>>.
Defaults to `custom.code`

When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.

See <<span-api,Span API>> docs for details on how to use custom spans.

Expand Down
19 changes: 1 addition & 18 deletions docs/span-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Defaults to `unnamed`
You can alternatively set this via <<span-type,`span.type`>>.
Defaults to `custom.code`

When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.

[[span-end]]
==== `span.end()`
Expand All @@ -86,20 +86,3 @@ span.end()
End the span.
If the span has already ended,
nothing happens.

A span that isn't ended before the parent transaction ends will be <<span-truncate,truncated>>.

[[span-truncate]]
==== `span.truncate()`

[source,js]
----
span.truncate()
----

Truncates and ends the span.
If the span is already ended or truncated,
nothing happens.

A truncated span is a special type of ended span.
It's used to indicate that the measured event took longer than the duration recorded by the span.
2 changes: 1 addition & 1 deletion docs/transaction-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Think of the transaction result as equivalent to the status code of an HTTP resp
transaction.end([result])
----

Ends the transaction and <<span-truncate,truncates>> all un-ended child spans.
Ends the transaction.
If the transaction has already ended,
nothing happens.

Expand Down
61 changes: 54 additions & 7 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ var connect = require('./middleware/connect')
var Filters = require('./filters')
var Instrumentation = require('./instrumentation')
var parsers = require('./parsers')
var request = require('./request')
var stackman = require('./stackman')
var symbols = require('./symbols')
var truncate = require('./truncate')

var IncomingMessage = http.IncomingMessage
var ServerResponse = http.ServerResponse
Expand All @@ -33,6 +33,7 @@ function Agent () {

this._instrumentation = new Instrumentation(this)
this._filters = new Filters()
this._apmServer = null

this._config()
}
Expand All @@ -43,6 +44,10 @@ Object.defineProperty(Agent.prototype, 'currentTransaction', {
}
})

Agent.prototype.destroy = function () {
if (this._apmServer) this._apmServer.destroy()
}

Agent.prototype.startTransaction = function () {
return this._instrumentation.startTransaction.apply(this._instrumentation, arguments)
}
Expand Down Expand Up @@ -131,15 +136,35 @@ Agent.prototype.start = function (opts) {
})
}

this._instrumentation.start()
this._apmServer = new ElasticAPMHttpClient({
// metadata
agentName: 'nodejs',
agentVersion: version,
serviceName: this._conf.serviceName,
serviceVersion: this._conf.serviceVersion,
frameworkName: this._conf.frameworkName,
frameworkVersion: this._conf.frameworkVersion,
hostname: this._conf.hostname,

this._httpClient = new ElasticAPMHttpClient({
// Sanitize conf
truncateStringsAt: config.INTAKE_STRING_MAX_SIZE, // TODO: Do we need to set this (it's the default value)

// HTTP conf
secretToken: this._conf.secretToken,
userAgent: userAgent,
serverUrl: this._conf.serverUrl,
rejectUnauthorized: this._conf.verifyServerCert,
serverTimeout: this._conf.serverTimeout * 1000
serverTimeout: this._conf.serverTimeout * 1000,

// Streaming conf
size: this._conf.apiRequestSize,
time: this._conf.apiRequestTime * 1000
})
this._apmServer.on('error', err => {
this.logger.error('An error occrued while communicating with the APM Server:', err.message)
})

this._instrumentation.start()

Error.stackTraceLimit = this._conf.stackTraceLimit
if (this._conf.captureExceptions) this.handleUncaughtExceptions()
Expand Down Expand Up @@ -287,8 +312,25 @@ Agent.prototype.captureError = function (err, opts, cb) {
}

function send (error) {
agent.logger.info('logging error %s with Elastic APM', id)
request.errors(agent, [error], cb)
error = agent._filters.process(error) // TODO: Update filter to expect this format

if (!error) {
agent.logger.debug('error ignored by filter %o', {id: id})
cb()
return
}

truncate.error(error, agent._conf)

if (agent._apmServer) {
agent.logger.info(`Sending error ${id} to Elastic APM`)
agent._apmServer.sendError(error, function () {
agent._apmServer.flush(cb)
})
} else {
// TODO: Swallow this error just as it's done in agent.flush()?
process.nextTick(cb.bind(null, new Error('cannot capture error before agent is started')))
}
}
}

Expand All @@ -314,7 +356,12 @@ Agent.prototype.handleUncaughtExceptions = function (cb) {
}

Agent.prototype.flush = function (cb) {
this._instrumentation.flush(cb)
if (this._apmServer) {
this._apmServer.flush(cb)
} else {
this.logger.warn(new Error('cannot flush agent before it is started'))
process.nextTick(cb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worthwhile for the flush callback be able to receive that error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it out as this only occurs if the agent isn't started. So from the outside I want the user to use the module in the same way no matter if the agent is started or not. And normally errors coming from the _apmServer is just logged, so I think it's best to maintain that behavior here as well.

Copy link
Contributor Author

@watson watson Aug 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I just realized that the callback given to captureError will pass a similar error along in the callback: https://github.com/elastic/apm-agent-nodejs/pull/465/files#diff-7403da6ce2244c65d641fdfcc095be93R331

But I think maybe that should be avoided in captureError. If so the captureError callback will never be called with an error either.

}
}

Agent.prototype.lambda = function wrapLambda (type, fn) {
Expand Down
12 changes: 8 additions & 4 deletions lib/config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

var fs = require('fs')
var os = require('os')
var path = require('path')

var normalizeBool = require('normalize-bool')
Expand All @@ -28,7 +27,8 @@ var DEFAULTS = {
verifyServerCert: true,
active: true,
logLevel: 'info',
hostname: os.hostname(),
apiRequestSize: 1024 * 1024, // TODO: Is this the right default
apiRequestTime: 10, // TODO: Is this the right default. Should this be ms?
stackTraceLimit: 50,
captureExceptions: true,
filterHttpHeaders: true,
Expand All @@ -44,10 +44,10 @@ var DEFAULTS = {
sourceLinesSpanAppFrames: 0,
sourceLinesSpanLibraryFrames: 0,
errorMessageMaxLength: 2048,
flushInterval: 10,
flushInterval: 10, // TODO: Deprecate
transactionMaxSpans: Infinity,
transactionSampleRate: 1.0,
maxQueueSize: 100,
maxQueueSize: 100, // TODO: Deprecate
serverTimeout: 30,
disableInstrumentations: []
}
Expand All @@ -61,6 +61,8 @@ var ENV_TABLE = {
active: 'ELASTIC_APM_ACTIVE',
logLevel: 'ELASTIC_APM_LOG_LEVEL',
hostname: 'ELASTIC_APM_HOSTNAME',
apiRequestSize: 'ELASTIC_APM_API_REQUEST_SIZE',
apiRequestTime: 'ELASTIC_APM_API_REQUEST_TIME',
frameworkName: 'ELASTIC_APM_FRAMEWORK_NAME',
frameworkVersion: 'ELASTIC_APM_FRAMEWORK_VERSION',
stackTraceLimit: 'ELASTIC_APM_STACK_TRACE_LIMIT',
Expand Down Expand Up @@ -98,6 +100,8 @@ var BOOL_OPTS = [
]

var NUM_OPTS = [
'apiRequestSize',
'apiRequestTime',
'stackTraceLimit',
'abortedErrorThreshold',
'sourceLinesErrorAppFrames',
Expand Down
67 changes: 32 additions & 35 deletions lib/instrumentation/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
var fs = require('fs')
var path = require('path')

var AsyncValuePromise = require('async-value-promise')
var hook = require('require-in-the-middle')
var semver = require('semver')

var Queue = require('../queue')
var request = require('../request')
var Transaction = require('./transaction')
var truncate = require('../truncate')
var shimmer = require('./shimmer')

var MODULES = [
Expand Down Expand Up @@ -43,7 +41,6 @@ module.exports = Instrumentation

function Instrumentation (agent) {
this._agent = agent
this._queue = null
this._started = false
this.currentTransaction = null
}
Expand All @@ -56,19 +53,6 @@ Instrumentation.prototype.start = function () {
var self = this
this._started = true

var qopts = {
flushInterval: this._agent._conf.flushInterval,
maxQueueSize: this._agent._conf.maxQueueSize,
logger: this._agent.logger
}
this._queue = new Queue(qopts, function onFlush (transactions, done) {
AsyncValuePromise.all(transactions).then(function (transactions) {
if (self._agent._conf.active && transactions.length > 0) {
request.transactions(self._agent, transactions, done)
}
}, done)
})

if (this._agent._conf.asyncHooks && semver.gte(process.version, '8.2.0')) {
require('./async-hooks')(this)
} else {
Expand Down Expand Up @@ -106,27 +90,44 @@ Instrumentation.prototype._patchModule = function (exports, name, version, enabl
}

Instrumentation.prototype.addEndedTransaction = function (transaction) {
var agent = this._agent

if (this._started) {
var queue = this._queue
var payload = agent._filters.process(transaction._encode()) // TODO: Update filter to expect this format
if (!payload) return agent.logger.debug('transaction ignored by filter %o', {id: transaction.id})
truncate.transaction(payload)
agent.logger.debug('sending transaction %o', {id: transaction.id})
agent._apmServer.sendTransaction(payload)
} else {
agent.logger.debug('ignoring transaction %o', {id: transaction.id})
}
}

this._agent.logger.debug('adding transaction to queue %o', {id: transaction.id})
Instrumentation.prototype.addEndedSpan = function (span) {
var agent = this._agent

var payload = new AsyncValuePromise()
if (this._started) {
agent.logger.debug('encoding span %o', {trans: span.transaction.id, name: span.name, type: span.type})
span._encode(function (err, payload) {
if (err) {
agent.logger.error('error encoding span %o', {trans: span.transaction.id, name: span.name, type: span.type, error: err.message})
return
}

payload.catch(function (err) {
this._agent.logger.error('error encoding transaction %s: %s', transaction.id, err.message)
})
payload = agent._filters.process(payload) // TODO: Update filter to expect this format

// Add the transaction payload to the queue instead of the transation
// object it self to free up the transaction for garbage collection
transaction._encode(function (err, _payload) {
if (err) payload.reject(err)
else payload.resolve(_payload)
})
if (!payload) {
agent.logger.debug('span ignored by filter %o', {trans: span.transaction.id, name: span.name, type: span.type})
return
}

queue.add(payload)
truncate.span(payload)

agent.logger.debug('sending span %o', {trans: span.transaction.id, name: span.name, type: span.type})
if (agent._apmServer) agent._apmServer.sendSpan(payload)
})
} else {
this._agent.logger.debug('ignoring transaction %o', {id: transaction.id})
agent.logger.debug('ignoring span %o', {trans: span.transaction.id, name: span.name, type: span.type})
}
}

Expand Down Expand Up @@ -216,7 +217,3 @@ Instrumentation.prototype._recoverTransaction = function (trans) {

this.currentTransaction = trans
}

Instrumentation.prototype.flush = function (cb) {
this._queue.flush(cb)
}
21 changes: 5 additions & 16 deletions lib/instrumentation/span.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ module.exports = Span
function Span (transaction) {
this.transaction = transaction
this.started = false
this.truncated = false
this.ended = false
this.name = null
this.type = null
Expand Down Expand Up @@ -50,18 +49,6 @@ Span.prototype.customStackTrace = function (stackObj) {
this._recordStackTrace(stackObj)
}

Span.prototype.truncate = function () {
if (!this.started) {
this._agent.logger.debug('tried to truncate non-started span - ignoring %o', {id: this.transaction.id, name: this.name, type: this.type})
return
} else if (this.ended) {
this._agent.logger.debug('tried to truncate already ended span - ignoring %o', {id: this.transaction.id, name: this.name, type: this.type})
return
}
this.truncated = true
this.end()
}

Span.prototype.end = function () {
if (!this.started) {
this._agent.logger.debug('tried to call span.end() on un-started span %o', {id: this.transaction.id, name: this.name, type: this.type})
Expand All @@ -75,8 +62,8 @@ Span.prototype.end = function () {
this._agent._instrumentation._recoverTransaction(this.transaction)

this.ended = true
this._agent.logger.debug('ended span %o', {id: this.transaction.id, name: this.name, type: this.type, truncated: this.truncated})
this.transaction._recordEndedSpan(this)
this._agent.logger.debug('ended span %o', {id: this.transaction.id, name: this.name, type: this.type})
this._agent._instrumentation.addEndedSpan(this)
}

Span.prototype.duration = function () {
Expand Down Expand Up @@ -157,8 +144,10 @@ Span.prototype._encode = function (cb) {
}

var payload = {
transactionId: self.transaction.id,
timestamp: self.transaction.timestamp,
name: self.name,
type: self.truncated ? self.type + '.truncated' : self.type,
type: self.type,
start: self.offsetTime(),
duration: self.duration()
}
Expand Down
Loading