Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

feat!: add metrics #223

Merged
merged 2 commits into from
Nov 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
"stream-to-it": "^0.2.2"
},
"devDependencies": {
"@libp2p/interface-metrics": "^4.0.0",
"@libp2p/interface-mocks": "^7.0.1",
"@libp2p/interface-transport-compliance-tests": "^3.0.0",
"aegir": "^37.5.3",
Expand Down
44 changes: 38 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { Connection } from '@libp2p/interface-connection'
import type { CounterGroup, Metrics } from '@libp2p/interface-metrics'

const log = logger('libp2p:tcp')

Expand Down Expand Up @@ -55,11 +56,36 @@ export interface TCPCreateListenerOptions extends CreateListenerOptions, TCPSock

}

export interface TCPComponents {
metrics?: Metrics
}

export interface TCPMetrics {
dialerEvents: CounterGroup
listenerEvents: CounterGroup
}

class TCP implements Transport {
private readonly opts: TCPOptions
private readonly metrics?: TCPMetrics
private readonly components: TCPComponents

constructor (options: TCPOptions = {}) {
constructor (components: TCPComponents, options: TCPOptions = {}) {
this.opts = options
this.components = components

if (components.metrics != null) {
this.metrics = {
dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_errors_total', {
label: 'event',
help: 'Total count of TCP dialer errors by error type'
}),
listenerEvents: components.metrics.registerCounterGroup('libp2p_tcp_listener_errors_total', {
label: 'event',
help: 'Total count of TCP listener errors by error type'
})
}
}
}

get [symbol] (): true {
Expand All @@ -84,7 +110,8 @@ class TCP implements Transport {
remoteAddr: ma,
signal: options.signal,
socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
socketCloseTimeout: this.opts.socketCloseTimeout,
metrics: this.metrics?.dialerEvents
})
log('new outbound connection %s', maConn.remoteAddr)
const conn = await options.upgrader.upgradeOutbound(maConn)
Expand All @@ -107,12 +134,14 @@ class TCP implements Transport {

const onError = (err: Error) => {
err.message = `connection error ${cOptsStr}: ${err.message}`
this.metrics?.dialerEvents.increment({ error: true })

done(err)
}

const onTimeout = () => {
log('connection timeout %s', cOptsStr)
this.metrics?.dialerEvents.increment({ timeout: true })

const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
Expand All @@ -121,11 +150,13 @@ class TCP implements Transport {

const onConnect = () => {
log('connection opened %j', cOpts)
this.metrics?.dialerEvents.increment({ connect: true })
done()
}

const onAbort = () => {
log('connection aborted %j', cOpts)
this.metrics?.dialerEvents.increment({ abort: true })
rawSocket.destroy()
done(new AbortError())
}
Expand Down Expand Up @@ -166,7 +197,8 @@ class TCP implements Transport {
...options,
maxConnections: this.opts.maxConnections,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
socketCloseTimeout: this.opts.socketCloseTimeout,
metrics: this.components.metrics
})
}

Expand All @@ -190,8 +222,8 @@ class TCP implements Transport {
}
}

export function tcp (init: TCPOptions = {}): (components?: any) => Transport {
return () => {
return new TCP(init)
export function tcp (init: TCPOptions = {}): (components?: TCPComponents) => Transport {
return (components: TCPComponents = {}) => {
return new TCP(components, init)
}
}
72 changes: 67 additions & 5 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti
import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TCPCreateListenerOptions } from './index.js'
import type { CounterGroup, Metric, Metrics } from '@libp2p/interface-metrics'

const log = logger('libp2p:tcp:listener')

Expand All @@ -31,6 +32,16 @@ interface Context extends TCPCreateListenerOptions {
socketInactivityTimeout?: number
socketCloseTimeout?: number
maxConnections?: number
metrics?: Metrics
}

const SERVER_STATUS_UP = 1
const SERVER_STATUS_DOWN = 0

export interface TCPListenerMetrics {
status: Metric
errors: CounterGroup
events: CounterGroup
}

type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }
Expand All @@ -39,8 +50,8 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()

private status: Status = { started: false }
private metrics?: TCPListenerMetrics

constructor (private readonly context: Context) {
super()
Expand All @@ -57,26 +68,75 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

this.server
.on('listening', () => this.dispatchEvent(new CustomEvent('listening')))
.on('error', err => this.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
.on('close', () => this.dispatchEvent(new CustomEvent('close')))
.on('listening', () => {
if (context.metrics != null) {
// we are listening, register metrics for our port
const address = this.server.address()
let addr: string

if (address == null) {
addr = 'unknown'
} else if (typeof address === 'string') {
// unix socket
addr = address
} else {
addr = `${address.address}:${address.port}`
}

context.metrics?.registerMetric(`libp2p_tcp_connections_${addr}_count`, {
help: 'Current active connections in TCP listener',
calculate: () => {
return this.connections.size
}
})

this.metrics = {
status: context.metrics.registerMetric(`libp2p_tcp_${addr}_server_status`, {
help: 'Current status of the TCP server'
}),
errors: context.metrics.registerCounterGroup(`libp2p_tcp_${addr}_server_errors_total`, {
label: 'error',
help: 'Total count of TCP listener errors by error type'
}),
events: context.metrics.registerCounterGroup(`libp2p_tcp_$${addr}_socket_events`, {
label: 'event',
help: 'Total count of TCP socket events by event'
})
}

this.metrics?.status.update(SERVER_STATUS_UP)
}

this.dispatchEvent(new CustomEvent('listening'))
})
.on('error', err => {
this.metrics?.errors.increment({ listen_error: true })
this.dispatchEvent(new CustomEvent<Error>('error', { detail: err }))
})
.on('close', () => {
this.metrics?.status.update(SERVER_STATUS_DOWN)
this.dispatchEvent(new CustomEvent('close'))
})
}

private onSocket (socket: net.Socket) {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
this.metrics?.events.increment({ error: true })
})

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events
})
} catch (err) {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_to_connection: true })
return
}

Expand All @@ -99,6 +159,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
})
.catch(async err => {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_upgrade: true })

await attemptClose(maConn)
})
Expand All @@ -111,6 +172,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
this.metrics?.errors.increment({ inbound_closing_failed: true })
})
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import errCode from 'err-code'
import type { Socket } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { MultiaddrConnection } from '@libp2p/interface-connection'
import type { CounterGroup } from '@libp2p/interface-metrics'

const log = logger('libp2p:tcp:socket')

Expand All @@ -19,14 +20,15 @@ interface ToConnectionOptions {
signal?: AbortSignal
socketInactivityTimeout?: number
socketCloseTimeout?: number
metrics?: CounterGroup
}

/**
* Convert a socket into a MultiaddrConnection
* https://github.com/libp2p/interface-transport#multiaddrconnection
*/
export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => {
options = options ?? {}
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions) => {
const metrics = options.metrics
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT

Expand Down Expand Up @@ -61,6 +63,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ timeout: true })

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
Expand All @@ -75,6 +78,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti

socket.once('close', () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ close: true })

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
Expand All @@ -88,6 +92,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('socket ended', maConn.remoteAddr.toString())
metrics?.increment({ end: true })
})

const maConn: MultiaddrConnection = {
Expand Down