Skip to content

Commit

Permalink
feat!(@libp2p/protocol-perf): continously measure perf (libp2p#2064)
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Sep 21, 2023
1 parent 6640116 commit c8178d4
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 16 deletions.
87 changes: 79 additions & 8 deletions packages/protocol-perf/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* ```
*
* The `measurePerformance` function can be used to measure the latency and throughput of a connection.
* server. This will not work in browsers.
* server.
*
* @example
*
Expand Down Expand Up @@ -52,12 +52,19 @@ import type { AbortOptions } from '@libp2p/interfaces'
const log = logger('libp2p:perf')

export const defaultInit: PerfServiceInit = {
protocolName: '/perf/1.0.0',
protocolName: '/perf/1.2.0',
writeBlockSize: BigInt(64 << 10)
}

export interface PerfService {
measurePerformance: (startTime: number, connection: Connection, sendBytes: bigint, recvBytes: bigint, options?: AbortOptions) => Promise<number>
measurePerformance: (connection: Connection, sendBytes: bigint, recvBytes: bigint, options?: AbortOptions) => Promise<PerfOutput>
}

export interface PerfOutput {
type: 'intermediary' | 'final'
timeSeconds: number
uploadBytes: string
downloadBytes: string
}

export interface PerfServiceInit {
Expand Down Expand Up @@ -126,19 +133,52 @@ class DefaultPerfService implements Startable, PerfService {
throw new Error('bytesToSendBack was not set')
}

let lastAmountOfBytesSent = BigInt(0)
let lastReportedTime = Date.now()
let initialStartTime = Date.now()
let totalBytesSent = BigInt(0)

await stream.sink(async function * () {

while (bytesToSendBack > 0n) {
let toSend: bigint = writeBlockSize
if (toSend > bytesToSendBack) {
toSend = bytesToSendBack
}

bytesToSendBack = bytesToSendBack - toSend
yield uint8Buf.subarray(0, Number(toSend))

if (Date.now() - lastReportedTime > 1000) {
const output: PerfOutput = {
type: 'intermediary',
timeSeconds: (Date.now() - lastReportedTime) / 1000,
uploadBytes: BigInt(0).toString(),
downloadBytes: lastAmountOfBytesSent.toString()
}

console.log(JSON.stringify(output))

lastReportedTime = Date.now()
lastAmountOfBytesSent = BigInt(0)
}

lastAmountOfBytesSent += toSend
totalBytesSent += toSend
}
}())

const finalOutput: PerfOutput = {
type: 'final',
timeSeconds: (Date.now() - initialStartTime) / 1000,
uploadBytes: BigInt(0).toString(),
downloadBytes: totalBytesSent.toString()
}

console.log(JSON.stringify(finalOutput))
}

async measurePerformance (startTime: number, connection: Connection, sendBytes: bigint, recvBytes: bigint, options: AbortOptions = {}): Promise<number> {
async measurePerformance (connection: Connection, sendBytes: bigint, recvBytes: bigint, options: AbortOptions = {}): Promise<PerfOutput> {
log('opening stream on protocol %s to %p', this.protocol, connection.remotePeer)

const uint8Buf = new Uint8Array(this.databuf)
Expand All @@ -147,24 +187,49 @@ class DefaultPerfService implements Startable, PerfService {

const stream = await connection.newStream([this.protocol], options)

let lastAmountOfBytesSent = BigInt(0)
let lastReportedTime = Date.now()
let totalBytesSent = BigInt(0)
let initialStartTime = Date.now()

// Convert sendBytes to uint64 big endian buffer
const view = new DataView(this.databuf)
view.setBigInt64(0, recvBytes, false)

log('sending %i bytes to %p', sendBytes, connection.remotePeer)

try {

await stream.sink((async function * () {
// Send the number of bytes to receive
yield uint8Buf.subarray(0, 8)
// Send the number of bytes to send

while (sendBytes > 0n) {
let toSend: bigint = writeBlockSize
if (toSend > sendBytes) {
toSend = sendBytes
}
sendBytes = sendBytes - toSend
yield uint8Buf.subarray(0, Number(toSend))

if (Date.now() - lastReportedTime > 1000) {
const output: PerfOutput = {
type: 'intermediary',
timeSeconds: (Date.now() - lastReportedTime) / 1000,
uploadBytes: lastAmountOfBytesSent.toString(),
downloadBytes: BigInt(0).toString()
}

lastReportedTime = Date.now()
lastAmountOfBytesSent = BigInt(0)

console.log(JSON.stringify(output))
}

lastAmountOfBytesSent += toSend
totalBytesSent += toSend
}

})())

// Read the received bytes
Expand All @@ -177,15 +242,21 @@ class DefaultPerfService implements Startable, PerfService {
throw new Error(`Expected to receive ${recvBytes} bytes, but received ${actualRecvdBytes}`)
}
} catch (err) {
log('error sending %i bytes to %p: %s', sendBytes, connection.remotePeer, err)
log('error sending %s bytes to %p: %s', totalBytesSent, connection.remotePeer, err)
throw err
} finally {
log('performed %s to %p', this.protocol, connection.remotePeer)
await stream.close()
}

// Return the latency
return Date.now() - startTime
const finalOutput: PerfOutput = {
type: 'final',
timeSeconds: (Date.now() - initialStartTime) / 1000,
uploadBytes: totalBytesSent.toString(),
downloadBytes: BigInt(0).toString()
}

return finalOutput
}
}

Expand Down
9 changes: 4 additions & 5 deletions packages/protocol-perf/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,13 @@ export async function main (runServer: boolean, serverIpAddress: string, transpo

await node.start()

const startTime = Date.now()

if (!runServer) {
const connection = await node.dial(multiaddr(tcpMultiaddrAddress))
const duration = await node.services.perf.measurePerformance(startTime, connection, BigInt(uploadBytes), BigInt(downloadBytes))
// Output latency to stdout in seconds
const finalOutput = await node.services.perf.measurePerformance(connection, BigInt(uploadBytes), BigInt(downloadBytes))

// eslint-disable-next-line no-console
console.log(JSON.stringify({ latency: duration / 1000 }))
console.log(finalOutput)

await node.stop()
}
}
Expand Down
4 changes: 1 addition & 3 deletions packages/protocol-perf/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ describe('perf', () => {
localComponents.events.safeDispatchEvent('connection:open', { detail: localToRemote })
remoteComponents.events.safeDispatchEvent('connection:open', { detail: remoteToLocal })

const startTime = Date.now()

// Run Perf
await expect(client.measurePerformance(startTime, localToRemote, 1024n, 1024n)).to.eventually.be.fulfilled()
await expect(client.measurePerformance(localToRemote, 1024n, 1024n)).to.eventually.be.fulfilled()
})
})

0 comments on commit c8178d4

Please sign in to comment.