Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Updated shim.recordConsume to use shim.record and added ability to invoke an after hook with callback args #2207

Merged
merged 2 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions lib/instrumentation/amqplib/amqplib.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,20 @@ function wrapModel(shim, Model, promiseMode) {
destinationName: shim.FIRST,
callback: setCallback(shim, promiseMode),
promise: promiseMode,
messageHandler: function handleConsumedMessage(shim, fn, name, message) {
after: function handleConsumedMessage({ shim, result, args, segment }) {
if (!shim.agent.config.message_tracer.segment_parameters.enabled) {
shim.logger.trace('Not capturing segment parameters')
return
}

// the message is the param when using the promised based model
message = promiseMode ? message : message[1]
const message = promiseMode ? result : args?.[1]
if (!message) {
shim.logger.trace('No results from consume.')
return null
}
const parameters = getParametersFromMessage(message)

const headers = message?.properties?.headers

return { parameters, headers }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

headers were never getting used here. I could mimic what's being done in here but we never had tests that asserting DT headers were getting propagated. I would think we should. so maybe i'll cut another story to fix this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i dug into this and I don't think it's apples to apples in recordSubscribecConsume it creates a transaction for the consumption. Whereas .get is just pulling one message. I can't just add the headers to DT payload as there's only 1 transaction getting created elsewhere and not within .get. So I'll leave this as is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that be noted in an inline comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure because there's no code to say we don't need to propagate headers

shim.copySegmentParameters(segment, parameters)
}
})
)
Expand Down Expand Up @@ -312,12 +314,10 @@ function wrapModel(shim, Model, promiseMode) {
* Extracts the appropriate messageHandler parameters for the consume method.
*
* @param {Shim} shim instance of shim
* @param {object} _consumer not used
* @param {string} _name not used
* @param {Array} args arguments passed to the consume method
* @returns {object} message params
*/
function describeMessage(shim, _consumer, _name, args) {
function describeMessage(shim, args) {
const [message] = args

if (!message?.properties) {
Expand Down
3 changes: 1 addition & 2 deletions lib/instrumentation/aws-sdk/v3/bedrock.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ function getBedrockSpec({ commandName }, shim, _original, _name, args) {
return new RecorderSpec({
promise: true,
name: `Llm/${modelType}/Bedrock/${commandName}`,
// eslint-disable-next-line max-params
after: (shim, _fn, _fnName, err, response, segment) => {
after: ({ shim, error: err, result: response, segment }) => {
const passThroughParams = {
shim,
err,
Expand Down
2 changes: 1 addition & 1 deletion lib/instrumentation/core/inspector.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function initialize(agent, inspector, name, shim) {
shim.wrap(sessionProto, 'post', function wrapPost(shim, fn) {
return function wrappedPost() {
const args = shim.argsToArray.apply(shim, arguments)
shim.bindCallbackSegment(args, shim.LAST)
shim.bindCallbackSegment(null, args, shim.LAST)
return fn.apply(this, args)
}
})
Expand Down
6 changes: 2 additions & 4 deletions lib/instrumentation/langchain/runnable.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ function instrumentInvokeChain({ langchain, shim }) {
return new RecorderSpec({
name: `${LANGCHAIN.CHAIN}/${fnName}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
after({ error: err, result: output, segment }) {
recordChatCompletionEvents({
segment,
messages: [output],
Expand Down Expand Up @@ -97,8 +96,7 @@ function instrumentStream({ langchain, shim }) {
return new RecorderSpec({
name: `${LANGCHAIN.CHAIN}/${fnName}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
after({ error: err, result: output, segment }) {
// Input error occurred which means a stream was not created.
// Skip instrumenting streaming and create Llm Events from
// the data we have
Expand Down
3 changes: 1 addition & 2 deletions lib/instrumentation/langchain/tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ module.exports = function initialize(shim, tools) {
return new RecorderSpec({
name: `${LANGCHAIN.TOOL}/${name}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
after({ error: err, result: output, segment }) {
const metadata = mergeMetadata(instanceMeta, paramsMeta)
const tags = mergeTags(instanceTags, paramsTags)
segment.end()
Expand Down
3 changes: 1 addition & 2 deletions lib/instrumentation/langchain/vectorstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ module.exports = function initialize(shim, vectorstores) {
return new RecorderSpec({
name: `${LANGCHAIN.VECTORSTORE}/${fnName}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
after({ error: err, result: output, segment }) {
if (!output) {
// If we get an error, it is possible that `output = null`.
// In that case, we define it to be an empty array.
Expand Down
2 changes: 1 addition & 1 deletion lib/instrumentation/memcached.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ module.exports = function initialize(agent, memcached, moduleName, shim) {
return new OperationSpec({
name: metacall.type || 'Unknown',
callback: function wrapCallback(shim, fn, fnName, opSegment) {
shim.bindCallbackSegment(metacall, 'callback', opSegment)
shim.bindCallbackSegment(null, metacall, 'callback', opSegment)
},
parameters
})
Expand Down
6 changes: 2 additions & 4 deletions lib/instrumentation/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) {
return new RecorderSpec({
name: OPENAI.COMPLETION,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, response, segment) {
after({ error: err, result: response, segment }) {
if (request.stream) {
instrumentStream({ agent, shim, request, response, segment })
} else {
Expand Down Expand Up @@ -294,8 +293,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) {
return new RecorderSpec({
name: OPENAI.EMBEDDING,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, response, segment) {
after({ error: err, result: response, segment }) {
addLlmMeta({ agent, segment })

if (!response) {
Expand Down
6 changes: 3 additions & 3 deletions lib/instrumentation/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function registerInternalSendCommand(shim, proto) {
parameters,
callback: function bindCallback(shim, _f, _n, segment) {
if (shim.isFunction(commandObject.callback)) {
shim.bindCallbackSegment(commandObject, 'callback', segment)
shim.bindCallbackSegment(null, commandObject, 'callback', segment)
} else {
const self = this
commandObject.callback = shim.bindSegment(
Expand Down Expand Up @@ -87,9 +87,9 @@ function registerSendCommand(shim, proto) {
callback: function bindCallback(shim, _f, _n, segment) {
const last = args[args.length - 1]
if (shim.isFunction(last)) {
shim.bindCallbackSegment(args, shim.LAST, segment)
shim.bindCallbackSegment(null, args, shim.LAST, segment)
} else if (shim.isArray(last) && shim.isFunction(last[last.length - 1])) {
shim.bindCallbackSegment(last, shim.LAST, segment)
shim.bindCallbackSegment(null, last, shim.LAST, segment)
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion lib/instrumentation/superagent.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ function wrapCallback(shim, callback) {
return function wrappedCallback() {
const segment = shim.getSegment(this)
if (segment && segment.transaction.isActive()) {
shim.bindCallbackSegment(this, '_callback', segment)
shim.bindCallbackSegment(null, this, '_callback', segment)
}
return callback.apply(this, arguments)
}
Expand Down
128 changes: 9 additions & 119 deletions lib/shim/message-shim/consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/

'use strict'
const TraceSegment = require('../../transaction/trace/segment')
const genericRecorder = require('../../metrics/recorders/generic')
const { _nameMessageSegment } = require('./common')
const specs = require('../specs')
Expand All @@ -25,7 +24,7 @@ module.exports = createRecorder
function updateSpecFromArgs({ shim, fn, fnName, args, spec }) {
let msgDesc = null
if (shim.isFunction(spec)) {
msgDesc = spec.call(this, shim, fn, fnName, args)
msgDesc = spec(shim, fn, fnName, args)
} else {
msgDesc = spec
const destIdx = shim.normalizeIndex(args.length, spec.destinationName)
Expand All @@ -37,130 +36,21 @@ function updateSpecFromArgs({ shim, fn, fnName, args, spec }) {
return msgDesc
}

/**
* Binds the consumer callback to the active segment.
*
* @private
* @param {object} params to function
* @param {MessageShim} params.shim instance of shim
* @param {Array} params.args arguments passed to original consume function
* @param {specs.MessageSpec} params.msgDesc spec for the wrapped consume function
* @param {TraceSegment} params.segment active segment to bind callback
* @param {boolean} params.getParams flag to copy message parameters to segment
* @param {Function} params.resHandler function to handle response from callback to obtain the message parameters
*/
function bindCallback({ shim, args, msgDesc, segment, getParams, resHandler }) {
const cbIdx = shim.normalizeIndex(args.length, msgDesc.callback)
if (cbIdx !== null) {
shim.bindCallbackSegment(args, cbIdx, segment)

// If we have a callback and a results handler, then wrap the callback so
// we can call the results handler and get the message properties.
if (resHandler) {
shim.wrap(args, cbIdx, function wrapCb(shim, cb, cbName) {
if (shim.isFunction(cb)) {
return function cbWrapper() {
const cbArgs = shim.argsToArray.apply(shim, arguments)
const msgProps = resHandler.call(this, shim, cb, cbName, cbArgs)
if (getParams && msgProps && msgProps.parameters) {
shim.copySegmentParameters(segment, msgProps.parameters)
}

return cb.apply(this, arguments)
}
}
})
}
}
}

/**
* Binds the consumer function to the async context and checks return to possibly
* bind the promise
*
* @private
* @param {object} params to function
* @param {MessageShim} params.shim instance of shim
* @param {Function} params.fn consumer function
* @param {string} params.fnName name of function
* @param {Array} params.args arguments passed to original consume function
* @param {specs.MessageSpec} params.msgDesc spec for the wrapped consume function
* @param {TraceSegment} params.segment active segment to bind callback
* @param {boolean} params.getParams flag to copy message parameters to segment
* @param {Function} params.resHandler function to handle response from callback to obtain the message parameters
* @returns {Promise|*} response from consume function
*/
function bindConsumer({ shim, fn, fnName, args, msgDesc, segment, getParams, resHandler }) {
// Call the method in the context of our segment.
let ret = shim.applySegment(fn, segment, true, this, args)

if (ret && msgDesc.promise && shim.isPromise(ret)) {
ret = shim.bindPromise(ret, segment)

// Intercept the promise to handle the result.
if (resHandler) {
ret = ret.then(function interceptValue(res) {
const msgProps = resHandler.call(this, shim, fn, fnName, res)
if (getParams && msgProps && msgProps.parameters) {
shim.copySegmentParameters(segment, msgProps.parameters)
}
return res
})
}
}

return ret
}

/**
*
* @private
* @param {object} params to function
* @param {MessageShim} params.shim instance of shim
* @param {Function} params.fn function that is being wrapped
* @param {string} params.fnName name of function
* @param params.args
* @param {specs.MessageSpec} params.spec spec for the wrapped consume function
* @returns {Function} recorder for consume function
* @returns {specs.MessageSpec} updated spec with logic to name segment and apply the genericRecorder
*/
function createRecorder({ shim, fn, fnName, spec }) {
return function consumeRecorder() {
const parent = shim.getSegment()
if (!parent || !parent.transaction.isActive()) {
shim.logger.trace('Not recording consume, no active transaction.')
return fn.apply(this, arguments)
}

// Process the message args.
const args = shim.argsToArray.apply(shim, arguments)
const msgDesc = updateSpecFromArgs.call(this, { shim, fn, fnName, args, spec })

// Make the segment if we can.
if (!msgDesc) {
shim.logger.trace('Not recording consume, no message descriptor.')
return fn.apply(this, args)
}

const name = _nameMessageSegment(shim, msgDesc, shim._metrics.CONSUME)

// Adds details needed by createSegment when used with a spec
msgDesc.name = name
msgDesc.recorder = genericRecorder
msgDesc.parent = parent

const segment = shim.createSegment(msgDesc)
const getParams = shim.agent.config.message_tracer.segment_parameters.enabled
const resHandler = shim.isFunction(msgDesc.messageHandler) ? msgDesc.messageHandler : null

bindCallback({ shim, args, msgDesc, segment, getParams, resHandler })
return bindConsumer.call(this, {
shim,
fn,
fnName,
args,
msgDesc,
segment,
getParams,
resHandler
})
}
function createRecorder({ spec, shim, fn, fnName, args }) {
const msgDesc = updateSpecFromArgs({ shim, fn, fnName, args, spec })
// Adds details needed by createSegment when used with a spec
msgDesc.name = _nameMessageSegment(shim, msgDesc, shim._metrics.CONSUME)
msgDesc.recorder = genericRecorder
return msgDesc
}
13 changes: 2 additions & 11 deletions lib/shim/message-shim/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,8 @@ function recordConsume(nodule, properties, spec) {
properties = null
}

// This is using wrap instead of record because the spec allows for a messageHandler
// which is being used to handle the result of the callback or promise of the
// original wrapped consume function.
// TODO: https://github.com/newrelic/node-newrelic/issues/981
return this.wrap(nodule, properties, function wrapConsume(shim, fn, fnName) {
if (!shim.isFunction(fn)) {
shim.logger.debug('Not wrapping %s (%s) as consume', fn, fnName)
return fn
}

return createRecorder({ shim, fn, fnName, spec })
return this.record(nodule, properties, function wrapConsume(shim, fn, fnName, args) {
return createRecorder({ spec, shim, fn, fnName, args })
})
}

Expand Down
9 changes: 4 additions & 5 deletions lib/shim/message-shim/subscribe-consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ function makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) {
spec.queue = queue
}

return function wrapConsumer(shim, consumer, cName) {
return function wrapConsumer(shim, consumer) {
if (!shim.isFunction(consumer)) {
return consumer
}

const consumerWrapper = createConsumerWrapper({ shim, consumer, cName, spec })
const consumerWrapper = createConsumerWrapper({ shim, consumer, spec })
return shim.bindCreateTransaction(
consumerWrapper,
new specs.TransactionSpec({
Expand All @@ -108,10 +108,9 @@ function makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) {
* @param {MessageShim} params.shim instance of shim
* @param {specs.MessageSubscribeSpec} params.spec spec for function
* @param {Function} params.consumer function for consuming message
* @param {string} params.cName name of consumer function
* @returns {Function} handler for the transaction being created
*/
function createConsumerWrapper({ shim, spec, consumer, cName }) {
function createConsumerWrapper({ shim, spec, consumer }) {
return function createConsumeTrans() {
// If there is no transaction or we're in a pre-existing transaction,
// then don't do anything. Note that the latter should never happen.
Expand All @@ -123,7 +122,7 @@ function createConsumerWrapper({ shim, spec, consumer, cName }) {
return consumer.apply(this, args)
}

const msgDesc = spec.messageHandler.call(this, shim, consumer, cName, args)
const msgDesc = spec.messageHandler.call(this, shim, args)

// If message could not be handled, immediately kill this transaction.
if (!msgDesc) {
Expand Down
Loading
Loading