Skip to content

Commit

Permalink
feat: export metrics (libp2p#71)
Browse files Browse the repository at this point in the history
Co-authored-by: David D <[email protected]>
Co-authored-by: chad <[email protected]>
  • Loading branch information
3 people authored May 9, 2023
1 parent f404e19 commit b3cb445
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 17 deletions.
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,13 @@
"lint": "aegir lint",
"lint:fix": "aegir lint --fix",
"clean": "aegir clean",
"dep-check": "aegir dep-check",
"dep-check": "aegir dep-check -i protons",
"release": "aegir release"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^11.0.4",
"@libp2p/interface-connection": "^5.0.2",
"@libp2p/interface-metrics": "^4.0.8",
"@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-peer-store": "^2.0.2",
"@libp2p/interface-registrar": "^2.0.12",
Expand Down Expand Up @@ -169,10 +170,11 @@
"@libp2p/peer-id-factory": "^2.0.3",
"@protobuf-ts/protoc": "^2.9.0",
"@types/sinon": "^10.0.14",
"aegir": "^39.0.5",
"aegir": "^39.0.6",
"eslint-plugin-etc": "^2.0.2",
"it-pair": "^2.0.6",
"protons": "^7.0.2",
"sinon": "^15.0.4"
"sinon": "^15.0.4",
"sinon-ts": "^1.0.0"
}
}
13 changes: 12 additions & 1 deletion src/maconn.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { logger } from '@libp2p/logger'
import { nopSink, nopSource } from './util.js'
import type { MultiaddrConnection, MultiaddrConnectionTimeline } from '@libp2p/interface-connection'
import type { CounterGroup } from '@libp2p/interface-metrics'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Source, Sink } from 'it-stream-types'

Expand All @@ -21,6 +22,11 @@ interface WebRTCMultiaddrConnectionInit {
* Holds the relevant events timestamps of the connection
*/
timeline: MultiaddrConnectionTimeline

/**
* Optional metrics counter group for this connection
*/
metrics?: CounterGroup
}

export class WebRTCMultiaddrConnection implements MultiaddrConnection {
Expand All @@ -39,6 +45,11 @@ export class WebRTCMultiaddrConnection implements MultiaddrConnection {
*/
timeline: MultiaddrConnectionTimeline

/**
* Optional metrics counter group for this connection
*/
metrics?: CounterGroup

/**
* The stream source, a no-op as the transport natively supports multiplexing
*/
Expand All @@ -59,10 +70,10 @@ export class WebRTCMultiaddrConnection implements MultiaddrConnection {
if (err !== undefined) {
log.error('error closing connection', err)
}

log.trace('closing connection')

this.timeline.close = Date.now()
this.peerConnection.close()
this.metrics?.increment({ close: true })
}
}
27 changes: 23 additions & 4 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import { WebRTCStream } from './stream.js'
import { nopSink, nopSource } from './util.js'
import type { Stream } from '@libp2p/interface-connection'
import type { CounterGroup } from '@libp2p/interface-metrics'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
import type { Source, Sink } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface DataChannelMuxerFactoryInit {
peerConnection: RTCPeerConnection
metrics?: CounterGroup
}

export class DataChannelMuxerFactory implements StreamMuxerFactory {
/**
* WebRTC Peer Connection
*/
private readonly peerConnection: RTCPeerConnection
private streamBuffer: WebRTCStream[] = []
private readonly metrics?: CounterGroup

constructor (peerConnection: RTCPeerConnection, readonly protocol = '/webrtc') {
constructor (peerConnection: RTCPeerConnection, metrics?: CounterGroup, readonly protocol = '/webrtc') {
this.peerConnection = peerConnection
// store any datachannels opened before upgrade has been completed
this.peerConnection.ondatachannel = ({ channel }) => {
Expand All @@ -28,10 +35,11 @@ export class DataChannelMuxerFactory implements StreamMuxerFactory {
})
this.streamBuffer.push(stream)
}
this.metrics = metrics
}

createStreamMuxer (init?: StreamMuxerInit | undefined): StreamMuxer {
return new DataChannelMuxer(this.peerConnection, this.streamBuffer, this.protocol, init)
return new DataChannelMuxer(this.peerConnection, this.streamBuffer, this.protocol, init, this.metrics)
}
}

Expand All @@ -44,6 +52,11 @@ export class DataChannelMuxer implements StreamMuxer {
*/
private readonly peerConnection: RTCPeerConnection

/**
* Optional metrics for this data channel muxer
*/
private readonly metrics?: CounterGroup

/**
* Array of streams in the data channel
*/
Expand All @@ -69,7 +82,7 @@ export class DataChannelMuxer implements StreamMuxer {
*/
sink: Sink<Source<Uint8Array | Uint8ArrayList>, Promise<void>> = nopSink

constructor (peerConnection: RTCPeerConnection, streams: Stream[], readonly protocol = '/webrtc', init?: StreamMuxerInit) {
constructor (peerConnection: RTCPeerConnection, streams: Stream[], readonly protocol: string = '/webrtc', init?: StreamMuxerInit, metrics?: CounterGroup) {
/**
* Initialized stream muxer
*/
Expand Down Expand Up @@ -100,6 +113,7 @@ export class DataChannelMuxer implements StreamMuxer {

this.streams.push(stream)
if ((init?.onIncomingStream) != null) {
this.metrics?.increment({ incoming_stream: true })
init.onIncomingStream(stream)
}
}
Expand All @@ -120,6 +134,10 @@ export class DataChannelMuxer implements StreamMuxer {
newStream (): Stream {
// The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label
const channel = this.peerConnection.createDataChannel('')
const closeCb = (stream: Stream): void => {
this.metrics?.increment({ stream_end: true })
this.init?.onStreamEnd?.(stream)
}
const stream = new WebRTCStream({
channel,
stat: {
Expand All @@ -128,9 +146,10 @@ export class DataChannelMuxer implements StreamMuxer {
open: 0
}
},
closeCb: this.wrapStreamEnd(this.init?.onStreamEnd)
closeCb: this.wrapStreamEnd(closeCb)
})
this.streams.push(stream)
this.metrics?.increment({ outgoing_stream: true })

return stream
}
Expand Down
30 changes: 24 additions & 6 deletions src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { WebRTCStream } from './stream.js'
import { genUfrag } from './util.js'
import type { WebRTCDialOptions } from './options.js'
import type { Connection } from '@libp2p/interface-connection'
import type { CounterGroup, Metrics } from '@libp2p/interface-metrics'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'

Expand Down Expand Up @@ -42,19 +43,29 @@ export const CERTHASH_CODE: number = protocols('certhash').code
/**
* The peer for this transport
*/
// @TODO(ddimaria): seems like an unnessary abstraction, consider removing
export interface WebRTCDirectTransportComponents {
peerId: PeerId
metrics?: Metrics
}

export interface WebRTCMetrics {
dialerEvents: CounterGroup
}

export class WebRTCDirectTransport implements Transport {
/**
* The peer for this transport
*/
private readonly metrics?: WebRTCMetrics
private readonly components: WebRTCDirectTransportComponents

constructor (components: WebRTCDirectTransportComponents) {
this.components = components
if (components.metrics != null) {
this.metrics = {
dialerEvents: components.metrics.registerCounterGroup('libp2p_webrtc_dialer_events_total', {
label: 'event',
help: 'Total count of WebRTC dial events by type'
})
}
}
}

/**
Expand Down Expand Up @@ -125,6 +136,7 @@ export class WebRTCDirectTransport implements Transport {
const handshakeTimeout = setTimeout(() => {
const error = `Data channel was never opened: state: ${handshakeDataChannel.readyState}`
log.error(error)
this.metrics?.dialerEvents.increment({ open_error: true })
reject(dataChannelError('data', error))
}, HANDSHAKE_TIMEOUT_MS)

Expand All @@ -139,6 +151,8 @@ export class WebRTCDirectTransport implements Transport {
const errorTarget = event.target?.toString() ?? 'not specified'
const error = `Error opening a data channel for handshaking: ${errorTarget}`
log.error(error)
// NOTE: We use unknown error here but this could potentially be considered a reset by some standards.
this.metrics?.dialerEvents.increment({ unknown_error: true })
reject(dataChannelError('data', error))
}
})
Expand Down Expand Up @@ -193,7 +207,8 @@ export class WebRTCDirectTransport implements Transport {
remoteAddr: ma,
timeline: {
open: Date.now()
}
},
metrics: this.metrics?.dialerEvents
})

const eventListeningName = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange'
Expand All @@ -215,7 +230,10 @@ export class WebRTCDirectTransport implements Transport {
}
}, { signal })

const muxerFactory = new DataChannelMuxerFactory(peerConnection)
// Track opened peer connection
this.metrics?.dialerEvents.increment({ peer_connection: true })

const muxerFactory = new DataChannelMuxerFactory(peerConnection, this.metrics?.dialerEvents)

// For outbound connections, the remote is expected to start the noise handshake.
// Therefore, we need to secure an inbound noise connection from the remote.
Expand Down
10 changes: 9 additions & 1 deletion test/maconn.browser.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,33 @@

import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import { stubObject } from 'sinon-ts'
import { WebRTCMultiaddrConnection } from './../src/maconn.js'
import type { CounterGroup } from '@libp2p/interface-metrics'

describe('Multiaddr Connection', () => {
it('can open and close', async () => {
const peerConnection = new RTCPeerConnection()
peerConnection.createDataChannel('whatever', { negotiated: true, id: 91 })
const remoteAddr = multiaddr('/ip4/1.2.3.4/udp/1234/webrtc/certhash/uEiAUqV7kzvM1wI5DYDc1RbcekYVmXli_Qprlw3IkiEg6tQ')
const metrics = stubObject<CounterGroup>({
increment: () => {},
reset: () => {}
})
const maConn = new WebRTCMultiaddrConnection({
peerConnection,
remoteAddr,
timeline: {
open: (new Date()).getTime()
}
},
metrics
})

expect(maConn.timeline.close).to.be.undefined

await maConn.close()

expect(maConn.timeline.close).to.not.be.undefined
expect(metrics.increment.calledWith({ close: true })).to.be.true
})
})
8 changes: 6 additions & 2 deletions test/transport.browser.spec.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
/* eslint-disable @typescript-eslint/no-floating-promises */

import { mockUpgrader } from '@libp2p/interface-mocks'
import { mockMetrics, mockUpgrader } from '@libp2p/interface-mocks'
import { type CreateListenerOptions, symbol } from '@libp2p/interface-transport'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { multiaddr, type Multiaddr } from '@multiformats/multiaddr'
import { expect, assert } from 'aegir/chai'
import { UnimplementedError } from './../src/error.js'
import * as underTest from './../src/transport.js'
import { expectError } from './util.js'
import type { Metrics } from '@libp2p/interface-metrics'

function ignoredDialOption (): CreateListenerOptions {
const upgrader = mockUpgrader({})
return { upgrader }
}

describe('WebRTC Transport', () => {
let metrics: Metrics
let components: underTest.WebRTCDirectTransportComponents

before(async () => {
metrics = mockMetrics()()
components = {
peerId: await createEd25519PeerId()
peerId: await createEd25519PeerId(),
metrics
}
})

Expand Down

0 comments on commit b3cb445

Please sign in to comment.