Skip to content

Commit

Permalink
feat(p2p): snappy compress p2p messages (#10417)
Browse files Browse the repository at this point in the history
fixes: #10348
  • Loading branch information
Maddiaa0 authored Dec 5, 2024
1 parent c246aba commit c643a54
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 49 deletions.
1 change: 1 addition & 0 deletions yarn-project/p2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
"libp2p": "1.5.0",
"semver": "^7.6.0",
"sha3": "^2.1.4",
"snappy": "^7.2.2",
"tslib": "^2.4.0",
"xxhash-wasm": "^1.1.0"
},
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ export type ReqRespNode = {
export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Uint8Array.from(Buffer.from('tx'))),
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Buffer.from('tx')),
};

// By default, all requests are valid
Expand Down
13 changes: 13 additions & 0 deletions yarn-project/p2p/src/service/encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import { sha256 } from '@aztec/foundation/crypto';

import { type RPC } from '@chainsafe/libp2p-gossipsub/message';
import { type DataTransform } from '@chainsafe/libp2p-gossipsub/types';
import { type Message } from '@libp2p/interface';
import { compressSync, uncompressSync } from 'snappy';
import xxhashFactory from 'xxhash-wasm';

// Load WASM
Expand Down Expand Up @@ -46,3 +48,14 @@ export function getMsgIdFn(message: Message) {
const vec = [Buffer.from(topic), message.data];
return sha256(Buffer.concat(vec)).subarray(0, 20);
}

export class SnappyTransform implements DataTransform {
inboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
const uncompressed = Buffer.from(uncompressSync(Buffer.from(data), { asBuffer: true }));
return new Uint8Array(uncompressed);
}

outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
return new Uint8Array(compressSync(Buffer.from(data)));
}
}
9 changes: 5 additions & 4 deletions yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import {
} from '../tx_validator/index.js';
import { type PubSubLibp2p, convertToMultiaddr } from '../util.js';
import { AztecDatastore } from './data_store.js';
import { fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js';
import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js';
import { PeerManager } from './peer_manager.js';
import { PeerErrorSeverity } from './peer_scoring.js';
import { pingHandler, statusHandler } from './reqresp/handlers.js';
Expand Down Expand Up @@ -246,6 +246,7 @@ export class LibP2PService extends WithTracer implements P2PService {
msgIdFn: getMsgIdFn,
msgIdToStrFn: msgIdToStrFn,
fastMsgIdFn: fastMsgIdFn,
dataTransform: new SnappyTransform(),
metricsRegister: otelMetricsAdapter,
metricsTopicStrToLabel: metricsTopicStrToLabels(),
scoreParams: createPeerScoreParams({
Expand Down Expand Up @@ -282,11 +283,11 @@ export class LibP2PService extends WithTracer implements P2PService {
* @param msg - the tx request message
* @returns the tx response message
*/
const txHandler = (msg: Buffer): Promise<Uint8Array> => {
const txHandler = (msg: Buffer): Promise<Buffer> => {
const txHash = TxHash.fromBuffer(msg);
const foundTx = mempools.txPool.getTxByHash(txHash);
const asUint8Array = Uint8Array.from(foundTx ? foundTx.toBuffer() : Buffer.alloc(0));
return Promise.resolve(asUint8Array);
const buf = foundTx ? foundTx.toBuffer() : Buffer.alloc(0);
return Promise.resolve(buf);
};

const requestResponseHandlers = {
Expand Down
8 changes: 4 additions & 4 deletions yarn-project/p2p/src/service/reqresp/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* @param _msg - The ping request message.
* @returns A resolved promise with the pong response.
*/
export function pingHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('pong')));
export function pingHandler(_msg: any): Promise<Buffer> {
return Promise.resolve(Buffer.from('pong'));
}

/**
* Handles the status request.
* @param _msg - The status request message.
* @returns A resolved promise with the ok response.
*/
export function statusHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('ok')));
export function statusHandler(_msg: any): Promise<Buffer> {
return Promise.resolve(Buffer.from('ok'));
}
6 changes: 3 additions & 3 deletions yarn-project/p2p/src/service/reqresp/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export type ReqRespSubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL |
* A handler for a sub protocol
* The message will arrive as a buffer, and the handler must return a buffer
*/
export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise<Uint8Array>;
export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise<Buffer>;

/**
* A type mapping from supprotocol to it's rate limits
Expand Down Expand Up @@ -83,8 +83,8 @@ export type SubProtocolMap = {
* Default handler for unimplemented sub protocols, this SHOULD be overwritten
* by the service, but is provided as a fallback
*/
const defaultHandler = (_msg: any): Promise<Uint8Array> => {
return Promise.resolve(Uint8Array.from(Buffer.from('unimplemented')));
const defaultHandler = (_msg: any): Promise<Buffer> => {
return Promise.resolve(Buffer.from('unimplemented'));
};

/**
Expand Down
60 changes: 25 additions & 35 deletions yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../
import {
MOCK_SUB_PROTOCOL_HANDLERS,
MOCK_SUB_PROTOCOL_VALIDATORS,
type ReqRespNode,
connectToPeers,
createNodes,
startNodes,
Expand All @@ -23,15 +24,22 @@ const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping'));
// and ask for specific data that they missed via the traditional gossip protocol.
describe('ReqResp', () => {
let peerManager: MockProxy<PeerManager>;
let nodes: ReqRespNode[];

beforeEach(() => {
peerManager = mock<PeerManager>();
});

afterEach(async () => {
if (nodes) {
await stopNodes(nodes as ReqRespNode[]);
}
});

it('Should perform a ping request', async () => {
// Create two nodes
// They need to discover each other
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);
const { req: pinger } = nodes[0];

await startNodes(nodes);
Expand All @@ -45,12 +53,10 @@ describe('ReqResp', () => {

await sleep(500);
expect(res?.toBuffer().toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});

it('Should handle gracefully if a peer connected peer is offline', async () => {
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

const { req: pinger } = nodes[0];
const { req: ponger } = nodes[1];
Expand All @@ -66,12 +72,10 @@ describe('ReqResp', () => {
const res = await pinger.sendRequest(PING_PROTOCOL, PING_REQUEST);

expect(res).toBeUndefined();

await stopNodes(nodes);
});

it('Should request from a later peer if other peers are offline', async () => {
const nodes = await createNodes(peerManager, 4);
nodes = await createNodes(peerManager, 4);

await startNodes(nodes);
await sleep(500);
Expand All @@ -86,12 +90,10 @@ describe('ReqResp', () => {
const res = await nodes[0].req.sendRequest(PING_PROTOCOL, PING_REQUEST);

expect(res?.toBuffer().toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});

it('Should hit a rate limit if too many requests are made in quick succession', async () => {
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes);

Expand All @@ -110,8 +112,6 @@ describe('ReqResp', () => {
// Make sure the error message is logged
const errorMessage = `Rate limit exceeded for ${PING_PROTOCOL} from ${nodes[0].p2p.peerId.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});

describe('TX REQ PROTOCOL', () => {
Expand All @@ -120,15 +120,15 @@ describe('ReqResp', () => {
const txHash = tx.getTxHash();

const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Uint8Array> => {
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Buffer> => {
const receivedHash = TxHash.fromBuffer(message);
if (txHash.equals(receivedHash)) {
return Promise.resolve(Uint8Array.from(tx.toBuffer()));
return Promise.resolve(tx.toBuffer());
}
return Promise.resolve(Uint8Array.from(Buffer.from('')));
return Promise.resolve(Buffer.from(''));
};

const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers);
await sleep(500);
Expand All @@ -137,8 +137,6 @@ describe('ReqResp', () => {

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash);
expect(res).toEqual(tx);

await stopNodes(nodes);
});

it('Does not crash if tx hash returns undefined', async () => {
Expand All @@ -147,11 +145,11 @@ describe('ReqResp', () => {

const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
// Return nothing
protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise<Uint8Array> => {
return Promise.resolve(Uint8Array.from(Buffer.from('')));
protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise<Buffer> => {
return Promise.resolve(Buffer.from(''));
};

const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers);
await sleep(500);
Expand All @@ -160,12 +158,10 @@ describe('ReqResp', () => {

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash);
expect(res).toBeUndefined();

await stopNodes(nodes);
});

it('Should hit individual timeout if nothing is returned over the stream', async () => {
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes);

Expand Down Expand Up @@ -197,12 +193,10 @@ describe('ReqResp', () => {
}),
PeerErrorSeverity.HighToleranceError,
);

await stopNodes(nodes);
});

it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => {
const nodes = await createNodes(peerManager, 4);
nodes = await createNodes(peerManager, 4);

await startNodes(nodes);

Expand All @@ -226,8 +220,6 @@ describe('ReqResp', () => {
// Make sure the error message is logged
const errorMessage = `${new CollectiveReqRespTimeoutError().message} | subProtocol: ${TX_REQ_PROTOCOL}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});

it('Should penalize peer if transaction validation fails', async () => {
Expand All @@ -236,12 +228,12 @@ describe('ReqResp', () => {

// Mock that the node will respond with the tx
const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Uint8Array> => {
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Buffer> => {
const receivedHash = TxHash.fromBuffer(message);
if (txHash.equals(receivedHash)) {
return Promise.resolve(Uint8Array.from(tx.toBuffer()));
return Promise.resolve(tx.toBuffer());
}
return Promise.resolve(Uint8Array.from(Buffer.from('')));
return Promise.resolve(Buffer.from(''));
};

// Mock that the receiving node will find that the transaction is invalid
Expand All @@ -251,7 +243,7 @@ describe('ReqResp', () => {
return Promise.resolve(false);
};

const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers, protocolValidators);
await sleep(500);
Expand All @@ -268,8 +260,6 @@ describe('ReqResp', () => {
}),
PeerErrorSeverity.LowToleranceError,
);

await stopNodes(nodes);
});
});
});
9 changes: 7 additions & 2 deletions yarn-project/p2p/src/service/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { executeTimeoutWithCustomError } from '@aztec/foundation/timer';
import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface';
import { pipe } from 'it-pipe';
import { type Libp2p } from 'libp2p';
import { compressSync, uncompressSync } from 'snappy';
import { type Uint8ArrayList } from 'uint8arraylist';

import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js';
Expand All @@ -31,6 +32,9 @@ import { RequestResponseRateLimiter } from './rate_limiter/rate_limiter.js';
* This service implements the request response sub protocol, it is heavily inspired from
* ethereum implementations of the same name.
*
* Note, responses get compressed in streamHandler
* so they get decompressed in readMessage
*
* see: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-reqresp-domain
*/
export class ReqResp {
Expand Down Expand Up @@ -232,7 +236,7 @@ export class ReqResp {
chunks.push(chunk.subarray());
}
const messageData = chunks.concat();
return Buffer.concat(messageData);
return uncompressSync(Buffer.concat(messageData), { asBuffer: true }) as Buffer;
}

/**
Expand Down Expand Up @@ -269,7 +273,8 @@ export class ReqResp {
async function* (source: any) {
for await (const chunkList of source) {
const msg = Buffer.from(chunkList.subarray());
yield handler(msg);
const response = await handler(msg);
yield new Uint8Array(compressSync(response));
}
},
stream,
Expand Down
Loading

0 comments on commit c643a54

Please sign in to comment.