Skip to content

Commit

Permalink
feat: request specific transactions through the p2p layer (#8185)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 authored Aug 29, 2024
1 parent 1c6b478 commit 54e1cc7
Show file tree
Hide file tree
Showing 12 changed files with 614 additions and 119 deletions.
2 changes: 2 additions & 0 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export const createP2PClient = async (

if (config.p2pEnabled) {
// If announceTcpAddress or announceUdpAddress are not provided, query for public IP if config allows

const {
tcpAnnounceAddress: configTcpAnnounceAddress,
udpAnnounceAddress: configUdpAnnounceAddress,
Expand Down Expand Up @@ -68,6 +69,7 @@ export const createP2PClient = async (
// Create peer discovery service
const peerId = await createLibP2PPeerId(config.peerIdPrivateKey);
const discoveryService = new DiscV5Service(peerId, config);

p2pService = await LibP2PService.new(config, discoveryService, peerId, txPool, attestationsPool, store);
} else {
p2pService = new DummyP2PService();
Expand Down
1 change: 1 addition & 0 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describe('In-Memory P2P Client', () => {
stop: jest.fn(),
propagate: jest.fn(),
registerBlockReceivedCallback: jest.fn(),
sendRequest: jest.fn(),
getEnr: jest.fn(),
};

Expand Down
12 changes: 12 additions & 0 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { type ENR } from '@chainsafe/enr';

import { type AttestationPool } from '../attestation_pool/attestation_pool.js';
import { getP2PConfigEnvVars } from '../config.js';
import { TX_REQ_PROTOCOL } from '../service/reqresp/interface.js';
import type { P2PService } from '../service/service.js';
import { type TxPool } from '../tx_pool/index.js';

Expand Down Expand Up @@ -71,6 +72,12 @@ export interface P2P {
// ^ This pattern is not my favorite (md)
registerBlockProposalHandler(handler: (block: BlockProposal) => Promise<BlockAttestation>): void;

/**
* Request a transaction from another peer by its tx hash.
* @param txHash - Hash of the tx to query.
*/
requestTxByHash(txHash: TxHash): Promise<Tx | undefined>;

/**
* Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers.
* @param tx - The transaction.
Expand Down Expand Up @@ -276,6 +283,11 @@ export class P2PClient implements P2P {
this.p2pService.registerBlockReceivedCallback(handler);
}

public requestTxByHash(txHash: TxHash): Promise<Tx | undefined> {
// Underlying I want to use the libp2p service to just have a request method where the subprotocol is defined here
return this.p2pService.sendRequest(TX_REQ_PROTOCOL, txHash);
}

/**
* Returns all transactions in the transaction pool.
* @returns An array of Txs.
Expand Down
101 changes: 101 additions & 0 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { bootstrap } from '@libp2p/bootstrap';
import { tcp } from '@libp2p/tcp';
import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p';

import { pingHandler, statusHandler } from '../service/reqresp/handlers.js';
import {
PING_PROTOCOL,
type ReqRespSubProtocolHandlers,
STATUS_PROTOCOL,
TX_REQ_PROTOCOL,
} from '../service/reqresp/interface.js';
import { ReqResp } from '../service/reqresp/reqresp.js';

/**
* Creates a libp2p node, pre configured.
* @param boostrapAddrs - an optional list of bootstrap addresses
* @returns Lip2p node
*/
export async function createLibp2pNode(boostrapAddrs: string[] = []): Promise<Libp2p> {
const options: Libp2pOptions = {
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0'],
},
connectionEncryption: [noise()],
streamMuxers: [yamux()],
transports: [tcp()],
};

if (boostrapAddrs.length > 0) {
options.peerDiscovery = [
bootstrap({
list: boostrapAddrs,
}),
];
}

return await createLibp2p(options);
}

/**
* A p2p / req resp node pairing the req node will always contain the p2p node.
* they are provided as a pair to allow access the p2p node directly
*/
export type ReqRespNode = {
p2p: Libp2p;
req: ReqResp;
};

// Mock sub protocol handlers
export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Uint8Array.from(Buffer.from('tx'))),
};

/**
* @param numberOfNodes - the number of nodes to create
* @returns An array of the created nodes
*/
export const createNodes = async (numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp()));
};

// TODO: think about where else this can go
export const startNodes = async (nodes: ReqRespNode[], subProtocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS) => {
for (const node of nodes) {
await node.req.start(subProtocolHandlers);
}
};

export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
await node.req.stop();
await node.p2p.stop();
}
};

// Create a req resp node, exposing the underlying p2p node
export const createReqResp = async (): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const req = new ReqResp(p2p);
return {
p2p,
req,
};
};

// Given a node list; hand shake all of the nodes with each other
export const connectToPeers = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
for (const otherNode of nodes) {
if (node === otherNode) {
continue;
}
const addr = otherNode.p2p.getMultiaddrs()[0];
await node.p2p.dial(addr);
}
}
};
18 changes: 18 additions & 0 deletions yarn-project/p2p/src/service/dummy_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { BlockAttestation, BlockProposal, Gossipable, TxHash } from '@aztec
import type { PeerId } from '@libp2p/interface';
import EventEmitter from 'events';

import { type ReqRespSubProtocol, type SubProtocolMap } from './reqresp/interface.js';
import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from './service.js';

/**
Expand Down Expand Up @@ -42,6 +43,23 @@ export class DummyP2PService implements P2PService {
*/
public registerBlockReceivedCallback(_: (block: BlockProposal) => Promise<BlockAttestation>) {}

/**
* Sends a request to a peer.
* @param _protocol - The protocol to send the request on.
* @param _request - The request to send.
* @returns The response from the peer, otherwise undefined.
*/
public sendRequest<Protocol extends ReqRespSubProtocol>(
_protocol: Protocol,
_request: InstanceType<SubProtocolMap[Protocol]['request']>,
): Promise<InstanceType<SubProtocolMap[Protocol]['response']> | undefined> {
return Promise.resolve(undefined);
}

/**
* Returns the ENR of the peer.
* @returns The ENR of the peer, otherwise undefined.
*/
public getEnr(): undefined {
return undefined;
}
Expand Down
76 changes: 75 additions & 1 deletion yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
TopicType,
TopicTypeMap,
Tx,
TxHash,
} from '@aztec/circuit-types';
import { createDebugLogger } from '@aztec/foundation/log';
import { SerialQueue } from '@aztec/foundation/queue';
Expand All @@ -30,6 +31,18 @@ import { type TxPool } from '../tx_pool/index.js';
import { convertToMultiaddr } from '../util.js';
import { AztecDatastore } from './data_store.js';
import { PeerManager } from './peer_manager.js';
import { pingHandler, statusHandler } from './reqresp/handlers.js';
import {
DEFAULT_SUB_PROTOCOL_HANDLERS,
PING_PROTOCOL,
type ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
STATUS_PROTOCOL,
type SubProtocolMap,
TX_REQ_PROTOCOL,
subProtocolMap,
} from './reqresp/interface.js';
import { ReqResp } from './reqresp/reqresp.js';
import type { P2PService, PeerDiscoveryService } from './service.js';

export interface PubSubLibp2p extends Libp2p {
Expand Down Expand Up @@ -61,6 +74,14 @@ export class LibP2PService implements P2PService {
private peerManager: PeerManager;
private discoveryRunningPromise?: RunningPromise;

// Request and response sub service
private reqresp: ReqResp;

/**
* Callback for when a block is received from a peer.
* @param block - The block received from the peer.
* @returns The attestation for the block, if any.
*/
private blockReceivedCallback: (block: BlockProposal) => Promise<BlockAttestation | undefined>;

constructor(
Expand All @@ -69,9 +90,11 @@ export class LibP2PService implements P2PService {
private peerDiscoveryService: PeerDiscoveryService,
private txPool: TxPool,
private attestationPool: AttestationPool,
private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS,
private logger = createDebugLogger('aztec:libp2p_service'),
) {
this.peerManager = new PeerManager(node, peerDiscoveryService, config, logger);
this.reqresp = new ReqResp(node);

this.blockReceivedCallback = (block: BlockProposal): Promise<BlockAttestation | undefined> => {
this.logger.verbose(
Expand Down Expand Up @@ -124,6 +147,7 @@ export class LibP2PService implements P2PService {
this.peerManager.discover();
}, this.config.peerCheckIntervalMS);
this.discoveryRunningPromise.start();
await this.reqresp.start(this.requestResponseHandlers);
}

/**
Expand All @@ -140,6 +164,9 @@ export class LibP2PService implements P2PService {
this.logger.debug('Stopping LibP2P...');
await this.stopLibP2P();
this.logger.info('LibP2P service stopped');
this.logger.debug('Stopping request response service...');
await this.reqresp.stop();
this.logger.debug('Request response service stopped...');
}

/**
Expand Down Expand Up @@ -206,9 +233,56 @@ export class LibP2PService implements P2PService {
},
});

return new LibP2PService(config, node, peerDiscoveryService, txPool, attestationPool);
// Create request response protocol handlers
/**
* Handler for tx requests
* @param msg - the tx request message
* @returns the tx response message
*/
const txHandler = (msg: Buffer): Promise<Uint8Array> => {
const txHash = TxHash.fromBuffer(msg);
const foundTx = txPool.getTxByHash(txHash);
const asUint8Array = Uint8Array.from(foundTx ? foundTx.toBuffer() : Buffer.alloc(0));
return Promise.resolve(asUint8Array);
};

const requestResponseHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: txHandler,
};

return new LibP2PService(config, node, peerDiscoveryService, txPool, attestationPool, requestResponseHandlers);
}

/**
* Send Request via the ReqResp service
* The subprotocol defined will determine the request and response types
*
* See the subProtocolMap for the mapping of subprotocols to request/response types in `interface.ts`
*
* @param protocol The request response protocol to use
* @param request The request type to send
* @returns
*/
async sendRequest<SubProtocol extends ReqRespSubProtocol>(
protocol: SubProtocol,
request: InstanceType<SubProtocolMap[SubProtocol]['request']>,
): Promise<InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined> {
const pair = subProtocolMap[protocol];

const res = await this.reqresp.sendRequest(protocol, request.toBuffer());
if (!res) {
return undefined;
}

return pair.response.fromBuffer(res!);
}

/**
* Get the ENR of the node
* @returns The ENR of the node
*/
public getEnr(): ENR | undefined {
return this.peerDiscoveryService.getEnr();
}
Expand Down
18 changes: 14 additions & 4 deletions yarn-project/p2p/src/service/reqresp/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
export function pingHandler(_msg: any) {
return Uint8Array.from(Buffer.from('pong'));
/**
* Handles the ping request.
* @param _msg - The ping request message.
* @returns A resolved promise with the pong response.
*/
export function pingHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('pong')));
}

export function statusHandler(_msg: any) {
return Uint8Array.from(Buffer.from('ok'));
/**
* Handles the status request.
* @param _msg - The status request message.
* @returns A resolved promise with the ok response.
*/
export function statusHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('ok')));
}
Loading

0 comments on commit 54e1cc7

Please sign in to comment.