Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): snappy compress p2p messages #10417

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions yarn-project/circuit-types/src/p2p/block_proposal.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Buffer32 } from '@aztec/foundation/buffer';
import { recoverAddress } from '@aztec/foundation/crypto';
import { keccak256, recoverAddress } from '@aztec/foundation/crypto';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { Signature } from '@aztec/foundation/eth-signature';
import { type Fr } from '@aztec/foundation/fields';
Expand Down Expand Up @@ -42,7 +42,7 @@ export class BlockProposal extends Gossipable {
}

override p2pMessageIdentifier(): Buffer32 {
return BlockProposalHash.fromField(this.payload.archive);
return new BlockProposalHash(keccak256(this.signature.toBuffer()));
}

get archive(): Fr {
Expand Down
12 changes: 12 additions & 0 deletions yarn-project/circuit-types/src/p2p/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,15 @@ export const TopicTypeMap: Record<string, typeof Gossipable> = {
[TopicType.block_attestation]: BlockAttestation as unknown as typeof Gossipable,
[TopicType.epoch_proof_quote]: EpochProofQuote as unknown as typeof Gossipable,
};

/**
* Map from topic to deserialiser
*
* Used in msgIdFn libp2p to get the p2pMessageIdentifier from a message
*/
export const TopicToDeserializer = {
[Tx.p2pTopic]: Tx.fromBuffer,
[BlockProposal.p2pTopic]: BlockProposal.fromBuffer,
[BlockAttestation.p2pTopic]: BlockAttestation.fromBuffer,
[EpochProofQuote.p2pTopic]: EpochProofQuote.fromBuffer,
};
4 changes: 3 additions & 1 deletion yarn-project/p2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@
"libp2p": "1.5.0",
"semver": "^7.6.0",
"sha3": "^2.1.4",
"tslib": "^2.4.0"
"snappy": "^7.2.2",
"tslib": "^2.4.0",
"xxhash-wasm": "^1.1.0"
},
"devDependencies": {
"@aztec/archiver": "workspace:^",
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ export type ReqRespNode = {
export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Uint8Array.from(Buffer.from('tx'))),
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Buffer.from('tx')),
};

// By default, all requests are valid
Expand Down
61 changes: 61 additions & 0 deletions yarn-project/p2p/src/service/encoding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Taken from lodestar: https://github.com/ChainSafe/lodestar
import { sha256 } from '@aztec/foundation/crypto';

import { type RPC } from '@chainsafe/libp2p-gossipsub/message';
import { type DataTransform } from '@chainsafe/libp2p-gossipsub/types';
import { type Message } from '@libp2p/interface';
import { compressSync, uncompressSync } from 'snappy';
import xxhashFactory from 'xxhash-wasm';

// Load WASM
const xxhash = await xxhashFactory();

// Use salt to prevent msgId from being mined for collisions
const h64Seed = BigInt(Math.floor(Math.random() * 1e9));

// Shared buffer to convert msgId to string
const sharedMsgIdBuf = Buffer.alloc(20);

/**
* The function used to generate a gossipsub message id
* We use the first 8 bytes of SHA256(data) for content addressing
*/
export function fastMsgIdFn(rpcMsg: RPC.Message): string {
if (rpcMsg.data) {
return xxhash.h64Raw(rpcMsg.data, h64Seed).toString(16);
}
return '0000000000000000';
}

export function msgIdToStrFn(msgId: Uint8Array): string {
// This happens serially, no need to reallocate the buffer
sharedMsgIdBuf.set(msgId);
return `0x${sharedMsgIdBuf.toString('hex')}`;
}

/**
* Get the message identifier from a libp2p message
*
* Follows similarly to:
* https://github.com/ethereum/consensus-specs/blob/v1.1.0-alpha.7/specs/altair/p2p-interface.md#topics-and-messages
*
* @param message - The libp2p message
* @returns The message identifier
*/
export function getMsgIdFn(message: Message) {
const { topic } = message;

const vec = [Buffer.from(topic), message.data];
return sha256(Buffer.concat(vec)).subarray(0, 20);
}

export class SnappyTransform implements DataTransform {
inboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
const uncompressed = Buffer.from(uncompressSync(Buffer.from(data), { asBuffer: true }));
return new Uint8Array(uncompressed);
}

outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
return new Uint8Array(compressSync(Buffer.from(data)));
}
}
11 changes: 8 additions & 3 deletions yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
} from '../tx_validator/index.js';
import { type PubSubLibp2p, convertToMultiaddr } from '../util.js';
import { AztecDatastore } from './data_store.js';
import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js';
import { PeerManager } from './peer_manager.js';
import { PeerErrorSeverity } from './peer_scoring.js';
import { pingHandler, statusHandler } from './reqresp/handlers.js';
Expand Down Expand Up @@ -242,6 +243,10 @@ export class LibP2PService extends WithTracer implements P2PService {
heartbeatInterval: config.gossipsubInterval,
mcacheLength: config.gossipsubMcacheLength,
mcacheGossip: config.gossipsubMcacheGossip,
msgIdFn: getMsgIdFn,
msgIdToStrFn: msgIdToStrFn,
fastMsgIdFn: fastMsgIdFn,
dataTransform: new SnappyTransform(),
metricsRegister: otelMetricsAdapter,
metricsTopicStrToLabel: metricsTopicStrToLabels(),
scoreParams: createPeerScoreParams({
Expand Down Expand Up @@ -278,11 +283,11 @@ export class LibP2PService extends WithTracer implements P2PService {
* @param msg - the tx request message
* @returns the tx response message
*/
const txHandler = (msg: Buffer): Promise<Uint8Array> => {
const txHandler = (msg: Buffer): Promise<Buffer> => {
const txHash = TxHash.fromBuffer(msg);
const foundTx = mempools.txPool.getTxByHash(txHash);
const asUint8Array = Uint8Array.from(foundTx ? foundTx.toBuffer() : Buffer.alloc(0));
return Promise.resolve(asUint8Array);
const buf = foundTx ? foundTx.toBuffer() : Buffer.alloc(0);
return Promise.resolve(buf);
};

const requestResponseHandlers = {
Expand Down
8 changes: 4 additions & 4 deletions yarn-project/p2p/src/service/reqresp/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* @param _msg - The ping request message.
* @returns A resolved promise with the pong response.
*/
export function pingHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('pong')));
export function pingHandler(_msg: any): Promise<Buffer> {
return Promise.resolve(Buffer.from('pong'));
}

/**
* Handles the status request.
* @param _msg - The status request message.
* @returns A resolved promise with the ok response.
*/
export function statusHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('ok')));
export function statusHandler(_msg: any): Promise<Buffer> {
return Promise.resolve(Buffer.from('ok'));
}
6 changes: 3 additions & 3 deletions yarn-project/p2p/src/service/reqresp/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export type ReqRespSubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL |
* A handler for a sub protocol
* The message will arrive as a buffer, and the handler must return a buffer
*/
export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise<Uint8Array>;
export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise<Buffer>;

/**
* A type mapping from supprotocol to it's rate limits
Expand Down Expand Up @@ -83,8 +83,8 @@ export type SubProtocolMap = {
* Default handler for unimplemented sub protocols, this SHOULD be overwritten
* by the service, but is provided as a fallback
*/
const defaultHandler = (_msg: any): Promise<Uint8Array> => {
return Promise.resolve(Uint8Array.from(Buffer.from('unimplemented')));
const defaultHandler = (_msg: any): Promise<Buffer> => {
return Promise.resolve(Buffer.from('unimplemented'));
};

/**
Expand Down
60 changes: 25 additions & 35 deletions yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../
import {
MOCK_SUB_PROTOCOL_HANDLERS,
MOCK_SUB_PROTOCOL_VALIDATORS,
type ReqRespNode,
connectToPeers,
createNodes,
startNodes,
Expand All @@ -23,15 +24,22 @@ const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping'));
// and ask for specific data that they missed via the traditional gossip protocol.
describe('ReqResp', () => {
let peerManager: MockProxy<PeerManager>;
let nodes: ReqRespNode[];

beforeEach(() => {
peerManager = mock<PeerManager>();
});

afterEach(async () => {
if (nodes) {
await stopNodes(nodes as ReqRespNode[]);
}
});

it('Should perform a ping request', async () => {
// Create two nodes
// They need to discover each other
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);
const { req: pinger } = nodes[0];

await startNodes(nodes);
Expand All @@ -45,12 +53,10 @@ describe('ReqResp', () => {

await sleep(500);
expect(res?.toBuffer().toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});

it('Should handle gracefully if a peer connected peer is offline', async () => {
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

const { req: pinger } = nodes[0];
const { req: ponger } = nodes[1];
Expand All @@ -66,12 +72,10 @@ describe('ReqResp', () => {
const res = await pinger.sendRequest(PING_PROTOCOL, PING_REQUEST);

expect(res).toBeUndefined();

await stopNodes(nodes);
});

it('Should request from a later peer if other peers are offline', async () => {
const nodes = await createNodes(peerManager, 4);
nodes = await createNodes(peerManager, 4);

await startNodes(nodes);
await sleep(500);
Expand All @@ -86,12 +90,10 @@ describe('ReqResp', () => {
const res = await nodes[0].req.sendRequest(PING_PROTOCOL, PING_REQUEST);

expect(res?.toBuffer().toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});

it('Should hit a rate limit if too many requests are made in quick succession', async () => {
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes);

Expand All @@ -110,8 +112,6 @@ describe('ReqResp', () => {
// Make sure the error message is logged
const errorMessage = `Rate limit exceeded for ${PING_PROTOCOL} from ${nodes[0].p2p.peerId.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});

describe('TX REQ PROTOCOL', () => {
Expand All @@ -120,15 +120,15 @@ describe('ReqResp', () => {
const txHash = tx.getTxHash();

const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Uint8Array> => {
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Buffer> => {
const receivedHash = TxHash.fromBuffer(message);
if (txHash.equals(receivedHash)) {
return Promise.resolve(Uint8Array.from(tx.toBuffer()));
return Promise.resolve(tx.toBuffer());
}
return Promise.resolve(Uint8Array.from(Buffer.from('')));
return Promise.resolve(Buffer.from(''));
};

const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers);
await sleep(500);
Expand All @@ -137,8 +137,6 @@ describe('ReqResp', () => {

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash);
expect(res).toEqual(tx);

await stopNodes(nodes);
});

it('Does not crash if tx hash returns undefined', async () => {
Expand All @@ -147,11 +145,11 @@ describe('ReqResp', () => {

const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
// Return nothing
protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise<Uint8Array> => {
return Promise.resolve(Uint8Array.from(Buffer.from('')));
protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise<Buffer> => {
return Promise.resolve(Buffer.from(''));
};

const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers);
await sleep(500);
Expand All @@ -160,12 +158,10 @@ describe('ReqResp', () => {

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash);
expect(res).toBeUndefined();

await stopNodes(nodes);
});

it('Should hit individual timeout if nothing is returned over the stream', async () => {
const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes);

Expand Down Expand Up @@ -197,12 +193,10 @@ describe('ReqResp', () => {
}),
PeerErrorSeverity.HighToleranceError,
);

await stopNodes(nodes);
});

it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => {
const nodes = await createNodes(peerManager, 4);
nodes = await createNodes(peerManager, 4);

await startNodes(nodes);

Expand All @@ -226,8 +220,6 @@ describe('ReqResp', () => {
// Make sure the error message is logged
const errorMessage = `${new CollectiveReqRespTimeoutError().message} | subProtocol: ${TX_REQ_PROTOCOL}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});

it('Should penalize peer if transaction validation fails', async () => {
Expand All @@ -236,12 +228,12 @@ describe('ReqResp', () => {

// Mock that the node will respond with the tx
const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Uint8Array> => {
protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise<Buffer> => {
const receivedHash = TxHash.fromBuffer(message);
if (txHash.equals(receivedHash)) {
return Promise.resolve(Uint8Array.from(tx.toBuffer()));
return Promise.resolve(tx.toBuffer());
}
return Promise.resolve(Uint8Array.from(Buffer.from('')));
return Promise.resolve(Buffer.from(''));
};

// Mock that the receiving node will find that the transaction is invalid
Expand All @@ -251,7 +243,7 @@ describe('ReqResp', () => {
return Promise.resolve(false);
};

const nodes = await createNodes(peerManager, 2);
nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers, protocolValidators);
await sleep(500);
Expand All @@ -268,8 +260,6 @@ describe('ReqResp', () => {
}),
PeerErrorSeverity.LowToleranceError,
);

await stopNodes(nodes);
});
});
});
Loading
Loading