Skip to content

Commit

Permalink
feat(p2p): request response skeleton (#8076)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: just-mitch <[email protected]>
  • Loading branch information
Maddiaa0 and just-mitch authored Aug 26, 2024
1 parent 0263b4c commit bfbc4b2
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 0 deletions.
7 changes: 7 additions & 0 deletions yarn-project/p2p/src/service/reqresp/handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export function pingHandler(_msg: any) {
return Uint8Array.from(Buffer.from('pong'));
}

export function statusHandler(_msg: any) {
return Uint8Array.from(Buffer.from('ok'));
}
4 changes: 4 additions & 0 deletions yarn-project/p2p/src/service/reqresp/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Request Response protocol allows nodes to ask their peers for data
* that they missed via the traditional gossip protocol.
*/
13 changes: 13 additions & 0 deletions yarn-project/p2p/src/service/reqresp/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export enum ReqRespType {
Status = 'status',
Ping = 'ping',
/** Ask peers for specific transactions */
TxsByHash = 'txs_by_hash',
}

export const PING_PROTOCOL = '/aztec/ping/0.1.0';
export const STATUS_PROTOCOL = '/aztec/status/0.1.0';

export type SubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL;

export type SubProtocolHandler = (msg: string) => Uint8Array;
155 changes: 155 additions & 0 deletions yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { sleep } from '@aztec/foundation/sleep';

import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { bootstrap } from '@libp2p/bootstrap';
import { tcp } from '@libp2p/tcp';
import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p';

import { PING_PROTOCOL } from './interface.js';
import { ReqResp } from './reqresp.js';

/**
* Creates a libp2p node, pre configured.
* @param boostrapAddrs - an optional list of bootstrap addresses
* @returns Lip2p node
*/
async function createLibp2pNode(boostrapAddrs: string[] = []): Promise<Libp2p> {
const options: Libp2pOptions = {
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0'],
},
connectionEncryption: [noise()],
streamMuxers: [yamux()],
transports: [tcp()],
};

if (boostrapAddrs.length > 0) {
options.peerDiscovery = [
bootstrap({
list: boostrapAddrs,
}),
];
}

return await createLibp2p(options);
}

/**
* A p2p / req resp node pairing the req node will always contain the p2p node.
* they are provided as a pair to allow access the p2p node directly
*/
type ReqRespNode = {
p2p: Libp2p;
req: ReqResp;
};

/**
* @param numberOfNodes - the number of nodes to create
* @returns An array of the created nodes
*/
const createNodes = async (numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp()));
};

const startNodes = async (nodes: ReqRespNode[]) => {
for (const node of nodes) {
await node.req.start();
}
};

const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
await node.req.stop();
await node.p2p.stop();
}
};

// Create a req resp node, exposing the underlying p2p node
const createReqResp = async (): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const req = new ReqResp(p2p);
return {
p2p,
req,
};
};

// Given a node list; hand shake all of the nodes with each other
const connectToPeers = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
for (const otherNode of nodes) {
if (node === otherNode) {
continue;
}
const addr = otherNode.p2p.getMultiaddrs()[0];
await node.p2p.dial(addr);
}
}
};

// The Req Resp protocol should allow nodes to dial specific peers
// and ask for specific data that they missed via the traditional gossip protocol.
describe('ReqResp', () => {
it('Should perform a ping request', async () => {
// Create two nodes
// They need to discover each other
const nodes = await createNodes(2);
const { req: pinger } = nodes[0];

await startNodes(nodes);

// connect the nodes
await connectToPeers(nodes);

await sleep(500);

const res = await pinger.sendRequest(PING_PROTOCOL, Buffer.from('ping'));

await sleep(500);
expect(res?.toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});

it('Should handle gracefully if a peer connected peer is offline', async () => {
const nodes = await createNodes(2);

const { req: pinger } = nodes[0];
const { req: ponger } = nodes[1];
await startNodes(nodes);

// connect the nodes
await connectToPeers(nodes);
await sleep(500);

void ponger.stop();

// It should return undefined if it cannot dial the peer
const res = await pinger.sendRequest(PING_PROTOCOL, Buffer.from('ping'));

expect(res).toBeUndefined();

await stopNodes(nodes);
});

it('Should request from a later peer if other peers are offline', async () => {
const nodes = await createNodes(4);

await startNodes(nodes);
await sleep(500);
await connectToPeers(nodes);
await sleep(500);

// Stop the second middle two nodes
void nodes[1].req.stop();
void nodes[2].req.stop();

// send from the first node
const res = await nodes[0].req.sendRequest(PING_PROTOCOL, Buffer.from('ping'));

expect(res?.toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});
});
130 changes: 130 additions & 0 deletions yarn-project/p2p/src/service/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// @attribution: lodestar impl for inspiration
import { type Logger, createDebugLogger } from '@aztec/foundation/log';

import { type IncomingStreamData, type PeerId } from '@libp2p/interface';
import { pipe } from 'it-pipe';
import { type Libp2p } from 'libp2p';
import { type Uint8ArrayList } from 'uint8arraylist';

import { pingHandler, statusHandler } from './handlers.js';
import { PING_PROTOCOL, STATUS_PROTOCOL, type SubProtocol, type SubProtocolHandler } from './interface.js';

/**
* A mapping from a protocol to a handler function
*/
const SUB_PROTOCOL_HANDLERS: Record<SubProtocol, SubProtocolHandler> = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
};

export class ReqResp {
protected readonly logger: Logger;

private abortController: AbortController = new AbortController();

constructor(protected readonly libp2p: Libp2p) {
this.logger = createDebugLogger('aztec:p2p:reqresp');
}

/**
* Start the reqresp service
*/
async start() {
// Register all protocol handlers
for (const subProtocol of Object.keys(SUB_PROTOCOL_HANDLERS)) {
await this.libp2p.handle(subProtocol, this.streamHandler.bind(this, subProtocol as SubProtocol));
}
}

/**
* Stop the reqresp service
*/
async stop() {
// Unregister all handlers
for (const protocol of Object.keys(SUB_PROTOCOL_HANDLERS)) {
await this.libp2p.unhandle(protocol);
}
await this.libp2p.stop();
this.abortController.abort();
}

/**
* Send a request to peers, returns the first response
*
* @param subProtocol - The protocol being requested
* @param payload - The payload to send
* @returns - The response from the peer, otherwise undefined
*/
async sendRequest(subProtocol: SubProtocol, payload: Buffer): Promise<Buffer | undefined> {
// Get active peers
const peers = this.libp2p.getPeers();

// Attempt to ask all of our peers
for (const peer of peers) {
const response = await this.sendRequestToPeer(peer, subProtocol, payload);

// If we get a response, return it, otherwise we iterate onto the next peer
if (response) {
return response;
}
}
return undefined;
}

/**
* Sends a request to a specific peer
*
* @param peerId - The peer to send the request to
* @param subProtocol - The protocol to use to request
* @param payload - The payload to send
* @returns If the request is successful, the response is returned, otherwise undefined
*/
async sendRequestToPeer(peerId: PeerId, subProtocol: SubProtocol, payload: Buffer): Promise<Buffer | undefined> {
try {
const stream = await this.libp2p.dialProtocol(peerId, subProtocol);

const result = await pipe([payload], stream, this.readMessage);
return result;
} catch (e) {
this.logger.warn(`Failed to send request to peer ${peerId.publicKey}`);
return undefined;
}
}

/**
* Read a message returned from a stream into a single buffer
*/
private async readMessage(source: AsyncIterable<Uint8ArrayList>): Promise<Buffer> {
const chunks: Uint8Array[] = [];
for await (const chunk of source) {
chunks.push(chunk.subarray());
}
const messageData = chunks.concat();
return Buffer.concat(messageData);
}

/**
* Stream Handler
* Reads the incoming stream, determines the protocol, then triggers the appropriate handler
*
* @param param0 - The incoming stream data
*/
private async streamHandler(protocol: SubProtocol, { stream }: IncomingStreamData) {
try {
await pipe(
stream,
async function* (source) {
for await (const chunkList of source) {
const msg = Buffer.from(chunkList.subarray()).toString();
yield SUB_PROTOCOL_HANDLERS[protocol](msg);
}
},
stream,
);
} catch (e: any) {
this.logger.warn(e);
} finally {
await stream.close();
}
}
}

0 comments on commit bfbc4b2

Please sign in to comment.