Skip to content

Commit

Permalink
fix!: switch back to protons
Browse files Browse the repository at this point in the history
Updates code from ChainSafe#468
  • Loading branch information
achingbrain committed Jan 30, 2024
1 parent b77e6ca commit 3ba5374
Show file tree
Hide file tree
Showing 22 changed files with 8,530 additions and 4,762 deletions.
9,716 changes: 7,639 additions & 2,077 deletions package-lock.json

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
"scripts": {
"lint": "aegir lint",
"release": "aegir release --no-types",
"copy": "mkdirp dist/src/message && cp src/message/*.* dist/src/message",
"build": "npm run copy && aegir build",
"build": "aegir build",
"generate": "protons ./src/message/index.proto",
"prepare": "npm run build",
"pretest": "npm run build",
"pretest:e2e": "npm run build",
Expand Down Expand Up @@ -84,7 +84,7 @@
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.0",
"multiformats": "^12.0.1",
"protobufjs": "^7.2.4",
"protons-runtime": "^5.2.2",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.4"
},
Expand All @@ -105,6 +105,7 @@
"p-event": "^6.0.0",
"p-retry": "^5.1.2",
"p-wait-for": "^5.0.2",
"protons": "^7.3.4",
"sinon": "^15.1.2",
"time-cache": "^0.3.0",
"ts-node": "^10.7.0",
Expand Down
99 changes: 56 additions & 43 deletions src/index.ts

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions src/message-cache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { RPC } from './message/rpc.js'
import type { RPC } from './message/index.js'
import type { MessageId, MsgIdStr, PeerIdStr, TopicStr, MsgIdToStrFn } from './types.js'

export type CacheEntry = MessageId & {
Expand All @@ -8,7 +8,7 @@ export type CacheEntry = MessageId & {
export type MessageCacheRecord = Pick<MessageCacheEntry, 'message' | 'originatingPeers'>

interface MessageCacheEntry {
message: RPC.IMessage
message: RPC.Message
/**
* Tracks if the message has been validated by the app layer and thus forwarded
*/
Expand Down Expand Up @@ -60,7 +60,7 @@ export class MessageCache {
* Adds a message to the current window and the cache
* Returns true if the message is not known and is inserted in the cache
*/
put (messageId: MessageId, msg: RPC.IMessage, validated = false): boolean {
put (messageId: MessageId, msg: RPC.Message, validated = false): boolean {
const { msgIdStr } = messageId
// Don't add duplicate entries to the cache.
if (this.msgs.has(msgIdStr)) {
Expand Down Expand Up @@ -99,15 +99,15 @@ export class MessageCache {
/**
* Retrieves a message from the cache by its ID, if it is still present
*/
get (msgId: Uint8Array): RPC.IMessage | undefined {
get (msgId: Uint8Array): RPC.Message | undefined {
return this.msgs.get(this.msgIdToStrFn(msgId))?.message
}

/**
* Increases the iwant count for the given message by one and returns the message together
* with the iwant if the message exists.
*/
getWithIWantCount (msgIdStr: string, p: string): { msg: RPC.IMessage, count: number } | null {
getWithIWantCount (msgIdStr: string, p: string): { msg: RPC.Message, count: number } | null {
const msg = this.msgs.get(msgIdStr)
if (msg == null) {
return null
Expand Down
38 changes: 19 additions & 19 deletions src/message/decodeRpc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import protobuf from 'protobufjs/minimal.js'
import type { IRPC, RPC } from './rpc.js'
import type { RPC } from './index.js'

export interface DecodeRPCLimits {
maxSubscriptions: number
Expand All @@ -22,7 +22,7 @@ export const defaultDecodeRpcLimits: DecodeRPCLimits = {
/**
* Copied code from src/message/rpc.cjs but with decode limits to prevent OOM attacks
*/
export function decodeRpc (bytes: Uint8Array, opts: DecodeRPCLimits): IRPC {
export function decodeRpc (bytes: Uint8Array, opts: DecodeRPCLimits): RPC {
// Mutate to use the option as stateful counter. Must limit the total count of messageIDs across all IWANT, IHAVE
// else one count put 100 messageIDs into each 100 IWANT and "get around" the limit
opts = { ...opts }
Expand All @@ -31,7 +31,7 @@ export function decodeRpc (bytes: Uint8Array, opts: DecodeRPCLimits): IRPC {
const l = bytes.length

const c = l === undefined ? r.len : r.pos + l
const m: IRPC = {}
const m: RPC = { subscriptions: [], messages: [] }
while (r.pos < c) {
const t = r.uint32()
switch (t >>> 3) {
Expand All @@ -56,9 +56,9 @@ export function decodeRpc (bytes: Uint8Array, opts: DecodeRPCLimits): IRPC {
return m
}

function decodeSubOpts (r: protobuf.Reader, l: number): RPC.ISubOpts {
function decodeSubOpts (r: protobuf.Reader, l: number): RPC.SubOpts {
const c = l === undefined ? r.len : r.pos + l
const m: RPC.ISubOpts = {}
const m: RPC.SubOpts = {}
while (r.pos < c) {
const t = r.uint32()
switch (t >>> 3) {
Expand All @@ -76,10 +76,10 @@ function decodeSubOpts (r: protobuf.Reader, l: number): RPC.ISubOpts {
return m
}

function decodeMessage (r: protobuf.Reader, l: number): RPC.IMessage {
function decodeMessage (r: protobuf.Reader, l: number): RPC.Message {
const c = l === undefined ? r.len : r.pos + l
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const m = {} as RPC.IMessage
const m = {} as RPC.Message
while (r.pos < c) {
const t = r.uint32()
switch (t >>> 3) {
Expand Down Expand Up @@ -111,10 +111,10 @@ function decodeMessage (r: protobuf.Reader, l: number): RPC.IMessage {
return m
}

function decodeControlMessage (r: protobuf.Reader, l: number, opts: DecodeRPCLimits): RPC.IControlMessage {
function decodeControlMessage (r: protobuf.Reader, l: number, opts: DecodeRPCLimits): RPC.ControlMessage {
const c = l === undefined ? r.len : r.pos + l
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const m = {} as RPC.IControlMessage
const m = {} as RPC.ControlMessage
while (r.pos < c) {
const t = r.uint32()
switch (t >>> 3) {
Expand Down Expand Up @@ -146,10 +146,10 @@ function decodeControlMessage (r: protobuf.Reader, l: number, opts: DecodeRPCLim
return m
}

function decodeControlIHave (r: protobuf.Reader, l: number, opts: DecodeRPCLimits): RPC.IControlIHave {
function decodeControlIHave (r: protobuf.Reader, l: number, opts: DecodeRPCLimits): RPC.ControlIHave {
const c = l === undefined ? r.len : r.pos + l
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const m = {} as RPC.IControlIHave
const m = {} as RPC.ControlIHave
while (r.pos < c) {
const t = r.uint32()
switch (t >>> 3) {
Expand All @@ -169,10 +169,10 @@ function decodeControlIHave (r: protobuf.Reader, l: number, opts: DecodeRPCLimit
return m
}

function decodeControlIWant (r: protobuf.Reader, l: number, opts: DecodeRPCLimits): RPC.IControlIWant {
function decodeControlIWant (r: protobuf.Reader, l: number, opts: DecodeRPCLimits): RPC.ControlIWant {
const c = l === undefined ? r.len : r.pos + l
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const m = {} as RPC.IControlIWant
const m = {} as RPC.ControlIWant
while (r.pos < c) {
const t = r.uint32()
switch (t >>> 3) {
Expand All @@ -189,10 +189,10 @@ function decodeControlIWant (r: protobuf.Reader, l: number, opts: DecodeRPCLimit
return m
}

function decodeControlGraft (r: protobuf.Reader, l: number): RPC.IControlGraft {
function decodeControlGraft (r: protobuf.Reader, l: number): RPC.ControlGraft {
const c = l === undefined ? r.len : r.pos + l
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const m = {} as RPC.IControlGraft
const m = {} as RPC.ControlGraft
while (r.pos < c) {
const t = r.uint32()
switch (t >>> 3) {
Expand All @@ -207,10 +207,10 @@ function decodeControlGraft (r: protobuf.Reader, l: number): RPC.IControlGraft {
return m
}

function decodeControlPrune (r: protobuf.Reader, l: number, opts: DecodeRPCLimits): RPC.IControlPrune {
function decodeControlPrune (r: protobuf.Reader, l: number, opts: DecodeRPCLimits): RPC.ControlPrune {
const c = l === undefined ? r.len : r.pos + l
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const m = {} as RPC.IControlPrune
const m = {} as RPC.ControlPrune
while (r.pos < c) {
const t = r.uint32()
switch (t >>> 3) {
Expand All @@ -233,10 +233,10 @@ function decodeControlPrune (r: protobuf.Reader, l: number, opts: DecodeRPCLimit
return m
}

function decodePeerInfo (r: protobuf.Reader, l: number): RPC.IPeerInfo {
function decodePeerInfo (r: protobuf.Reader, l: number): RPC.PeerInfo {
const c = l === undefined ? r.len : r.pos + l
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const m = {} as RPC.IPeerInfo
const m = {} as RPC.PeerInfo
while (r.pos < c) {
const t = r.uint32()
switch (t >>> 3) {
Expand Down
16 changes: 8 additions & 8 deletions src/message/rpc.proto → src/message/index.proto
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
syntax = "proto3";

message RPC {
repeated SubOpts subscriptions = 1;
repeated Message messages = 2;
repeated SubOpts subscriptions = 1 [(protons.options).limit = 1024];
repeated Message messages = 2 [(protons.options).limit = 1024];
optional ControlMessage control = 3;

message SubOpts {
Expand All @@ -14,14 +14,14 @@ message RPC {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
required string topic = 4;
string topic = 4;
optional bytes signature = 5;
optional bytes key = 6;
}

message ControlMessage {
repeated ControlIHave ihave = 1;
repeated ControlIWant iwant = 2;
repeated ControlIHave ihave = 1 [(protons.options).limit = 1024];
repeated ControlIWant iwant = 2 [(protons.options).limit = 1024];
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
}
Expand All @@ -41,12 +41,12 @@ message RPC {

message ControlPrune {
optional string topicID = 1;
repeated PeerInfo peers = 2;
optional uint64 backoff = 3;
repeated PeerInfo peers = 2 [(protons.options).limit = 1024];
optional uint64 backoff = 3 [jstype = JS_NUMBER];
}

message PeerInfo {
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
}
}
}
Loading

0 comments on commit 3ba5374

Please sign in to comment.