From d904ba9230babb5386d3382d7812e75fedb4d254 Mon Sep 17 00:00:00 2001 From: Alex Coseru Date: Mon, 26 Aug 2024 22:49:08 +0300 Subject: [PATCH] Bug/remove_pubsub (#652) * remove pubsub * remove broadcast command & cleanup --- docs/API.md | 26 ----- src/@types/OceanNode.ts | 6 - src/@types/commands.ts | 5 - src/components/P2P/handleBroadcasts.ts | 29 ----- src/components/P2P/handlers.ts | 1 - src/components/P2P/index.ts | 109 ++++++------------ src/components/httpRoutes/commands.ts | 27 +---- src/components/httpRoutes/index.ts | 4 +- src/components/httpRoutes/routeUtils.ts | 4 - src/components/httpRoutes/validateCommands.ts | 6 - 10 files changed, 39 insertions(+), 178 deletions(-) delete mode 100644 src/components/P2P/handleBroadcasts.ts diff --git a/docs/API.md b/docs/API.md index 60c006bd1..968cd1070 100644 --- a/docs/API.md +++ b/docs/API.md @@ -404,32 +404,6 @@ returns list of logs --- -## Broadcast Command - -### `HTTP` POST /broadcastCommand - -#### Description - -returns an empty if command is valid - -#### Parameters - -| name | type | required | description | -| ------- | ------ | -------- | ---------------------------- | -| command | string | v | command name | -| ... | any | | any other command parameters | - -#### Request - -```json -{ - "command": "echo", - "...": "..." -} -``` - ---- - ## Advertise Did ### `HTTP` GET /advertiseDid/?did=did:op:123" diff --git a/src/@types/OceanNode.ts b/src/@types/OceanNode.ts index cdf854055..0109262df 100644 --- a/src/@types/OceanNode.ts +++ b/src/@types/OceanNode.ts @@ -125,12 +125,6 @@ export interface OceanNodeStatus { supportedSchemas?: Schema[] } -export interface P2PBroadcastResponse { - command: string // original broadcast command - message: any // original broadcast message - response: any // the actual response to the original command and message -} - export interface FindDDOResponse { provider: string id: string diff --git a/src/@types/commands.ts b/src/@types/commands.ts index 6384f0494..03f95de2c 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -137,11 +137,6 @@ export interface ICommandHandler { validate(command: Command): ValidateParams } -export interface BroadcastCommand { - command: string // the name of the command - message: any // the message to broadcast -} - export interface ComputeGetEnvironmentsCommand extends Command { chainId?: number } diff --git a/src/components/P2P/handleBroadcasts.ts b/src/components/P2P/handleBroadcasts.ts deleted file mode 100644 index 80a7872c8..000000000 --- a/src/components/P2P/handleBroadcasts.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { BroadcastCommand } from '../../@types/commands.js' -import { LOG_LEVELS_STR, getLoggerLevelEmoji } from '../../utils/logging/Logger.js' -import { P2P_LOGGER } from '../../utils/logging/common.js' - -export function handleBroadcasts(topic: string, _message: any) { - // can only register one handler for the protocol - - if (_message.detail.topic === topic) { - // 'broadcast from ', _message.detail.from - P2P_LOGGER.logMessage('Received broadcast msg... ', true) - const rawMessage = new TextDecoder('utf8').decode(_message.detail.data) - P2P_LOGGER.logMessageWithEmoji( - `Decoded broadcast: ${rawMessage}`, - true, - getLoggerLevelEmoji(LOG_LEVELS_STR.LEVEL_INFO), - LOG_LEVELS_STR.LEVEL_INFO - ) - - const command: BroadcastCommand = JSON.parse(rawMessage) as BroadcastCommand - - P2P_LOGGER.log( - LOG_LEVELS_STR.LEVEL_WARN, - `Broadcast command "${command.command}" not implemented yet!`, - true - ) - } else { - // console.log('Got some relays...', message.detail) - } -} diff --git a/src/components/P2P/handlers.ts b/src/components/P2P/handlers.ts index e4dde59cd..d6e0b86e1 100644 --- a/src/components/P2P/handlers.ts +++ b/src/components/P2P/handlers.ts @@ -1,2 +1 @@ -export * from './handleBroadcasts.js' export * from './handleProtocolCommands.js' diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 0d21c6f87..c0b82ce56 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -4,7 +4,6 @@ import EventEmitter from 'node:events' import clone from 'lodash.clonedeep' import { - // handleBroadcasts, // handlePeerConnect, // handlePeerDiscovery, // handlePeerDisconnect, @@ -19,7 +18,7 @@ import { mdns } from '@libp2p/mdns' import { yamux } from '@chainsafe/libp2p-yamux' import { peerIdFromString } from '@libp2p/peer-id' import { pipe } from 'it-pipe' -import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery' +// import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery' import { tcp } from '@libp2p/tcp' import { webSockets } from '@libp2p/websockets' @@ -31,7 +30,7 @@ import { uPnPNAT } from '@libp2p/upnp-nat' import { ping } from '@libp2p/ping' import { dcutr } from '@libp2p/dcutr' import { kadDHT, passthroughMapper } from '@libp2p/kad-dht' -import { gossipsub } from '@chainsafe/libp2p-gossipsub' +// import { gossipsub } from '@chainsafe/libp2p-gossipsub' import { EVENTS, cidFromRawString } from '../../utils/index.js' import { Transform } from 'stream' @@ -40,11 +39,7 @@ import { OceanNodeConfig, FindDDOResponse } from '../../@types/OceanNode' // eslint-disable-next-line camelcase import is_ip_private from 'private-ip' import ip from 'ip' -import { - GENERIC_EMOJIS, - LOG_LEVELS_STR, - getLoggerLevelEmoji -} from '../../utils/logging/Logger.js' +import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js' import { INDEXER_DDO_EVENT_EMITTER } from '../Indexer/index.js' import { P2P_LOGGER } from '../../utils/logging/common.js' import { CoreHandlersRegistry } from '../core/handler/coreHandlersRegistry' @@ -119,9 +114,10 @@ export class OceanP2P extends EventEmitter { this._libp2p.addEventListener('peer:disconnect', (evt: any) => { this.handlePeerDisconnect(evt) }) - this._libp2p.addEventListener('peer:discovery', (evt: any) => { - this.handlePeerDiscovery(evt) + this._libp2p.addEventListener('peer:discovery', (details: any) => { + this.handlePeerDiscovery(details) }) + this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options)) this._peers = [] this._connections = {} @@ -161,9 +157,22 @@ export class OceanP2P extends EventEmitter { P2P_LOGGER.debug('Connection closed to:' + peerId.toString()) // Emitted when a peer has been found } - handlePeerDiscovery(details: any) { - const peerInfo = details.detail - P2P_LOGGER.debug('Discovered new peer:' + peerInfo.id.toString()) + async handlePeerDiscovery(details: any) { + try { + const peerInfo = details.detail + P2P_LOGGER.debug('Discovered new peer:' + peerInfo.id.toString()) + if (peerInfo.multiaddrs) { + await this._libp2p.peerStore.save(peerInfo.id, { + multiaddrs: peerInfo.multiaddrs + }) + await this._libp2p.peerStore.patch(peerInfo.id, { + multiaddrs: peerInfo.multiaddrs + }) + } + } catch (e) { + // no panic if it failed + // console.error(e) + } } handlePeerJoined(details: any) { @@ -233,7 +242,6 @@ export class OceanP2P extends EventEmitter { this._privateKey = config.keys.privateKey /** @type {import('libp2p').Libp2pOptions} */ // start with some default, overwrite based on config later - let doPx = false const bindInterfaces = [] if (config.p2pConfig.enableIPV4) { P2P_LOGGER.info('Binding P2P sockets to IPV4') @@ -258,7 +266,6 @@ export class OceanP2P extends EventEmitter { config.p2pConfig.announceAddresses && config.p2pConfig.announceAddresses.length > 0 ) { - doPx = true addresses = { listen: bindInterfaces, announceFilter: (multiaddrs: any[]) => @@ -274,6 +281,7 @@ export class OceanP2P extends EventEmitter { } let servicesConfig = { identify: identify(), + /* pubsub: gossipsub({ fallbackToFloodsub: false, batchPublish: false, @@ -286,7 +294,7 @@ export class OceanP2P extends EventEmitter { // canRelayMessage: true, // enabled: true allowedTopics: ['oceanprotocol._peer-discovery._p2p._pubsub', 'oceanprotocol'] - }), + }), */ dht: kadDHT({ // this is necessary because this node is not connected to the public network // it can be removed if, for example bootstrappers are configured @@ -372,7 +380,7 @@ export class OceanP2P extends EventEmitter { }), mdns({ interval: config.p2pConfig.mDNSInterval - }), + }) /*, pubsubPeerDiscovery({ interval: config.p2pConfig.pubsubPeerDiscoveryInterval, topics: [ @@ -381,7 +389,7 @@ export class OceanP2P extends EventEmitter { // '_peer-discovery._p2p._pubsub' // Include if you want to participate in the global space ], listenOnly: false - }) + }) */ ] } } @@ -393,7 +401,7 @@ export class OceanP2P extends EventEmitter { peerDiscovery: [ mdns({ interval: config.p2pConfig.mDNSInterval - }), + }) /*, pubsubPeerDiscovery({ interval: config.p2pConfig.pubsubPeerDiscoveryInterval, topics: [ @@ -402,7 +410,7 @@ export class OceanP2P extends EventEmitter { // '_peer-discovery._p2p._pubsub' // Include if you want to participate in the global space ], listenOnly: false - }) + }) */ ] } } @@ -410,29 +418,6 @@ export class OceanP2P extends EventEmitter { const node = await createLibp2p(options) await node.start() - // node.services.pubsub.addEventListener( 'peer joined', (evt:any) => {handlePeerJoined(evt)}) - // node.services.pubsub.addEventListener('peer left', (evt:any) => {handlePeerLeft(evt)}) - // node.services.pubsub.addEventListener('subscription-change', (evt:any) => { handleSubscriptionCHange(evt)}) - - // this._libp2p.services.pubsub.on('peer joined', (peer:any) => { - // console.log('New peer joined us:', peer) - // }) - // this._libp2p.services.pubsub.addEventListener('peer left', (evt:any) => { - // console.log('Peer left...', evt) - // }) - // this._libp2p.services.pubsub.on('peer left', (peer:any) => { - // console.log('Peer left...', peer) - // }) - - /* since we don't have broadcasts implemented, comment this part of the code - node.services.pubsub.addEventListener('message', (message: any) => { - handleBroadcasts(this._topic, message) - }) - */ - - node.services.pubsub.subscribe(this._topic) - node.services.pubsub.publish(this._topic, encoding('online')) - const upnpService = (node.services as any).upnpNAT if (config.p2pConfig.upnp && upnpService) { this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000) @@ -489,24 +474,24 @@ export class OceanP2P extends EventEmitter { async getOceanPeers(running: boolean = true, known: boolean = true) { const peers: string[] = [] - if (running) { + /* if (running) { // get pubsub peers const node = this._libp2p const newPeers = (await node.services.pubsub.getSubscribers(this._topic)).sort() for (const peer of newPeers.slice(0)) { if (!peers.includes(peer.toString)) peers.push(peer.toString()) } - } + } */ if (known) { // get p2p peers and filter them by protocol for (const peer of await this._libp2p.peerStore.all()) { - if (peer && peer.protocols) { - for (const protocol of peer.protocols) { - if (protocol === this._protocol) { - if (!peers.includes(peer.id.toString())) peers.push(peer.id.toString()) - } - } - } + // if (peer && peer.protocols) { + // for (const protocol of peer.protocols) { + // if (protocol === this._protocol) { + if (!peers.includes(peer.id.toString())) peers.push(peer.id.toString()) + // } + // } + // } } } @@ -518,18 +503,6 @@ export class OceanP2P extends EventEmitter { return Boolean(s.find((p: any) => p.toString() === peer.toString())) } - async broadcast(_message: any) { - P2P_LOGGER.logMessage('Broadcasting:', true) - P2P_LOGGER.logMessageWithEmoji( - _message, - true, - getLoggerLevelEmoji(LOG_LEVELS_STR.LEVEL_INFO), - LOG_LEVELS_STR.LEVEL_INFO - ) - const message = encoding(_message) - await this._libp2p.services.pubsub.publish(this._topic, message) - } - async getPeerDetails(peerName: string) { try { const peerId = peerIdFromString(peerName) @@ -1013,11 +986,3 @@ export class OceanP2P extends EventEmitter { this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000) } } - -function encoding(message: any) { - if (!(message instanceof Uint8Array)) { - return uint8ArrayFromString(message) - } - - return message -} diff --git a/src/components/httpRoutes/commands.ts b/src/components/httpRoutes/commands.ts index b8b395e05..49ca7558e 100644 --- a/src/components/httpRoutes/commands.ts +++ b/src/components/httpRoutes/commands.ts @@ -3,35 +3,10 @@ import express, { Request, Response } from 'express' import { P2PCommandResponse } from '../../@types' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import { getDefaultLevel } from '../../utils/logging/Logger.js' - import { HTTP_LOGGER } from '../../utils/logging/common.js' -import { hasP2PInterface, sendMissingP2PResponse } from './index.js' +import { hasP2PInterface } from './index.js' import { validateCommandParameters } from './validateCommands.js' -export const broadcastCommandRoute = express.Router() - -broadcastCommandRoute.post( - '/broadcastCommand', - express.json(), - async (req: Request, res: Response): Promise => { - const validate = validateCommandParameters(req.body, []) - if (!validate.valid) { - res.status(validate.status).send(validate.reason) - return - } - - HTTP_LOGGER.log(getDefaultLevel(), `broadcastCommand received ${req.body}`, true) - - if (hasP2PInterface) { - await req.oceanNode.getP2PNode().broadcast(JSON.stringify(req.body)) - res.sendStatus(200) - } else { - sendMissingP2PResponse(res) - } - } -) - export const directCommandRoute = express.Router() directCommandRoute.post( '/directCommand', diff --git a/src/components/httpRoutes/index.ts b/src/components/httpRoutes/index.ts index aa4b80d5e..4a4a28b74 100644 --- a/src/components/httpRoutes/index.ts +++ b/src/components/httpRoutes/index.ts @@ -1,7 +1,7 @@ import express, { Response } from 'express' import { getOceanPeersRoute, getP2PPeersRoute, getP2PPeerRoute } from './getOceanPeers.js' import { advertiseDidRoute, getProvidersForDidRoute } from './dids.js' -import { broadcastCommandRoute, directCommandRoute } from './commands.js' +import { directCommandRoute } from './commands.js' import { logRoutes } from './logs.js' import { providerRoutes } from './provider.js' import { aquariusRoutes } from './aquarius.js' @@ -34,8 +34,6 @@ httpRoutes.use(getP2PPeerRoute) httpRoutes.use(advertiseDidRoute) // /getProvidersForDid httpRoutes.use(getProvidersForDidRoute) -// /broadcastCommand -httpRoutes.use(broadcastCommandRoute) // /directCommand httpRoutes.use(directCommandRoute) // /logs diff --git a/src/components/httpRoutes/routeUtils.ts b/src/components/httpRoutes/routeUtils.ts index f72c0e578..4843f5d0e 100644 --- a/src/components/httpRoutes/routeUtils.ts +++ b/src/components/httpRoutes/routeUtils.ts @@ -92,10 +92,6 @@ routesNames.set('directCommand', { method: 'post' }) -routesNames.set('broadcastCommand', { - path: '/broadcastCommand', - method: 'post' -}) // fileInfo routesNames.set('fileInfo', { path: `${SERVICES_API_BASE_PATH}/fileInfo`, diff --git a/src/components/httpRoutes/validateCommands.ts b/src/components/httpRoutes/validateCommands.ts index f36e5dfcb..49850f496 100644 --- a/src/components/httpRoutes/validateCommands.ts +++ b/src/components/httpRoutes/validateCommands.ts @@ -10,12 +10,6 @@ export type ValidateParams = { status?: number } -export function validateBroadcastParameters(requestBody: any): ValidateParams { - // for now we can use the same validation function, - // but later we might need to have separate validation functions - // if we many different commands of each type - return validateCommandParameters(requestBody, []) -} // add others when we add suppor // request level validation, just check if we have a "command" field and its a supported one