From 54e1cc7f07a71ab0e77f81cbced79363de67fe02 Mon Sep 17 00:00:00 2001 From: Maddiaa <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 29 Aug 2024 11:56:18 +0100 Subject: [PATCH] feat: request specific transactions through the p2p layer (#8185) --- yarn-project/p2p/src/client/index.ts | 2 + .../p2p/src/client/p2p_client.test.ts | 1 + yarn-project/p2p/src/client/p2p_client.ts | 12 ++ yarn-project/p2p/src/mocks/index.ts | 101 ++++++++++ yarn-project/p2p/src/service/dummy_service.ts | 18 ++ .../p2p/src/service/libp2p_service.ts | 76 ++++++- .../p2p/src/service/reqresp/handlers.ts | 18 +- .../p2p/src/service/reqresp/interface.ts | 103 +++++++++- .../reqresp/p2p_client.integration.test.ts | 190 ++++++++++++++++++ .../p2p/src/service/reqresp/reqresp.test.ts | 143 ++++++------- .../p2p/src/service/reqresp/reqresp.ts | 55 +++-- yarn-project/p2p/src/service/service.ts | 14 ++ 12 files changed, 614 insertions(+), 119 deletions(-) create mode 100644 yarn-project/p2p/src/mocks/index.ts create mode 100644 yarn-project/p2p/src/service/reqresp/p2p_client.integration.test.ts diff --git a/yarn-project/p2p/src/client/index.ts b/yarn-project/p2p/src/client/index.ts index cd891dfa03f..d2316c4db76 100644 --- a/yarn-project/p2p/src/client/index.ts +++ b/yarn-project/p2p/src/client/index.ts @@ -23,6 +23,7 @@ export const createP2PClient = async ( if (config.p2pEnabled) { // If announceTcpAddress or announceUdpAddress are not provided, query for public IP if config allows + const { tcpAnnounceAddress: configTcpAnnounceAddress, udpAnnounceAddress: configUdpAnnounceAddress, @@ -68,6 +69,7 @@ export const createP2PClient = async ( // Create peer discovery service const peerId = await createLibP2PPeerId(config.peerIdPrivateKey); const discoveryService = new DiscV5Service(peerId, config); + p2pService = await LibP2PService.new(config, discoveryService, peerId, txPool, attestationsPool, store); } else { p2pService = new DummyP2PService(); diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 5d9138d961d..f795da34aee 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -44,6 +44,7 @@ describe('In-Memory P2P Client', () => { stop: jest.fn(), propagate: jest.fn(), registerBlockReceivedCallback: jest.fn(), + sendRequest: jest.fn(), getEnr: jest.fn(), }; diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 6af0180251e..0c1890beeb8 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -15,6 +15,7 @@ import { type ENR } from '@chainsafe/enr'; import { type AttestationPool } from '../attestation_pool/attestation_pool.js'; import { getP2PConfigEnvVars } from '../config.js'; +import { TX_REQ_PROTOCOL } from '../service/reqresp/interface.js'; import type { P2PService } from '../service/service.js'; import { type TxPool } from '../tx_pool/index.js'; @@ -71,6 +72,12 @@ export interface P2P { // ^ This pattern is not my favorite (md) registerBlockProposalHandler(handler: (block: BlockProposal) => Promise): void; + /** + * Request a transaction from another peer by its tx hash. + * @param txHash - Hash of the tx to query. + */ + requestTxByHash(txHash: TxHash): Promise; + /** * Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers. * @param tx - The transaction. @@ -276,6 +283,11 @@ export class P2PClient implements P2P { this.p2pService.registerBlockReceivedCallback(handler); } + public requestTxByHash(txHash: TxHash): Promise { + // Underlying I want to use the libp2p service to just have a request method where the subprotocol is defined here + return this.p2pService.sendRequest(TX_REQ_PROTOCOL, txHash); + } + /** * Returns all transactions in the transaction pool. * @returns An array of Txs. diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts new file mode 100644 index 00000000000..18054a39165 --- /dev/null +++ b/yarn-project/p2p/src/mocks/index.ts @@ -0,0 +1,101 @@ +import { noise } from '@chainsafe/libp2p-noise'; +import { yamux } from '@chainsafe/libp2p-yamux'; +import { bootstrap } from '@libp2p/bootstrap'; +import { tcp } from '@libp2p/tcp'; +import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p'; + +import { pingHandler, statusHandler } from '../service/reqresp/handlers.js'; +import { + PING_PROTOCOL, + type ReqRespSubProtocolHandlers, + STATUS_PROTOCOL, + TX_REQ_PROTOCOL, +} from '../service/reqresp/interface.js'; +import { ReqResp } from '../service/reqresp/reqresp.js'; + +/** + * Creates a libp2p node, pre configured. + * @param boostrapAddrs - an optional list of bootstrap addresses + * @returns Lip2p node + */ +export async function createLibp2pNode(boostrapAddrs: string[] = []): Promise { + const options: Libp2pOptions = { + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'], + }, + connectionEncryption: [noise()], + streamMuxers: [yamux()], + transports: [tcp()], + }; + + if (boostrapAddrs.length > 0) { + options.peerDiscovery = [ + bootstrap({ + list: boostrapAddrs, + }), + ]; + } + + return await createLibp2p(options); +} + +/** + * A p2p / req resp node pairing the req node will always contain the p2p node. + * they are provided as a pair to allow access the p2p node directly + */ +export type ReqRespNode = { + p2p: Libp2p; + req: ReqResp; +}; + +// Mock sub protocol handlers +export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = { + [PING_PROTOCOL]: pingHandler, + [STATUS_PROTOCOL]: statusHandler, + [TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Uint8Array.from(Buffer.from('tx'))), +}; + +/** + * @param numberOfNodes - the number of nodes to create + * @returns An array of the created nodes + */ +export const createNodes = async (numberOfNodes: number): Promise => { + return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp())); +}; + +// TODO: think about where else this can go +export const startNodes = async (nodes: ReqRespNode[], subProtocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS) => { + for (const node of nodes) { + await node.req.start(subProtocolHandlers); + } +}; + +export const stopNodes = async (nodes: ReqRespNode[]): Promise => { + for (const node of nodes) { + await node.req.stop(); + await node.p2p.stop(); + } +}; + +// Create a req resp node, exposing the underlying p2p node +export const createReqResp = async (): Promise => { + const p2p = await createLibp2pNode(); + const req = new ReqResp(p2p); + return { + p2p, + req, + }; +}; + +// Given a node list; hand shake all of the nodes with each other +export const connectToPeers = async (nodes: ReqRespNode[]): Promise => { + for (const node of nodes) { + for (const otherNode of nodes) { + if (node === otherNode) { + continue; + } + const addr = otherNode.p2p.getMultiaddrs()[0]; + await node.p2p.dial(addr); + } + } +}; diff --git a/yarn-project/p2p/src/service/dummy_service.ts b/yarn-project/p2p/src/service/dummy_service.ts index 507b3d515e4..be5b82d9c8e 100644 --- a/yarn-project/p2p/src/service/dummy_service.ts +++ b/yarn-project/p2p/src/service/dummy_service.ts @@ -3,6 +3,7 @@ import type { BlockAttestation, BlockProposal, Gossipable, TxHash } from '@aztec import type { PeerId } from '@libp2p/interface'; import EventEmitter from 'events'; +import { type ReqRespSubProtocol, type SubProtocolMap } from './reqresp/interface.js'; import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from './service.js'; /** @@ -42,6 +43,23 @@ export class DummyP2PService implements P2PService { */ public registerBlockReceivedCallback(_: (block: BlockProposal) => Promise) {} + /** + * Sends a request to a peer. + * @param _protocol - The protocol to send the request on. + * @param _request - The request to send. + * @returns The response from the peer, otherwise undefined. + */ + public sendRequest( + _protocol: Protocol, + _request: InstanceType, + ): Promise | undefined> { + return Promise.resolve(undefined); + } + + /** + * Returns the ENR of the peer. + * @returns The ENR of the peer, otherwise undefined. + */ public getEnr(): undefined { return undefined; } diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index bc3a7c55594..c62a694034f 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -6,6 +6,7 @@ import { TopicType, TopicTypeMap, Tx, + TxHash, } from '@aztec/circuit-types'; import { createDebugLogger } from '@aztec/foundation/log'; import { SerialQueue } from '@aztec/foundation/queue'; @@ -30,6 +31,18 @@ import { type TxPool } from '../tx_pool/index.js'; import { convertToMultiaddr } from '../util.js'; import { AztecDatastore } from './data_store.js'; import { PeerManager } from './peer_manager.js'; +import { pingHandler, statusHandler } from './reqresp/handlers.js'; +import { + DEFAULT_SUB_PROTOCOL_HANDLERS, + PING_PROTOCOL, + type ReqRespSubProtocol, + type ReqRespSubProtocolHandlers, + STATUS_PROTOCOL, + type SubProtocolMap, + TX_REQ_PROTOCOL, + subProtocolMap, +} from './reqresp/interface.js'; +import { ReqResp } from './reqresp/reqresp.js'; import type { P2PService, PeerDiscoveryService } from './service.js'; export interface PubSubLibp2p extends Libp2p { @@ -61,6 +74,14 @@ export class LibP2PService implements P2PService { private peerManager: PeerManager; private discoveryRunningPromise?: RunningPromise; + // Request and response sub service + private reqresp: ReqResp; + + /** + * Callback for when a block is received from a peer. + * @param block - The block received from the peer. + * @returns The attestation for the block, if any. + */ private blockReceivedCallback: (block: BlockProposal) => Promise; constructor( @@ -69,9 +90,11 @@ export class LibP2PService implements P2PService { private peerDiscoveryService: PeerDiscoveryService, private txPool: TxPool, private attestationPool: AttestationPool, + private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS, private logger = createDebugLogger('aztec:libp2p_service'), ) { this.peerManager = new PeerManager(node, peerDiscoveryService, config, logger); + this.reqresp = new ReqResp(node); this.blockReceivedCallback = (block: BlockProposal): Promise => { this.logger.verbose( @@ -124,6 +147,7 @@ export class LibP2PService implements P2PService { this.peerManager.discover(); }, this.config.peerCheckIntervalMS); this.discoveryRunningPromise.start(); + await this.reqresp.start(this.requestResponseHandlers); } /** @@ -140,6 +164,9 @@ export class LibP2PService implements P2PService { this.logger.debug('Stopping LibP2P...'); await this.stopLibP2P(); this.logger.info('LibP2P service stopped'); + this.logger.debug('Stopping request response service...'); + await this.reqresp.stop(); + this.logger.debug('Request response service stopped...'); } /** @@ -206,9 +233,56 @@ export class LibP2PService implements P2PService { }, }); - return new LibP2PService(config, node, peerDiscoveryService, txPool, attestationPool); + // Create request response protocol handlers + /** + * Handler for tx requests + * @param msg - the tx request message + * @returns the tx response message + */ + const txHandler = (msg: Buffer): Promise => { + const txHash = TxHash.fromBuffer(msg); + const foundTx = txPool.getTxByHash(txHash); + const asUint8Array = Uint8Array.from(foundTx ? foundTx.toBuffer() : Buffer.alloc(0)); + return Promise.resolve(asUint8Array); + }; + + const requestResponseHandlers = { + [PING_PROTOCOL]: pingHandler, + [STATUS_PROTOCOL]: statusHandler, + [TX_REQ_PROTOCOL]: txHandler, + }; + + return new LibP2PService(config, node, peerDiscoveryService, txPool, attestationPool, requestResponseHandlers); + } + + /** + * Send Request via the ReqResp service + * The subprotocol defined will determine the request and response types + * + * See the subProtocolMap for the mapping of subprotocols to request/response types in `interface.ts` + * + * @param protocol The request response protocol to use + * @param request The request type to send + * @returns + */ + async sendRequest( + protocol: SubProtocol, + request: InstanceType, + ): Promise | undefined> { + const pair = subProtocolMap[protocol]; + + const res = await this.reqresp.sendRequest(protocol, request.toBuffer()); + if (!res) { + return undefined; + } + + return pair.response.fromBuffer(res!); } + /** + * Get the ENR of the node + * @returns The ENR of the node + */ public getEnr(): ENR | undefined { return this.peerDiscoveryService.getEnr(); } diff --git a/yarn-project/p2p/src/service/reqresp/handlers.ts b/yarn-project/p2p/src/service/reqresp/handlers.ts index 339c690add8..688fab959e3 100644 --- a/yarn-project/p2p/src/service/reqresp/handlers.ts +++ b/yarn-project/p2p/src/service/reqresp/handlers.ts @@ -1,7 +1,17 @@ -export function pingHandler(_msg: any) { - return Uint8Array.from(Buffer.from('pong')); +/** + * Handles the ping request. + * @param _msg - The ping request message. + * @returns A resolved promise with the pong response. + */ +export function pingHandler(_msg: any): Promise { + return Promise.resolve(Uint8Array.from(Buffer.from('pong'))); } -export function statusHandler(_msg: any) { - return Uint8Array.from(Buffer.from('ok')); +/** + * Handles the status request. + * @param _msg - The status request message. + * @returns A resolved promise with the ok response. + */ +export function statusHandler(_msg: any): Promise { + return Promise.resolve(Uint8Array.from(Buffer.from('ok'))); } diff --git a/yarn-project/p2p/src/service/reqresp/interface.ts b/yarn-project/p2p/src/service/reqresp/interface.ts index fa66707e8be..39f27b6268f 100644 --- a/yarn-project/p2p/src/service/reqresp/interface.ts +++ b/yarn-project/p2p/src/service/reqresp/interface.ts @@ -1,13 +1,100 @@ -export enum ReqRespType { - Status = 'status', - Ping = 'ping', - /** Ask peers for specific transactions */ - TxsByHash = 'txs_by_hash', -} +import { Tx, TxHash } from '@aztec/circuit-types'; +/* + * Request Response Sub Protocols + */ export const PING_PROTOCOL = '/aztec/ping/0.1.0'; export const STATUS_PROTOCOL = '/aztec/status/0.1.0'; +export const TX_REQ_PROTOCOL = '/aztec/tx_req/0.1.0'; + +// Sum type for sub protocols +export type ReqRespSubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL | typeof TX_REQ_PROTOCOL; + +/** + * A handler for a sub protocol + * The message will arrive as a buffer, and the handler must return a buffer + */ +export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise; + +/** + * A type mapping from supprotocol to it's handling funciton + */ +export type ReqRespSubProtocolHandlers = Record; + +/** + * Sub protocol map determines the request and response types for each + * Req Resp protocol + */ +export type SubProtocolMap = { + [S in ReqRespSubProtocol]: RequestResponsePair; +}; + +/** + * Default handler for unimplemented sub protocols, this SHOULD be overwritten + * by the service, but is provided as a fallback + */ +const defaultHandler = (_msg: any): Promise => { + return Promise.resolve(Uint8Array.from(Buffer.from('unimplemented'))); +}; -export type SubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL; +/** + * Default sub protocol handlers - this SHOULD be overwritten by the service, + */ +export const DEFAULT_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = { + [PING_PROTOCOL]: defaultHandler, + [STATUS_PROTOCOL]: defaultHandler, + [TX_REQ_PROTOCOL]: defaultHandler, +}; + +/** + * The Request Response Pair interface defines the methods that each + * request response pair must implement + */ +interface RequestResponsePair { + request: new (...args: any[]) => Req; + /** + * The response must implement the static fromBuffer method (generic serialisation) + */ + response: { + new (...args: any[]): Res; + fromBuffer(buffer: Buffer): Res; + }; +} + +/** + * RequestableBuffer is a wrapper around a buffer that allows it to be + * used in generic request response protocols + * + * An instance of the RequestResponsePair defined above + */ +export class RequestableBuffer { + constructor(public buffer: Buffer) {} + + toBuffer() { + return this.buffer; + } + + static fromBuffer(buffer: Buffer) { + return new RequestableBuffer(buffer); + } +} -export type SubProtocolHandler = (msg: string) => Uint8Array; +/** + * A mapping from each protocol to their request and response types + * This defines the request and response types for each sub protocol, used primarily + * as a type rather than an object + */ +export const subProtocolMap: SubProtocolMap = { + [PING_PROTOCOL]: { + request: RequestableBuffer, + response: RequestableBuffer, + }, + [STATUS_PROTOCOL]: { + request: RequestableBuffer, + response: RequestableBuffer, + }, + [TX_REQ_PROTOCOL]: { + request: TxHash, + response: Tx, + }, +}; diff --git a/yarn-project/p2p/src/service/reqresp/p2p_client.integration.test.ts b/yarn-project/p2p/src/service/reqresp/p2p_client.integration.test.ts new file mode 100644 index 00000000000..307324f783f --- /dev/null +++ b/yarn-project/p2p/src/service/reqresp/p2p_client.integration.test.ts @@ -0,0 +1,190 @@ +// An integration test for the p2p client to test req resp protocols +import { mockTx } from '@aztec/circuit-types'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { sleep } from '@aztec/foundation/sleep'; +import { type AztecKVStore } from '@aztec/kv-store'; +import { openTmpStore } from '@aztec/kv-store/utils'; + +import { describe, expect, it, jest } from '@jest/globals'; +import { generatePrivateKey } from 'viem/accounts'; + +import { type AttestationPool } from '../../attestation_pool/attestation_pool.js'; +import { BootstrapNode } from '../../bootstrap/bootstrap.js'; +import { createP2PClient } from '../../client/index.js'; +import { MockBlockSource } from '../../client/mocks.js'; +import { type P2PClient } from '../../client/p2p_client.js'; +import { type BootnodeConfig, type P2PConfig } from '../../config.js'; +import { type TxPool } from '../../tx_pool/index.js'; +import { createLibP2PPeerId } from '../index.js'; + +/** + * Mockify helper for testing purposes. + */ +type Mockify = { + [P in keyof T]: ReturnType; +}; + +const TEST_TIMEOUT = 80000; + +const BOOT_NODE_UDP_PORT = 40400; +async function createBootstrapNode(port: number) { + const peerId = await createLibP2PPeerId(); + const bootstrapNode = new BootstrapNode(); + const config: BootnodeConfig = { + udpListenAddress: `0.0.0.0:${port}`, + udpAnnounceAddress: `127.0.0.1:${port}`, + peerIdPrivateKey: Buffer.from(peerId.privateKey!).toString('hex'), + minPeerCount: 1, + maxPeerCount: 100, + }; + await bootstrapNode.start(config); + + return bootstrapNode; +} + +function generatePeerIdPrivateKeys(numberOfPeers: number): string[] { + const peerIdPrivateKeys: string[] = []; + for (let i = 0; i < numberOfPeers; i++) { + // magic number is multiaddr prefix: https://multiformats.io/multiaddr/ + peerIdPrivateKeys.push('08021220' + generatePrivateKey().substr(2, 66)); + } + return peerIdPrivateKeys; +} + +const NUMBER_OF_PEERS = 2; + +describe('Req Resp p2p client integration', () => { + let txPool: Mockify; + let attestationPool: Mockify; + let blockSource: MockBlockSource; + let kvStore: AztecKVStore; + const logger = createDebugLogger('p2p-client-integration-test'); + + const makeBootstrapNode = async (): Promise<[BootstrapNode, string]> => { + const bootstrapNode = await createBootstrapNode(BOOT_NODE_UDP_PORT); + const enr = bootstrapNode.getENR().encodeTxt(); + return [bootstrapNode, enr]; + }; + + const createClients = async (numberOfPeers: number, bootstrapNodeEnr: string): Promise => { + const clients: P2PClient[] = []; + const peerIdPrivateKeys = generatePeerIdPrivateKeys(numberOfPeers); + for (let i = 0; i < numberOfPeers; i++) { + // Note these bindings are important + const addr = `127.0.0.1:${i + 1 + BOOT_NODE_UDP_PORT}`; + const listenAddr = `0.0.0.0:${i + 1 + BOOT_NODE_UDP_PORT}`; + const config: P2PConfig = { + p2pEnabled: true, + peerIdPrivateKey: peerIdPrivateKeys[i], + tcpListenAddress: listenAddr, // run on port 0 + udpListenAddress: listenAddr, + tcpAnnounceAddress: addr, + udpAnnounceAddress: addr, + l2QueueSize: 1, + bootstrapNodes: [bootstrapNodeEnr], + blockCheckIntervalMS: 1000, + peerCheckIntervalMS: 1000, + transactionProtocol: '', + minPeerCount: 1, + maxPeerCount: 10, + keepProvenTxsInPoolFor: 0, + queryForIp: false, + }; + + txPool = { + addTxs: jest.fn(() => {}), + getTxByHash: jest.fn().mockReturnValue(undefined), + deleteTxs: jest.fn(), + getAllTxs: jest.fn().mockReturnValue([]), + getAllTxHashes: jest.fn().mockReturnValue([]), + getMinedTxHashes: jest.fn().mockReturnValue([]), + getPendingTxHashes: jest.fn().mockReturnValue([]), + getTxStatus: jest.fn().mockReturnValue(undefined), + markAsMined: jest.fn(), + }; + + attestationPool = { + addAttestations: jest.fn(), + deleteAttestations: jest.fn(), + deleteAttestationsForSlot: jest.fn(), + getAttestationsForSlot: jest.fn().mockReturnValue(undefined), + }; + + blockSource = new MockBlockSource(); + kvStore = openTmpStore(); + const client = await createP2PClient( + config, + kvStore, + txPool as unknown as TxPool, + attestationPool as unknown as AttestationPool, + blockSource, + ); + + await client.start(); + clients.push(client); + + logger.info(`Creating client ${i}`); + } + + logger.info(`Created ${NUMBER_OF_PEERS} clients`); + await Promise.all(clients.map(client => client.isReady())); + logger.info(`Clients ready`); + return clients; + }; + + // Shutdown all test clients + const shutdown = async (clients: P2PClient[], bootnode: BootstrapNode) => { + await Promise.all([bootnode.stop(), ...clients.map(client => client.stop())]); + await sleep(1000); + }; + + it( + 'Returns undefined if unable to find a transaction from another peer', + async () => { + // We want to create a set of nodes and request transaction from them + // Not using a before each as a the wind down is not working as expected + const [bootstrapNode, bootstrapNodeEnr] = await makeBootstrapNode(); + const clients = await createClients(NUMBER_OF_PEERS, bootstrapNodeEnr); + const [client1] = clients; + + await sleep(2000); + + // Perform a get tx request from client 1 + const tx = mockTx(); + const txHash = tx.getTxHash(); + + const requestedTx = await client1.requestTxByHash(txHash); + expect(requestedTx).toBeUndefined(); + + await shutdown(clients, bootstrapNode); + }, + TEST_TIMEOUT, + ); + + it( + 'Can request a transaction from another peer', + async () => { + // We want to create a set of nodes and request transaction from them + const [bootstrapNode, bootstrapNodeEnr] = await makeBootstrapNode(); + const clients = await createClients(NUMBER_OF_PEERS, bootstrapNodeEnr); + const [client1] = clients; + + // Give the nodes time to discover each other + await sleep(6000); + + // Perform a get tx request from client 1 + const tx = mockTx(); + const txHash = tx.getTxHash(); + // Mock the tx pool to return the tx we are looking for + txPool.getTxByHash.mockImplementationOnce(() => tx); + + const requestedTx = await client1.requestTxByHash(txHash); + + // Expect the tx to be the returned tx to be the same as the one we mocked + expect(requestedTx?.toBuffer()).toStrictEqual(tx.toBuffer()); + + await shutdown(clients, bootstrapNode); + }, + TEST_TIMEOUT, + ); +}); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts index 9f4acbb399e..de9d931bd12 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts @@ -1,92 +1,10 @@ +import { TxHash, mockTx } from '@aztec/circuit-types'; import { sleep } from '@aztec/foundation/sleep'; -import { noise } from '@chainsafe/libp2p-noise'; -import { yamux } from '@chainsafe/libp2p-yamux'; -import { bootstrap } from '@libp2p/bootstrap'; -import { tcp } from '@libp2p/tcp'; -import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p'; - -import { PING_PROTOCOL } from './interface.js'; -import { ReqResp } from './reqresp.js'; - -/** - * Creates a libp2p node, pre configured. - * @param boostrapAddrs - an optional list of bootstrap addresses - * @returns Lip2p node - */ -async function createLibp2pNode(boostrapAddrs: string[] = []): Promise { - const options: Libp2pOptions = { - addresses: { - listen: ['/ip4/0.0.0.0/tcp/0'], - }, - connectionEncryption: [noise()], - streamMuxers: [yamux()], - transports: [tcp()], - }; - - if (boostrapAddrs.length > 0) { - options.peerDiscovery = [ - bootstrap({ - list: boostrapAddrs, - }), - ]; - } - - return await createLibp2p(options); -} - -/** - * A p2p / req resp node pairing the req node will always contain the p2p node. - * they are provided as a pair to allow access the p2p node directly - */ -type ReqRespNode = { - p2p: Libp2p; - req: ReqResp; -}; - -/** - * @param numberOfNodes - the number of nodes to create - * @returns An array of the created nodes - */ -const createNodes = async (numberOfNodes: number): Promise => { - return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp())); -}; - -const startNodes = async (nodes: ReqRespNode[]) => { - for (const node of nodes) { - await node.req.start(); - } -}; - -const stopNodes = async (nodes: ReqRespNode[]): Promise => { - for (const node of nodes) { - await node.req.stop(); - await node.p2p.stop(); - } -}; - -// Create a req resp node, exposing the underlying p2p node -const createReqResp = async (): Promise => { - const p2p = await createLibp2pNode(); - const req = new ReqResp(p2p); - return { - p2p, - req, - }; -}; - -// Given a node list; hand shake all of the nodes with each other -const connectToPeers = async (nodes: ReqRespNode[]): Promise => { - for (const node of nodes) { - for (const otherNode of nodes) { - if (node === otherNode) { - continue; - } - const addr = otherNode.p2p.getMultiaddrs()[0]; - await node.p2p.dial(addr); - } - } -}; +import { describe, expect, it } from '@jest/globals'; + +import { MOCK_SUB_PROTOCOL_HANDLERS, connectToPeers, createNodes, startNodes, stopNodes } from '../../mocks/index.js'; +import { PING_PROTOCOL, TX_REQ_PROTOCOL } from './interface.js'; // The Req Resp protocol should allow nodes to dial specific peers // and ask for specific data that they missed via the traditional gossip protocol. @@ -152,4 +70,55 @@ describe('ReqResp', () => { await stopNodes(nodes); }); + + describe('TX REQ PROTOCOL', () => { + it('Can request a Tx from TxHash', async () => { + const tx = mockTx(); + const txHash = tx.getTxHash(); + + const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; + protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise => { + const receivedHash = TxHash.fromBuffer(message); + if (txHash.equals(receivedHash)) { + return Promise.resolve(Uint8Array.from(tx.toBuffer())); + } + return Promise.resolve(Uint8Array.from(Buffer.from(''))); + }; + + const nodes = await createNodes(2); + + await startNodes(nodes, protocolHandlers); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash.toBuffer()); + expect(res).toEqual(tx.toBuffer()); + + await stopNodes(nodes); + }); + + it('Does not crash if tx hash returns undefined', async () => { + const tx = mockTx(); + const txHash = tx.getTxHash(); + + const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; + // Return nothing + protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise => { + return Promise.resolve(Uint8Array.from(Buffer.from(''))); + }; + + const nodes = await createNodes(2); + + await startNodes(nodes, protocolHandlers); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash.toBuffer()); + expect(res).toBeUndefined(); + + await stopNodes(nodes); + }); + }); }); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index 559a237f36f..2851c9e9fce 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -6,22 +6,30 @@ import { pipe } from 'it-pipe'; import { type Libp2p } from 'libp2p'; import { type Uint8ArrayList } from 'uint8arraylist'; -import { pingHandler, statusHandler } from './handlers.js'; -import { PING_PROTOCOL, STATUS_PROTOCOL, type SubProtocol, type SubProtocolHandler } from './interface.js'; +import { + DEFAULT_SUB_PROTOCOL_HANDLERS, + type ReqRespSubProtocol, + type ReqRespSubProtocolHandlers, +} from './interface.js'; /** - * A mapping from a protocol to a handler function + * The Request Response Service + * + * It allows nodes to request specific information from their peers, its use case covers recovering + * information that was missed during a syncronisation or a gossip event. + * + * This service implements the request response sub protocol, it is heavily inspired from + * ethereum implementations of the same name. + * + * see: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-reqresp-domain */ -const SUB_PROTOCOL_HANDLERS: Record = { - [PING_PROTOCOL]: pingHandler, - [STATUS_PROTOCOL]: statusHandler, -}; - export class ReqResp { protected readonly logger: Logger; private abortController: AbortController = new AbortController(); + private subProtocolHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS; + constructor(protected readonly libp2p: Libp2p) { this.logger = createDebugLogger('aztec:p2p:reqresp'); } @@ -29,10 +37,11 @@ export class ReqResp { /** * Start the reqresp service */ - async start() { + async start(subProtocolHandlers: ReqRespSubProtocolHandlers) { + this.subProtocolHandlers = subProtocolHandlers; // Register all protocol handlers - for (const subProtocol of Object.keys(SUB_PROTOCOL_HANDLERS)) { - await this.libp2p.handle(subProtocol, this.streamHandler.bind(this, subProtocol as SubProtocol)); + for (const subProtocol of Object.keys(this.subProtocolHandlers)) { + await this.libp2p.handle(subProtocol, this.streamHandler.bind(this, subProtocol as ReqRespSubProtocol)); } } @@ -41,7 +50,7 @@ export class ReqResp { */ async stop() { // Unregister all handlers - for (const protocol of Object.keys(SUB_PROTOCOL_HANDLERS)) { + for (const protocol of Object.keys(this.subProtocolHandlers)) { await this.libp2p.unhandle(protocol); } await this.libp2p.stop(); @@ -55,7 +64,7 @@ export class ReqResp { * @param payload - The payload to send * @returns - The response from the peer, otherwise undefined */ - async sendRequest(subProtocol: SubProtocol, payload: Buffer): Promise { + async sendRequest(subProtocol: ReqRespSubProtocol, payload: Buffer): Promise { // Get active peers const peers = this.libp2p.getPeers(); @@ -64,7 +73,8 @@ export class ReqResp { const response = await this.sendRequestToPeer(peer, subProtocol, payload); // If we get a response, return it, otherwise we iterate onto the next peer - if (response) { + // We do not consider it a success if we have an empty buffer + if (response && response.length > 0) { return response; } } @@ -79,7 +89,11 @@ export class ReqResp { * @param payload - The payload to send * @returns If the request is successful, the response is returned, otherwise undefined */ - async sendRequestToPeer(peerId: PeerId, subProtocol: SubProtocol, payload: Buffer): Promise { + async sendRequestToPeer( + peerId: PeerId, + subProtocol: ReqRespSubProtocol, + payload: Buffer, + ): Promise { try { const stream = await this.libp2p.dialProtocol(peerId, subProtocol); @@ -109,14 +123,17 @@ export class ReqResp { * * @param param0 - The incoming stream data */ - private async streamHandler(protocol: SubProtocol, { stream }: IncomingStreamData) { + private async streamHandler(protocol: ReqRespSubProtocol, { stream }: IncomingStreamData) { + // Store a reference to from this for the async generator + const handler = this.subProtocolHandlers[protocol]; + try { await pipe( stream, - async function* (source) { + async function* (source: any) { for await (const chunkList of source) { - const msg = Buffer.from(chunkList.subarray()).toString(); - yield SUB_PROTOCOL_HANDLERS[protocol](msg); + const msg = Buffer.from(chunkList.subarray()); + yield handler(msg); } }, stream, diff --git a/yarn-project/p2p/src/service/service.ts b/yarn-project/p2p/src/service/service.ts index 5cf0525778c..607927d3454 100644 --- a/yarn-project/p2p/src/service/service.ts +++ b/yarn-project/p2p/src/service/service.ts @@ -4,6 +4,8 @@ import type { ENR } from '@chainsafe/enr'; import type { PeerId } from '@libp2p/interface'; import type EventEmitter from 'events'; +import { type ReqRespSubProtocol, type SubProtocolMap } from './reqresp/interface.js'; + export enum PeerDiscoveryState { RUNNING = 'running', STOPPED = 'stopped', @@ -31,6 +33,18 @@ export interface P2PService { */ propagate(message: T): void; + /** + * Request information from peers via the request response protocol. + * + * @param protocol - The request response protocol to use + * @param request - The request type, corresponding to the protocol + * @returns The response type, corresponding to the protocol + */ + sendRequest( + protocol: Protocol, + request: InstanceType, + ): Promise | undefined>; + // Leaky abstraction: fix https://github.com/AztecProtocol/aztec-packages/issues/7963 registerBlockReceivedCallback(callback: (block: BlockProposal) => Promise): void;