Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): snappy compress p2p messages #10417

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yarn-project/p2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
"libp2p": "1.5.0",
"semver": "^7.6.0",
"sha3": "^2.1.4",
"snappy": "^7.2.2",
"tslib": "^2.4.0",
"xxhash-wasm": "^1.1.0"
},
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ export type ReqRespNode = {
export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Uint8Array.from(Buffer.from('tx'))),
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Buffer.from('tx')),
};

// By default, all requests are valid
Expand Down
13 changes: 13 additions & 0 deletions yarn-project/p2p/src/service/encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import { sha256 } from '@aztec/foundation/crypto';

import { type RPC } from '@chainsafe/libp2p-gossipsub/message';
import { type DataTransform } from '@chainsafe/libp2p-gossipsub/types';
import { type Message } from '@libp2p/interface';
import { compressSync, uncompressSync } from 'snappy';
import xxhashFactory from 'xxhash-wasm';

// Load WASM
Expand Down Expand Up @@ -46,3 +48,14 @@ export function getMsgIdFn(message: Message) {
const vec = [Buffer.from(topic), message.data];
return sha256(Buffer.concat(vec)).subarray(0, 20);
}

export class SnappyTransform implements DataTransform {
inboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
const uncompressed = Buffer.from(uncompressSync(Buffer.from(data), { asBuffer: true }));
return new Uint8Array(uncompressed);
}

outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
return new Uint8Array(compressSync(Buffer.from(data)));
}
}
9 changes: 5 additions & 4 deletions yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import {
} from '../tx_validator/index.js';
import { type PubSubLibp2p, convertToMultiaddr } from '../util.js';
import { AztecDatastore } from './data_store.js';
import { fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js';
import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js';
import { PeerManager } from './peer_manager.js';
import { PeerErrorSeverity } from './peer_scoring.js';
import { pingHandler, statusHandler } from './reqresp/handlers.js';
Expand Down Expand Up @@ -246,6 +246,7 @@ export class LibP2PService extends WithTracer implements P2PService {
msgIdFn: getMsgIdFn,
msgIdToStrFn: msgIdToStrFn,
fastMsgIdFn: fastMsgIdFn,
dataTransform: new SnappyTransform(),
metricsRegister: otelMetricsAdapter,
metricsTopicStrToLabel: metricsTopicStrToLabels(),
scoreParams: createPeerScoreParams({
Expand Down Expand Up @@ -282,11 +283,11 @@ export class LibP2PService extends WithTracer implements P2PService {
* @param msg - the tx request message
* @returns the tx response message
*/
const txHandler = (msg: Buffer): Promise<Uint8Array> => {
const txHandler = (msg: Buffer): Promise<Buffer> => {
const txHash = TxHash.fromBuffer(msg);
const foundTx = mempools.txPool.getTxByHash(txHash);
const asUint8Array = Uint8Array.from(foundTx ? foundTx.toBuffer() : Buffer.alloc(0));
return Promise.resolve(asUint8Array);
const buf = foundTx ? foundTx.toBuffer() : Buffer.alloc(0);
return Promise.resolve(buf);
};

const requestResponseHandlers = {
Expand Down
8 changes: 4 additions & 4 deletions yarn-project/p2p/src/service/reqresp/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* @param _msg - The ping request message.
* @returns A resolved promise with the pong response.
*/
export function pingHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('pong')));
export function pingHandler(_msg: any): Promise<Buffer> {
return Promise.resolve(Buffer.from('pong'));
}

/**
* Handles the status request.
* @param _msg - The status request message.
* @returns A resolved promise with the ok response.
*/
export function statusHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('ok')));
export function statusHandler(_msg: any): Promise<Buffer> {
return Promise.resolve(Buffer.from('ok'));
}
6 changes: 3 additions & 3 deletions yarn-project/p2p/src/service/reqresp/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export type ReqRespSubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL |
* A handler for a sub protocol
* The message will arrive as a buffer, and the handler must return a buffer
*/
export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise<Uint8Array>;
export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise<Buffer>;

/**
* A type mapping from supprotocol to it's rate limits
Expand Down Expand Up @@ -83,8 +83,8 @@ export type SubProtocolMap = {
* Default handler for unimplemented sub protocols, this SHOULD be overwritten
* by the service, but is provided as a fallback
*/
const defaultHandler = (_msg: any): Promise<Uint8Array> => {
return Promise.resolve(Uint8Array.from(Buffer.from('unimplemented')));
const defaultHandler = (_msg: any): Promise<Buffer> => {
return Promise.resolve(Buffer.from('unimplemented'));
};

/**
Expand Down
60 changes: 25 additions & 35 deletions yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../
import {
MOCK_SUB_PROTOCOL_HANDLERS,
MOCK_SUB_PROTOCOL_VALIDATORS,
type ReqRespNode,
connectToPeers,
createNodes,
startNodes,
Expand All @@ -23,15 +24,22 @@ const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping'));
// and ask for specific data that they missed via the traditional gossip protocol.
describe('ReqResp', () => {
let peerManager: MockProxy<PeerManager>;
let nodes: ReqRespNode[];

beforeEach(() => {
peerManager = mock<PeerManager>();
});

afterEach(async () => {
if (nodes) {
await stopNodes(nodes as ReqRespNode[]);
}
});

it('Should perform a ping request', async () => {
// Create two nodes
// They need to discover each other
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);
const { req: pinger } = nodes[0];

await startNodes(nodes);
Expand All @@ -45,12 +53,10 @@ describe('ReqResp', () => {

await sleep(500);
expect(res?.toBuffer().toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});

it('Should handle gracefully if a peer connected peer is offline', async () => {
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

const { req: pinger } = nodes[0];
const { req: ponger } = nodes[1];
Expand All @@ -66,12 +72,10 @@ describe('ReqResp', () => {
const res = await pinger.sendRequest(PING_PROTOCOL, PING_REQUEST);

expect(res).toBeUndefined();

await stopNodes(nodes);
});

it('Should request from a later peer if other peers are offline', async () => {
const nodes = await createNodes(peerManager, 4);
nodes = await createNodes(peerManager, 4);

await startNodes(nodes);
await sleep(500);
Expand All @@ -86,12 +90,10 @@ describe('ReqResp', () => {
const res = await nodes[0].req.sendRequest(PING_PROTOCOL, PING_REQUEST);

expect(res?.toBuffer().toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});

it('Should hit a rate limit if too many requests are made in quick succession', async () => {
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes);

Expand All @@ -110,8 +112,6 @@ describe('ReqResp', () => {
// Make sure the error message is logged
const errorMessage = `Rate limit exceeded for ${PING_PROTOCOL} from ${nodes[0].p2p.peerId.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});

describe('TX REQ PROTOCOL', () => {
Expand All @@ -120,15 +120,15 @@ describe('ReqResp', () => {
const txHash = tx.getTxHash();

const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Uint8Array> => {
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Buffer> => {
const receivedHash = TxHash.fromBuffer(message);
if (txHash.equals(receivedHash)) {
return Promise.resolve(Uint8Array.from(tx.toBuffer()));
return Promise.resolve(tx.toBuffer());
}
return Promise.resolve(Uint8Array.from(Buffer.from('')));
return Promise.resolve(Buffer.from(''));
};

const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers);
await sleep(500);
Expand All @@ -137,8 +137,6 @@ describe('ReqResp', () => {

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash);
expect(res).toEqual(tx);

await stopNodes(nodes);
});

it('Does not crash if tx hash returns undefined', async () => {
Expand All @@ -147,11 +145,11 @@ describe('ReqResp', () => {

const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
// Return nothing
protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise<Uint8Array> => {
return Promise.resolve(Uint8Array.from(Buffer.from('')));
protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise<Buffer> => {
return Promise.resolve(Buffer.from(''));
};

const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers);
await sleep(500);
Expand All @@ -160,12 +158,10 @@ describe('ReqResp', () => {

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash);
expect(res).toBeUndefined();

await stopNodes(nodes);
});

it('Should hit individual timeout if nothing is returned over the stream', async () => {
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes);

Expand Down Expand Up @@ -197,12 +193,10 @@ describe('ReqResp', () => {
}),
PeerErrorSeverity.HighToleranceError,
);

await stopNodes(nodes);
});

it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => {
const nodes = await createNodes(peerManager, 4);
nodes = await createNodes(peerManager, 4);

await startNodes(nodes);

Expand All @@ -226,8 +220,6 @@ describe('ReqResp', () => {
// Make sure the error message is logged
const errorMessage = `${new CollectiveReqRespTimeoutError().message} | subProtocol: ${TX_REQ_PROTOCOL}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});

it('Should penalize peer if transaction validation fails', async () => {
Expand All @@ -236,12 +228,12 @@ describe('ReqResp', () => {

// Mock that the node will respond with the tx
const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Uint8Array> => {
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Buffer> => {
const receivedHash = TxHash.fromBuffer(message);
if (txHash.equals(receivedHash)) {
return Promise.resolve(Uint8Array.from(tx.toBuffer()));
return Promise.resolve(tx.toBuffer());
}
return Promise.resolve(Uint8Array.from(Buffer.from('')));
return Promise.resolve(Buffer.from(''));
};

// Mock that the receiving node will find that the transaction is invalid
Expand All @@ -251,7 +243,7 @@ describe('ReqResp', () => {
return Promise.resolve(false);
};

const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers, protocolValidators);
await sleep(500);
Expand All @@ -268,8 +260,6 @@ describe('ReqResp', () => {
}),
PeerErrorSeverity.LowToleranceError,
);

await stopNodes(nodes);
});
});
});
9 changes: 7 additions & 2 deletions yarn-project/p2p/src/service/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { executeTimeoutWithCustomError } from '@aztec/foundation/timer';
import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface';
import { pipe } from 'it-pipe';
import { type Libp2p } from 'libp2p';
import { compressSync, uncompressSync } from 'snappy';
import { type Uint8ArrayList } from 'uint8arraylist';

import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js';
Expand All @@ -31,6 +32,9 @@ import { RequestResponseRateLimiter } from './rate_limiter/rate_limiter.js';
* This service implements the request response sub protocol, it is heavily inspired from
* ethereum implementations of the same name.
*
* Note, responses get compressed in streamHandler
* so they get decompressed in readMessage
*
* see: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-reqresp-domain
*/
export class ReqResp {
Expand Down Expand Up @@ -232,7 +236,7 @@ export class ReqResp {
chunks.push(chunk.subarray());
}
const messageData = chunks.concat();
return Buffer.concat(messageData);
return uncompressSync(Buffer.concat(messageData), { asBuffer: true }) as Buffer;
}

/**
Expand Down Expand Up @@ -269,7 +273,8 @@ export class ReqResp {
async function* (source: any) {
for await (const chunkList of source) {
const msg = Buffer.from(chunkList.subarray());
yield handler(msg);
const response = await handler(msg);
yield new Uint8Array(compressSync(response));
}
},
stream,
Expand Down
Loading
Loading