diff --git a/.codecov.yml b/.codecov.yml new file mode 100644 index 0000000..1336c3f --- /dev/null +++ b/.codecov.yml @@ -0,0 +1,2 @@ +# Disable comments on Pull Requests +comment: false diff --git a/.gitignore b/.gitignore index 3c3629e..16d4f51 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ +.nyc_output node_modules diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..34f191d --- /dev/null +++ b/.npmignore @@ -0,0 +1,4 @@ +.codecov.yml +.travis.yml +.nyc_output +test diff --git a/.travis.yml b/.travis.yml index e27fab1..927dec4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,6 @@ language: node_js node_js: - '10' -- '9' - '8' -- '7' - '6' -- '5' -- '4' +after_success: npm run coverage diff --git a/README.md b/README.md index 868ade8..b7816e9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # elastic-apm-http-client [![Build status](https://travis-ci.org/elastic/apm-nodejs-http-client.svg?branch=master)](https://travis-ci.org/elastic/apm-nodejs-http-client) +[![codecov](https://img.shields.io/codecov/c/github/elastic/apm-nodejs-http-client.svg)](https://codecov.io/gh/elastic/apm-nodejs-http-client) [![Standard - JavaScript Style Guide](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](https://github.com/feross/standard) A low-level HTTP client for communicating with the Elastic APM intake @@ -22,73 +23,155 @@ npm install elastic-apm-http-client ## Example Usage ```js -var client = require('elastic-apm-http-client')({ - userAgent: '...' +const Client = require('elastic-apm-http-client') + +const client = new Client({ + userAgent: 'My Custom Elastic APM Agent', + meta: function () { + return { + // meta data object sent as the first ndjson object in all HTTP + // requests to the APM Server + } + } }) -client.request('errors', body, function (err, res, body) { - if (err) throw err - console.log(body) -}) +const span = { + name: 'SELECT FROM users', + duration: 42, + start: 0, + type: 'db.mysql.query' +} + +client.sendSpan(span) ``` ## API -The module exposes an initialize function which takes a single options -hash as the 1st argument: +### `new Client(options)` -- `userAgent` - The HTTP user agent that your module should identify it - self with -- `secretToken` - (optional) The Elastic APM intake API secret token -- `serverUrl` - (optional) The APM Server URL (default: - `http://localhost:8200`) -- `rejectUnauthorized` - (optional) Set to `false` if the client - shouldn't verify the APM Server TLS certificates (default: `true`) -- `serverTimeout` - (optional) Set request timeout in milliseconds +Construct a new `client` object. Data given to the client will be +converted to ndjson, compressed using gzip, and streamed to the APM +Server. -The init function will return a low level HTTP client primed for -communicating with the Elastic APM intake API. +Arguments: -### `client.request(endpoint[, headers], body, callback)` +- `options` - An object containing config options (see below) -#### endpoint +HTTP client configuration: -The Elastic APM intake API currently support the following endpoints: +- `userAgent` - (required) The HTTP user agent that your module should + identify it self as +- `meta` - (required) A function which will be called every time the a + new HTTP request is being made to the APM Server. It's expected that + you return a metadata object. This object will be sent as the first + ndjson object to the API +- `secretToken` - The Elastic APM intake API secret token +- `serverUrl` - The APM Server URL (default: `http://localhost:8200`) +- `headers` - An object containing extra HTTP headers that should be + used when making HTTP requests to he APM Server +- `rejectUnauthorized` - Set to `false` if the client shouldn't verify + the APM Server TLS certificates (default: `true`) +- `serverTimeout` - HTTP request timeout in milliseconds. If no data is + sent or received on the socket for this amount of time, the request + will be aborted. It's not recommended to set a `serverTimeout` lower + than the `time` config option. That might result in healthy requests + being aborted prematurely (default: `15000` ms) +- `keepAlive` - If set the `false` the client will not reuse sockets + between requests (default: `true`) +- `keepAliveMsecs` - When using the `keepAlive` option, specifies the + initial delay for TCP Keep-Alive packets. Ignored when the `keepAlive` + option is `false` or `undefined` (default: `1000` ms) +- `maxSockets` - Maximum number of sockets to allow per host (default: + `Infinity`) +- `maxFreeSockets` - Maximum number of sockets to leave open in a free + state. Only relevant if `keepAlive` is set to `true` (default: `256`) -- `errors` -- `transactions` +Streaming configuration: -The default full URL's for those are: +- `size` - The maxiumum compressed body size (in bytes) of each HTTP + request to the APM Server. An overshoot of up to the size of the + internal zlib buffer should be expected as the buffer is flushed after + this limit is reached. The default zlib buffer size is 16 kb (default: + `1048576` bytes / 1 MB) +- `time` - The maxiumum number of milliseconds a streaming HTTP request + to the APM Server can be ongoing before it's ended (default: `10000` + ms) -``` -http://localhost:8200/ -``` +### Event: `close` + +The `close` event is emitted when the client and any of its underlying +resources have been closed. The event indicates that no more events will +be emitted, and no more data can be sent by the client. + +### Event: `error` + +Emitted if an error occurs. The listener callback is passed a single +Error argument when called. + +The client is not closed when the `error` event is emitted. + +### Event: `finish` + +The `finish` event is emitted after the `client.end()` method has been +called, and all data has been flushed to the underlying system. + +### `client.sendSpan(span[, callback])` + +Send a span to the APM Server. + +Arguments: + +- `span` - A span object that can be serialized to JSON +- `callback` - Callback is called when the `span` have been flushed to + the underlying system + +### `client.sendTransaction(transaction[, callback])` + +Send a transaction to the APM Server. + +Arguments: + +- `transaction` - A transaction object that can be serialized to JSON +- `callback` - Callback is called when the `transaction` have been + flushed to the underlying system + +### `client.sendError(error[, callback])` + +Send a error to the APM Server. + +Arguments: + +- `error` - A error object that can be serialized to JSON +- `callback` - Callback is called when the `error` have been flushed to + the underlying system + +### `client.flush([callback])` + +Flush the internal buffer and end the current HTTP request to the APM +Server. If no HTTP request is in process nothing happens. -When specifying the `endpoint` argument in the `client.request()` -method, you just have to specify that last part of the URL, e.g. -"releases". +Arguments: -#### headers +- `callback` - Callback is called when the internal buffer have been + flushed and the HTTP request ended. If no HTTP request is in progress + the callback is called in the next tick. -An optional object that you can use to supply custom headers that should -be sent to the Elastic APM intake API. +### `client.end([callback])` -#### body +Calling the `client.end()` method signals that no more data will be sent +to the `client`. If the internal buffer contains any data, this is +flushed before ending. -The body should be in the form of a JavaScript object literal. The -elastic-apm-http-client will take care of encoding it correctly. +Arguments: -#### callback +- `callback` - If provided, the optional `callback` function is attached + as a listener for the 'finish' event -The callback function is called with 3 arguments: +### `client.destroy()` -1. An error when applicable (usually from the - [http.ClientRequest](https://nodejs.org/api/http.html#http_class_http_clientrequest) - object) -1. An - [http.IncomingMessage](https://nodejs.org/api/http.html#http_http_incomingmessage) - object -1. The response body (as a String) +Destroy the `client`. After this call, the client has ended and +subsequent calls to `sendSpan()`, `sendTransaction()`, `sendError()`, +`flush()`, or `end()` will result in an error. ## License diff --git a/index.js b/index.js index 3086e34..1b5e44e 100644 --- a/index.js +++ b/index.js @@ -1,75 +1,280 @@ 'use strict' -var http = require('http') -var parseUrl = require('url').parse -var zlib = require('zlib') -var stringify = require('fast-safe-stringify') -var pkg = require('./package') +const util = require('util') +const parseUrl = require('url').parse +const zlib = require('zlib') +const Writable = require('readable-stream').Writable +const pump = require('pump') +const ndjson = require('ndjson') +const eos = require('end-of-stream') +const safeStringify = require('fast-safe-stringify') +const streamToBuffer = require('fast-stream-to-buffer') +const StreamChopper = require('stream-chopper') +const pkg = require('./package') -var SUB_USER_AGENT = pkg.name + '/' + pkg.version +const flush = Symbol('flush') -var Client = module.exports = function (opts) { +module.exports = Client + +// All sockets on the agent are unreffed when they are created. This means that +// when those are the only handles left, the `beforeExit` event will be +// emitted. By listening for this we can make sure to end the requests properly +// before exiting. This way we don't keep the process running until the `time` +// timeout happens. +const clients = [] +process.once('beforeExit', function () { + clients.forEach(function (client) { + if (!client) return // clients remove them selfs from the array when they end + client.end() + }) +}) + +util.inherits(Client, Writable) + +function Client (opts) { if (!(this instanceof Client)) return new Client(opts) - opts = opts || {} + opts = normalizeOptions(opts) - if (!opts.userAgent) throw new Error('Missing required option: userAgent') + Writable.call(this, opts) - this.secretToken = opts.secretToken || null - this.userAgent = opts.userAgent + ' ' + SUB_USER_AGENT + const errorproxy = (err) => { + if (this._destroyed === false) this.emit('error', err) + } - var url = parseUrl(opts.serverUrl || 'http://localhost:8200') + const fail = () => { + if (this._writableState.ending === false) this.destroy() + } - this._api = { - hostname: url.hostname, - port: url.port, - transport: url.protocol === 'https:' ? require('https') : http, - path: url.path === '/' ? '/v1/' : url.path + '/v1/', - rejectUnauthorized: opts.rejectUnauthorized !== false, - serverTimeout: opts.serverTimeout + this._active = false + this._destroyed = false + this._onflushed = null + this._transport = require(opts.serverUrl.protocol.slice(0, -1)) // 'http:' => 'http' + this._agent = new this._transport.Agent(opts) + this._stream = ndjson.serialize() + this._chopper = new StreamChopper({ + size: opts.size, + time: opts.time, + type: StreamChopper.overflow + }).on('stream', onStream(opts, this, errorproxy)) + + this._stream.on('error', errorproxy) + this._chopper.on('error', errorproxy) + eos(this._stream, {error: false}, fail) + eos(this._chopper, {error: false}, fail) + + pump(this._stream, this._chopper) + + this._index = clients.length + clients.push(this) +} + +// re-ref the open socket handles +Client.prototype._ref = function () { + Object.keys(this._agent.sockets).forEach(remote => { + this._agent.sockets[remote].forEach(function (socket) { + socket.ref() + }) + }) +} + +Client.prototype._write = function (obj, enc, cb) { + if (this._destroyed) { + this.emit('error', new Error('write called on destroyed Elastic APM client')) + cb() + } else if (obj === flush) { + if (this._active) { + this._onflushed = cb + this._chopper.chop() + } else { + this._chopper.chop(cb) + } + } else { + this._stream.write(obj, cb) + } +} + +Client.prototype.sendSpan = function (span, cb) { + return this.write({span}, cb) +} + +Client.prototype.sendTransaction = function (transaction, cb) { + return this.write({transaction}, cb) +} + +Client.prototype.sendError = function (error, cb) { + return this.write({error}, cb) +} + +Client.prototype.flush = function (cb) { + if (this._destroyed) { + this.emit('error', new Error('flush called on destroyed Elastic APM client')) + if (cb) process.nextTick(cb) + return + } + + // Write the special "flush" signal. We do this so that the order of writes + // and flushes are kept. If we where to just flush the client right here, the + // internal Writable buffer might still contain data that hasn't yet been + // given to the _write function. + return this.write(flush, cb) +} + +Client.prototype._final = function (cb) { + if (this._destroyed) { + this.emit('error', new Error('end called on destroyed Elastic APM client')) + cb() + return } + clients[this._index] = null // remove global reference to ease garbage collection + this._ref() + this._stream.end() + cb() +} + +// Overwrite destroy instead of using _destroy because readable-stream@2 can't +// be trusted. After a stream is destroyed, we want a call to either +// client.write() or client.end() to both emit an error and call the provided +// callback. Unfortunately, in readable-stream@2 and Node.js <10, this is not +// consistent. This has been fixed in Node.js 10 and will also be fixed in +// readable-stream@3. +Client.prototype.destroy = function (err) { + if (this._destroyed) return + this._destroyed = true + if (err) this.emit('error', err) + clients[this._index] = null // remove global reference to ease garbage collection + this._stream.destroy() + this._chopper.destroy() + this._agent.destroy() + process.nextTick(() => { + this.emit('close') + }) } -Client.prototype.request = function (endpoint, headers, body, cb) { - var self = this - - if (typeof body === 'function') return this.request(endpoint, {}, headers, body) - if (!headers) headers = {} - - zlib.gzip(stringify(body), function (err, buffer) { - if (err) return cb(err) - - if (self.secretToken) headers['Authorization'] = 'Bearer ' + self.secretToken - headers['Content-Type'] = 'application/json' - headers['Content-Encoding'] = 'gzip' - headers['Content-Length'] = buffer.length - headers['Accept'] = 'application/json' - headers['User-Agent'] = self.userAgent - - var opts = { - method: 'POST', - hostname: self._api.hostname, - port: self._api.port, - path: self._api.path + endpoint, - headers: headers, - rejectUnauthorized: self._api.rejectUnauthorized +function onStream (opts, client, onerror) { + const meta = opts.meta + const serverTimeout = opts.serverTimeout + opts = getRequestOptions(opts, client._agent) + + return function (stream, next) { + const onerrorproxy = (err) => { + stream.removeListener('error', onerrorproxy) + compressor.removeListener('error', onerrorproxy) + req.removeListener('error', onerrorproxy) + stream.destroy() + onerror(err) } - var req = self._api.transport.request(opts, function (res) { - var buffers = [] - res.on('data', buffers.push.bind(buffers)) - res.on('end', function () { - cb(null, res, Buffer.concat(buffers).toString('utf8')) - }) + client._active = true + + const req = client._transport.request(opts, onResult(onerror)) + const compressor = zlib.createGzip() + + // Mointor streams for errors so that we can make sure to destory the + // output stream as soon as that occurs + stream.on('error', onerrorproxy) + compressor.on('error', onerrorproxy) + req.on('error', onerrorproxy) + + req.on('socket', function (socket) { + // Sockets will automatically be unreffed by the HTTP agent when they are + // not in use by an HTTP request, but as we're keeping the HTTP request + // open, we need to unref the socket manually + socket.unref() }) - if (isFinite(self._api.serverTimeout)) { - req.setTimeout(self._api.serverTimeout, function () { + if (Number.isFinite(serverTimeout)) { + req.setTimeout(serverTimeout, function () { req.abort() }) } - req.on('error', cb) - req.end(buffer) + pump(stream, compressor, req, function () { + // This function is technically called with an error, but because we + // manually attach error listeners on all the streams in the pipeline + // above, we can safely ignore it. + // + // We do this for two reasons: + // + // 1) This callback might be called a few ticks too late, in which case a + // race condition could occur where the user would write to the output + // stream before the rest of the system discovered that it was + // unwritable + // + // 2) The error might occur post the end of the stream. In that case we + // would not get it here as the internal error listener would have + // been removed and the stream would throw the error instead + + client._active = false + if (client._onflushed) { + client._onflushed() + client._onflushed = null + } + + next() + }) + + // All requests to the APM Server must start with a metadata object + stream.write(safeStringify({metadata: meta()}) + '\n') + } +} + +function onResult (onerror) { + return streamToBuffer.onStream(function (err, buf, res) { + if (err) return onerror(err) + if (res.statusCode < 200 || res.statusCode > 299) { + const err = new Error('Unexpected response code from APM Server: ' + res.statusCode) + if (buf.length > 0) { + err.result = buf.toString('utf8') + if (res.headers['content-type'] === 'application/json') { + try { + err.result = JSON.parse(err.result).error || err.result + } catch (e) {} + } + } + onerror(err) + } }) } + +function normalizeOptions (opts) { + if (!opts.userAgent) throw new Error('Missing required option: userAgent') + if (!opts.meta) throw new Error('Missing required option: meta') + + const normalized = Object.assign({}, opts, {objectMode: true}) + + // default values + if (!normalized.size && normalized.size !== 0) normalized.size = 1024 * 1024 + if (!normalized.time && normalized.time !== 0) normalized.time = 10000 + if (!normalized.serverTimeout && normalized.serverTimeout !== 0) normalized.serverTimeout = 15000 + if (!normalized.serverUrl) normalized.serverUrl = 'http://localhost:8200' + normalized.keepAlive = normalized.keepAlive !== false + + // process + normalized.serverUrl = parseUrl(normalized.serverUrl) + + return normalized +} + +function getRequestOptions (opts, agent) { + const defaultPath = '/v2/intake' + return { + agent: agent, + rejectUnauthorized: opts.rejectUnauthorized !== false, + hostname: opts.serverUrl.hostname, + port: opts.serverUrl.port, + method: 'POST', + path: opts.serverUrl.path === '/' ? defaultPath : opts.serverUrl.path + defaultPath, + headers: getHeaders(opts) + } +} + +function getHeaders (opts) { + const headers = {} + if (opts.secretToken) headers['Authorization'] = 'Bearer ' + opts.secretToken + headers['Content-Type'] = 'application/x-ndjson' + headers['Content-Encoding'] = 'gzip' + headers['Accept'] = 'application/json' + headers['User-Agent'] = opts.userAgent + ' ' + pkg.name + '/' + pkg.version + return Object.assign(headers, opts.headers) +} diff --git a/package.json b/package.json index 74026cc..84f59a1 100644 --- a/package.json +++ b/package.json @@ -7,19 +7,30 @@ "test": "test" }, "scripts": { - "test": "standard && tape test.js" + "coverage": "nyc report --reporter=text-lcov > coverage.lcov && codecov", + "test": "standard && nyc node test/test.js" + }, + "engines": { + "node": "6 || 8 || 10" }, "author": "Thomas Watson (https://twitter.com/wa7son)", "license": "MIT", "dependencies": { - "fast-safe-stringify": "^2.0.4" + "end-of-stream": "^1.4.1", + "fast-safe-stringify": "^2.0.4", + "fast-stream-to-buffer": "^1.0.0", + "ndjson": "^1.5.0", + "pump": "^3.0.0", + "readable-stream": "^2.3.6", + "stream-chopper": "^1.1.1" }, "devDependencies": { + "codecov": "^3.0.4", "https-pem": "^2.0.0", - "nock": "^9.2.6", + "nyc": "^12.0.2", "semver": "^5.5.0", "standard": "^11.0.1", - "tape": "^4.9.0" + "tape": "^4.9.1" }, "repository": { "type": "git", diff --git a/test.js b/test.js deleted file mode 100644 index af30de0..0000000 --- a/test.js +++ /dev/null @@ -1,221 +0,0 @@ -'use strict' - -var zlib = require('zlib') -var http = require('http') -var https = require('https') -var test = require('tape') -var nock = require('nock') -var pem = require('https-pem') -var semver = require('semver') -var Client = require('./') - -test('throw if missing required options', function (t) { - t.throws(function () { - Client() - }) - t.end() -}) - -test('only userAgent should be required', function (t) { - t.doesNotThrow(function () { - Client({userAgent: 'foo'}) - }) - t.end() -}) - -test('#request()', function (t) { - var encode = function (body, cb) { - zlib.gzip(JSON.stringify(body), function (err, buffer) { - if (err) throw err - cb(buffer) - }) - } - var body = { foo: 'bar' } - - encode(body, function (buffer) { - t.test('normal request', function (t) { - var client = Client({secretToken: 'secret', userAgent: 'foo'}) - var scope = nock('http://localhost:8200') - .matchHeader('Authorization', 'Bearer secret') - .matchHeader('Content-Type', 'application/json') - .matchHeader('Content-Encoding', 'gzip') - .matchHeader('Content-Length', String(buffer.length)) - .matchHeader('User-Agent', 'foo elastic-apm-http-client/' + require('./package').version) - .post('/v1/endpoint', function (body) { - t.equal(body, buffer.toString('hex')) - return true - }) - .reply(202) - - client.request('endpoint', body, function (err, res, body) { - t.error(err) - t.equal(res.statusCode, 202) - t.equal(body, '') - scope.done() - t.end() - }) - }) - - t.test('no secretToken', function (t) { - var client = Client({userAgent: 'foo'}) - var scope = nock('http://localhost:8200') - .post('/v1/endpoint', function (body, a, b) { - t.ok('content-encoding' in this.headers) - t.notOk('authorization' in this.headers) - return true - }) - .reply() - - client.request('endpoint', body, function (err, res, body) { - t.error(err) - scope.done() - t.end() - }) - }) - - t.test('request with error', function (t) { - var client = Client({userAgent: 'foo'}) - var scope = nock('http://localhost:8200') - .post('/v1/endpoint', function (body) { - t.equal(body, buffer.toString('hex')) - return true - }) - .reply(500, { error: 'foo' }) - - client.request('endpoint', body, function (err, res, body) { - t.error(err) - t.equal(res.statusCode, 500) - t.deepEqual(body, '{"error":"foo"}') - scope.done() - t.end() - }) - }) - - t.test('with custom header', function (t) { - var client = Client({userAgent: 'foo'}) - var scope = nock('http://localhost:8200') - .matchHeader('X-Foo', 'bar') - .post('/v1/endpoint', function (body) { - t.equal(body, buffer.toString('hex')) - return true - }) - .reply(202) - - var headers = { 'X-Foo': 'bar' } - - client.request('endpoint', headers, body, function (err, res, body) { - t.error(err) - t.equal(res.statusCode, 202) - t.equal(body, '') - scope.done() - t.end() - }) - }) - - t.test('socket hang up', function (t) { - var server = http.createServer(function (req, res) { - req.socket.destroy() - }) - - server.listen(function () { - var opts = { - secretToken: 'test', - userAgent: 'test', - serverUrl: 'http://localhost:' + server.address().port - } - - var client = Client(opts) - - client.request('endpoint', body, function (err, res, body) { - t.equal(err.message, 'socket hang up') - t.equal(err.code, 'ECONNRESET') - server.close() - t.end() - }) - }) - }) - - t.test('reject unauthorized TLS by default', function (t) { - var server = https.createServer(pem, function (req, res) { - res.end('secret') - }) - - server.listen(function () { - var opts = { - userAgent: 'test', - serverUrl: 'https://localhost:' + server.address().port - } - - var client = Client(opts) - - client.request('endpoint', body, function (err, res, body) { - if (semver.gte(process.version, '0.12.0')) { - t.equal(err.message, 'self signed certificate') - t.equal(err.code, 'DEPTH_ZERO_SELF_SIGNED_CERT') - } else { - // Node.js v0.10 had the code as the message (and no code) - t.equal(err.message, 'DEPTH_ZERO_SELF_SIGNED_CERT') - } - server.close() - t.end() - }) - }) - }) - - t.test('allow unauthorized TLS by if asked', function (t) { - var server = https.createServer(pem, function (req, res) { - res.end('secret') - }) - - server.listen(function () { - var opts = { - userAgent: 'test', - serverUrl: 'https://localhost:' + server.address().port, - rejectUnauthorized: false - } - - var client = Client(opts) - - client.request('endpoint', body, function (err, res, body) { - t.error(err) - t.equal(body, 'secret') - server.close() - t.end() - }) - }) - }) - - t.test('serverUrl contains path', function (t) { - var client = Client({userAgent: 'foo', serverUrl: 'http://localhost:8200/sub'}) - var scope = nock('http://localhost:8200') - .post('/sub/v1/endpoint') - .reply() - - client.request('endpoint', body, function (err, res, body) { - t.error(err) - scope.done() - t.end() - }) - }) - - t.test('socket timeout', function (t) { - var client = Client({ - secretToken: 'secret', - userAgent: 'foo', - serverTimeout: 1000 - }) - - var scope = nock('http://localhost:8200') - .post('/v1/endpoint') - .socketDelay(2000) - .reply(200) - - client.request('endpoint', body, function (err, res, body) { - t.ok(err) - t.equal(err.code, 'ECONNRESET') - scope.done() - t.end() - }) - }) - }) -}) diff --git a/test/test.js b/test/test.js new file mode 100644 index 0000000..fb743d5 --- /dev/null +++ b/test/test.js @@ -0,0 +1,956 @@ +'use strict' + +const path = require('path') +const exec = require('child_process').exec +const http = require('http') +const test = require('tape') +const semver = require('semver') +const utils = require('./utils') +const pkg = require('../package') +const Client = require('../') + +const APMServer = utils.APMServer +const processReq = utils.processReq +const assertReq = utils.assertReq +const onmeta = utils.onmeta + +/** + * Setup and config + */ + +test('package', function (t) { + // these values are in the User-Agent header tests, so we need to make sure + // they are as we expect + t.equal(pkg.name, 'elastic-apm-http-client') + t.ok(semver.valid(pkg.version)) + t.end() +}) + +test('throw if missing required options', function (t) { + t.throws(function () { + new Client() // eslint-disable-line no-new + }) + t.end() +}) + +test('throw if only userAgent is provided', function (t) { + t.throws(function () { + new Client({userAgent: 'foo'}) // eslint-disable-line no-new + }) + t.end() +}) + +test('throw if only meta is provided', function (t) { + t.throws(function () { + new Client({meta: onmeta}) // eslint-disable-line no-new + }) + t.end() +}) + +test('only userAgent and meta should be required', function (t) { + t.doesNotThrow(function () { + new Client({ // eslint-disable-line no-new + userAgent: 'foo', + meta: onmeta + }) + }) + t.end() +}) + +test('should work without new', function (t) { + const client = Client({ + userAgent: 'foo', + meta: onmeta + }) + t.ok(client instanceof Client) + t.end() +}) + +test('null value config options shouldn\'t throw', function (t) { + t.doesNotThrow(function () { + new Client({ // eslint-disable-line no-new + userAgent: 'foo', // valid, so we don't throw + meta: onmeta, // valid, so we don't throw + size: null, + time: null, + serverTimeout: null, + type: null, + serverUrl: null, + keepAlive: null + }) + }) + t.end() +}) + +test('no secretToken', function (t) { + t.plan(1) + const server = APMServer(function (req, res) { + t.notOk('authorization' in req.headers) + res.end() + server.close() + t.end() + }) + server.listen(function () { + const client = new Client({ + serverUrl: 'http://localhost:' + server.address().port, + userAgent: 'foo', + meta: onmeta + }) + client.sendSpan({foo: 42}) + client.end() + }) +}) + +test('custom headers', function (t) { + t.plan(1) + const server = APMServer(function (req, res) { + t.equal(req.headers['x-foo'], 'bar') + res.end() + server.close() + t.end() + }).listen(function () { + const client = new Client({ + serverUrl: 'http://localhost:' + server.address().port, + userAgent: 'foo', + meta: onmeta, + headers: { + 'X-Foo': 'bar' + } + }) + client.sendSpan({foo: 42}) + client.end() + }) +}) + +test('serverUrl contains path', function (t) { + t.plan(1) + const server = APMServer(function (req, res) { + t.equal(req.url, '/subpath/v2/intake') + res.end() + server.close() + t.end() + }).listen(function () { + const client = new Client({ + serverUrl: 'http://localhost:' + server.address().port + '/subpath', + userAgent: 'foo', + meta: onmeta + }) + client.sendSpan({foo: 42}) + client.end() + }) +}) + +test('reject unauthorized TLS by default', function (t) { + t.plan(3) + const server = APMServer({secure: true}, function (req, res) { + t.fail('should should not get request') + }).client(function (client) { + client.on('error', function (err) { + t.ok(err instanceof Error) + t.equal(err.message, 'self signed certificate') + t.equal(err.code, 'DEPTH_ZERO_SELF_SIGNED_CERT') + server.close() + t.end() + }) + client.sendSpan({foo: 42}) + client.end() + }) +}) + +test('allow unauthorized TLS if asked', function (t) { + t.plan(1) + const server = APMServer({secure: true}, function (req, res) { + t.pass('should let request through') + res.end() + server.close() + t.end() + }).client({rejectUnauthorized: false}, function (client) { + client.sendSpan({foo: 42}) + client.end() + }) +}) + +/** + * Normal operation + */ + +const dataTypes = ['span', 'transaction', 'error'] + +dataTypes.forEach(function (dataType) { + const sendFn = 'send' + dataType.charAt(0).toUpperCase() + dataType.substr(1) + + test(`client.${sendFn}() + client.flush()`, function (t) { + t.plan(2 + assertReq.asserts) + const datas = [ + {metadata: {}}, + {[dataType]: {foo: 42}} + ] + const server = APMServer(function (req, res) { + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.end() + server.close() + t.end() + }) + }).client(function (client) { + client[sendFn]({foo: 42}) + client.flush() + }) + }) + + test(`client.${sendFn}(callback) + client.flush()`, function (t) { + t.plan(3 + assertReq.asserts) + const datas = [ + {metadata: {}}, + {[dataType]: {foo: 42}} + ] + const server = APMServer(function (req, res) { + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.end() + server.close() + t.end() + }) + }).client(function (client) { + let nexttick = false + client[sendFn]({foo: 42}, function () { + t.ok(nexttick, 'should call callback') + }) + client.flush() + nexttick = true + }) + }) + + test(`client.${sendFn}() + client.end()`, function (t) { + t.plan(2 + assertReq.asserts) + const datas = [ + {metadata: {}}, + {[dataType]: {foo: 42}} + ] + const server = APMServer(function (req, res) { + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.end() + server.close() + t.end() + }) + }).client(function (client) { + client[sendFn]({foo: 42}) + client.end() + }) + }) + + test(`single client.${sendFn}`, function (t) { + t.plan(2 + assertReq.asserts) + const datas = [ + {metadata: {}}, + {[dataType]: {foo: 42}} + ] + const server = APMServer(function (req, res) { + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.end() + server.close() + t.end() + }) + }).client({time: 100}, function (client) { + client[sendFn]({foo: 42}) + }) + }) + + test(`multiple client.${sendFn} (same request)`, function (t) { + t.plan(4 + assertReq.asserts) + const datas = [ + {metadata: {}}, + {[dataType]: {req: 1}}, + {[dataType]: {req: 2}}, + {[dataType]: {req: 3}} + ] + const server = APMServer(function (req, res) { + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.end() + server.close() + t.end() + }) + }).client({time: 100}, function (client) { + client[sendFn]({req: 1}) + client[sendFn]({req: 2}) + client[sendFn]({req: 3}) + }) + }) + + test(`multiple client.${sendFn} (multiple requests)`, function (t) { + t.plan(8 + assertReq.asserts * 2) + + let clientReqNum = 0 + let clientSendNum = 0 + let serverReqNum = 0 + let client + + const datas = [ + {metadata: {}}, + {[dataType]: {req: 1, send: 1}}, + {[dataType]: {req: 1, send: 2}}, + {[dataType]: {req: 1, send: 3}}, + {metadata: {}}, + {[dataType]: {req: 2, send: 4}}, + {[dataType]: {req: 2, send: 5}}, + {[dataType]: {req: 2, send: 6}} + ] + + const server = APMServer(function (req, res) { + let reqNum = ++serverReqNum + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.end() + if (reqNum === 1) { + send() + } else { + server.close() + t.end() + } + }) + }).client({time: 100}, function (_client) { + client = _client + send() + }) + + function send () { + clientReqNum++ + for (let n = 0; n < 3; n++) { + client[sendFn]({req: clientReqNum, send: ++clientSendNum}) + } + } + }) +}) + +test('client.flush(callback) - with active request', function (t) { + t.plan(5 + assertReq.asserts) + const datas = [ + {metadata: {}}, + {span: {foo: 42}} + ] + const server = APMServer(function (req, res) { + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.end() + server.close() + t.end() + }) + }).client(function (client) { + t.equal(client._active, false, 'no outgoing HTTP request to begin with') + client.sendSpan({foo: 42}) + t.equal(client._active, true, 'an outgoing HTTP request should be active') + client.flush(function () { + t.equal(client._active, false, 'the outgoing HTTP request should be done') + }) + }) +}) + +test('client.flush(callback) - with queued request', function (t) { + t.plan(6 + assertReq.asserts * 2) + let requests = 0 + const datas = [ + {metadata: {}}, + {span: {req: 1}}, + {metadata: {}}, + {span: {req: 2}} + ] + const server = APMServer(function (req, res) { + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.end() + if (++requests === 2) { + t.end() + server.close() + } + }) + }).client(function (client) { + client.sendSpan({req: 1}) + client.flush() + client.sendSpan({req: 2}) + t.equal(client._active, true, 'an outgoing HTTP request should be active') + client.flush(function () { + t.equal(client._active, false, 'the outgoing HTTP request should be done') + }) + }) +}) + +test('2nd flush before 1st flush have finished', function (t) { + t.plan(6 + assertReq.asserts * 2) + let requestStarts = 0 + let requestEnds = 0 + const datas = [ + {metadata: {}}, + {span: {req: 1}}, + {metadata: {}}, + {span: {req: 2}} + ] + const server = APMServer(function (req, res) { + requestStarts++ + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + requestEnds++ + res.end() + }) + }).client(function (client) { + client.sendSpan({req: 1}) + client.flush() + client.sendSpan({req: 2}) + client.flush() + setTimeout(function () { + t.equal(requestStarts, 2, 'should have received 2 requests') + t.equal(requestEnds, 2, 'should have received 2 requests completely') + t.end() + server.close() + }, 100) + }) +}) + +test('client.end(callback)', function (t) { + t.plan(3 + assertReq.asserts) + const datas = [ + {metadata: {}}, + {span: {foo: 42}} + ] + const server = APMServer(function (req, res) { + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.end() + server.close() + t.end() + }) + }).client(function (client) { + client.sendSpan({foo: 42}) + client.end(function () { + t.pass('should call callback') + }) + }) +}) + +/** + * Side effects + */ + +test('client should not hold the process open', function (t) { + t.plan(3 + assertReq.asserts) + + const datas = [ + {metadata: {}}, + {span: {hello: 'world'}} + ] + + const server = APMServer(function (req, res) { + assertReq(t, req) + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + res.statusCode = 202 + res.end() + server.close() + }) + }) + + server.listen(function () { + const url = 'http://localhost:' + server.address().port + const file = path.join(__dirname, 'unref-client.js') + exec(`node ${file} ${url}`, function (err, stdout, stderr) { + if (err) throw err + const end = Date.now() + const start = Number(stdout) + const duration = end - start + t.ok(duration < 300, `should not take more than 300 ms to complete (was: ${duration})`) + t.end() + }) + }) +}) + +/** + * Edge cases + */ + +test('Event: close - if ndjson stream ends', function (t) { + t.plan(1) + let client + const server = APMServer(function (req, res) { + client._stream.end() + setTimeout(function () { + // wait a little to allow close to be emitted + t.end() + server.close() + }, 10) + }).listen(function () { + client = new Client({ + serverUrl: 'http://localhost:' + server.address().port, + userAgent: 'foo', + meta: onmeta + }) + + client.on('finish', function () { + t.fail('should not emit finish event') + }) + client.on('close', function () { + t.pass('should emit close event') + }) + + client.sendSpan({req: 1}) + }) +}) + +test('Event: close - if ndjson stream is destroyed', function (t) { + t.plan(1) + let client + const server = APMServer(function (req, res) { + client._stream.destroy() + setTimeout(function () { + // wait a little to allow close to be emitted + t.end() + server.close() + }, 10) + }).listen(function () { + client = new Client({ + serverUrl: 'http://localhost:' + server.address().port, + userAgent: 'foo', + meta: onmeta + }) + + client.on('finish', function () { + t.fail('should not emit finish event') + }) + client.on('close', function () { + t.pass('should emit close event') + }) + + client.sendSpan({req: 1}) + }) +}) + +test('Event: close - if chopper ends', function (t) { + t.plan(1) + let client + const server = APMServer(function (req, res) { + client._chopper.end() + setTimeout(function () { + // wait a little to allow close to be emitted + t.end() + server.close() + }, 10) + }).listen(function () { + client = new Client({ + serverUrl: 'http://localhost:' + server.address().port, + userAgent: 'foo', + meta: onmeta + }) + + client.on('finish', function () { + t.fail('should not emit finish event') + }) + client.on('close', function () { + t.pass('should emit close event') + }) + + client.sendSpan({req: 1}) + }) +}) + +test('Event: close - if chopper is destroyed', function (t) { + t.plan(1) + let client + const server = APMServer(function (req, res) { + client._chopper.destroy() + setTimeout(function () { + // wait a little to allow close to be emitted + t.end() + server.close() + }, 10) + }).listen(function () { + client = new Client({ + serverUrl: 'http://localhost:' + server.address().port, + userAgent: 'foo', + meta: onmeta + }) + + client.on('finish', function () { + t.fail('should not emit finish event') + }) + client.on('close', function () { + t.pass('should emit close event') + }) + + client.sendSpan({req: 1}) + }) +}) + +test('write after end', function (t) { + t.plan(2) + const server = APMServer(function (req, res) { + t.fail('should never get any request') + }).client(function (client) { + client.on('error', function (err) { + t.ok(err instanceof Error) + t.equal(err.message, 'write after end') + server.close() + t.end() + }) + client.end() + client.sendSpan({foo: 42}) + }) +}) + +test('request with error - no body', function (t) { + const server = APMServer(function (req, res) { + res.statusCode = 418 + res.end() + }).client(function (client) { + client.on('error', function (err) { + t.ok(err instanceof Error) + t.equal(err.message, 'Unexpected response code from APM Server: 418') + t.equal(err.result, undefined) + server.close() + t.end() + }) + client.sendSpan({foo: 42}) + client.flush() + }) +}) + +test('request with error - non json body', function (t) { + const server = APMServer(function (req, res) { + res.statusCode = 418 + res.end('boom!') + }).client(function (client) { + client.on('error', function (err) { + t.ok(err instanceof Error) + t.equal(err.message, 'Unexpected response code from APM Server: 418') + t.equal(err.result, 'boom!') + server.close() + t.end() + }) + client.sendSpan({foo: 42}) + client.flush() + }) +}) + +test('request with error - invalid json body', function (t) { + const server = APMServer(function (req, res) { + res.statusCode = 418 + res.setHeader('Content-Type', 'application/json') + res.end('boom!') + }).client(function (client) { + client.on('error', function (err) { + t.ok(err instanceof Error) + t.equal(err.message, 'Unexpected response code from APM Server: 418') + t.equal(err.result, 'boom!') + server.close() + t.end() + }) + client.sendSpan({foo: 42}) + client.flush() + }) +}) + +test('request with error - json body without error property', function (t) { + const body = JSON.stringify({foo: 'bar'}) + const server = APMServer(function (req, res) { + res.statusCode = 418 + res.setHeader('Content-Type', 'application/json') + res.end(body) + }).client(function (client) { + client.on('error', function (err) { + t.ok(err instanceof Error) + t.equal(err.message, 'Unexpected response code from APM Server: 418') + t.equal(err.result, body) + server.close() + t.end() + }) + client.sendSpan({foo: 42}) + client.flush() + }) +}) + +test('request with error - json body with error property', function (t) { + const server = APMServer(function (req, res) { + res.statusCode = 418 + res.setHeader('Content-Type', 'application/json') + res.end(JSON.stringify({error: 'bar'})) + }).client(function (client) { + client.on('error', function (err) { + t.ok(err instanceof Error) + t.equal(err.message, 'Unexpected response code from APM Server: 418') + t.equal(err.result, 'bar') + server.close() + t.end() + }) + client.sendSpan({foo: 42}) + client.flush() + }) +}) + +test('socket hang up', function (t) { + const server = APMServer(function (req, res) { + req.socket.destroy() + }).client(function (client) { + let closed = false + client.on('error', function (err) { + t.equal(err.message, 'socket hang up') + t.equal(err.code, 'ECONNRESET') + // wait a little in case 'close' is emitted async + setTimeout(function () { + t.equal(closed, false) + t.end() + server.close() + client.destroy() + }, 50) + }) + client.on('close', function () { + closed = true + }) + client.on('finish', function () { + t.fail('should not emit finish') + }) + client.sendSpan({foo: 42}) + }) +}) + +test('socket hang up - continue with new request', function (t) { + t.plan(6 + assertReq.asserts * 2) + let reqs = 0 + let client + const datas = [ + {metadata: {}}, + {span: {req: 2}} + ] + const server = APMServer(function (req, res) { + assertReq(t, req) + + if (++reqs === 1) return req.socket.destroy() + + // We have to attach the listener directly to the HTTP request stream as it + // will receive the gzip header once the write have been made on the + // client. If we were to attach it to the gunzip+ndjson, it would not fire + req.on('data', function () { + client.flush() + }) + + req = processReq(req) + req.on('data', function (obj) { + t.deepEqual(obj, datas.shift()) + }) + req.on('end', function () { + t.pass('should end request') + res.end() + server.close() + }) + }).client(function (_client) { + client = _client + client.on('error', function (err) { + t.equal(err.message, 'socket hang up') + t.equal(err.code, 'ECONNRESET') + client.sendSpan({req: 2}) + }) + client.on('finish', function () { + t.equal(reqs, 2, 'should emit finish after last request') + t.end() + }) + client.sendSpan({req: 1}) + }) +}) + +test('socket timeout - server response too slow', function (t) { + const server = APMServer(function (req, res) { + req.resume() + }).client({serverTimeout: 1000}, function (client) { + const start = Date.now() + client.on('error', function (err) { + const end = Date.now() + const delta = end - start + t.ok(delta > 1000 && delta < 2000, 'timeout should occur between 1-2 seconds') + t.equal(err.message, 'socket hang up') + t.equal(err.code, 'ECONNRESET') + server.close() + t.end() + }) + client.sendSpan({foo: 42}) + client.end() + }) +}) + +test('socket timeout - client request too slow', function (t) { + const server = APMServer(function (req, res) { + req.resume() + req.on('end', function () { + res.end() + }) + }).client({serverTimeout: 1000}, function (client) { + const start = Date.now() + client.on('error', function (err) { + const end = Date.now() + const delta = end - start + t.ok(delta > 1000 && delta < 2000, 'timeout should occur between 1-2 seconds') + t.equal(err.message, 'socket hang up') + t.equal(err.code, 'ECONNRESET') + server.close() + t.end() + }) + client.sendSpan({foo: 42}) + }) +}) + +test('client.destroy() - on fresh client', function (t) { + t.plan(1) + const client = new Client({ + userAgent: 'foo', + meta: onmeta + }) + client.on('finish', function () { + t.fail('should not emit finish') + }) + client.on('close', function () { + t.pass('should emit close') + }) + client.destroy() + process.nextTick(function () { + // wait a little to allow close to be emitted + t.end() + }) +}) + +test('client.destroy() - should not allow more writes', function (t) { + t.plan(12) + let count = 0 + + const client = new Client({ + userAgent: 'foo', + meta: onmeta + }) + client.on('error', function (err) { + t.ok(err instanceof Error, 'should emit error ' + err.message) + }) + client.on('finish', function () { + t.pass('should emit finish') // emitted because of client.end() + }) + client.on('close', function () { + t.pass('should emit close') // emitted because of client.destroy() + }) + client.destroy() + client.sendSpan({foo: 42}, done) + client.sendTransaction({foo: 42}, done) + client.sendError({foo: 42}, done) + client.flush(done) + client.end(done) + + function done () { + t.pass('should still call callback even though it\'s destroyed') + if (++count === 5) t.end() + } +}) + +test('client.destroy() - on ended client', function (t) { + t.plan(2) + let client + + // create a server that doesn't unref incoming sockets to see if + // `client.destroy()` will make the server close without hanging + const server = http.createServer(function (req, res) { + req.resume() + req.on('end', function () { + res.end() + client.destroy() + server.close() + process.nextTick(function () { + // wait a little to allow close to be emitted + t.end() + }) + }) + }) + + server.listen(function () { + client = new Client({ + serverUrl: 'http://localhost:' + server.address().port, + userAgent: 'foo', + meta: onmeta + }) + client.on('finish', function () { + t.pass('should emit finish only once') + }) + client.on('close', function () { + t.pass('should emit close event') + }) + client.sendSpan({foo: 42}) + client.end() + }) +}) + +test('client.destroy() - on client with request in progress', function (t) { + t.plan(1) + let client + + // create a server that doesn't unref incoming sockets to see if + // `client.destroy()` will make the server close without hanging + const server = http.createServer(function (req, res) { + server.close() + client.destroy() + process.nextTick(function () { + // wait a little to allow close to be emitted + t.end() + }) + }) + + server.listen(function () { + client = new Client({ + serverUrl: 'http://localhost:' + server.address().port, + userAgent: 'foo', + meta: onmeta + }) + client.on('finish', function () { + t.fail('should not emit finish') + }) + client.on('close', function () { + t.pass('should emit close event') + }) + client.sendSpan({foo: 42}) + }) +}) diff --git a/test/unref-client.js b/test/unref-client.js new file mode 100644 index 0000000..f4b48dc --- /dev/null +++ b/test/unref-client.js @@ -0,0 +1,16 @@ +'use strict' + +const Client = require('../') + +const client = new Client({ + serverUrl: process.argv[2], + secretToken: 'secret', + userAgent: 'foo', + meta: function () { + return {} + } +}) + +process.stdout.write(String(Date.now())) + +client.sendSpan({hello: 'world'}) // Don't end the stream diff --git a/test/utils.js b/test/utils.js new file mode 100644 index 0000000..5910e78 --- /dev/null +++ b/test/utils.js @@ -0,0 +1,69 @@ +'use strict' + +const http = require('http') +const https = require('https') +const zlib = require('zlib') +const pem = require('https-pem') +const ndjson = require('ndjson') +const pkg = require('../package') +const Client = require('../') + +exports.APMServer = APMServer +exports.processReq = processReq +exports.assertReq = assertReq +exports.onmeta = onmeta + +function APMServer (opts, onreq) { + if (typeof opts === 'function') return APMServer(null, opts) + opts = opts || {} + + const secure = !!opts.secure + + const server = secure + ? https.createServer(pem, onreq) + : http.createServer(onreq) + + // Because we use a keep-alive agent in the client, we need to unref the + // sockets received by the server. If not, the server would hold open the app + // even after the tests have completed + server.on('connection', function (socket) { + socket.unref() + }) + + server.client = function (opts, onclient) { + if (typeof opts === 'function') { + onclient = opts + opts = {} + } + server.listen(function () { + onclient(new Client(Object.assign({ + serverUrl: `http${secure ? 's' : ''}://localhost:${server.address().port}`, + secretToken: 'secret', + userAgent: 'foo', + meta: onmeta + }, opts))) + }) + return server + } + + return server +} + +function processReq (req) { + return req.pipe(zlib.createGunzip()).pipe(ndjson.parse()) +} + +function assertReq (t, req) { + t.equal(req.method, 'POST', 'should make a POST request') + t.equal(req.url, '/v2/intake', 'should send request to /v2/intake') + t.equal(req.headers['authorization'], 'Bearer secret', 'should add secret token') + t.equal(req.headers['content-type'], 'application/x-ndjson', 'should send reqeust as ndjson') + t.equal(req.headers['content-encoding'], 'gzip', 'should compress request') + t.equal(req.headers['accept'], 'application/json', 'should expect json in response') + t.equal(req.headers['user-agent'], `foo ${pkg.name}/${pkg.version}`, 'should add proper User-Agent') +} +assertReq.asserts = 7 + +function onmeta () { + return {} +}