Skip to content

Commit

Permalink
fix: add rpc error metrics (#412)
Browse files Browse the repository at this point in the history
* fix: log general info of a rpc

* fix: new metrics to handle rpc errors

* fix: add gossipsub_msg_received_error_total metric

* chore: address PR comments
  • Loading branch information
twoeths authored Mar 23, 2023
1 parent 1e1cf5a commit 5cd8b07
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
35 changes: 31 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -935,16 +935,26 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
// to prevent a top-level unhandled exception
// This processing of rpc messages should happen without awaiting full validation/execution of prior messages
if (this.opts.awaitRpcHandler) {
await this.handleReceivedRpc(peerId, rpc)
try {
await this.handleReceivedRpc(peerId, rpc)
} catch (err) {
this.metrics?.onRpcRecvError()
this.log(err)
}
} else {
this.handleReceivedRpc(peerId, rpc).catch((err) => this.log(err))
this.handleReceivedRpc(peerId, rpc).catch((err) => {
this.metrics?.onRpcRecvError()
this.log(err)
})
}
} catch (e) {
this.metrics?.onRpcDataError()
this.log(e as Error)
}
}
})
} catch (err) {
this.metrics?.onPeerReadStreamError()
this.handlePeerReadStreamError(err as Error, peerId)
}
}
Expand All @@ -969,7 +979,21 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
return
}

this.log('rpc from %p', from)
const subscriptions = rpc.subscriptions ? rpc.subscriptions.length : 0
const messages = rpc.messages ? rpc.messages.length : 0
let ihave = 0
let iwant = 0
let graft = 0
let prune = 0
if (rpc.control) {
if (rpc.control.ihave) ihave = rpc.control.ihave.length
if (rpc.control.iwant) iwant = rpc.control.iwant.length
if (rpc.control.graft) graft = rpc.control.graft.length
if (rpc.control.prune) prune = rpc.control.prune.length
}
this.log(
`rpc.from ${from.toString()} subscriptions ${subscriptions} messages ${messages} ihave ${ihave} iwant ${iwant} graft ${graft} prune ${prune}`
)

// Handle received subscriptions
if (rpc.subscriptions && rpc.subscriptions.length > 0) {
Expand Down Expand Up @@ -1013,7 +1037,10 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G

const handleReceivedMessagePromise = this.handleReceivedMessage(from, message)
// Should never throw, but handle just in case
.catch((err) => this.log(err))
.catch((err) => {
this.metrics?.onMsgRecvError(message.topic)
this.log(err)
})

if (this.opts.awaitRpcMessageHandler) {
await handleReceivedMessagePromise
Expand Down
31 changes: 31 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ export function getMetrics(
labelNames: ['hit']
}),

// peer stream
peerReadStreamError: register.gauge({
name: 'gossipsub_peer_read_stream_err_count_total',
help: 'Peer read stream error'
}),

// RPC outgoing. Track byte length + data structure sizes
rpcRecvBytes: register.gauge({ name: 'gossipsub_rpc_recv_bytes_total', help: 'RPC recv' }),
rpcRecvCount: register.gauge({ name: 'gossipsub_rpc_recv_count_total', help: 'RPC recv' }),
Expand All @@ -259,6 +265,8 @@ export function getMetrics(
rpcRecvIWant: register.gauge({ name: 'gossipsub_rpc_recv_iwant_total', help: 'RPC recv' }),
rpcRecvGraft: register.gauge({ name: 'gossipsub_rpc_recv_graft_total', help: 'RPC recv' }),
rpcRecvPrune: register.gauge({ name: 'gossipsub_rpc_recv_prune_total', help: 'RPC recv' }),
rpcDataError: register.gauge({ name: 'gossipsub_rpc_data_err_count_total', help: 'RPC data error' }),
rpcRecvError: register.gauge({ name: 'gossipsub_rpc_recv_err_count_total', help: 'RPC recv error' }),

/** Total count of RPC dropped because acceptFrom() == false */
rpcRecvNotAccepted: register.gauge({
Expand Down Expand Up @@ -323,6 +331,12 @@ export function getMetrics(
help: 'Total count of recv msgs before any validation',
labelNames: ['topic']
}),
/** Total count of recv msgs error */
msgReceivedError: register.gauge<{ topic: TopicLabel }>({
name: 'gossipsub_msg_received_error_total',
help: 'Total count of recv msgs error',
labelNames: ['topic']
}),
/** Tracks distribution of recv msgs by duplicate, invalid, valid */
msgReceivedStatus: register.gauge<{ topic: TopicLabel; status: MessageStatus }>({
name: 'gossipsub_msg_received_status_total',
Expand Down Expand Up @@ -615,6 +629,11 @@ export function getMetrics(
this.msgReceivedPreValidation.inc({ topic }, 1)
},

onMsgRecvError(topicStr: TopicStr): void {
const topic = this.toTopic(topicStr)
this.msgReceivedError.inc({ topic }, 1)
},

onMsgRecvResult(topicStr: TopicStr, status: MessageStatus): void {
const topic = this.toTopic(topicStr)
this.msgReceivedStatus.inc({ topic, status })
Expand All @@ -640,6 +659,18 @@ export function getMetrics(
this.duplicateMsgIgnored.inc({ topic }, 1)
},

onPeerReadStreamError(): void {
this.peerReadStreamError.inc(1)
},

onRpcRecvError(): void {
this.rpcRecvError.inc(1)
},

onRpcDataError(): void {
this.rpcDataError.inc(1)
},

onRpcRecv(rpc: IRPC, rpcBytes: number): void {
this.rpcRecvBytes.inc(rpcBytes)
this.rpcRecvCount.inc(1)
Expand Down

0 comments on commit 5cd8b07

Please sign in to comment.