Skip to content

Commit

Permalink
deps(dev): bump aegir from 37.12.1 to 38.1.6 (#128)
Browse files Browse the repository at this point in the history
* deps(dev): bump aegir from 37.12.1 to 38.1.6

Bumps [aegir](https://github.com/ipfs/aegir) from 37.12.1 to 38.1.6.
- [Release notes](https://github.com/ipfs/aegir/releases)
- [Changelog](https://github.com/ipfs/aegir/blob/master/CHANGELOG.md)
- [Commits](ipfs/aegir@v37.12.1...v38.1.6)

---
updated-dependencies:
- dependency-name: aegir
  dependency-type: direct:development
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>

* chore: fix linting

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: achingbrain <[email protected]>
  • Loading branch information
dependabot[bot] and achingbrain authored Feb 22, 2023
1 parent 9c2c7cb commit 7609545
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 88 deletions.
8 changes: 3 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
"scripts": {
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"dep-check": "aegir dep-check -i protons",
"build": "aegir build",
"generate": "protons test/message/rpc.proto",
"test": "aegir test",
Expand All @@ -187,7 +187,6 @@
"@libp2p/peer-collections": "^3.0.0",
"@libp2p/peer-id": "^2.0.0",
"@libp2p/topology": "^4.0.0",
"@multiformats/multiaddr": "^11.0.0",
"abortable-iterator": "^4.0.2",
"it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.3",
Expand All @@ -199,14 +198,13 @@
},
"devDependencies": {
"@libp2p/peer-id-factory": "^2.0.0",
"aegir": "^37.9.1",
"aegir": "^38.1.6",
"delay": "^5.0.0",
"it-pair": "^2.0.2",
"p-defer": "^4.0.0",
"p-wait-for": "^5.0.0",
"protons": "^7.0.2",
"protons-runtime": "^5.0.0",
"sinon": "^15.0.1",
"util": "^0.12.4"
"sinon": "^15.0.1"
}
}
66 changes: 35 additions & 31 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export interface PubSubComponents {
* PubSubBaseProtocol handles the peers and connections logic for pubsub routers
* and specifies the API that pubsub routers should have.
*/
export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = PubSubEvents> extends EventEmitter<Events> implements PubSub<Events> {
export abstract class PubSubBaseProtocol<Events extends Record<string, any> = PubSubEvents> extends EventEmitter<Events> implements PubSub<Events> {
public started: boolean
/**
* Map of topics to which peers are subscribed to
Expand Down Expand Up @@ -108,10 +108,8 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P

/**
* Register the pubsub protocol onto the libp2p node.
*
* @returns {void}
*/
async start () {
async start (): Promise<void> {
if (this.started || !this.enabled) {
return
}
Expand All @@ -121,10 +119,12 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
const registrar = this.components.registrar
// Incoming streams
// Called after a peer dials us
await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream, {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams
})))
await Promise.all(this.multicodecs.map(async multicodec => {
await registrar.handle(multicodec, this._onIncomingStream, {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams
})
}))

// register protocol with topology
// Topology callbacks called on connection manager changes
Expand All @@ -141,7 +141,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Unregister the pubsub protocol and the streams with other peers will be closed.
*/
async stop () {
async stop (): Promise<void> {
if (!this.started || !this.enabled) {
return
}
Expand All @@ -150,10 +150,14 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P

// unregister protocol and handlers
if (this._registrarTopologyIds != null) {
this._registrarTopologyIds?.map(id => registrar.unregister(id))
this._registrarTopologyIds?.forEach(id => {
registrar.unregister(id)
})
}

await Promise.all(this.multicodecs.map(async multicodec => await registrar.unhandle(multicodec)))
await Promise.all(this.multicodecs.map(async multicodec => {
await registrar.unhandle(multicodec)
}))

log('stopping')
for (const peerStreams of this.peers.values()) {
Expand All @@ -166,14 +170,14 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
log('stopped')
}

isStarted () {
isStarted (): boolean {
return this.started
}

/**
* On an inbound stream opened
*/
protected _onIncomingStream (data: IncomingStreamData) {
protected _onIncomingStream (data: IncomingStreamData): void {
const { stream, connection } = data
const peerId = connection.remotePeer

Expand All @@ -186,13 +190,13 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
const inboundStream = peer.attachInboundStream(stream)

this.processMessages(peerId, inboundStream, peer)
.catch(err => log(err))
.catch(err => { log(err) })
}

/**
* Registrar notifies an established connection with pubsub protocol
*/
protected _onPeerConnected (peerId: PeerId, conn: Connection) {
protected _onPeerConnected (peerId: PeerId, conn: Connection): void {
log('connected %p', peerId)

void Promise.resolve().then(async () => {
Expand Down Expand Up @@ -221,7 +225,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Registrar notifies a closing connection with pubsub protocol
*/
protected _onPeerDisconnected (peerId: PeerId, conn?: Connection) {
protected _onPeerDisconnected (peerId: PeerId, conn?: Connection): void {
const idB58Str = peerId.toString()

log('connection ended', idB58Str)
Expand Down Expand Up @@ -258,7 +262,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Notifies the router that a peer has been disconnected
*/
protected _removePeer (peerId: PeerId) {
protected _removePeer (peerId: PeerId): PeerStreams | undefined {
const peerStreams = this.peers.get(peerId)
if (peerStreams == null) {
return
Expand All @@ -284,7 +288,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Responsible for processing each RPC message received by other peers.
*/
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8ArrayList>, peerStreams: PeerStreams) {
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8ArrayList>, peerStreams: PeerStreams): Promise<void> {
try {
await pipe(
stream,
Expand Down Expand Up @@ -320,7 +324,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
})),
messages
})
.catch(err => log(err))
.catch(err => { log(err) })
}
}
)
Expand Down Expand Up @@ -378,7 +382,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
log.error(err)
}
}))
.catch(err => log(err))
.catch(err => { log(err) })
}

return true
Expand All @@ -387,7 +391,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Handles a subscription change from a peer
*/
processRpcSubOpt (id: PeerId, subOpt: PubSubRPCSubscription) {
processRpcSubOpt (id: PeerId, subOpt: PubSubRPCSubscription): void {
const t = subOpt.topic

if (t == null) {
Expand All @@ -412,7 +416,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Handles a message from a peer
*/
async processMessage (from: PeerId, msg: Message) {
async processMessage (from: PeerId, msg: Message): Promise<void> {
if (this.components.peerId.equals(from) && !this.emitSelf) {
return
}
Expand Down Expand Up @@ -442,7 +446,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
* The default msgID implementation
* Child class can override this.
*/
getMsgId (msg: Message) {
getMsgId (msg: Message): Promise<Uint8Array> | Uint8Array {
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictSign':
Expand Down Expand Up @@ -470,7 +474,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
* Whether to accept a message from a peer
* Override to create a graylist
*/
acceptFrom (id: PeerId) {
acceptFrom (id: PeerId): boolean {
return true
}

Expand All @@ -495,10 +499,10 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Send an rpc object to a peer
*/
send (peer: PeerId, data: { messages?: Message[], subscriptions?: string[], subscribe?: boolean }) {
send (peer: PeerId, data: { messages?: Message[], subscriptions?: string[], subscribe?: boolean }): void {
const { messages, subscriptions, subscribe } = data

return this.sendRpc(peer, {
this.sendRpc(peer, {
subscriptions: (subscriptions ?? []).map(str => ({ topic: str, subscribe: Boolean(subscribe) })),
messages: (messages ?? []).map(toRpcMessage)
})
Expand All @@ -507,7 +511,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Send an rpc object to a peer
*/
sendRpc (peer: PeerId, rpc: PubSubRPC) {
sendRpc (peer: PeerId, rpc: PubSubRPC): void {
const peerStreams = this.peers.get(peer)

if (peerStreams == null || !peerStreams.isWritable) {
Expand All @@ -523,7 +527,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
* Validates the given message. The signature will be checked for authenticity.
* Throws an error on invalid messages
*/
async validate (from: PeerId, message: Message) { // eslint-disable-line require-await
async validate (from: PeerId, message: Message): Promise<void> { // eslint-disable-line require-await
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictNoSign':
Expand Down Expand Up @@ -671,7 +675,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Subscribes to a given topic.
*/
subscribe (topic: string) {
subscribe (topic: string): void {
if (!this.started) {
throw new Error('Pubsub has not started')
}
Expand All @@ -690,7 +694,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Unsubscribe from the given topic
*/
unsubscribe (topic: string) {
unsubscribe (topic: string): void {
if (!this.started) {
throw new Error('Pubsub is not started')
}
Expand All @@ -713,7 +717,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
/**
* Get the list of topics which the peer is subscribed to.
*/
getTopics () {
getTopics (): string[] {
if (!this.started) {
throw new Error('Pubsub is not started')
}
Expand Down
14 changes: 7 additions & 7 deletions src/peer-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,22 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
/**
* Do we have a connection to read from?
*/
get isReadable () {
get isReadable (): boolean {
return Boolean(this.inboundStream)
}

/**
* Do we have a connection to write on?
*/
get isWritable () {
get isWritable (): boolean {
return Boolean(this.outboundStream)
}

/**
* Send a message to this peer.
* Throws if there is no `stream` to write to available.
*/
write (data: Uint8Array | Uint8ArrayList) {
write (data: Uint8Array | Uint8ArrayList): void {
if (this.outboundStream == null) {
const id = this.id.toString()
throw new Error('No writable connection to ' + id)
Expand All @@ -85,7 +85,7 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
/**
* Attach a raw inbound stream and setup a read stream
*/
attachInboundStream (stream: Stream) {
attachInboundStream (stream: Stream): AsyncIterable<Uint8ArrayList> {
// Create and attach a new inbound stream
// The inbound stream is:
// - abortable, set to only return on abort, rather than throw
Expand All @@ -107,12 +107,12 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
/**
* Attach a raw outbound stream and setup a write stream
*/
async attachOutboundStream (stream: Stream) {
async attachOutboundStream (stream: Stream): Promise<Pushable<Uint8ArrayList>> {
// If an outbound stream already exists, gently close it
const _prevStream = this.outboundStream
if (this.outboundStream != null) {
// End the stream without emitting a close event
await this.outboundStream.end()
this.outboundStream.end()
}

this._rawOutboundStream = stream
Expand Down Expand Up @@ -151,7 +151,7 @@ export class PeerStreams extends EventEmitter<PeerStreamEvents> {
/**
* Closes the open connection to peer
*/
close () {
close (): void {
if (this.closed) {
return
}
Expand Down
4 changes: 2 additions & 2 deletions src/sign.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export async function signMessage (peerId: PeerId, message: { from: PeerId, topi
/**
* Verifies the signature of the given message
*/
export async function verifySignature (message: SignedMessage, encode: (rpc: PubSubRPCMessage) => Uint8Array) {
export async function verifySignature (message: SignedMessage, encode: (rpc: PubSubRPCMessage) => Uint8Array): Promise<boolean> {
if (message.type !== 'signed') {
throw new Error('Message type must be "signed" to be verified')
}
Expand Down Expand Up @@ -80,7 +80,7 @@ export async function verifySignature (message: SignedMessage, encode: (rpc: Pub
* Returns the PublicKey associated with the given message.
* If no valid PublicKey can be retrieved an error will be returned.
*/
export async function messagePublicKey (message: SignedMessage) {
export async function messagePublicKey (message: SignedMessage): Promise<Uint8Array> {
if (message.type !== 'signed') {
throw new Error('Message type must be "signed" to have a public key')
}
Expand Down
8 changes: 4 additions & 4 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export function randomSeqno (): bigint {
/**
* Generate a message id, based on the `key` and `seqno`
*/
export const msgId = (key: Uint8Array, seqno: bigint) => {
export const msgId = (key: Uint8Array, seqno: bigint): Uint8Array => {
const seqnoBytes = uint8ArrayFromString(seqno.toString(16).padStart(16, '0'), 'base16')

const msgId = new Uint8Array(key.length + seqnoBytes.length)
Expand All @@ -30,15 +30,15 @@ export const msgId = (key: Uint8Array, seqno: bigint) => {
/**
* Generate a message id, based on message `data`
*/
export const noSignMsgId = (data: Uint8Array) => {
export const noSignMsgId = (data: Uint8Array): Uint8Array | Promise<Uint8Array> => {
return sha256.encode(data)
}

/**
* Check if any member of the first set is also a member
* of the second set
*/
export const anyMatch = (a: Set<number> | number[], b: Set<number> | number[]) => {
export const anyMatch = (a: Set<number> | number[], b: Set<number> | number[]): boolean => {
let bHas
if (Array.isArray(b)) {
bHas = (val: number) => b.includes(val)
Expand All @@ -58,7 +58,7 @@ export const anyMatch = (a: Set<number> | number[], b: Set<number> | number[]) =
/**
* Make everything an array
*/
export const ensureArray = function <T> (maybeArray: T | T[]) {
export const ensureArray = function <T> (maybeArray: T | T[]): T[] {
if (!Array.isArray(maybeArray)) {
return [maybeArray]
}
Expand Down
Loading

0 comments on commit 7609545

Please sign in to comment.