From bf102b38e75ddb7b84503f36268e94f32b55923d Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 18 Jan 2023 21:50:03 -0800 Subject: [PATCH 01/18] rcmgr: Change LimitConfig to use LimitVal type --- p2p/host/resource-manager/limit.go | 76 ++- .../limit_config_test.backwards-compat.json | 45 ++ .../resource-manager/limit_config_test.go | 78 ++- p2p/host/resource-manager/limit_defaults.go | 545 +++++++++++++----- p2p/host/resource-manager/limit_test.go | 124 +++- p2p/host/resource-manager/rcmgr_test.go | 42 +- p2p/test/resource-manager/rcmgr_test.go | 52 +- 7 files changed, 717 insertions(+), 245 deletions(-) create mode 100644 p2p/host/resource-manager/limit_config_test.backwards-compat.json diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index c3d6dd88c8..22765fdeb1 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -52,11 +52,11 @@ type Limiter interface { // NewDefaultLimiterFromJSON creates a new limiter by parsing a json configuration, // using the default limits for fallback. func NewDefaultLimiterFromJSON(in io.Reader) (Limiter, error) { - return NewLimiterFromJSON(in, DefaultLimits.AutoScale()) + return NewLimiterFromJSON(in, DefaultReifiedLimits) } // NewLimiterFromJSON creates a new limiter by parsing a json configuration. -func NewLimiterFromJSON(in io.Reader, defaults LimitConfig) (Limiter, error) { +func NewLimiterFromJSON(in io.Reader, defaults ReifiedLimitConfig) (Limiter, error) { cfg, err := readLimiterConfigFromJSON(in, defaults) if err != nil { return nil, err @@ -64,25 +64,24 @@ func NewLimiterFromJSON(in io.Reader, defaults LimitConfig) (Limiter, error) { return &fixedLimiter{cfg}, nil } -func readLimiterConfigFromJSON(in io.Reader, defaults LimitConfig) (LimitConfig, error) { +func readLimiterConfigFromJSON(in io.Reader, defaults ReifiedLimitConfig) (ReifiedLimitConfig, error) { var cfg LimitConfig if err := json.NewDecoder(in).Decode(&cfg); err != nil { - return LimitConfig{}, err + return ReifiedLimitConfig{}, err } - cfg.Apply(defaults) - return cfg, nil + return cfg.Reify(defaults), nil } // fixedLimiter is a limiter with fixed limits. type fixedLimiter struct { - LimitConfig + ReifiedLimitConfig } var _ Limiter = (*fixedLimiter)(nil) -func NewFixedLimiter(conf LimitConfig) Limiter { +func NewFixedLimiter(conf ReifiedLimitConfig) Limiter { log.Debugw("initializing new limiter with config", "limits", conf) - return &fixedLimiter{LimitConfig: conf} + return &fixedLimiter{conf} } // BaseLimit is a mixin type for basic resource limits. @@ -97,6 +96,33 @@ type BaseLimit struct { Memory int64 `json:",omitempty"` } +func valueOrBlockAll(n int) LimitVal { + if n == 0 { + return BlockAllLimit + } + return LimitVal(n) +} +func valueOrBlockAll64(n int64) LimitVal64 { + if n == 0 { + return BlockAllLimit64 + } + return LimitVal64(n) +} + +// ToResourceLimits converts the BaseLimit to a ResourceLimits +func (l BaseLimit) ToResourceLimits() ResourceLimits { + return ResourceLimits{ + Streams: valueOrBlockAll(l.Streams), + StreamsInbound: valueOrBlockAll(l.StreamsInbound), + StreamsOutbound: valueOrBlockAll(l.StreamsOutbound), + Conns: valueOrBlockAll(l.Conns), + ConnsInbound: valueOrBlockAll(l.ConnsInbound), + ConnsOutbound: valueOrBlockAll(l.ConnsOutbound), + FD: valueOrBlockAll(l.FD), + Memory: valueOrBlockAll64(l.Memory), + } +} + // Apply overwrites all zero-valued limits with the values of l2 // Must not use a pointer receiver. func (l *BaseLimit) Apply(l2 BaseLimit) { @@ -202,65 +228,65 @@ func (l *BaseLimit) GetMemoryLimit() int64 { } func (l *fixedLimiter) GetSystemLimits() Limit { - return &l.System + return &l.system } func (l *fixedLimiter) GetTransientLimits() Limit { - return &l.Transient + return &l.transient } func (l *fixedLimiter) GetAllowlistedSystemLimits() Limit { - return &l.AllowlistedSystem + return &l.allowlistedSystem } func (l *fixedLimiter) GetAllowlistedTransientLimits() Limit { - return &l.AllowlistedTransient + return &l.allowlistedTransient } func (l *fixedLimiter) GetServiceLimits(svc string) Limit { - sl, ok := l.Service[svc] + sl, ok := l.service[svc] if !ok { - return &l.ServiceDefault + return &l.serviceDefault } return &sl } func (l *fixedLimiter) GetServicePeerLimits(svc string) Limit { - pl, ok := l.ServicePeer[svc] + pl, ok := l.servicePeer[svc] if !ok { - return &l.ServicePeerDefault + return &l.servicePeerDefault } return &pl } func (l *fixedLimiter) GetProtocolLimits(proto protocol.ID) Limit { - pl, ok := l.Protocol[proto] + pl, ok := l.protocol[proto] if !ok { - return &l.ProtocolDefault + return &l.protocolDefault } return &pl } func (l *fixedLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit { - pl, ok := l.ProtocolPeer[proto] + pl, ok := l.protocolPeer[proto] if !ok { - return &l.ProtocolPeerDefault + return &l.protocolPeerDefault } return &pl } func (l *fixedLimiter) GetPeerLimits(p peer.ID) Limit { - pl, ok := l.Peer[p] + pl, ok := l.peer[p] if !ok { - return &l.PeerDefault + return &l.peerDefault } return &pl } func (l *fixedLimiter) GetStreamLimits(_ peer.ID) Limit { - return &l.Stream + return &l.stream } func (l *fixedLimiter) GetConnLimits() Limit { - return &l.Conn + return &l.conn } diff --git a/p2p/host/resource-manager/limit_config_test.backwards-compat.json b/p2p/host/resource-manager/limit_config_test.backwards-compat.json new file mode 100644 index 0000000000..b1a5e9ecb7 --- /dev/null +++ b/p2p/host/resource-manager/limit_config_test.backwards-compat.json @@ -0,0 +1,45 @@ +{ + "System": { + "Memory": 65536, + "Conns": 16, + "ConnsInbound": 8, + "ConnsOutbound": 16, + "FD": 16 + }, + "ServiceDefault": { + "Memory": 8765 + }, + "Service": { + "A": { + "Memory": 8192 + }, + "B": {} + }, + "ServicePeerDefault": { + "Memory": 2048 + }, + "ServicePeer": { + "A": { + "Memory": 4096 + } + }, + "ProtocolDefault": { + "Memory": 2048 + }, + "ProtocolPeerDefault": { + "Memory": 1024 + }, + "Protocol": { + "/A": { + "Memory": 8192 + } + }, + "PeerDefault": { + "Memory": 4096 + }, + "Peer": { + "12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS": { + "Memory": 4097 + } + } +} \ No newline at end of file diff --git a/p2p/host/resource-manager/limit_config_test.go b/p2p/host/resource-manager/limit_config_test.go index 21dede5641..8a23486d35 100644 --- a/p2p/host/resource-manager/limit_config_test.go +++ b/p2p/host/resource-manager/limit_config_test.go @@ -17,6 +17,43 @@ func withMemoryLimit(l BaseLimit, m int64) BaseLimit { return l2 } +func TestLimitConfigParserBackwardsCompat(t *testing.T) { + in, err := os.Open("limit_config_test.json") + require.NoError(t, err) + defer in.Close() + + DefaultLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + DefaultLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaults := DefaultLimits.AutoScale() + cfg, err := readLimiterConfigFromJSON(in, defaults) + require.NoError(t, err) + + require.Equal(t, int64(65536), cfg.system.Memory) + require.Equal(t, defaults.system.Streams, cfg.system.Streams) + require.Equal(t, defaults.system.StreamsInbound, cfg.system.StreamsInbound) + require.Equal(t, defaults.system.StreamsOutbound, cfg.system.StreamsOutbound) + require.Equal(t, 16, cfg.system.Conns) + require.Equal(t, 8, cfg.system.ConnsInbound) + require.Equal(t, 16, cfg.system.ConnsOutbound) + require.Equal(t, 16, cfg.system.FD) + + require.Equal(t, defaults.transient, cfg.transient) + require.Equal(t, int64(8765), cfg.serviceDefault.Memory) + + require.Contains(t, cfg.service, "A") + require.Equal(t, withMemoryLimit(cfg.serviceDefault, 8192), cfg.service["A"]) + require.Contains(t, cfg.service, "B") + require.Equal(t, cfg.serviceDefault, cfg.service["B"]) + require.Contains(t, cfg.service, "C") + require.Equal(t, defaults.service["C"], cfg.service["C"]) + + require.Equal(t, int64(4096), cfg.peerDefault.Memory) + peerID, err := peer.Decode("12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS") + require.NoError(t, err) + require.Contains(t, cfg.peer, peerID) + require.Equal(t, int64(4097), cfg.peer[peerID].Memory) +} + func TestLimitConfigParser(t *testing.T) { in, err := os.Open("limit_config_test.json") require.NoError(t, err) @@ -28,33 +65,34 @@ func TestLimitConfigParser(t *testing.T) { cfg, err := readLimiterConfigFromJSON(in, defaults) require.NoError(t, err) - require.Equal(t, int64(65536), cfg.System.Memory) - require.Equal(t, defaults.System.Streams, cfg.System.Streams) - require.Equal(t, defaults.System.StreamsInbound, cfg.System.StreamsInbound) - require.Equal(t, defaults.System.StreamsOutbound, cfg.System.StreamsOutbound) - require.Equal(t, 16, cfg.System.Conns) - require.Equal(t, 8, cfg.System.ConnsInbound) - require.Equal(t, 16, cfg.System.ConnsOutbound) - require.Equal(t, 16, cfg.System.FD) + require.Equal(t, int64(65536), cfg.system.Memory) + require.Equal(t, defaults.system.Streams, cfg.system.Streams) + require.Equal(t, defaults.system.StreamsInbound, cfg.system.StreamsInbound) + require.Equal(t, defaults.system.StreamsOutbound, cfg.system.StreamsOutbound) + require.Equal(t, 16, cfg.system.Conns) + require.Equal(t, 8, cfg.system.ConnsInbound) + require.Equal(t, 16, cfg.system.ConnsOutbound) + require.Equal(t, 16, cfg.system.FD) - require.Equal(t, defaults.Transient, cfg.Transient) - require.Equal(t, int64(8765), cfg.ServiceDefault.Memory) + require.Equal(t, defaults.transient, cfg.transient) + require.Equal(t, int64(8765), cfg.serviceDefault.Memory) - require.Contains(t, cfg.Service, "A") - require.Equal(t, withMemoryLimit(cfg.ServiceDefault, 8192), cfg.Service["A"]) - require.Contains(t, cfg.Service, "B") - require.Equal(t, cfg.ServiceDefault, cfg.Service["B"]) - require.Contains(t, cfg.Service, "C") - require.Equal(t, defaults.Service["C"], cfg.Service["C"]) + require.Contains(t, cfg.service, "A") + require.Equal(t, withMemoryLimit(cfg.serviceDefault, 8192), cfg.service["A"]) + require.Contains(t, cfg.service, "B") + require.Equal(t, cfg.serviceDefault, cfg.service["B"]) + require.Contains(t, cfg.service, "C") + require.Equal(t, defaults.service["C"], cfg.service["C"]) - require.Equal(t, int64(4096), cfg.PeerDefault.Memory) + require.Equal(t, int64(4096), cfg.peerDefault.Memory) peerID, err := peer.Decode("12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS") require.NoError(t, err) - require.Contains(t, cfg.Peer, peerID) - require.Equal(t, int64(4097), cfg.Peer[peerID].Memory) + require.Contains(t, cfg.peer, peerID) + require.Equal(t, int64(4097), cfg.peer[peerID].Memory) // Roundtrip - jsonBytes, err := json.Marshal(&cfg) + limitConfig := FromReifiedLimitConfig(cfg, defaults) + jsonBytes, err := json.Marshal(&limitConfig) require.NoError(t, err) cfgAfterRoundTrip, err := readLimiterConfigFromJSON(bytes.NewReader(jsonBytes), defaults) require.NoError(t, err) diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index a9c73a4d9e..dba341927a 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -2,7 +2,9 @@ package rcmgr import ( "encoding/json" + "fmt" "math" + "strconv" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -108,38 +110,299 @@ func (cfg *ScalingLimitConfig) AddProtocolPeerLimit(proto protocol.ID, base Base } } +type LimitVal int + +const ( + // DefaultLimit is the default value for resources. The exact value depends on the context, but will get values from `DefaultLimits`. + DefaultLimit LimitVal = 0 + // Unlimited is the value for unlimited resources. An arbitrarily high number will also work. + Unlimited LimitVal = -1 + // BlockAllLimit is the LimitVal for allowing no amount of resources. + BlockAllLimit LimitVal = -2 +) + +func (l LimitVal) MarshalJSON() ([]byte, error) { + if l == Unlimited { + return json.Marshal("unlimited") + } else if l == DefaultLimit { + return json.Marshal("default") + } else if l == BlockAllLimit { + return json.Marshal("blockAll") + } + return json.Marshal(int(l)) +} + +func (l *LimitVal) UnmarshalJSON(b []byte) error { + if string(b) == `"default"` { + *l = DefaultLimit + return nil + } else if string(b) == `"unlimited"` { + *l = Unlimited + return nil + } else if string(b) == `"blockAll"` { + *l = BlockAllLimit + return nil + } + + var val int + if err := json.Unmarshal(b, &val); err != nil { + return err + } + *l = LimitVal(val) + return nil +} + +func (l LimitVal) Reify(defaultVal int) int { + if l == DefaultLimit { + return defaultVal + } + if l == Unlimited { + return math.MaxInt32 + } + if l == BlockAllLimit { + return 0 + } + return int(l) +} + +type LimitVal64 int64 + +const ( + // Default is the default value for resources. + DefaultLimit64 LimitVal64 = 0 + // Unlimited is the value for unlimited resources. + Unlimited64 LimitVal64 = -1 + // BlockAllLimit64 is the LimitVal for allowing no amount of resources. + BlockAllLimit64 LimitVal64 = -2 +) + +func (l LimitVal64) MarshalJSON() ([]byte, error) { + if l == Unlimited64 { + return json.Marshal("unlimited") + } else if l == DefaultLimit64 { + return json.Marshal("default") + } else if l == BlockAllLimit64 { + return json.Marshal("blockAll") + } + + // Convert this to a string because JSON doesn't support 64-bit integers. + return json.Marshal(strconv.FormatInt(int64(l), 10)) +} + +func (l *LimitVal64) UnmarshalJSON(b []byte) error { + if string(b) == `"default"` { + *l = DefaultLimit64 + return nil + } else if string(b) == `"unlimited"` { + *l = Unlimited64 + return nil + } else if string(b) == `"blockAll"` { + *l = BlockAllLimit64 + return nil + } + + var val string + if err := json.Unmarshal(b, &val); err != nil { + // Is this an integer? Possible because of backwards compatibility. + var val int + if err := json.Unmarshal(b, &val); err != nil { + return fmt.Errorf("failed to unmarshal limit value: %w", err) + } + + *l = LimitVal64(val) + return nil + } + + i, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return err + } + *l = LimitVal64(i) + + return nil +} + +func (l LimitVal64) Reify(defaultVal int64) int64 { + if l == DefaultLimit64 { + return defaultVal + } + if l == Unlimited64 { + return math.MaxInt32 + } + if l == BlockAllLimit64 { + return 0 + } + return int64(l) +} + +// ResourceLimits is the type for basic resource limits. +type ResourceLimits struct { + Streams LimitVal `json:",omitempty"` + StreamsInbound LimitVal `json:",omitempty"` + StreamsOutbound LimitVal `json:",omitempty"` + Conns LimitVal `json:",omitempty"` + ConnsInbound LimitVal `json:",omitempty"` + ConnsOutbound LimitVal `json:",omitempty"` + FD LimitVal `json:",omitempty"` + Memory LimitVal64 `json:",omitempty"` +} + +// Apply overwrites all default limits with the values of l2 +func (l *ResourceLimits) Apply(l2 ResourceLimits) { + if l.Streams == DefaultLimit { + l.Streams = l2.Streams + } + if l.StreamsInbound == DefaultLimit { + l.StreamsInbound = l2.StreamsInbound + } + if l.StreamsOutbound == DefaultLimit { + l.StreamsOutbound = l2.StreamsOutbound + } + if l.Conns == DefaultLimit { + l.Conns = l2.Conns + } + if l.ConnsInbound == DefaultLimit { + l.ConnsInbound = l2.ConnsInbound + } + if l.ConnsOutbound == DefaultLimit { + l.ConnsOutbound = l2.ConnsOutbound + } + if l.FD == DefaultLimit { + l.FD = l2.FD + } + if l.Memory == DefaultLimit64 { + l.Memory = l2.Memory + } +} + +func (l *ResourceLimits) Reify(defaults BaseLimit) BaseLimit { + out := defaults + out.Streams = l.Streams.Reify(defaults.Streams) + out.StreamsInbound = l.StreamsInbound.Reify(defaults.StreamsInbound) + out.StreamsOutbound = l.StreamsOutbound.Reify(defaults.StreamsOutbound) + out.Conns = l.Conns.Reify(defaults.Conns) + out.ConnsInbound = l.ConnsInbound.Reify(defaults.ConnsInbound) + out.ConnsOutbound = l.ConnsOutbound.Reify(defaults.ConnsOutbound) + out.FD = l.FD.Reify(defaults.FD) + out.Memory = l.Memory.Reify(defaults.Memory) + + return out +} + type LimitConfig struct { - System BaseLimit `json:",omitempty"` - Transient BaseLimit `json:",omitempty"` + System ResourceLimits `json:",omitempty"` + Transient ResourceLimits `json:",omitempty"` // Limits that are applied to resources with an allowlisted multiaddr. // These will only be used if the normal System & Transient limits are // reached. - AllowlistedSystem BaseLimit `json:",omitempty"` - AllowlistedTransient BaseLimit `json:",omitempty"` + AllowlistedSystem ResourceLimits `json:",omitempty"` + AllowlistedTransient ResourceLimits `json:",omitempty"` + + ServiceDefault ResourceLimits `json:",omitempty"` + Service map[string]ResourceLimits `json:",omitempty"` + + ServicePeerDefault ResourceLimits `json:",omitempty"` + ServicePeer map[string]ResourceLimits `json:",omitempty"` + + ProtocolDefault ResourceLimits `json:",omitempty"` + Protocol map[protocol.ID]ResourceLimits `json:",omitempty"` + + ProtocolPeerDefault ResourceLimits `json:",omitempty"` + ProtocolPeer map[protocol.ID]ResourceLimits `json:",omitempty"` + + PeerDefault ResourceLimits `json:",omitempty"` + Peer map[peer.ID]ResourceLimits `json:",omitempty"` + + Conn ResourceLimits `json:",omitempty"` + Stream ResourceLimits `json:",omitempty"` +} + +// FromReifiedLimitConfig converts a ReifiedLimitConfig to a LimitConfig. Uses the defaults config to know what was specifically set and what was left as default. +func FromReifiedLimitConfig(cfg ReifiedLimitConfig, defaults ReifiedLimitConfig) LimitConfig { + out := LimitConfig{} + + out.System = resourceLimitsFromBaseLimit(cfg.system, defaults.system) + out.Transient = resourceLimitsFromBaseLimit(cfg.transient, defaults.transient) + + out.AllowlistedSystem = resourceLimitsFromBaseLimit(cfg.allowlistedSystem, defaults.allowlistedSystem) + out.AllowlistedTransient = resourceLimitsFromBaseLimit(cfg.allowlistedTransient, defaults.allowlistedTransient) + + out.ServiceDefault = resourceLimitsFromBaseLimit(cfg.serviceDefault, defaults.serviceDefault) + out.Service = resourceLimitsMapFromBaseLimitMap(cfg.service, defaults.service, defaults.serviceDefault) + + out.ServicePeerDefault = resourceLimitsFromBaseLimit(cfg.servicePeerDefault, defaults.servicePeerDefault) + out.ServicePeer = resourceLimitsMapFromBaseLimitMap(cfg.servicePeer, defaults.servicePeer, defaults.servicePeerDefault) + + out.ProtocolDefault = resourceLimitsFromBaseLimit(cfg.protocolDefault, defaults.protocolDefault) + out.Protocol = resourceLimitsMapFromBaseLimitMap(cfg.protocol, defaults.protocol, defaults.protocolDefault) + + out.ProtocolPeerDefault = resourceLimitsFromBaseLimit(cfg.protocolPeerDefault, defaults.protocolPeerDefault) + out.ProtocolPeer = resourceLimitsMapFromBaseLimitMap(cfg.protocolPeer, defaults.protocolPeer, defaults.protocolPeerDefault) + + out.PeerDefault = resourceLimitsFromBaseLimit(cfg.peerDefault, defaults.peerDefault) + out.Peer = resourceLimitsMapFromBaseLimitMap(cfg.peer, defaults.peer, defaults.peerDefault) + + out.Conn = resourceLimitsFromBaseLimit(cfg.conn, defaults.conn) + out.Stream = resourceLimitsFromBaseLimit(cfg.stream, defaults.stream) - ServiceDefault BaseLimit `json:",omitempty"` - Service map[string]BaseLimit `json:",omitempty"` + return out +} - ServicePeerDefault BaseLimit `json:",omitempty"` - ServicePeer map[string]BaseLimit `json:",omitempty"` +func resourceLimitsMapFromBaseLimitMap[K comparable](m map[K]BaseLimit, defaultLimits map[K]BaseLimit, fallbackDefault BaseLimit) map[K]ResourceLimits { + if len(m) == 0 { + return nil + } - ProtocolDefault BaseLimit `json:",omitempty"` - Protocol map[protocol.ID]BaseLimit `json:",omitempty"` + out := make(map[K]ResourceLimits, len(m)) + for k, v := range m { + if defaultForKey, ok := defaultLimits[k]; ok { + out[k] = resourceLimitsFromBaseLimit(v, defaultForKey) + } else { + out[k] = resourceLimitsFromBaseLimit(v, fallbackDefault) + } + } + return out +} - ProtocolPeerDefault BaseLimit `json:",omitempty"` - ProtocolPeer map[protocol.ID]BaseLimit `json:",omitempty"` +func resourceLimitsFromBaseLimit(l BaseLimit, defaultLimit BaseLimit) ResourceLimits { + return ResourceLimits{ + Streams: limitValFromInt(l.Streams, defaultLimit.Streams), + StreamsInbound: limitValFromInt(l.StreamsInbound, defaultLimit.StreamsInbound), + StreamsOutbound: limitValFromInt(l.StreamsOutbound, defaultLimit.StreamsOutbound), + Conns: limitValFromInt(l.Conns, defaultLimit.Conns), + ConnsInbound: limitValFromInt(l.ConnsInbound, defaultLimit.ConnsInbound), + ConnsOutbound: limitValFromInt(l.ConnsOutbound, defaultLimit.ConnsOutbound), + FD: limitValFromInt(l.FD, defaultLimit.FD), + Memory: limitValFromInt64(l.Memory, defaultLimit.Memory), + } +} - PeerDefault BaseLimit `json:",omitempty"` - Peer map[peer.ID]BaseLimit `json:",omitempty"` +func limitValFromInt(i int, defaultVal int) LimitVal { + if i == defaultVal { + return DefaultLimit + } else if i == math.MaxInt { + return Unlimited + } else if i == 0 { + return BlockAllLimit + } + return LimitVal(i) +} - Conn BaseLimit `json:",omitempty"` - Stream BaseLimit `json:",omitempty"` +func limitValFromInt64(i int64, defaultVal int64) LimitVal64 { + if i == defaultVal { + return DefaultLimit64 + } else if i == math.MaxInt { + return Unlimited64 + } else if i == 0 { + return BlockAllLimit64 + } + return LimitVal64(i) } func (cfg *LimitConfig) MarshalJSON() ([]byte, error) { // we want to marshal the encoded peer id - encodedPeerMap := make(map[string]BaseLimit, len(cfg.Peer)) + encodedPeerMap := make(map[string]ResourceLimits, len(cfg.Peer)) for p, v := range cfg.Peer { encodedPeerMap[p.String()] = v } @@ -147,13 +410,32 @@ func (cfg *LimitConfig) MarshalJSON() ([]byte, error) { type Alias LimitConfig return json.Marshal(&struct { *Alias - Peer map[string]BaseLimit `json:",omitempty"` + Peer map[string]ResourceLimits `json:",omitempty"` }{ Alias: (*Alias)(cfg), Peer: encodedPeerMap, }) } +func applyResourceLimitsMap[K comparable](this *map[K]ResourceLimits, other map[K]ResourceLimits, fallbackDefault ResourceLimits) { + for k, l := range *this { + r := fallbackDefault + if l2, ok := other[k]; ok { + r = l2 + } + l.Apply(r) + (*this)[k] = l + } + if *this == nil && other != nil { + *this = make(map[K]ResourceLimits) + } + for k, l := range other { + if _, ok := (*this)[k]; !ok { + (*this)[k] = l + } + } +} + func (cfg *LimitConfig) Apply(c LimitConfig) { cfg.System.Apply(c.System) cfg.Transient.Apply(c.Transient) @@ -167,145 +449,142 @@ func (cfg *LimitConfig) Apply(c LimitConfig) { cfg.Conn.Apply(c.Conn) cfg.Stream.Apply(c.Stream) - // TODO: the following could be solved a lot nicer, if only we could use generics - for s, l := range cfg.Service { - r := cfg.ServiceDefault - if l2, ok := c.Service[s]; ok { - r = l2 - } - l.Apply(r) - cfg.Service[s] = l - } - if c.Service != nil && cfg.Service == nil { - cfg.Service = make(map[string]BaseLimit) - } - for s, l := range c.Service { - if _, ok := cfg.Service[s]; !ok { - cfg.Service[s] = l - } - } + applyResourceLimitsMap(&cfg.Service, c.Service, cfg.ServiceDefault) + applyResourceLimitsMap(&cfg.ServicePeer, c.ServicePeer, cfg.ServicePeerDefault) + applyResourceLimitsMap(&cfg.Protocol, c.Protocol, cfg.ProtocolDefault) + applyResourceLimitsMap(&cfg.ProtocolPeer, c.ProtocolPeer, cfg.ProtocolPeerDefault) + applyResourceLimitsMap(&cfg.Peer, c.Peer, cfg.PeerDefault) +} - for s, l := range cfg.ServicePeer { - r := cfg.ServicePeerDefault - if l2, ok := c.ServicePeer[s]; ok { - r = l2 - } - l.Apply(r) - cfg.ServicePeer[s] = l - } - if c.ServicePeer != nil && cfg.ServicePeer == nil { - cfg.ServicePeer = make(map[string]BaseLimit) - } - for s, l := range c.ServicePeer { - if _, ok := cfg.ServicePeer[s]; !ok { - cfg.ServicePeer[s] = l - } - } +func (cfg LimitConfig) Reify(defaults ReifiedLimitConfig) ReifiedLimitConfig { + out := defaults + + out.system = cfg.System.Reify(defaults.system) + out.transient = cfg.Transient.Reify(defaults.transient) + out.allowlistedSystem = cfg.AllowlistedSystem.Reify(defaults.allowlistedSystem) + out.allowlistedTransient = cfg.AllowlistedTransient.Reify(defaults.allowlistedTransient) + out.serviceDefault = cfg.ServiceDefault.Reify(defaults.serviceDefault) + out.servicePeerDefault = cfg.ServicePeerDefault.Reify(defaults.servicePeerDefault) + out.protocolDefault = cfg.ProtocolDefault.Reify(defaults.protocolDefault) + out.protocolPeerDefault = cfg.ProtocolPeerDefault.Reify(defaults.protocolPeerDefault) + out.peerDefault = cfg.PeerDefault.Reify(defaults.peerDefault) + out.conn = cfg.Conn.Reify(defaults.conn) + out.stream = cfg.Stream.Reify(defaults.stream) + + out.service = reifyMapWithDefault(cfg.Service, defaults.service, out.serviceDefault) + out.servicePeer = reifyMapWithDefault(cfg.ServicePeer, defaults.servicePeer, out.servicePeerDefault) + out.protocol = reifyMapWithDefault(cfg.Protocol, defaults.protocol, out.protocolDefault) + out.protocolPeer = reifyMapWithDefault(cfg.ProtocolPeer, defaults.protocolPeer, out.protocolPeerDefault) + out.peer = reifyMapWithDefault(cfg.Peer, defaults.peer, out.peerDefault) + + return out +} - for s, l := range cfg.Protocol { - r := cfg.ProtocolDefault - if l2, ok := c.Protocol[s]; ok { - r = l2 - } - l.Apply(r) - cfg.Protocol[s] = l - } - if c.Protocol != nil && cfg.Protocol == nil { - cfg.Protocol = make(map[protocol.ID]BaseLimit) - } - for s, l := range c.Protocol { - if _, ok := cfg.Protocol[s]; !ok { - cfg.Protocol[s] = l - } +func reifyMapWithDefault[K comparable](definedLimits map[K]ResourceLimits, defaults map[K]BaseLimit, fallbackDefault BaseLimit) map[K]BaseLimit { + if definedLimits == nil && defaults == nil { + return nil } - for s, l := range cfg.ProtocolPeer { - r := cfg.ProtocolPeerDefault - if l2, ok := c.ProtocolPeer[s]; ok { - r = l2 - } - l.Apply(r) - cfg.ProtocolPeer[s] = l - } - if c.ProtocolPeer != nil && cfg.ProtocolPeer == nil { - cfg.ProtocolPeer = make(map[protocol.ID]BaseLimit) - } - for s, l := range c.ProtocolPeer { - if _, ok := cfg.ProtocolPeer[s]; !ok { - cfg.ProtocolPeer[s] = l - } + out := make(map[K]BaseLimit) + for k, l := range defaults { + out[k] = l } - for s, l := range cfg.Peer { - r := cfg.PeerDefault - if l2, ok := c.Peer[s]; ok { - r = l2 - } - l.Apply(r) - cfg.Peer[s] = l - } - if c.Peer != nil && cfg.Peer == nil { - cfg.Peer = make(map[peer.ID]BaseLimit) - } - for s, l := range c.Peer { - if _, ok := cfg.Peer[s]; !ok { - cfg.Peer[s] = l + for k, l := range definedLimits { + if defaultForKey, ok := out[k]; ok { + out[k] = l.Reify(defaultForKey) + } else { + out[k] = l.Reify(fallbackDefault) } } + + return out +} + +// ReifiedLimitConfig is similar to LimitConfig, but all values are defined. +// There is no unset "default" value. Commonly constructed by calling +// LimitConfig.Reify(DefaultReifiedLimits) +type ReifiedLimitConfig struct { + system BaseLimit + transient BaseLimit + + // Limits that are applied to resources with an allowlisted multiaddr. + // These will only be used if the normal System & Transient limits are + // reached. + allowlistedSystem BaseLimit + allowlistedTransient BaseLimit + + serviceDefault BaseLimit + service map[string]BaseLimit + + servicePeerDefault BaseLimit + servicePeer map[string]BaseLimit + + protocolDefault BaseLimit + protocol map[protocol.ID]BaseLimit + + protocolPeerDefault BaseLimit + protocolPeer map[protocol.ID]BaseLimit + + peerDefault BaseLimit + peer map[peer.ID]BaseLimit + + conn BaseLimit + stream BaseLimit } // Scale scales up a limit configuration. // memory is the amount of memory that the stack is allowed to consume, // for a dedicated node it's recommended to use 1/8 of the installed system memory. // If memory is smaller than 128 MB, the base configuration will be used. -func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) LimitConfig { - lc := LimitConfig{ - System: scale(cfg.SystemBaseLimit, cfg.SystemLimitIncrease, memory, numFD), - Transient: scale(cfg.TransientBaseLimit, cfg.TransientLimitIncrease, memory, numFD), - AllowlistedSystem: scale(cfg.AllowlistedSystemBaseLimit, cfg.AllowlistedSystemLimitIncrease, memory, numFD), - AllowlistedTransient: scale(cfg.AllowlistedTransientBaseLimit, cfg.AllowlistedTransientLimitIncrease, memory, numFD), - ServiceDefault: scale(cfg.ServiceBaseLimit, cfg.ServiceLimitIncrease, memory, numFD), - ServicePeerDefault: scale(cfg.ServicePeerBaseLimit, cfg.ServicePeerLimitIncrease, memory, numFD), - ProtocolDefault: scale(cfg.ProtocolBaseLimit, cfg.ProtocolLimitIncrease, memory, numFD), - ProtocolPeerDefault: scale(cfg.ProtocolPeerBaseLimit, cfg.ProtocolPeerLimitIncrease, memory, numFD), - PeerDefault: scale(cfg.PeerBaseLimit, cfg.PeerLimitIncrease, memory, numFD), - Conn: scale(cfg.ConnBaseLimit, cfg.ConnLimitIncrease, memory, numFD), - Stream: scale(cfg.StreamBaseLimit, cfg.ConnLimitIncrease, memory, numFD), +func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) ReifiedLimitConfig { + lc := ReifiedLimitConfig{ + system: scale(cfg.SystemBaseLimit, cfg.SystemLimitIncrease, memory, numFD), + transient: scale(cfg.TransientBaseLimit, cfg.TransientLimitIncrease, memory, numFD), + allowlistedSystem: scale(cfg.AllowlistedSystemBaseLimit, cfg.AllowlistedSystemLimitIncrease, memory, numFD), + allowlistedTransient: scale(cfg.AllowlistedTransientBaseLimit, cfg.AllowlistedTransientLimitIncrease, memory, numFD), + serviceDefault: scale(cfg.ServiceBaseLimit, cfg.ServiceLimitIncrease, memory, numFD), + servicePeerDefault: scale(cfg.ServicePeerBaseLimit, cfg.ServicePeerLimitIncrease, memory, numFD), + protocolDefault: scale(cfg.ProtocolBaseLimit, cfg.ProtocolLimitIncrease, memory, numFD), + protocolPeerDefault: scale(cfg.ProtocolPeerBaseLimit, cfg.ProtocolPeerLimitIncrease, memory, numFD), + peerDefault: scale(cfg.PeerBaseLimit, cfg.PeerLimitIncrease, memory, numFD), + conn: scale(cfg.ConnBaseLimit, cfg.ConnLimitIncrease, memory, numFD), + stream: scale(cfg.StreamBaseLimit, cfg.ConnLimitIncrease, memory, numFD), } if cfg.ServiceLimits != nil { - lc.Service = make(map[string]BaseLimit) + lc.service = make(map[string]BaseLimit) for svc, l := range cfg.ServiceLimits { - lc.Service[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.service[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } if cfg.ProtocolLimits != nil { - lc.Protocol = make(map[protocol.ID]BaseLimit) + lc.protocol = make(map[protocol.ID]BaseLimit) for proto, l := range cfg.ProtocolLimits { - lc.Protocol[proto] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.protocol[proto] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } if cfg.PeerLimits != nil { - lc.Peer = make(map[peer.ID]BaseLimit) + lc.peer = make(map[peer.ID]BaseLimit) for p, l := range cfg.PeerLimits { - lc.Peer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.peer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } if cfg.ServicePeerLimits != nil { - lc.ServicePeer = make(map[string]BaseLimit) + lc.servicePeer = make(map[string]BaseLimit) for svc, l := range cfg.ServicePeerLimits { - lc.ServicePeer[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.servicePeer[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } if cfg.ProtocolPeerLimits != nil { - lc.ProtocolPeer = make(map[protocol.ID]BaseLimit) + lc.protocolPeer = make(map[protocol.ID]BaseLimit) for p, l := range cfg.ProtocolPeerLimits { - lc.ProtocolPeer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.protocolPeer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } return lc } -func (cfg *ScalingLimitConfig) AutoScale() LimitConfig { +func (cfg *ScalingLimitConfig) AutoScale() ReifiedLimitConfig { return cfg.Scale( int64(memory.TotalMemory())/8, getNumFDs()/2, @@ -340,6 +619,8 @@ func scale(base BaseLimit, inc BaseLimitIncrease, memory int64, numFD int) BaseL return l } +var DefaultReifiedLimits = DefaultLimits.AutoScale() + // DefaultLimits are the limits used by the default limiter constructors. var DefaultLimits = ScalingLimitConfig{ SystemBaseLimit: BaseLimit{ @@ -540,18 +821,18 @@ var infiniteBaseLimit = BaseLimit{ Memory: math.MaxInt64, } -// InfiniteLimits are a limiter configuration that uses infinite limits, thus effectively not limiting anything. +// InfiniteLimits are a limiter configuration that uses unlimited limits, thus effectively not limiting anything. // Keep in mind that the operating system limits the number of file descriptors that an application can use. -var InfiniteLimits = LimitConfig{ - System: infiniteBaseLimit, - Transient: infiniteBaseLimit, - AllowlistedSystem: infiniteBaseLimit, - AllowlistedTransient: infiniteBaseLimit, - ServiceDefault: infiniteBaseLimit, - ServicePeerDefault: infiniteBaseLimit, - ProtocolDefault: infiniteBaseLimit, - ProtocolPeerDefault: infiniteBaseLimit, - PeerDefault: infiniteBaseLimit, - Conn: infiniteBaseLimit, - Stream: infiniteBaseLimit, +var InfiniteLimits = ReifiedLimitConfig{ + system: infiniteBaseLimit, + transient: infiniteBaseLimit, + allowlistedSystem: infiniteBaseLimit, + allowlistedTransient: infiniteBaseLimit, + serviceDefault: infiniteBaseLimit, + servicePeerDefault: infiniteBaseLimit, + protocolDefault: infiniteBaseLimit, + protocolPeerDefault: infiniteBaseLimit, + peerDefault: infiniteBaseLimit, + conn: infiniteBaseLimit, + stream: infiniteBaseLimit, } diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index 9070045bc7..342b1447b4 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -2,6 +2,7 @@ package rcmgr import ( "encoding/json" + "math" "runtime" "testing" @@ -32,7 +33,7 @@ func TestScaling(t *testing.T) { t.Run("no scaling if no increase is defined", func(t *testing.T) { cfg := ScalingLimitConfig{ServiceBaseLimit: base} scaled := cfg.Scale(8<<30, 100) - require.Equal(t, base, scaled.ServiceDefault) + require.Equal(t, base, scaled.serviceDefault) }) t.Run("scaling", func(t *testing.T) { @@ -50,14 +51,14 @@ func TestScaling(t *testing.T) { }, } scaled := cfg.Scale(128<<20+4<<30, 1000) - require.Equal(t, 500, scaled.Transient.FD) - require.Equal(t, base.Streams+4, scaled.Transient.Streams) - require.Equal(t, base.StreamsInbound+4*2, scaled.Transient.StreamsInbound) - require.Equal(t, base.StreamsOutbound+4*3, scaled.Transient.StreamsOutbound) - require.Equal(t, base.Conns+4*4, scaled.Transient.Conns) - require.Equal(t, base.ConnsInbound+4*5, scaled.Transient.ConnsInbound) - require.Equal(t, base.ConnsOutbound+4*6, scaled.Transient.ConnsOutbound) - require.Equal(t, base.Memory+4*7, scaled.Transient.Memory) + require.Equal(t, 500, scaled.transient.FD) + require.Equal(t, base.Streams+4, scaled.transient.Streams) + require.Equal(t, base.StreamsInbound+4*2, scaled.transient.StreamsInbound) + require.Equal(t, base.StreamsOutbound+4*3, scaled.transient.StreamsOutbound) + require.Equal(t, base.Conns+4*4, scaled.transient.Conns) + require.Equal(t, base.ConnsInbound+4*5, scaled.transient.ConnsInbound) + require.Equal(t, base.ConnsOutbound+4*6, scaled.transient.ConnsOutbound) + require.Equal(t, base.Memory+4*7, scaled.transient.Memory) }) t.Run("scaling and using the base amounts", func(t *testing.T) { @@ -75,14 +76,14 @@ func TestScaling(t *testing.T) { }, } scaled := cfg.Scale(1, 10) - require.Equal(t, 1, scaled.Transient.FD) - require.Equal(t, base.Streams, scaled.Transient.Streams) - require.Equal(t, base.StreamsInbound, scaled.Transient.StreamsInbound) - require.Equal(t, base.StreamsOutbound, scaled.Transient.StreamsOutbound) - require.Equal(t, base.Conns, scaled.Transient.Conns) - require.Equal(t, base.ConnsInbound, scaled.Transient.ConnsInbound) - require.Equal(t, base.ConnsOutbound, scaled.Transient.ConnsOutbound) - require.Equal(t, base.Memory, scaled.Transient.Memory) + require.Equal(t, 1, scaled.transient.FD) + require.Equal(t, base.Streams, scaled.transient.Streams) + require.Equal(t, base.StreamsInbound, scaled.transient.StreamsInbound) + require.Equal(t, base.StreamsOutbound, scaled.transient.StreamsOutbound) + require.Equal(t, base.Conns, scaled.transient.Conns) + require.Equal(t, base.ConnsInbound, scaled.transient.ConnsInbound) + require.Equal(t, base.ConnsOutbound, scaled.transient.ConnsOutbound) + require.Equal(t, base.Memory, scaled.transient.Memory) }) t.Run("scaling limits in maps", func(t *testing.T) { @@ -99,16 +100,16 @@ func TestScaling(t *testing.T) { } scaled := cfg.Scale(128<<20+4<<30, 1000) - require.Len(t, scaled.Service, 2) - require.Contains(t, scaled.Service, "A") - require.Equal(t, 10, scaled.Service["A"].Streams) - require.Equal(t, int64(100), scaled.Service["A"].Memory) - require.Equal(t, 9, scaled.Service["A"].FD) + require.Len(t, scaled.service, 2) + require.Contains(t, scaled.service, "A") + require.Equal(t, 10, scaled.service["A"].Streams) + require.Equal(t, int64(100), scaled.service["A"].Memory) + require.Equal(t, 9, scaled.service["A"].FD) - require.Contains(t, scaled.Service, "B") - require.Equal(t, 20+4*2, scaled.Service["B"].Streams) - require.Equal(t, int64(200+4*3), scaled.Service["B"].Memory) - require.Equal(t, 400, scaled.Service["B"].FD) + require.Contains(t, scaled.service, "B") + require.Equal(t, 20+4*2, scaled.service["B"].Streams) + require.Equal(t, int64(200+4*3), scaled.service["B"].Memory) + require.Equal(t, 400, scaled.service["B"].FD) }) } @@ -139,8 +140,75 @@ func TestReadmeExample(t *testing.T) { limitConf := scalingLimits.Scale(4<<30, 1000) - require.Equal(t, 384, limitConf.System.Conns) - require.Equal(t, 1000, limitConf.System.FD) + require.Equal(t, 384, limitConf.system.Conns) + require.Equal(t, 1000, limitConf.system.FD) +} + +func TestJSONMarshalling(t *testing.T) { + bl := ResourceLimits{ + Streams: DefaultLimit, + StreamsInbound: 10, + StreamsOutbound: 10, + Conns: 10, + // ConnsInbound: DefaultLimit, + ConnsOutbound: Unlimited, + Memory: Unlimited64, + } + + jsonEncoded, err := json.Marshal(bl) + require.NoError(t, err) + + require.Equal(t, string(jsonEncoded), `{"StreamsInbound":10,"StreamsOutbound":10,"Conns":10,"ConnsOutbound":"unlimited","Memory":"unlimited"}`) + + // Roundtrip + var blDecoded ResourceLimits + err = json.Unmarshal(jsonEncoded, &blDecoded) + require.NoError(t, err) + + require.Equal(t, bl, blDecoded) +} + +func TestJSONRoundTripInt64(t *testing.T) { + bl := ResourceLimits{ + Memory: math.MaxInt64, + } + + jsonEncoded, err := json.Marshal(bl) + require.NoError(t, err) + + require.Equal(t, string(jsonEncoded), `{"Memory":"9223372036854775807"}`) + + // Roundtrip + var blDecoded ResourceLimits + err = json.Unmarshal(jsonEncoded, &blDecoded) + require.NoError(t, err) + + require.Equal(t, bl, blDecoded) +} + +func TestRoundTripFromReifyAndBack(t *testing.T) { + l := LimitConfig{ + System: ResourceLimits{ + Conns: 1234, + Memory: 54321, + }, + + ServiceDefault: ResourceLimits{ + Conns: 2, + }, + + Service: map[string]ResourceLimits{ + "foo": { + Conns: 3, + }, + }, + } + + reified := l.Reify(InfiniteLimits) + + // Roundtrip + fromReified := FromReifiedLimitConfig(reified, InfiniteLimits) + require.Equal(t, l, fromReified) } func TestSerializeJSON(t *testing.T) { diff --git a/p2p/host/resource-manager/rcmgr_test.go b/p2p/host/resource-manager/rcmgr_test.go index 12420ac712..7e8e6ac8dc 100644 --- a/p2p/host/resource-manager/rcmgr_test.go +++ b/p2p/host/resource-manager/rcmgr_test.go @@ -21,8 +21,8 @@ func TestResourceManager(t *testing.T) { svcA := "A.svc" svcB := "B.svc" nmgr, err := NewResourceManager( - NewFixedLimiter(LimitConfig{ - System: BaseLimit{ + NewFixedLimiter(ReifiedLimitConfig{ + system: BaseLimit{ Memory: 16384, StreamsInbound: 3, StreamsOutbound: 3, @@ -32,7 +32,7 @@ func TestResourceManager(t *testing.T) { Conns: 6, FD: 2, }, - Transient: BaseLimit{ + transient: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, @@ -42,7 +42,7 @@ func TestResourceManager(t *testing.T) { Conns: 2, FD: 1, }, - ServiceDefault: BaseLimit{ + serviceDefault: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, @@ -52,13 +52,13 @@ func TestResourceManager(t *testing.T) { Conns: 2, FD: 1, }, - ServicePeerDefault: BaseLimit{ + servicePeerDefault: BaseLimit{ Memory: 4096, StreamsInbound: 5, StreamsOutbound: 5, Streams: 10, }, - Service: map[string]BaseLimit{ + service: map[string]BaseLimit{ svcA: { Memory: 8192, StreamsInbound: 2, @@ -80,7 +80,7 @@ func TestResourceManager(t *testing.T) { FD: 1, }, }, - ServicePeer: map[string]BaseLimit{ + servicePeer: map[string]BaseLimit{ svcB: { Memory: 8192, StreamsInbound: 1, @@ -88,13 +88,13 @@ func TestResourceManager(t *testing.T) { Streams: 2, }, }, - ProtocolDefault: BaseLimit{ + protocolDefault: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, Streams: 2, }, - Protocol: map[protocol.ID]BaseLimit{ + protocol: map[protocol.ID]BaseLimit{ protoA: { Memory: 8192, StreamsInbound: 2, @@ -102,7 +102,7 @@ func TestResourceManager(t *testing.T) { Streams: 2, }, }, - ProtocolPeer: map[protocol.ID]BaseLimit{ + protocolPeer: map[protocol.ID]BaseLimit{ protoB: { Memory: 8192, StreamsInbound: 1, @@ -110,7 +110,7 @@ func TestResourceManager(t *testing.T) { Streams: 2, }, }, - PeerDefault: BaseLimit{ + peerDefault: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, @@ -120,13 +120,13 @@ func TestResourceManager(t *testing.T) { Conns: 2, FD: 1, }, - ProtocolPeerDefault: BaseLimit{ + protocolPeerDefault: BaseLimit{ Memory: 4096, StreamsInbound: 5, StreamsOutbound: 5, Streams: 10, }, - Peer: map[peer.ID]BaseLimit{ + peer: map[peer.ID]BaseLimit{ peerA: { Memory: 8192, StreamsInbound: 2, @@ -138,14 +138,14 @@ func TestResourceManager(t *testing.T) { FD: 1, }, }, - Conn: BaseLimit{ + conn: BaseLimit{ Memory: 4096, ConnsInbound: 1, ConnsOutbound: 1, Conns: 1, FD: 1, }, - Stream: BaseLimit{ + stream: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, @@ -979,24 +979,24 @@ func TestResourceManagerWithAllowlist(t *testing.T) { peerA := test.RandPeerIDFatal(t) limits := DefaultLimits.AutoScale() - limits.System.Conns = 0 - limits.Transient.Conns = 0 + limits.system.Conns = 0 + limits.transient.Conns = 0 baseLimit := BaseLimit{ Conns: 2, ConnsInbound: 2, ConnsOutbound: 1, } - baseLimit.Apply(limits.AllowlistedSystem) - limits.AllowlistedSystem = baseLimit + baseLimit.Apply(limits.allowlistedSystem) + limits.allowlistedSystem = baseLimit baseLimit = BaseLimit{ Conns: 1, ConnsInbound: 1, ConnsOutbound: 1, } - baseLimit.Apply(limits.AllowlistedTransient) - limits.AllowlistedTransient = baseLimit + baseLimit.Apply(limits.allowlistedTransient) + limits.allowlistedTransient = baseLimit rcmgr, err := NewResourceManager(NewFixedLimiter(limits), WithAllowlistedMultiaddrs([]multiaddr.Multiaddr{ multiaddr.StringCast("/ip4/1.2.3.4"), diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index ae546c5af9..4c82296681 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -17,7 +17,7 @@ import ( "github.com/stretchr/testify/require" ) -func makeRcmgrOption(t *testing.T, cfg rcmgr.LimitConfig) func(int) libp2p.Option { +func makeRcmgrOption(t *testing.T, cfg rcmgr.ReifiedLimitConfig) func(int) libp2p.Option { return func(i int) libp2p.Option { var opts []rcmgr.Option if os.Getenv("LIBP2P_TEST_RCMGR_TRACE") == "1" { @@ -46,13 +46,19 @@ func waitForConnection(t *testing.T, src, dest *Echo) { func TestResourceManagerConnInbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns - cfg := rcmgr.DefaultLimits.AutoScale() - cfg.System.ConnsInbound = 3 - cfg.System.ConnsOutbound = 1024 - cfg.System.Conns = 1024 - cfg.PeerDefault.Conns = 1 - cfg.PeerDefault.ConnsInbound = 1 - cfg.PeerDefault.ConnsOutbound = 1 + cfg := rcmgr.LimitConfig{ + + System: rcmgr.ResourceLimits{ + ConnsInbound: 3, + ConnsOutbound: 1024, + Conns: 1024, + }, + PeerDefault: rcmgr.ResourceLimits{ + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + }, + }.Reify(rcmgr.DefaultReifiedLimits) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) @@ -82,13 +88,18 @@ func TestResourceManagerConnInbound(t *testing.T) { func TestResourceManagerConnOutbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns - cfg := rcmgr.DefaultLimits.AutoScale() - cfg.System.ConnsInbound = 1024 - cfg.System.ConnsOutbound = 3 - cfg.System.Conns = 1024 - cfg.PeerDefault.Conns = 1 - cfg.PeerDefault.ConnsInbound = 1 - cfg.PeerDefault.ConnsOutbound = 1 + cfg := rcmgr.LimitConfig{ + System: rcmgr.ResourceLimits{ + ConnsInbound: 1024, + ConnsOutbound: 3, + Conns: 1024, + }, + PeerDefault: rcmgr.ResourceLimits{ + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + }, + }.Reify(rcmgr.DefaultReifiedLimits) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) defer closeRcmgrs(echos) @@ -117,10 +128,13 @@ func TestResourceManagerConnOutbound(t *testing.T) { func TestResourceManagerServiceInbound(t *testing.T) { // this test checks that we can not exceed the inbound stream limit at service level // we specify: 3 streams for the service, and we try to create 4 streams - cfg := rcmgr.DefaultLimits.AutoScale() - cfg.ServiceDefault.StreamsInbound = 3 - cfg.ServiceDefault.StreamsOutbound = 1024 - cfg.ServiceDefault.Streams = 1024 + cfg := rcmgr.LimitConfig{ + ServiceDefault: rcmgr.ResourceLimits{ + StreamsInbound: 3, + StreamsOutbound: 1024, + Streams: 1024, + }, + }.Reify(rcmgr.DefaultReifiedLimits) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) defer closeRcmgrs(echos) From 2259dd947a428e8ebbf45e5118107f08eb73e498 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 18 Jan 2023 22:05:26 -0800 Subject: [PATCH 02/18] Nits --- p2p/host/resource-manager/limit_test.go | 5 ++--- p2p/test/resource-manager/rcmgr_test.go | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index 342b1447b4..3428309a0b 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -148,7 +148,7 @@ func TestJSONMarshalling(t *testing.T) { bl := ResourceLimits{ Streams: DefaultLimit, StreamsInbound: 10, - StreamsOutbound: 10, + StreamsOutbound: BlockAllLimit, Conns: 10, // ConnsInbound: DefaultLimit, ConnsOutbound: Unlimited, @@ -157,8 +157,7 @@ func TestJSONMarshalling(t *testing.T) { jsonEncoded, err := json.Marshal(bl) require.NoError(t, err) - - require.Equal(t, string(jsonEncoded), `{"StreamsInbound":10,"StreamsOutbound":10,"Conns":10,"ConnsOutbound":"unlimited","Memory":"unlimited"}`) + require.Equal(t, string(jsonEncoded), `{"StreamsInbound":10,"StreamsOutbound":"blockAll","Conns":10,"ConnsOutbound":"unlimited","Memory":"unlimited"}`) // Roundtrip var blDecoded ResourceLimits diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index 4c82296681..f5288b0651 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -47,7 +47,6 @@ func TestResourceManagerConnInbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns cfg := rcmgr.LimitConfig{ - System: rcmgr.ResourceLimits{ ConnsInbound: 3, ConnsOutbound: 1024, From ecf109499082aa910bc847690baa2c9e2265f656 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 19 Jan 2023 08:17:43 -0800 Subject: [PATCH 03/18] Remove DefaultReifiedLimits --- p2p/host/resource-manager/limit.go | 2 +- p2p/host/resource-manager/limit_defaults.go | 4 +--- p2p/test/resource-manager/rcmgr_test.go | 6 +++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index 22765fdeb1..e60d41e60b 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -52,7 +52,7 @@ type Limiter interface { // NewDefaultLimiterFromJSON creates a new limiter by parsing a json configuration, // using the default limits for fallback. func NewDefaultLimiterFromJSON(in io.Reader) (Limiter, error) { - return NewLimiterFromJSON(in, DefaultReifiedLimits) + return NewLimiterFromJSON(in, DefaultLimits.AutoScale()) } // NewLimiterFromJSON creates a new limiter by parsing a json configuration. diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index dba341927a..5d3935ff09 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -503,7 +503,7 @@ func reifyMapWithDefault[K comparable](definedLimits map[K]ResourceLimits, defau // ReifiedLimitConfig is similar to LimitConfig, but all values are defined. // There is no unset "default" value. Commonly constructed by calling -// LimitConfig.Reify(DefaultReifiedLimits) +// LimitConfig.Reify(rcmgr.DefaultLimits.AutoScale()) type ReifiedLimitConfig struct { system BaseLimit transient BaseLimit @@ -619,8 +619,6 @@ func scale(base BaseLimit, inc BaseLimitIncrease, memory int64, numFD int) BaseL return l } -var DefaultReifiedLimits = DefaultLimits.AutoScale() - // DefaultLimits are the limits used by the default limiter constructors. var DefaultLimits = ScalingLimitConfig{ SystemBaseLimit: BaseLimit{ diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index f5288b0651..60f223f2ec 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -57,7 +57,7 @@ func TestResourceManagerConnInbound(t *testing.T) { ConnsOutbound: 1, Conns: 1, }, - }.Reify(rcmgr.DefaultReifiedLimits) + }.Reify(rcmgr.DefaultLimits.AutoScale()) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) @@ -98,7 +98,7 @@ func TestResourceManagerConnOutbound(t *testing.T) { ConnsOutbound: 1, Conns: 1, }, - }.Reify(rcmgr.DefaultReifiedLimits) + }.Reify(rcmgr.DefaultLimits.AutoScale()) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) defer closeRcmgrs(echos) @@ -133,7 +133,7 @@ func TestResourceManagerServiceInbound(t *testing.T) { StreamsOutbound: 1024, Streams: 1024, }, - }.Reify(rcmgr.DefaultReifiedLimits) + }.Reify(rcmgr.DefaultLimits.AutoScale()) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) defer closeRcmgrs(echos) From 4e0689dc908bfb64c5029e66ceabecf455432577 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 19 Jan 2023 09:23:50 -0800 Subject: [PATCH 04/18] Use pointers for optional ResourceLimits --- p2p/host/resource-manager/README.md | 64 +++++++- p2p/host/resource-manager/limit.go | 4 +- .../resource-manager/limit_config_test.go | 66 +++++++- p2p/host/resource-manager/limit_defaults.go | 142 ++++++++++++------ p2p/host/resource-manager/limit_test.go | 6 +- p2p/test/resource-manager/rcmgr_test.go | 59 +++++++- 6 files changed, 268 insertions(+), 73 deletions(-) diff --git a/p2p/host/resource-manager/README.md b/p2p/host/resource-manager/README.md index 96edd1d0cf..cb798252e7 100644 --- a/p2p/host/resource-manager/README.md +++ b/p2p/host/resource-manager/README.md @@ -28,9 +28,21 @@ scalingLimits := rcmgr.DefaultLimits // Add limits around included libp2p protocols libp2p.SetDefaultServiceLimits(&scalingLimits) -// Turn the scaling limits into a static set of limits using `.AutoScale`. This +// Turn the scaling limits into a reified set of limits using `.AutoScale`. This // scales the limits proportional to your system memory. -limits := scalingLimits.AutoScale() +scaledDefaultLimits := scalingLimits.AutoScale() + +// Tweak certain settings +cfg := rcmgr.LimitConfig{ + System: &rcmgr.ResourceLimits{ + // Allow unlimited outbound streams + StreamsOutbound: rcmgr.Unlimited, + }, + // Everything else is default. The exact values will come from `scaledDefaultLimits` above. +} + +// Create our limits by using our cfg and replacing the default values with values from `scaledDefaultLimits` +limits := cfg.Reify(scaledDefaultLimits) // The resource manager expects a limiter, se we create one from our limits. limiter := rcmgr.NewFixedLimiter(limits) @@ -51,6 +63,54 @@ if err != nil { host, err := libp2p.New(libp2p.ResourceManager(rm)) ``` +### Saving the limits config +The easiest way to save the defined limits is to serialize the `LimitConfig` +type as JSON. + +```go +noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf") +cfg := rcmgr.LimitConfig{ + System: &rcmgr.ResourceLimits{ + // Allow unlimited outbound streams + StreamsOutbound: rcmgr.Unlimited, + }, + Peer: map[peer.ID]rcmgr.ResourceLimits{ + noisyNeighbor: { + // No inbound connections from this peer + ConnsInbound: rcmgr.BlockAllLimit, + // But let me open connections to them + Conns: rcmgr.DefaultLimit, + ConnsOutbound: rcmgr.DefaultLimit, + // No inbound streams from this peer + StreamsInbound: rcmgr.BlockAllLimit, + // And let me open unlimited (by me) outbound streams (the peer may have their own limits on me) + StreamsOutbound: rcmgr.Unlimited, + }, + }, +} +jsonBytes, _ := json.Marshal(&cfg) + +// string(jsonBytes) +// { +// "System": { +// "StreamsOutbound": "unlimited" +// }, +// "Peer": { +// "QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf": { +// "StreamsInbound": "blockAll", +// "StreamsOutbound": "unlimited", +// "ConnsInbound": "blockAll" +// } +// } +// } +``` + +This will omit defaults from the JSON output. It will also serialize the +blockAll, and unlimited values explicitly. + +The `Memory` field is serialized as a string to workaround the JSON limitation +of 32 bit integers (`Memory` is an int64). + ## Basic Resources ### Memory diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index e60d41e60b..3a2178bc9d 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -110,8 +110,8 @@ func valueOrBlockAll64(n int64) LimitVal64 { } // ToResourceLimits converts the BaseLimit to a ResourceLimits -func (l BaseLimit) ToResourceLimits() ResourceLimits { - return ResourceLimits{ +func (l BaseLimit) ToResourceLimits() *ResourceLimits { + return &ResourceLimits{ Streams: valueOrBlockAll(l.Streams), StreamsInbound: valueOrBlockAll(l.StreamsInbound), StreamsOutbound: valueOrBlockAll(l.StreamsOutbound), diff --git a/p2p/host/resource-manager/limit_config_test.go b/p2p/host/resource-manager/limit_config_test.go index 8a23486d35..43f77bcf6f 100644 --- a/p2p/host/resource-manager/limit_config_test.go +++ b/p2p/host/resource-manager/limit_config_test.go @@ -18,13 +18,15 @@ func withMemoryLimit(l BaseLimit, m int64) BaseLimit { } func TestLimitConfigParserBackwardsCompat(t *testing.T) { - in, err := os.Open("limit_config_test.json") + // Tests that we can parse the old limit config format. + in, err := os.Open("limit_config_test.backwards-compat.json") require.NoError(t, err) defer in.Close() - DefaultLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) - DefaultLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) - defaults := DefaultLimits.AutoScale() + defaultScaledLimits := DefaultLimits + defaultScaledLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaultScaledLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaults := defaultScaledLimits.AutoScale() cfg, err := readLimiterConfigFromJSON(in, defaults) require.NoError(t, err) @@ -59,9 +61,10 @@ func TestLimitConfigParser(t *testing.T) { require.NoError(t, err) defer in.Close() - DefaultLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) - DefaultLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) - defaults := DefaultLimits.AutoScale() + defaultScaledLimits := DefaultLimits + defaultScaledLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaultScaledLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaults := defaultScaledLimits.AutoScale() cfg, err := readLimiterConfigFromJSON(in, defaults) require.NoError(t, err) @@ -91,10 +94,57 @@ func TestLimitConfigParser(t *testing.T) { require.Equal(t, int64(4097), cfg.peer[peerID].Memory) // Roundtrip - limitConfig := FromReifiedLimitConfig(cfg, defaults) + limitConfig := cfg.ToLimitConfigWithDefaults(defaults) jsonBytes, err := json.Marshal(&limitConfig) require.NoError(t, err) cfgAfterRoundTrip, err := readLimiterConfigFromJSON(bytes.NewReader(jsonBytes), defaults) require.NoError(t, err) require.Equal(t, cfg, cfgAfterRoundTrip) } + +func TestLimitConfigRoundTrip(t *testing.T) { + // Tests that we can roundtrip a LimitConfig to a ReifiedLimitConfig and back. + in, err := os.Open("limit_config_test.json") + require.NoError(t, err) + defer in.Close() + + defaults := DefaultLimits + defaults.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaults.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + reifiedCfg, err := readLimiterConfigFromJSON(in, defaults.AutoScale()) + require.NoError(t, err) + + // Roundtrip + limitConfig := reifiedCfg.ToLimitConfig() + // Using InfiniteLimits because it's different then the defaults used above. + // If anything was marked "default" in the round trip, it would show up as a + // difference here. + reifiedCfgRT := limitConfig.Reify(InfiniteLimits) + require.Equal(t, reifiedCfg, reifiedCfgRT) +} + +func TestReadmeLimitConfigSerialization(t *testing.T) { + noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf") + cfg := LimitConfig{ + System: &ResourceLimits{ + // Allow unlimited outbound streams + StreamsOutbound: Unlimited, + }, + Peer: map[peer.ID]ResourceLimits{ + noisyNeighbor: { + // No inbound connections from this peer + ConnsInbound: BlockAllLimit, + // But let me open connections to them + Conns: DefaultLimit, + ConnsOutbound: DefaultLimit, + // No inbound streams from this peer + StreamsInbound: BlockAllLimit, + // And let me open unlimited (by me) outbound streams (the peer may have their own limits on me) + StreamsOutbound: Unlimited, + }, + }, + } + jsonBytes, err := json.Marshal(&cfg) + require.NoError(t, err) + require.Equal(t, `{"System":{"StreamsOutbound":"unlimited"},"Peer":{"QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf":{"StreamsInbound":"blockAll","StreamsOutbound":"unlimited","ConnsInbound":"blockAll"}}}`, string(jsonBytes)) +} diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index 5d3935ff09..b899a47f05 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -248,7 +248,10 @@ type ResourceLimits struct { } // Apply overwrites all default limits with the values of l2 -func (l *ResourceLimits) Apply(l2 ResourceLimits) { +func (l *ResourceLimits) Apply(l2 *ResourceLimits) { + if l2 == nil { + return + } if l.Streams == DefaultLimit { l.Streams = l2.Streams } @@ -290,66 +293,35 @@ func (l *ResourceLimits) Reify(defaults BaseLimit) BaseLimit { } type LimitConfig struct { - System ResourceLimits `json:",omitempty"` - Transient ResourceLimits `json:",omitempty"` + System *ResourceLimits `json:",omitempty"` + Transient *ResourceLimits `json:",omitempty"` // Limits that are applied to resources with an allowlisted multiaddr. // These will only be used if the normal System & Transient limits are // reached. - AllowlistedSystem ResourceLimits `json:",omitempty"` - AllowlistedTransient ResourceLimits `json:",omitempty"` + AllowlistedSystem *ResourceLimits `json:",omitempty"` + AllowlistedTransient *ResourceLimits `json:",omitempty"` - ServiceDefault ResourceLimits `json:",omitempty"` + ServiceDefault *ResourceLimits `json:",omitempty"` Service map[string]ResourceLimits `json:",omitempty"` - ServicePeerDefault ResourceLimits `json:",omitempty"` + ServicePeerDefault *ResourceLimits `json:",omitempty"` ServicePeer map[string]ResourceLimits `json:",omitempty"` - ProtocolDefault ResourceLimits `json:",omitempty"` + ProtocolDefault *ResourceLimits `json:",omitempty"` Protocol map[protocol.ID]ResourceLimits `json:",omitempty"` - ProtocolPeerDefault ResourceLimits `json:",omitempty"` + ProtocolPeerDefault *ResourceLimits `json:",omitempty"` ProtocolPeer map[protocol.ID]ResourceLimits `json:",omitempty"` - PeerDefault ResourceLimits `json:",omitempty"` + PeerDefault *ResourceLimits `json:",omitempty"` Peer map[peer.ID]ResourceLimits `json:",omitempty"` - Conn ResourceLimits `json:",omitempty"` - Stream ResourceLimits `json:",omitempty"` -} - -// FromReifiedLimitConfig converts a ReifiedLimitConfig to a LimitConfig. Uses the defaults config to know what was specifically set and what was left as default. -func FromReifiedLimitConfig(cfg ReifiedLimitConfig, defaults ReifiedLimitConfig) LimitConfig { - out := LimitConfig{} - - out.System = resourceLimitsFromBaseLimit(cfg.system, defaults.system) - out.Transient = resourceLimitsFromBaseLimit(cfg.transient, defaults.transient) - - out.AllowlistedSystem = resourceLimitsFromBaseLimit(cfg.allowlistedSystem, defaults.allowlistedSystem) - out.AllowlistedTransient = resourceLimitsFromBaseLimit(cfg.allowlistedTransient, defaults.allowlistedTransient) - - out.ServiceDefault = resourceLimitsFromBaseLimit(cfg.serviceDefault, defaults.serviceDefault) - out.Service = resourceLimitsMapFromBaseLimitMap(cfg.service, defaults.service, defaults.serviceDefault) - - out.ServicePeerDefault = resourceLimitsFromBaseLimit(cfg.servicePeerDefault, defaults.servicePeerDefault) - out.ServicePeer = resourceLimitsMapFromBaseLimitMap(cfg.servicePeer, defaults.servicePeer, defaults.servicePeerDefault) - - out.ProtocolDefault = resourceLimitsFromBaseLimit(cfg.protocolDefault, defaults.protocolDefault) - out.Protocol = resourceLimitsMapFromBaseLimitMap(cfg.protocol, defaults.protocol, defaults.protocolDefault) - - out.ProtocolPeerDefault = resourceLimitsFromBaseLimit(cfg.protocolPeerDefault, defaults.protocolPeerDefault) - out.ProtocolPeer = resourceLimitsMapFromBaseLimitMap(cfg.protocolPeer, defaults.protocolPeer, defaults.protocolPeerDefault) - - out.PeerDefault = resourceLimitsFromBaseLimit(cfg.peerDefault, defaults.peerDefault) - out.Peer = resourceLimitsMapFromBaseLimitMap(cfg.peer, defaults.peer, defaults.peerDefault) - - out.Conn = resourceLimitsFromBaseLimit(cfg.conn, defaults.conn) - out.Stream = resourceLimitsFromBaseLimit(cfg.stream, defaults.stream) - - return out + Conn *ResourceLimits `json:",omitempty"` + Stream *ResourceLimits `json:",omitempty"` } -func resourceLimitsMapFromBaseLimitMap[K comparable](m map[K]BaseLimit, defaultLimits map[K]BaseLimit, fallbackDefault BaseLimit) map[K]ResourceLimits { +func resourceLimitsMapFromBaseLimitMapWithDefaults[K comparable](m map[K]BaseLimit, defaultLimits map[K]BaseLimit, fallbackDefault BaseLimit) map[K]ResourceLimits { if len(m) == 0 { return nil } @@ -357,16 +329,16 @@ func resourceLimitsMapFromBaseLimitMap[K comparable](m map[K]BaseLimit, defaultL out := make(map[K]ResourceLimits, len(m)) for k, v := range m { if defaultForKey, ok := defaultLimits[k]; ok { - out[k] = resourceLimitsFromBaseLimit(v, defaultForKey) + out[k] = *resourceLimitsFromBaseLimit(v, defaultForKey) } else { - out[k] = resourceLimitsFromBaseLimit(v, fallbackDefault) + out[k] = *resourceLimitsFromBaseLimit(v, fallbackDefault) } } return out } -func resourceLimitsFromBaseLimit(l BaseLimit, defaultLimit BaseLimit) ResourceLimits { - return ResourceLimits{ +func resourceLimitsFromBaseLimit(l BaseLimit, defaultLimit BaseLimit) *ResourceLimits { + return &ResourceLimits{ Streams: limitValFromInt(l.Streams, defaultLimit.Streams), StreamsInbound: limitValFromInt(l.StreamsInbound, defaultLimit.StreamsInbound), StreamsOutbound: limitValFromInt(l.StreamsOutbound, defaultLimit.StreamsOutbound), @@ -417,11 +389,11 @@ func (cfg *LimitConfig) MarshalJSON() ([]byte, error) { }) } -func applyResourceLimitsMap[K comparable](this *map[K]ResourceLimits, other map[K]ResourceLimits, fallbackDefault ResourceLimits) { +func applyResourceLimitsMap[K comparable](this *map[K]ResourceLimits, other map[K]ResourceLimits, fallbackDefault *ResourceLimits) { for k, l := range *this { r := fallbackDefault if l2, ok := other[k]; ok { - r = l2 + r = &l2 } l.Apply(r) (*this)[k] = l @@ -533,6 +505,76 @@ type ReifiedLimitConfig struct { stream BaseLimit } +// ToLimitConfigWithDefaults converts a ReifiedLimitConfig to a LimitConfig. +// Uses the defaults config to know what was specifically set and what was left +// as default. Returns a minimal LimitConfig. Reify the returned LimitConfig +// with the defaults to get back to the original ReifiedLimitConfig. +func (cfg ReifiedLimitConfig) ToLimitConfigWithDefaults(defaults ReifiedLimitConfig) LimitConfig { + out := LimitConfig{} + + out.System = resourceLimitsFromBaseLimit(cfg.system, defaults.system) + out.Transient = resourceLimitsFromBaseLimit(cfg.transient, defaults.transient) + + out.AllowlistedSystem = resourceLimitsFromBaseLimit(cfg.allowlistedSystem, defaults.allowlistedSystem) + out.AllowlistedTransient = resourceLimitsFromBaseLimit(cfg.allowlistedTransient, defaults.allowlistedTransient) + + out.ServiceDefault = resourceLimitsFromBaseLimit(cfg.serviceDefault, defaults.serviceDefault) + out.Service = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.service, defaults.service, defaults.serviceDefault) + + out.ServicePeerDefault = resourceLimitsFromBaseLimit(cfg.servicePeerDefault, defaults.servicePeerDefault) + out.ServicePeer = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.servicePeer, defaults.servicePeer, defaults.servicePeerDefault) + + out.ProtocolDefault = resourceLimitsFromBaseLimit(cfg.protocolDefault, defaults.protocolDefault) + out.Protocol = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.protocol, defaults.protocol, defaults.protocolDefault) + + out.ProtocolPeerDefault = resourceLimitsFromBaseLimit(cfg.protocolPeerDefault, defaults.protocolPeerDefault) + out.ProtocolPeer = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.protocolPeer, defaults.protocolPeer, defaults.protocolPeerDefault) + + out.PeerDefault = resourceLimitsFromBaseLimit(cfg.peerDefault, defaults.peerDefault) + out.Peer = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.peer, defaults.peer, defaults.peerDefault) + + out.Conn = resourceLimitsFromBaseLimit(cfg.conn, defaults.conn) + out.Stream = resourceLimitsFromBaseLimit(cfg.stream, defaults.stream) + + return out +} + +func resourceLimitsMapFromBaseLimitMap[K comparable](baseLimitMap map[K]BaseLimit) map[K]ResourceLimits { + if baseLimitMap == nil { + return nil + } + + out := make(map[K]ResourceLimits) + for k, l := range baseLimitMap { + out[k] = *l.ToResourceLimits() + } + + return out +} + +// ToLimitConfig converts a ReifiedLimitConfig to a LimitConfig. The returned +// LimitConfig will have no default values. +func (cfg ReifiedLimitConfig) ToLimitConfig() LimitConfig { + return LimitConfig{ + System: cfg.system.ToResourceLimits(), + Transient: cfg.transient.ToResourceLimits(), + AllowlistedSystem: cfg.allowlistedSystem.ToResourceLimits(), + AllowlistedTransient: cfg.allowlistedTransient.ToResourceLimits(), + ServiceDefault: cfg.serviceDefault.ToResourceLimits(), + Service: resourceLimitsMapFromBaseLimitMap(cfg.service), + ServicePeerDefault: cfg.servicePeerDefault.ToResourceLimits(), + ServicePeer: resourceLimitsMapFromBaseLimitMap(cfg.servicePeer), + ProtocolDefault: cfg.protocolDefault.ToResourceLimits(), + Protocol: resourceLimitsMapFromBaseLimitMap(cfg.protocol), + ProtocolPeerDefault: cfg.protocolPeerDefault.ToResourceLimits(), + ProtocolPeer: resourceLimitsMapFromBaseLimitMap(cfg.protocolPeer), + PeerDefault: cfg.peerDefault.ToResourceLimits(), + Peer: resourceLimitsMapFromBaseLimitMap(cfg.peer), + Conn: cfg.conn.ToResourceLimits(), + Stream: cfg.stream.ToResourceLimits(), + } +} + // Scale scales up a limit configuration. // memory is the amount of memory that the stack is allowed to consume, // for a dedicated node it's recommended to use 1/8 of the installed system memory. diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index 3428309a0b..b00f1c219f 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -187,12 +187,12 @@ func TestJSONRoundTripInt64(t *testing.T) { func TestRoundTripFromReifyAndBack(t *testing.T) { l := LimitConfig{ - System: ResourceLimits{ + System: &ResourceLimits{ Conns: 1234, Memory: 54321, }, - ServiceDefault: ResourceLimits{ + ServiceDefault: &ResourceLimits{ Conns: 2, }, @@ -206,7 +206,7 @@ func TestRoundTripFromReifyAndBack(t *testing.T) { reified := l.Reify(InfiniteLimits) // Roundtrip - fromReified := FromReifiedLimitConfig(reified, InfiniteLimits) + fromReified := reified.ToLimitConfigWithDefaults(InfiniteLimits) require.Equal(t, l, fromReified) } diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index 60f223f2ec..8ee1af5a13 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -47,12 +47,13 @@ func TestResourceManagerConnInbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns cfg := rcmgr.LimitConfig{ - System: rcmgr.ResourceLimits{ - ConnsInbound: 3, - ConnsOutbound: 1024, - Conns: 1024, + System: &rcmgr.ResourceLimits{ + ConnsInbound: 3, + ConnsOutbound: 1024, + Conns: 1024, + StreamsOutbound: rcmgr.Unlimited, }, - PeerDefault: rcmgr.ResourceLimits{ + PeerDefault: &rcmgr.ResourceLimits{ ConnsInbound: 1, ConnsOutbound: 1, Conns: 1, @@ -88,12 +89,12 @@ func TestResourceManagerConnOutbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns cfg := rcmgr.LimitConfig{ - System: rcmgr.ResourceLimits{ + System: &rcmgr.ResourceLimits{ ConnsInbound: 1024, ConnsOutbound: 3, Conns: 1024, }, - PeerDefault: rcmgr.ResourceLimits{ + PeerDefault: &rcmgr.ResourceLimits{ ConnsInbound: 1, ConnsOutbound: 1, Conns: 1, @@ -128,7 +129,7 @@ func TestResourceManagerServiceInbound(t *testing.T) { // this test checks that we can not exceed the inbound stream limit at service level // we specify: 3 streams for the service, and we try to create 4 streams cfg := rcmgr.LimitConfig{ - ServiceDefault: rcmgr.ResourceLimits{ + ServiceDefault: &rcmgr.ResourceLimits{ StreamsInbound: 3, StreamsOutbound: 1024, Streams: 1024, @@ -289,3 +290,45 @@ func waitForChannel(ready chan struct{}, timeout time.Duration) func() error { } } } + +func TestReadmeLimitConfigSerialization(t *testing.T) { + // Start with the default scaling limits. + scalingLimits := rcmgr.DefaultLimits + + // Add limits around included libp2p protocols + libp2p.SetDefaultServiceLimits(&scalingLimits) + + // Turn the scaling limits into a reified set of limits using `.AutoScale`. This + // scales the limits proportional to your system memory. + scaledDefaultLimits := scalingLimits.AutoScale() + + // Tweak certain settings + cfg := rcmgr.LimitConfig{ + System: &rcmgr.ResourceLimits{ + // Allow unlimited outbound streams + StreamsOutbound: rcmgr.Unlimited, + }, + // Everything else is default. The exact values will come from `scaledDefaultLimits` above. + } + + // Create our limits by using our cfg and replacing the default values with values from `scaledDefaultLimits` + limits := cfg.Reify(scaledDefaultLimits) + + // The resource manager expects a limiter, se we create one from our limits. + limiter := rcmgr.NewFixedLimiter(limits) + + // (Optional if you want metrics) Construct the OpenCensus metrics reporter. + str, err := rcmgrObs.NewStatsTraceReporter() + if err != nil { + panic(err) + } + + // Initialize the resource manager + rm, err := rcmgr.NewResourceManager(limiter, rcmgr.WithTraceReporter(str)) + if err != nil { + panic(err) + } + + // Create a libp2p host + host, err := libp2p.New(libp2p.ResourceManager(rm)) +} From 421a07d1d9726db777c916833d1b9533403710b2 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 19 Jan 2023 09:42:03 -0800 Subject: [PATCH 05/18] Reify a nil pointer with defaults --- p2p/host/resource-manager/limit_defaults.go | 4 ++++ p2p/test/resource-manager/rcmgr_test.go | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index b899a47f05..a203bc5099 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -279,6 +279,10 @@ func (l *ResourceLimits) Apply(l2 *ResourceLimits) { } func (l *ResourceLimits) Reify(defaults BaseLimit) BaseLimit { + if l == nil { + return defaults + } + out := defaults out.Streams = l.Streams.Reify(defaults.Streams) out.StreamsInbound = l.StreamsInbound.Reify(defaults.StreamsInbound) diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index 8ee1af5a13..b56efcec1e 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" "github.com/stretchr/testify/require" ) @@ -291,7 +292,7 @@ func waitForChannel(ready chan struct{}, timeout time.Duration) func() error { } } -func TestReadmeLimitConfigSerialization(t *testing.T) { +func TestReadmeExample(t *testing.T) { // Start with the default scaling limits. scalingLimits := rcmgr.DefaultLimits @@ -331,4 +332,9 @@ func TestReadmeLimitConfigSerialization(t *testing.T) { // Create a libp2p host host, err := libp2p.New(libp2p.ResourceManager(rm)) + if err != nil { + panic(err) + } + + host.Close() } From 75f2fc45295bf50ba9dfd4139ed74a723f85dd43 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 19 Jan 2023 09:43:27 -0800 Subject: [PATCH 06/18] Fix maxints --- p2p/host/resource-manager/limit_defaults.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index a203bc5099..d78b47ab59 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -157,7 +157,7 @@ func (l LimitVal) Reify(defaultVal int) int { return defaultVal } if l == Unlimited { - return math.MaxInt32 + return math.MaxInt } if l == BlockAllLimit { return 0 @@ -227,7 +227,7 @@ func (l LimitVal64) Reify(defaultVal int64) int64 { return defaultVal } if l == Unlimited64 { - return math.MaxInt32 + return math.MaxInt64 } if l == BlockAllLimit64 { return 0 From e3999f15d8b3a7c036cf97fe860e29f43e030728 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 19 Jan 2023 09:47:42 -0800 Subject: [PATCH 07/18] Move helper --- p2p/host/resource-manager/limit.go | 36 ++++++++++++ p2p/host/resource-manager/limit_defaults.go | 61 +++++---------------- 2 files changed, 49 insertions(+), 48 deletions(-) diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index 3a2178bc9d..c77dee5f4d 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -12,6 +12,7 @@ package rcmgr import ( "encoding/json" "io" + "math" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -109,6 +110,28 @@ func valueOrBlockAll64(n int64) LimitVal64 { return LimitVal64(n) } +func limitValFromInt(i int, defaultVal int) LimitVal { + if i == defaultVal { + return DefaultLimit + } else if i == math.MaxInt { + return Unlimited + } else if i == 0 { + return BlockAllLimit + } + return LimitVal(i) +} + +func limitValFromInt64(i int64, defaultVal int64) LimitVal64 { + if i == defaultVal { + return DefaultLimit64 + } else if i == math.MaxInt { + return Unlimited64 + } else if i == 0 { + return BlockAllLimit64 + } + return LimitVal64(i) +} + // ToResourceLimits converts the BaseLimit to a ResourceLimits func (l BaseLimit) ToResourceLimits() *ResourceLimits { return &ResourceLimits{ @@ -123,6 +146,19 @@ func (l BaseLimit) ToResourceLimits() *ResourceLimits { } } +func (l BaseLimit) ToResourceLimitsWithDefault(defaultLimit BaseLimit) *ResourceLimits { + return &ResourceLimits{ + Streams: limitValFromInt(l.Streams, defaultLimit.Streams), + StreamsInbound: limitValFromInt(l.StreamsInbound, defaultLimit.StreamsInbound), + StreamsOutbound: limitValFromInt(l.StreamsOutbound, defaultLimit.StreamsOutbound), + Conns: limitValFromInt(l.Conns, defaultLimit.Conns), + ConnsInbound: limitValFromInt(l.ConnsInbound, defaultLimit.ConnsInbound), + ConnsOutbound: limitValFromInt(l.ConnsOutbound, defaultLimit.ConnsOutbound), + FD: limitValFromInt(l.FD, defaultLimit.FD), + Memory: limitValFromInt64(l.Memory, defaultLimit.Memory), + } +} + // Apply overwrites all zero-valued limits with the values of l2 // Must not use a pointer receiver. func (l *BaseLimit) Apply(l2 BaseLimit) { diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index d78b47ab59..cb1fd2de85 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -333,49 +333,14 @@ func resourceLimitsMapFromBaseLimitMapWithDefaults[K comparable](m map[K]BaseLim out := make(map[K]ResourceLimits, len(m)) for k, v := range m { if defaultForKey, ok := defaultLimits[k]; ok { - out[k] = *resourceLimitsFromBaseLimit(v, defaultForKey) + out[k] = *v.ToResourceLimitsWithDefault(defaultForKey) } else { - out[k] = *resourceLimitsFromBaseLimit(v, fallbackDefault) + out[k] = *v.ToResourceLimitsWithDefault(fallbackDefault) } } return out } -func resourceLimitsFromBaseLimit(l BaseLimit, defaultLimit BaseLimit) *ResourceLimits { - return &ResourceLimits{ - Streams: limitValFromInt(l.Streams, defaultLimit.Streams), - StreamsInbound: limitValFromInt(l.StreamsInbound, defaultLimit.StreamsInbound), - StreamsOutbound: limitValFromInt(l.StreamsOutbound, defaultLimit.StreamsOutbound), - Conns: limitValFromInt(l.Conns, defaultLimit.Conns), - ConnsInbound: limitValFromInt(l.ConnsInbound, defaultLimit.ConnsInbound), - ConnsOutbound: limitValFromInt(l.ConnsOutbound, defaultLimit.ConnsOutbound), - FD: limitValFromInt(l.FD, defaultLimit.FD), - Memory: limitValFromInt64(l.Memory, defaultLimit.Memory), - } -} - -func limitValFromInt(i int, defaultVal int) LimitVal { - if i == defaultVal { - return DefaultLimit - } else if i == math.MaxInt { - return Unlimited - } else if i == 0 { - return BlockAllLimit - } - return LimitVal(i) -} - -func limitValFromInt64(i int64, defaultVal int64) LimitVal64 { - if i == defaultVal { - return DefaultLimit64 - } else if i == math.MaxInt { - return Unlimited64 - } else if i == 0 { - return BlockAllLimit64 - } - return LimitVal64(i) -} - func (cfg *LimitConfig) MarshalJSON() ([]byte, error) { // we want to marshal the encoded peer id encodedPeerMap := make(map[string]ResourceLimits, len(cfg.Peer)) @@ -516,29 +481,29 @@ type ReifiedLimitConfig struct { func (cfg ReifiedLimitConfig) ToLimitConfigWithDefaults(defaults ReifiedLimitConfig) LimitConfig { out := LimitConfig{} - out.System = resourceLimitsFromBaseLimit(cfg.system, defaults.system) - out.Transient = resourceLimitsFromBaseLimit(cfg.transient, defaults.transient) + out.System = cfg.system.ToResourceLimitsWithDefault(defaults.system) + out.Transient = cfg.transient.ToResourceLimitsWithDefault(defaults.transient) - out.AllowlistedSystem = resourceLimitsFromBaseLimit(cfg.allowlistedSystem, defaults.allowlistedSystem) - out.AllowlistedTransient = resourceLimitsFromBaseLimit(cfg.allowlistedTransient, defaults.allowlistedTransient) + out.AllowlistedSystem = cfg.allowlistedSystem.ToResourceLimitsWithDefault(defaults.allowlistedSystem) + out.AllowlistedTransient = cfg.allowlistedTransient.ToResourceLimitsWithDefault(defaults.allowlistedTransient) - out.ServiceDefault = resourceLimitsFromBaseLimit(cfg.serviceDefault, defaults.serviceDefault) + out.ServiceDefault = cfg.serviceDefault.ToResourceLimitsWithDefault(defaults.serviceDefault) out.Service = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.service, defaults.service, defaults.serviceDefault) - out.ServicePeerDefault = resourceLimitsFromBaseLimit(cfg.servicePeerDefault, defaults.servicePeerDefault) + out.ServicePeerDefault = cfg.servicePeerDefault.ToResourceLimitsWithDefault(defaults.servicePeerDefault) out.ServicePeer = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.servicePeer, defaults.servicePeer, defaults.servicePeerDefault) - out.ProtocolDefault = resourceLimitsFromBaseLimit(cfg.protocolDefault, defaults.protocolDefault) + out.ProtocolDefault = cfg.protocolDefault.ToResourceLimitsWithDefault(defaults.protocolDefault) out.Protocol = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.protocol, defaults.protocol, defaults.protocolDefault) - out.ProtocolPeerDefault = resourceLimitsFromBaseLimit(cfg.protocolPeerDefault, defaults.protocolPeerDefault) + out.ProtocolPeerDefault = cfg.protocolPeerDefault.ToResourceLimitsWithDefault(defaults.protocolPeerDefault) out.ProtocolPeer = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.protocolPeer, defaults.protocolPeer, defaults.protocolPeerDefault) - out.PeerDefault = resourceLimitsFromBaseLimit(cfg.peerDefault, defaults.peerDefault) + out.PeerDefault = cfg.peerDefault.ToResourceLimitsWithDefault(defaults.peerDefault) out.Peer = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.peer, defaults.peer, defaults.peerDefault) - out.Conn = resourceLimitsFromBaseLimit(cfg.conn, defaults.conn) - out.Stream = resourceLimitsFromBaseLimit(cfg.stream, defaults.stream) + out.Conn = cfg.conn.ToResourceLimitsWithDefault(defaults.conn) + out.Stream = cfg.stream.ToResourceLimitsWithDefault(defaults.stream) return out } From 88df42eb485f4c44ffdd8a040c724ec6d5f9caa2 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 20 Jan 2023 01:02:52 +0000 Subject: [PATCH 08/18] Return nil if everything is default --- p2p/host/resource-manager/limit.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index c77dee5f4d..9e97769deb 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -147,7 +147,7 @@ func (l BaseLimit) ToResourceLimits() *ResourceLimits { } func (l BaseLimit) ToResourceLimitsWithDefault(defaultLimit BaseLimit) *ResourceLimits { - return &ResourceLimits{ + out := ResourceLimits{ Streams: limitValFromInt(l.Streams, defaultLimit.Streams), StreamsInbound: limitValFromInt(l.StreamsInbound, defaultLimit.StreamsInbound), StreamsOutbound: limitValFromInt(l.StreamsOutbound, defaultLimit.StreamsOutbound), @@ -157,6 +157,19 @@ func (l BaseLimit) ToResourceLimitsWithDefault(defaultLimit BaseLimit) *Resource FD: limitValFromInt(l.FD, defaultLimit.FD), Memory: limitValFromInt64(l.Memory, defaultLimit.Memory), } + + if out.Streams == DefaultLimit && + out.StreamsInbound == DefaultLimit && + out.StreamsOutbound == DefaultLimit && + out.Conns == DefaultLimit && + out.ConnsInbound == DefaultLimit && + out.ConnsOutbound == DefaultLimit && + out.FD == DefaultLimit && + out.Memory == DefaultLimit64 { + return nil + } + + return &out } // Apply overwrites all zero-valued limits with the values of l2 From 7e524724d312dfdc480aa78264f0ab7eea0d240e Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 20 Jan 2023 16:17:06 +0000 Subject: [PATCH 09/18] Check for nil pointer --- p2p/host/resource-manager/limit_defaults.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index cb1fd2de85..6eed4e9335 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -332,10 +332,13 @@ func resourceLimitsMapFromBaseLimitMapWithDefaults[K comparable](m map[K]BaseLim out := make(map[K]ResourceLimits, len(m)) for k, v := range m { + def := fallbackDefault if defaultForKey, ok := defaultLimits[k]; ok { - out[k] = *v.ToResourceLimitsWithDefault(defaultForKey) - } else { - out[k] = *v.ToResourceLimitsWithDefault(fallbackDefault) + def = defaultForKey + } + rl := v.ToResourceLimitsWithDefault(def) + if rl != nil { + out[k] = *rl } } return out From 7e9135ec00970fde616ab384cc2143d609356e4f Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 20 Jan 2023 12:38:43 -0800 Subject: [PATCH 10/18] Change terms --- p2p/host/resource-manager/README.md | 12 +- p2p/host/resource-manager/limit.go | 14 +-- .../resource-manager/limit_config_test.go | 6 +- p2p/host/resource-manager/limit_defaults.go | 104 +++++++++--------- p2p/host/resource-manager/limit_test.go | 6 +- p2p/host/resource-manager/rcmgr_test.go | 2 +- p2p/test/resource-manager/rcmgr_test.go | 18 +-- 7 files changed, 81 insertions(+), 81 deletions(-) diff --git a/p2p/host/resource-manager/README.md b/p2p/host/resource-manager/README.md index cb798252e7..3ec9d6a208 100644 --- a/p2p/host/resource-manager/README.md +++ b/p2p/host/resource-manager/README.md @@ -33,7 +33,7 @@ libp2p.SetDefaultServiceLimits(&scalingLimits) scaledDefaultLimits := scalingLimits.AutoScale() // Tweak certain settings -cfg := rcmgr.LimitConfig{ +cfg := rcmgr.PartialLimitConfig{ System: &rcmgr.ResourceLimits{ // Allow unlimited outbound streams StreamsOutbound: rcmgr.Unlimited, @@ -42,7 +42,7 @@ cfg := rcmgr.LimitConfig{ } // Create our limits by using our cfg and replacing the default values with values from `scaledDefaultLimits` -limits := cfg.Reify(scaledDefaultLimits) +limits := cfg.Build(scaledDefaultLimits) // The resource manager expects a limiter, se we create one from our limits. limiter := rcmgr.NewFixedLimiter(limits) @@ -64,12 +64,12 @@ host, err := libp2p.New(libp2p.ResourceManager(rm)) ``` ### Saving the limits config -The easiest way to save the defined limits is to serialize the `LimitConfig` +The easiest way to save the defined limits is to serialize the `PartialLimitConfig` type as JSON. ```go noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf") -cfg := rcmgr.LimitConfig{ +cfg := rcmgr.PartialLimitConfig{ System: &rcmgr.ResourceLimits{ // Allow unlimited outbound streams StreamsOutbound: rcmgr.Unlimited, @@ -338,7 +338,7 @@ This is done using the `ScalingLimitConfig`. For every scope, this configuration struct defines the absolutely bare minimum limits, and an (optional) increase of these limits, which will be applied on nodes that have sufficient memory. -A `ScalingLimitConfig` can be converted into a `LimitConfig` (which can then be +A `ScalingLimitConfig` can be converted into a `ConcreteLimitConfig` (which can then be used to initialize a fixed limiter with `NewFixedLimiter`) by calling the `Scale` method. The `Scale` method takes two parameters: the amount of memory and the number of file descriptors that an application is willing to dedicate to libp2p. @@ -406,7 +406,7 @@ go-libp2p process. For the default definitions see [`DefaultLimits` and If the defaults seem mostly okay, but you want to adjust one facet you can simply copy the default struct object and update the field you want to change. You can -apply changes to a `BaseLimit`, `BaseLimitIncrease`, and `LimitConfig` with +apply changes to a `BaseLimit`, `BaseLimitIncrease`, and `ConcreteLimitConfig` with `.Apply`. Example diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index 9e97769deb..807dcac2df 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -57,7 +57,7 @@ func NewDefaultLimiterFromJSON(in io.Reader) (Limiter, error) { } // NewLimiterFromJSON creates a new limiter by parsing a json configuration. -func NewLimiterFromJSON(in io.Reader, defaults ReifiedLimitConfig) (Limiter, error) { +func NewLimiterFromJSON(in io.Reader, defaults ConcreteLimitConfig) (Limiter, error) { cfg, err := readLimiterConfigFromJSON(in, defaults) if err != nil { return nil, err @@ -65,22 +65,22 @@ func NewLimiterFromJSON(in io.Reader, defaults ReifiedLimitConfig) (Limiter, err return &fixedLimiter{cfg}, nil } -func readLimiterConfigFromJSON(in io.Reader, defaults ReifiedLimitConfig) (ReifiedLimitConfig, error) { - var cfg LimitConfig +func readLimiterConfigFromJSON(in io.Reader, defaults ConcreteLimitConfig) (ConcreteLimitConfig, error) { + var cfg PartialLimitConfig if err := json.NewDecoder(in).Decode(&cfg); err != nil { - return ReifiedLimitConfig{}, err + return ConcreteLimitConfig{}, err } - return cfg.Reify(defaults), nil + return cfg.Build(defaults), nil } // fixedLimiter is a limiter with fixed limits. type fixedLimiter struct { - ReifiedLimitConfig + ConcreteLimitConfig } var _ Limiter = (*fixedLimiter)(nil) -func NewFixedLimiter(conf ReifiedLimitConfig) Limiter { +func NewFixedLimiter(conf ConcreteLimitConfig) Limiter { log.Debugw("initializing new limiter with config", "limits", conf) return &fixedLimiter{conf} } diff --git a/p2p/host/resource-manager/limit_config_test.go b/p2p/host/resource-manager/limit_config_test.go index 43f77bcf6f..67a52b7eae 100644 --- a/p2p/host/resource-manager/limit_config_test.go +++ b/p2p/host/resource-manager/limit_config_test.go @@ -103,7 +103,7 @@ func TestLimitConfigParser(t *testing.T) { } func TestLimitConfigRoundTrip(t *testing.T) { - // Tests that we can roundtrip a LimitConfig to a ReifiedLimitConfig and back. + // Tests that we can roundtrip a PartialLimitConfig to a ConcreteLimitConfig and back. in, err := os.Open("limit_config_test.json") require.NoError(t, err) defer in.Close() @@ -119,13 +119,13 @@ func TestLimitConfigRoundTrip(t *testing.T) { // Using InfiniteLimits because it's different then the defaults used above. // If anything was marked "default" in the round trip, it would show up as a // difference here. - reifiedCfgRT := limitConfig.Reify(InfiniteLimits) + reifiedCfgRT := limitConfig.Build(InfiniteLimits) require.Equal(t, reifiedCfg, reifiedCfgRT) } func TestReadmeLimitConfigSerialization(t *testing.T) { noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf") - cfg := LimitConfig{ + cfg := PartialLimitConfig{ System: &ResourceLimits{ // Allow unlimited outbound streams StreamsOutbound: Unlimited, diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index 6eed4e9335..70cf076360 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -152,7 +152,7 @@ func (l *LimitVal) UnmarshalJSON(b []byte) error { return nil } -func (l LimitVal) Reify(defaultVal int) int { +func (l LimitVal) Build(defaultVal int) int { if l == DefaultLimit { return defaultVal } @@ -222,7 +222,7 @@ func (l *LimitVal64) UnmarshalJSON(b []byte) error { return nil } -func (l LimitVal64) Reify(defaultVal int64) int64 { +func (l LimitVal64) Build(defaultVal int64) int64 { if l == DefaultLimit64 { return defaultVal } @@ -278,25 +278,25 @@ func (l *ResourceLimits) Apply(l2 *ResourceLimits) { } } -func (l *ResourceLimits) Reify(defaults BaseLimit) BaseLimit { +func (l *ResourceLimits) Build(defaults BaseLimit) BaseLimit { if l == nil { return defaults } out := defaults - out.Streams = l.Streams.Reify(defaults.Streams) - out.StreamsInbound = l.StreamsInbound.Reify(defaults.StreamsInbound) - out.StreamsOutbound = l.StreamsOutbound.Reify(defaults.StreamsOutbound) - out.Conns = l.Conns.Reify(defaults.Conns) - out.ConnsInbound = l.ConnsInbound.Reify(defaults.ConnsInbound) - out.ConnsOutbound = l.ConnsOutbound.Reify(defaults.ConnsOutbound) - out.FD = l.FD.Reify(defaults.FD) - out.Memory = l.Memory.Reify(defaults.Memory) + out.Streams = l.Streams.Build(defaults.Streams) + out.StreamsInbound = l.StreamsInbound.Build(defaults.StreamsInbound) + out.StreamsOutbound = l.StreamsOutbound.Build(defaults.StreamsOutbound) + out.Conns = l.Conns.Build(defaults.Conns) + out.ConnsInbound = l.ConnsInbound.Build(defaults.ConnsInbound) + out.ConnsOutbound = l.ConnsOutbound.Build(defaults.ConnsOutbound) + out.FD = l.FD.Build(defaults.FD) + out.Memory = l.Memory.Build(defaults.Memory) return out } -type LimitConfig struct { +type PartialLimitConfig struct { System *ResourceLimits `json:",omitempty"` Transient *ResourceLimits `json:",omitempty"` @@ -344,14 +344,14 @@ func resourceLimitsMapFromBaseLimitMapWithDefaults[K comparable](m map[K]BaseLim return out } -func (cfg *LimitConfig) MarshalJSON() ([]byte, error) { +func (cfg *PartialLimitConfig) MarshalJSON() ([]byte, error) { // we want to marshal the encoded peer id encodedPeerMap := make(map[string]ResourceLimits, len(cfg.Peer)) for p, v := range cfg.Peer { encodedPeerMap[p.String()] = v } - type Alias LimitConfig + type Alias PartialLimitConfig return json.Marshal(&struct { *Alias Peer map[string]ResourceLimits `json:",omitempty"` @@ -380,7 +380,7 @@ func applyResourceLimitsMap[K comparable](this *map[K]ResourceLimits, other map[ } } -func (cfg *LimitConfig) Apply(c LimitConfig) { +func (cfg *PartialLimitConfig) Apply(c PartialLimitConfig) { cfg.System.Apply(c.System) cfg.Transient.Apply(c.Transient) cfg.AllowlistedSystem.Apply(c.AllowlistedSystem) @@ -400,31 +400,31 @@ func (cfg *LimitConfig) Apply(c LimitConfig) { applyResourceLimitsMap(&cfg.Peer, c.Peer, cfg.PeerDefault) } -func (cfg LimitConfig) Reify(defaults ReifiedLimitConfig) ReifiedLimitConfig { +func (cfg PartialLimitConfig) Build(defaults ConcreteLimitConfig) ConcreteLimitConfig { out := defaults - out.system = cfg.System.Reify(defaults.system) - out.transient = cfg.Transient.Reify(defaults.transient) - out.allowlistedSystem = cfg.AllowlistedSystem.Reify(defaults.allowlistedSystem) - out.allowlistedTransient = cfg.AllowlistedTransient.Reify(defaults.allowlistedTransient) - out.serviceDefault = cfg.ServiceDefault.Reify(defaults.serviceDefault) - out.servicePeerDefault = cfg.ServicePeerDefault.Reify(defaults.servicePeerDefault) - out.protocolDefault = cfg.ProtocolDefault.Reify(defaults.protocolDefault) - out.protocolPeerDefault = cfg.ProtocolPeerDefault.Reify(defaults.protocolPeerDefault) - out.peerDefault = cfg.PeerDefault.Reify(defaults.peerDefault) - out.conn = cfg.Conn.Reify(defaults.conn) - out.stream = cfg.Stream.Reify(defaults.stream) - - out.service = reifyMapWithDefault(cfg.Service, defaults.service, out.serviceDefault) - out.servicePeer = reifyMapWithDefault(cfg.ServicePeer, defaults.servicePeer, out.servicePeerDefault) - out.protocol = reifyMapWithDefault(cfg.Protocol, defaults.protocol, out.protocolDefault) - out.protocolPeer = reifyMapWithDefault(cfg.ProtocolPeer, defaults.protocolPeer, out.protocolPeerDefault) - out.peer = reifyMapWithDefault(cfg.Peer, defaults.peer, out.peerDefault) + out.system = cfg.System.Build(defaults.system) + out.transient = cfg.Transient.Build(defaults.transient) + out.allowlistedSystem = cfg.AllowlistedSystem.Build(defaults.allowlistedSystem) + out.allowlistedTransient = cfg.AllowlistedTransient.Build(defaults.allowlistedTransient) + out.serviceDefault = cfg.ServiceDefault.Build(defaults.serviceDefault) + out.servicePeerDefault = cfg.ServicePeerDefault.Build(defaults.servicePeerDefault) + out.protocolDefault = cfg.ProtocolDefault.Build(defaults.protocolDefault) + out.protocolPeerDefault = cfg.ProtocolPeerDefault.Build(defaults.protocolPeerDefault) + out.peerDefault = cfg.PeerDefault.Build(defaults.peerDefault) + out.conn = cfg.Conn.Build(defaults.conn) + out.stream = cfg.Stream.Build(defaults.stream) + + out.service = buildMapWithDefault(cfg.Service, defaults.service, out.serviceDefault) + out.servicePeer = buildMapWithDefault(cfg.ServicePeer, defaults.servicePeer, out.servicePeerDefault) + out.protocol = buildMapWithDefault(cfg.Protocol, defaults.protocol, out.protocolDefault) + out.protocolPeer = buildMapWithDefault(cfg.ProtocolPeer, defaults.protocolPeer, out.protocolPeerDefault) + out.peer = buildMapWithDefault(cfg.Peer, defaults.peer, out.peerDefault) return out } -func reifyMapWithDefault[K comparable](definedLimits map[K]ResourceLimits, defaults map[K]BaseLimit, fallbackDefault BaseLimit) map[K]BaseLimit { +func buildMapWithDefault[K comparable](definedLimits map[K]ResourceLimits, defaults map[K]BaseLimit, fallbackDefault BaseLimit) map[K]BaseLimit { if definedLimits == nil && defaults == nil { return nil } @@ -436,19 +436,19 @@ func reifyMapWithDefault[K comparable](definedLimits map[K]ResourceLimits, defau for k, l := range definedLimits { if defaultForKey, ok := out[k]; ok { - out[k] = l.Reify(defaultForKey) + out[k] = l.Build(defaultForKey) } else { - out[k] = l.Reify(fallbackDefault) + out[k] = l.Build(fallbackDefault) } } return out } -// ReifiedLimitConfig is similar to LimitConfig, but all values are defined. +// ConcreteLimitConfig is similar to PartialLimitConfig, but all values are defined. // There is no unset "default" value. Commonly constructed by calling -// LimitConfig.Reify(rcmgr.DefaultLimits.AutoScale()) -type ReifiedLimitConfig struct { +// PartialLimitConfig.Build(rcmgr.DefaultLimits.AutoScale()) +type ConcreteLimitConfig struct { system BaseLimit transient BaseLimit @@ -477,12 +477,12 @@ type ReifiedLimitConfig struct { stream BaseLimit } -// ToLimitConfigWithDefaults converts a ReifiedLimitConfig to a LimitConfig. +// ToLimitConfigWithDefaults converts a ConcreteLimitConfig to a PartialLimitConfig. // Uses the defaults config to know what was specifically set and what was left -// as default. Returns a minimal LimitConfig. Reify the returned LimitConfig -// with the defaults to get back to the original ReifiedLimitConfig. -func (cfg ReifiedLimitConfig) ToLimitConfigWithDefaults(defaults ReifiedLimitConfig) LimitConfig { - out := LimitConfig{} +// as default. Returns a minimal PartialLimitConfig. Build the returned PartialLimitConfig +// with the defaults to get back to the original ConcreteLimitConfig. +func (cfg ConcreteLimitConfig) ToLimitConfigWithDefaults(defaults ConcreteLimitConfig) PartialLimitConfig { + out := PartialLimitConfig{} out.System = cfg.system.ToResourceLimitsWithDefault(defaults.system) out.Transient = cfg.transient.ToResourceLimitsWithDefault(defaults.transient) @@ -524,10 +524,10 @@ func resourceLimitsMapFromBaseLimitMap[K comparable](baseLimitMap map[K]BaseLimi return out } -// ToLimitConfig converts a ReifiedLimitConfig to a LimitConfig. The returned -// LimitConfig will have no default values. -func (cfg ReifiedLimitConfig) ToLimitConfig() LimitConfig { - return LimitConfig{ +// ToLimitConfig converts a ReifiedLimitConfig to a PartialLimitConfig. The returned +// PartialLimitConfig will have no default values. +func (cfg ConcreteLimitConfig) ToLimitConfig() PartialLimitConfig { + return PartialLimitConfig{ System: cfg.system.ToResourceLimits(), Transient: cfg.transient.ToResourceLimits(), AllowlistedSystem: cfg.allowlistedSystem.ToResourceLimits(), @@ -551,8 +551,8 @@ func (cfg ReifiedLimitConfig) ToLimitConfig() LimitConfig { // memory is the amount of memory that the stack is allowed to consume, // for a dedicated node it's recommended to use 1/8 of the installed system memory. // If memory is smaller than 128 MB, the base configuration will be used. -func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) ReifiedLimitConfig { - lc := ReifiedLimitConfig{ +func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) ConcreteLimitConfig { + lc := ConcreteLimitConfig{ system: scale(cfg.SystemBaseLimit, cfg.SystemLimitIncrease, memory, numFD), transient: scale(cfg.TransientBaseLimit, cfg.TransientLimitIncrease, memory, numFD), allowlistedSystem: scale(cfg.AllowlistedSystemBaseLimit, cfg.AllowlistedSystemLimitIncrease, memory, numFD), @@ -598,7 +598,7 @@ func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) ReifiedLimitConfig return lc } -func (cfg *ScalingLimitConfig) AutoScale() ReifiedLimitConfig { +func (cfg *ScalingLimitConfig) AutoScale() ConcreteLimitConfig { return cfg.Scale( int64(memory.TotalMemory())/8, getNumFDs()/2, @@ -835,7 +835,7 @@ var infiniteBaseLimit = BaseLimit{ // InfiniteLimits are a limiter configuration that uses unlimited limits, thus effectively not limiting anything. // Keep in mind that the operating system limits the number of file descriptors that an application can use. -var InfiniteLimits = ReifiedLimitConfig{ +var InfiniteLimits = ConcreteLimitConfig{ system: infiniteBaseLimit, transient: infiniteBaseLimit, allowlistedSystem: infiniteBaseLimit, diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index b00f1c219f..3dd3304acf 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -185,8 +185,8 @@ func TestJSONRoundTripInt64(t *testing.T) { require.Equal(t, bl, blDecoded) } -func TestRoundTripFromReifyAndBack(t *testing.T) { - l := LimitConfig{ +func TestRoundTripFromConcreteAndBack(t *testing.T) { + l := PartialLimitConfig{ System: &ResourceLimits{ Conns: 1234, Memory: 54321, @@ -203,7 +203,7 @@ func TestRoundTripFromReifyAndBack(t *testing.T) { }, } - reified := l.Reify(InfiniteLimits) + reified := l.Build(InfiniteLimits) // Roundtrip fromReified := reified.ToLimitConfigWithDefaults(InfiniteLimits) diff --git a/p2p/host/resource-manager/rcmgr_test.go b/p2p/host/resource-manager/rcmgr_test.go index 7e8e6ac8dc..61d53bebb3 100644 --- a/p2p/host/resource-manager/rcmgr_test.go +++ b/p2p/host/resource-manager/rcmgr_test.go @@ -21,7 +21,7 @@ func TestResourceManager(t *testing.T) { svcA := "A.svc" svcB := "B.svc" nmgr, err := NewResourceManager( - NewFixedLimiter(ReifiedLimitConfig{ + NewFixedLimiter(ConcreteLimitConfig{ system: BaseLimit{ Memory: 16384, StreamsInbound: 3, diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index b56efcec1e..d3f66bd5f3 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -18,7 +18,7 @@ import ( "github.com/stretchr/testify/require" ) -func makeRcmgrOption(t *testing.T, cfg rcmgr.ReifiedLimitConfig) func(int) libp2p.Option { +func makeRcmgrOption(t *testing.T, cfg rcmgr.ConcreteLimitConfig) func(int) libp2p.Option { return func(i int) libp2p.Option { var opts []rcmgr.Option if os.Getenv("LIBP2P_TEST_RCMGR_TRACE") == "1" { @@ -47,7 +47,7 @@ func waitForConnection(t *testing.T, src, dest *Echo) { func TestResourceManagerConnInbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns - cfg := rcmgr.LimitConfig{ + cfg := rcmgr.PartialLimitConfig{ System: &rcmgr.ResourceLimits{ ConnsInbound: 3, ConnsOutbound: 1024, @@ -59,7 +59,7 @@ func TestResourceManagerConnInbound(t *testing.T) { ConnsOutbound: 1, Conns: 1, }, - }.Reify(rcmgr.DefaultLimits.AutoScale()) + }.Build(rcmgr.DefaultLimits.AutoScale()) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) @@ -89,7 +89,7 @@ func TestResourceManagerConnInbound(t *testing.T) { func TestResourceManagerConnOutbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns - cfg := rcmgr.LimitConfig{ + cfg := rcmgr.PartialLimitConfig{ System: &rcmgr.ResourceLimits{ ConnsInbound: 1024, ConnsOutbound: 3, @@ -100,7 +100,7 @@ func TestResourceManagerConnOutbound(t *testing.T) { ConnsOutbound: 1, Conns: 1, }, - }.Reify(rcmgr.DefaultLimits.AutoScale()) + }.Build(rcmgr.DefaultLimits.AutoScale()) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) defer closeRcmgrs(echos) @@ -129,13 +129,13 @@ func TestResourceManagerConnOutbound(t *testing.T) { func TestResourceManagerServiceInbound(t *testing.T) { // this test checks that we can not exceed the inbound stream limit at service level // we specify: 3 streams for the service, and we try to create 4 streams - cfg := rcmgr.LimitConfig{ + cfg := rcmgr.PartialLimitConfig{ ServiceDefault: &rcmgr.ResourceLimits{ StreamsInbound: 3, StreamsOutbound: 1024, Streams: 1024, }, - }.Reify(rcmgr.DefaultLimits.AutoScale()) + }.Build(rcmgr.DefaultLimits.AutoScale()) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) defer closeRcmgrs(echos) @@ -304,7 +304,7 @@ func TestReadmeExample(t *testing.T) { scaledDefaultLimits := scalingLimits.AutoScale() // Tweak certain settings - cfg := rcmgr.LimitConfig{ + cfg := rcmgr.PartialLimitConfig{ System: &rcmgr.ResourceLimits{ // Allow unlimited outbound streams StreamsOutbound: rcmgr.Unlimited, @@ -313,7 +313,7 @@ func TestReadmeExample(t *testing.T) { } // Create our limits by using our cfg and replacing the default values with values from `scaledDefaultLimits` - limits := cfg.Reify(scaledDefaultLimits) + limits := cfg.Build(scaledDefaultLimits) // The resource manager expects a limiter, se we create one from our limits. limiter := rcmgr.NewFixedLimiter(limits) From c68e24ccd2868c62715c821f48e7d2f5bdfa26e1 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 20 Jan 2023 12:41:55 -0800 Subject: [PATCH 11/18] Add IsDefault helper --- p2p/host/resource-manager/limit.go | 9 +-------- p2p/host/resource-manager/limit_defaults.go | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index 807dcac2df..657aff4a77 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -158,14 +158,7 @@ func (l BaseLimit) ToResourceLimitsWithDefault(defaultLimit BaseLimit) *Resource Memory: limitValFromInt64(l.Memory, defaultLimit.Memory), } - if out.Streams == DefaultLimit && - out.StreamsInbound == DefaultLimit && - out.StreamsOutbound == DefaultLimit && - out.Conns == DefaultLimit && - out.ConnsInbound == DefaultLimit && - out.ConnsOutbound == DefaultLimit && - out.FD == DefaultLimit && - out.Memory == DefaultLimit64 { + if out.IsDefault() { return nil } diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index 70cf076360..3a4c5b29a1 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -247,6 +247,24 @@ type ResourceLimits struct { Memory LimitVal64 `json:",omitempty"` } +func (l *ResourceLimits) IsDefault() bool { + if l == nil { + return true + } + + if l.Streams == DefaultLimit && + l.StreamsInbound == DefaultLimit && + l.StreamsOutbound == DefaultLimit && + l.Conns == DefaultLimit && + l.ConnsInbound == DefaultLimit && + l.ConnsOutbound == DefaultLimit && + l.FD == DefaultLimit && + l.Memory == DefaultLimit64 { + return true + } + return false +} + // Apply overwrites all default limits with the values of l2 func (l *ResourceLimits) Apply(l2 *ResourceLimits) { if l2 == nil { From 2f95f285d2226a9ef034920b045d3acc1d42aee9 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 30 Jan 2023 13:06:16 -0800 Subject: [PATCH 12/18] Use values instead of pointers for PartialLimitConfig --- p2p/host/resource-manager/limit.go | 14 +--- .../resource-manager/limit_config_test.go | 4 +- p2p/host/resource-manager/limit_defaults.go | 76 ++++++++++++++----- p2p/host/resource-manager/limit_test.go | 4 +- p2p/test/resource-manager/rcmgr_test.go | 12 +-- 5 files changed, 69 insertions(+), 41 deletions(-) diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index 657aff4a77..8803b0566f 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -133,8 +133,8 @@ func limitValFromInt64(i int64, defaultVal int64) LimitVal64 { } // ToResourceLimits converts the BaseLimit to a ResourceLimits -func (l BaseLimit) ToResourceLimits() *ResourceLimits { - return &ResourceLimits{ +func (l BaseLimit) ToResourceLimits() ResourceLimits { + return ResourceLimits{ Streams: valueOrBlockAll(l.Streams), StreamsInbound: valueOrBlockAll(l.StreamsInbound), StreamsOutbound: valueOrBlockAll(l.StreamsOutbound), @@ -146,8 +146,8 @@ func (l BaseLimit) ToResourceLimits() *ResourceLimits { } } -func (l BaseLimit) ToResourceLimitsWithDefault(defaultLimit BaseLimit) *ResourceLimits { - out := ResourceLimits{ +func (l BaseLimit) ToResourceLimitsWithDefault(defaultLimit BaseLimit) ResourceLimits { + return ResourceLimits{ Streams: limitValFromInt(l.Streams, defaultLimit.Streams), StreamsInbound: limitValFromInt(l.StreamsInbound, defaultLimit.StreamsInbound), StreamsOutbound: limitValFromInt(l.StreamsOutbound, defaultLimit.StreamsOutbound), @@ -157,12 +157,6 @@ func (l BaseLimit) ToResourceLimitsWithDefault(defaultLimit BaseLimit) *Resource FD: limitValFromInt(l.FD, defaultLimit.FD), Memory: limitValFromInt64(l.Memory, defaultLimit.Memory), } - - if out.IsDefault() { - return nil - } - - return &out } // Apply overwrites all zero-valued limits with the values of l2 diff --git a/p2p/host/resource-manager/limit_config_test.go b/p2p/host/resource-manager/limit_config_test.go index 67a52b7eae..69d16453e6 100644 --- a/p2p/host/resource-manager/limit_config_test.go +++ b/p2p/host/resource-manager/limit_config_test.go @@ -126,7 +126,7 @@ func TestLimitConfigRoundTrip(t *testing.T) { func TestReadmeLimitConfigSerialization(t *testing.T) { noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf") cfg := PartialLimitConfig{ - System: &ResourceLimits{ + System: ResourceLimits{ // Allow unlimited outbound streams StreamsOutbound: Unlimited, }, @@ -146,5 +146,5 @@ func TestReadmeLimitConfigSerialization(t *testing.T) { } jsonBytes, err := json.Marshal(&cfg) require.NoError(t, err) - require.Equal(t, `{"System":{"StreamsOutbound":"unlimited"},"Peer":{"QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf":{"StreamsInbound":"blockAll","StreamsOutbound":"unlimited","ConnsInbound":"blockAll"}}}`, string(jsonBytes)) + require.Equal(t, `{"Peer":{"QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf":{"StreamsInbound":"blockAll","StreamsOutbound":"unlimited","ConnsInbound":"blockAll"}},"System":{"StreamsOutbound":"unlimited"}}`, string(jsonBytes)) } diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index 3a4c5b29a1..41e5f2fe46 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -265,11 +265,15 @@ func (l *ResourceLimits) IsDefault() bool { return false } -// Apply overwrites all default limits with the values of l2 -func (l *ResourceLimits) Apply(l2 *ResourceLimits) { - if l2 == nil { - return +func (l *ResourceLimits) ToMaybeNilPtr() *ResourceLimits { + if l.IsDefault() { + return nil } + return l +} + +// Apply overwrites all default limits with the values of l2 +func (l *ResourceLimits) Apply(l2 ResourceLimits) { if l.Streams == DefaultLimit { l.Streams = l2.Streams } @@ -315,32 +319,32 @@ func (l *ResourceLimits) Build(defaults BaseLimit) BaseLimit { } type PartialLimitConfig struct { - System *ResourceLimits `json:",omitempty"` - Transient *ResourceLimits `json:",omitempty"` + System ResourceLimits `json:",omitempty"` + Transient ResourceLimits `json:",omitempty"` // Limits that are applied to resources with an allowlisted multiaddr. // These will only be used if the normal System & Transient limits are // reached. - AllowlistedSystem *ResourceLimits `json:",omitempty"` - AllowlistedTransient *ResourceLimits `json:",omitempty"` + AllowlistedSystem ResourceLimits `json:",omitempty"` + AllowlistedTransient ResourceLimits `json:",omitempty"` - ServiceDefault *ResourceLimits `json:",omitempty"` + ServiceDefault ResourceLimits `json:",omitempty"` Service map[string]ResourceLimits `json:",omitempty"` - ServicePeerDefault *ResourceLimits `json:",omitempty"` + ServicePeerDefault ResourceLimits `json:",omitempty"` ServicePeer map[string]ResourceLimits `json:",omitempty"` - ProtocolDefault *ResourceLimits `json:",omitempty"` + ProtocolDefault ResourceLimits `json:",omitempty"` Protocol map[protocol.ID]ResourceLimits `json:",omitempty"` - ProtocolPeerDefault *ResourceLimits `json:",omitempty"` + ProtocolPeerDefault ResourceLimits `json:",omitempty"` ProtocolPeer map[protocol.ID]ResourceLimits `json:",omitempty"` - PeerDefault *ResourceLimits `json:",omitempty"` + PeerDefault ResourceLimits `json:",omitempty"` Peer map[peer.ID]ResourceLimits `json:",omitempty"` - Conn *ResourceLimits `json:",omitempty"` - Stream *ResourceLimits `json:",omitempty"` + Conn ResourceLimits `json:",omitempty"` + Stream ResourceLimits `json:",omitempty"` } func resourceLimitsMapFromBaseLimitMapWithDefaults[K comparable](m map[K]BaseLimit, defaultLimits map[K]BaseLimit, fallbackDefault BaseLimit) map[K]ResourceLimits { @@ -355,9 +359,7 @@ func resourceLimitsMapFromBaseLimitMapWithDefaults[K comparable](m map[K]BaseLim def = defaultForKey } rl := v.ToResourceLimitsWithDefault(def) - if rl != nil { - out[k] = *rl - } + out[k] = rl } return out } @@ -372,18 +374,50 @@ func (cfg *PartialLimitConfig) MarshalJSON() ([]byte, error) { type Alias PartialLimitConfig return json.Marshal(&struct { *Alias + // String so we can have the properly marshalled peer id Peer map[string]ResourceLimits `json:",omitempty"` + + // The rest of the fields as pointers so that we omit empty values in the serialized result + System *ResourceLimits `json:",omitempty"` + Transient *ResourceLimits `json:",omitempty"` + AllowlistedSystem *ResourceLimits `json:",omitempty"` + AllowlistedTransient *ResourceLimits `json:",omitempty"` + + ServiceDefault *ResourceLimits `json:",omitempty"` + + ServicePeerDefault *ResourceLimits `json:",omitempty"` + + ProtocolDefault *ResourceLimits `json:",omitempty"` + + ProtocolPeerDefault *ResourceLimits `json:",omitempty"` + + PeerDefault *ResourceLimits `json:",omitempty"` + + Conn *ResourceLimits `json:",omitempty"` + Stream *ResourceLimits `json:",omitempty"` }{ Alias: (*Alias)(cfg), Peer: encodedPeerMap, + + System: cfg.System.ToMaybeNilPtr(), + Transient: cfg.Transient.ToMaybeNilPtr(), + AllowlistedSystem: cfg.AllowlistedSystem.ToMaybeNilPtr(), + AllowlistedTransient: cfg.AllowlistedTransient.ToMaybeNilPtr(), + ServiceDefault: cfg.ServiceDefault.ToMaybeNilPtr(), + ServicePeerDefault: cfg.ServicePeerDefault.ToMaybeNilPtr(), + ProtocolDefault: cfg.ProtocolDefault.ToMaybeNilPtr(), + ProtocolPeerDefault: cfg.ProtocolPeerDefault.ToMaybeNilPtr(), + PeerDefault: cfg.PeerDefault.ToMaybeNilPtr(), + Conn: cfg.Conn.ToMaybeNilPtr(), + Stream: cfg.Stream.ToMaybeNilPtr(), }) } -func applyResourceLimitsMap[K comparable](this *map[K]ResourceLimits, other map[K]ResourceLimits, fallbackDefault *ResourceLimits) { +func applyResourceLimitsMap[K comparable](this *map[K]ResourceLimits, other map[K]ResourceLimits, fallbackDefault ResourceLimits) { for k, l := range *this { r := fallbackDefault if l2, ok := other[k]; ok { - r = &l2 + r = l2 } l.Apply(r) (*this)[k] = l @@ -536,7 +570,7 @@ func resourceLimitsMapFromBaseLimitMap[K comparable](baseLimitMap map[K]BaseLimi out := make(map[K]ResourceLimits) for k, l := range baseLimitMap { - out[k] = *l.ToResourceLimits() + out[k] = l.ToResourceLimits() } return out diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index 3dd3304acf..3f5dadcfd2 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -187,12 +187,12 @@ func TestJSONRoundTripInt64(t *testing.T) { func TestRoundTripFromConcreteAndBack(t *testing.T) { l := PartialLimitConfig{ - System: &ResourceLimits{ + System: ResourceLimits{ Conns: 1234, Memory: 54321, }, - ServiceDefault: &ResourceLimits{ + ServiceDefault: ResourceLimits{ Conns: 2, }, diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index d3f66bd5f3..52c8dd6949 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -48,13 +48,13 @@ func TestResourceManagerConnInbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns cfg := rcmgr.PartialLimitConfig{ - System: &rcmgr.ResourceLimits{ + System: rcmgr.ResourceLimits{ ConnsInbound: 3, ConnsOutbound: 1024, Conns: 1024, StreamsOutbound: rcmgr.Unlimited, }, - PeerDefault: &rcmgr.ResourceLimits{ + PeerDefault: rcmgr.ResourceLimits{ ConnsInbound: 1, ConnsOutbound: 1, Conns: 1, @@ -90,12 +90,12 @@ func TestResourceManagerConnOutbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns cfg := rcmgr.PartialLimitConfig{ - System: &rcmgr.ResourceLimits{ + System: rcmgr.ResourceLimits{ ConnsInbound: 1024, ConnsOutbound: 3, Conns: 1024, }, - PeerDefault: &rcmgr.ResourceLimits{ + PeerDefault: rcmgr.ResourceLimits{ ConnsInbound: 1, ConnsOutbound: 1, Conns: 1, @@ -130,7 +130,7 @@ func TestResourceManagerServiceInbound(t *testing.T) { // this test checks that we can not exceed the inbound stream limit at service level // we specify: 3 streams for the service, and we try to create 4 streams cfg := rcmgr.PartialLimitConfig{ - ServiceDefault: &rcmgr.ResourceLimits{ + ServiceDefault: rcmgr.ResourceLimits{ StreamsInbound: 3, StreamsOutbound: 1024, Streams: 1024, @@ -305,7 +305,7 @@ func TestReadmeExample(t *testing.T) { // Tweak certain settings cfg := rcmgr.PartialLimitConfig{ - System: &rcmgr.ResourceLimits{ + System: rcmgr.ResourceLimits{ // Allow unlimited outbound streams StreamsOutbound: rcmgr.Unlimited, }, From 15f6fa59e61e8259406fb5ee0c94bccfe541d591 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 31 Jan 2023 10:03:04 -0800 Subject: [PATCH 13/18] Remove references to reify --- p2p/host/resource-manager/README.md | 2 +- p2p/host/resource-manager/limit_config_test.go | 8 ++++---- p2p/host/resource-manager/limit_defaults.go | 8 +++++++- p2p/host/resource-manager/limit_test.go | 6 +++--- p2p/test/resource-manager/rcmgr_test.go | 2 +- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/p2p/host/resource-manager/README.md b/p2p/host/resource-manager/README.md index 3ec9d6a208..f81b90d9f3 100644 --- a/p2p/host/resource-manager/README.md +++ b/p2p/host/resource-manager/README.md @@ -28,7 +28,7 @@ scalingLimits := rcmgr.DefaultLimits // Add limits around included libp2p protocols libp2p.SetDefaultServiceLimits(&scalingLimits) -// Turn the scaling limits into a reified set of limits using `.AutoScale`. This +// Turn the scaling limits into a concrete set of limits using `.AutoScale`. This // scales the limits proportional to your system memory. scaledDefaultLimits := scalingLimits.AutoScale() diff --git a/p2p/host/resource-manager/limit_config_test.go b/p2p/host/resource-manager/limit_config_test.go index 69d16453e6..40e3b233b9 100644 --- a/p2p/host/resource-manager/limit_config_test.go +++ b/p2p/host/resource-manager/limit_config_test.go @@ -111,16 +111,16 @@ func TestLimitConfigRoundTrip(t *testing.T) { defaults := DefaultLimits defaults.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) defaults.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) - reifiedCfg, err := readLimiterConfigFromJSON(in, defaults.AutoScale()) + concreteCfg, err := readLimiterConfigFromJSON(in, defaults.AutoScale()) require.NoError(t, err) // Roundtrip - limitConfig := reifiedCfg.ToLimitConfig() + limitConfig := concreteCfg.ToLimitConfig() // Using InfiniteLimits because it's different then the defaults used above. // If anything was marked "default" in the round trip, it would show up as a // difference here. - reifiedCfgRT := limitConfig.Build(InfiniteLimits) - require.Equal(t, reifiedCfg, reifiedCfgRT) + concreteCfgRT := limitConfig.Build(InfiniteLimits) + require.Equal(t, concreteCfg, concreteCfgRT) } func TestReadmeLimitConfigSerialization(t *testing.T) { diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index 41e5f2fe46..c111943438 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -148,6 +148,12 @@ func (l *LimitVal) UnmarshalJSON(b []byte) error { if err := json.Unmarshal(b, &val); err != nil { return err } + + if val == 0 { + // If there is an explicit 0 in the JSON we should interpret this as block all. + *l = BlockAllLimit + } + *l = LimitVal(val) return nil } @@ -576,7 +582,7 @@ func resourceLimitsMapFromBaseLimitMap[K comparable](baseLimitMap map[K]BaseLimi return out } -// ToLimitConfig converts a ReifiedLimitConfig to a PartialLimitConfig. The returned +// ToLimitConfig converts a ConcreteLimitConfig to a PartialLimitConfig. The returned // PartialLimitConfig will have no default values. func (cfg ConcreteLimitConfig) ToLimitConfig() PartialLimitConfig { return PartialLimitConfig{ diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index 3f5dadcfd2..1bf5103c59 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -203,11 +203,11 @@ func TestRoundTripFromConcreteAndBack(t *testing.T) { }, } - reified := l.Build(InfiniteLimits) + concrete := l.Build(InfiniteLimits) // Roundtrip - fromReified := reified.ToLimitConfigWithDefaults(InfiniteLimits) - require.Equal(t, l, fromReified) + fromConcrete := concrete.ToLimitConfigWithDefaults(InfiniteLimits) + require.Equal(t, l, fromConcrete) } func TestSerializeJSON(t *testing.T) { diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index 52c8dd6949..017b49293b 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -299,7 +299,7 @@ func TestReadmeExample(t *testing.T) { // Add limits around included libp2p protocols libp2p.SetDefaultServiceLimits(&scalingLimits) - // Turn the scaling limits into a reified set of limits using `.AutoScale`. This + // Turn the scaling limits into a concrete set of limits using `.AutoScale`. This // scales the limits proportional to your system memory. scaledDefaultLimits := scalingLimits.AutoScale() From 56056a33125b34550bc4952fb4af5f8c359b2206 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 31 Jan 2023 12:45:20 -0800 Subject: [PATCH 14/18] Unmarshal 0 limits as block all --- p2p/host/resource-manager/limit_defaults.go | 15 ++++++++++++++- p2p/host/resource-manager/limit_test.go | 21 +++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index c111943438..9ef442e44b 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -152,6 +152,7 @@ func (l *LimitVal) UnmarshalJSON(b []byte) error { if val == 0 { // If there is an explicit 0 in the JSON we should interpret this as block all. *l = BlockAllLimit + return nil } *l = LimitVal(val) @@ -215,6 +216,12 @@ func (l *LimitVal64) UnmarshalJSON(b []byte) error { return fmt.Errorf("failed to unmarshal limit value: %w", err) } + if val == 0 { + // If there is an explicit 0 in the JSON we should interpret this as block all. + *l = BlockAllLimit64 + return nil + } + *l = LimitVal64(val) return nil } @@ -223,8 +230,14 @@ func (l *LimitVal64) UnmarshalJSON(b []byte) error { if err != nil { return err } - *l = LimitVal64(i) + if i == 0 { + // If there is an explicit 0 in the JSON we should interpret this as block all. + *l = BlockAllLimit64 + return nil + } + + *l = LimitVal64(i) return nil } diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index 1bf5103c59..a719f845d4 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -227,3 +227,24 @@ func TestSerializeJSON(t *testing.T) { require.NoError(t, err) require.Equal(t, "{\"Streams\":10}", string(out)) } + +func TestWhatIsZeroInResourceLimits(t *testing.T) { + l := ResourceLimits{ + Streams: BlockAllLimit, + Memory: BlockAllLimit64, + } + + out, err := json.Marshal(l) + require.NoError(t, err) + require.Equal(t, `{"Streams":"blockAll","Memory":"blockAll"}`, string(out)) + + l2 := ResourceLimits{} + err = json.Unmarshal([]byte(`{"Streams":0,"Memory":0}`), &l2) + require.NoError(t, err) + require.Equal(t, l, l2) + + l3 := ResourceLimits{} + err = json.Unmarshal([]byte(`{"Streams":0,"Memory":"0"}`), &l3) + require.NoError(t, err) + require.Equal(t, l, l3) +} From 0702c199046b989ff617192d41d3151139281d67 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 31 Jan 2023 13:04:53 -0800 Subject: [PATCH 15/18] ResourceLimits can build on Limits interface objs --- p2p/host/resource-manager/limit.go | 12 +++---- p2p/host/resource-manager/limit_defaults.go | 35 +++++++++++++-------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index 8803b0566f..96b85dda00 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -231,7 +231,7 @@ func (l *BaseLimitIncrease) Apply(l2 BaseLimitIncrease) { } } -func (l *BaseLimit) GetStreamLimit(dir network.Direction) int { +func (l BaseLimit) GetStreamLimit(dir network.Direction) int { if dir == network.DirInbound { return l.StreamsInbound } else { @@ -239,11 +239,11 @@ func (l *BaseLimit) GetStreamLimit(dir network.Direction) int { } } -func (l *BaseLimit) GetStreamTotalLimit() int { +func (l BaseLimit) GetStreamTotalLimit() int { return l.Streams } -func (l *BaseLimit) GetConnLimit(dir network.Direction) int { +func (l BaseLimit) GetConnLimit(dir network.Direction) int { if dir == network.DirInbound { return l.ConnsInbound } else { @@ -251,15 +251,15 @@ func (l *BaseLimit) GetConnLimit(dir network.Direction) int { } } -func (l *BaseLimit) GetConnTotalLimit() int { +func (l BaseLimit) GetConnTotalLimit() int { return l.Conns } -func (l *BaseLimit) GetFDLimit() int { +func (l BaseLimit) GetFDLimit() int { return l.FD } -func (l *BaseLimit) GetMemoryLimit() int64 { +func (l BaseLimit) GetMemoryLimit() int64 { return l.Memory } diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index 9ef442e44b..6086f6d124 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -6,6 +6,7 @@ import ( "math" "strconv" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -319,22 +320,30 @@ func (l *ResourceLimits) Apply(l2 ResourceLimits) { } } -func (l *ResourceLimits) Build(defaults BaseLimit) BaseLimit { +func (l *ResourceLimits) Build(defaults Limit) BaseLimit { if l == nil { - return defaults + return BaseLimit{ + Streams: defaults.GetStreamTotalLimit(), + StreamsInbound: defaults.GetStreamLimit(network.DirInbound), + StreamsOutbound: defaults.GetStreamLimit(network.DirOutbound), + Conns: defaults.GetConnTotalLimit(), + ConnsInbound: defaults.GetConnLimit(network.DirInbound), + ConnsOutbound: defaults.GetConnLimit(network.DirOutbound), + FD: defaults.GetFDLimit(), + Memory: defaults.GetMemoryLimit(), + } } - out := defaults - out.Streams = l.Streams.Build(defaults.Streams) - out.StreamsInbound = l.StreamsInbound.Build(defaults.StreamsInbound) - out.StreamsOutbound = l.StreamsOutbound.Build(defaults.StreamsOutbound) - out.Conns = l.Conns.Build(defaults.Conns) - out.ConnsInbound = l.ConnsInbound.Build(defaults.ConnsInbound) - out.ConnsOutbound = l.ConnsOutbound.Build(defaults.ConnsOutbound) - out.FD = l.FD.Build(defaults.FD) - out.Memory = l.Memory.Build(defaults.Memory) - - return out + return BaseLimit{ + Streams: l.Streams.Build(defaults.GetStreamTotalLimit()), + StreamsInbound: l.StreamsInbound.Build(defaults.GetStreamLimit(network.DirInbound)), + StreamsOutbound: l.StreamsOutbound.Build(defaults.GetStreamLimit(network.DirOutbound)), + Conns: l.Conns.Build(defaults.GetConnTotalLimit()), + ConnsInbound: l.ConnsInbound.Build(defaults.GetConnLimit(network.DirInbound)), + ConnsOutbound: l.ConnsOutbound.Build(defaults.GetConnLimit(network.DirOutbound)), + FD: l.FD.Build(defaults.GetFDLimit()), + Memory: l.Memory.Build(defaults.GetMemoryLimit()), + } } type PartialLimitConfig struct { From 75c0b1418e10453fb47cc72b41d0fb2f1aef3c74 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 31 Jan 2023 13:06:43 -0800 Subject: [PATCH 16/18] Convert max int to unlimited --- p2p/host/resource-manager/limit.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index 96b85dda00..9577b94c8a 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -100,12 +100,16 @@ type BaseLimit struct { func valueOrBlockAll(n int) LimitVal { if n == 0 { return BlockAllLimit + } else if n == math.MaxInt { + return Unlimited } return LimitVal(n) } func valueOrBlockAll64(n int64) LimitVal64 { if n == 0 { return BlockAllLimit64 + } else if n == math.MaxInt { + return Unlimited64 } return LimitVal64(n) } From ba009ae8bf6c51e2084dd85a155600d3eeece8ca Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 7 Feb 2023 08:37:57 -0800 Subject: [PATCH 17/18] Remove ToLimitConfigWithDefaults --- p2p/host/resource-manager/limit.go | 35 ------------- .../resource-manager/limit_config_test.go | 4 +- p2p/host/resource-manager/limit_defaults.go | 51 ------------------- p2p/host/resource-manager/limit_test.go | 4 +- 4 files changed, 4 insertions(+), 90 deletions(-) diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index 9577b94c8a..ef7fcdc9b4 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -114,28 +114,6 @@ func valueOrBlockAll64(n int64) LimitVal64 { return LimitVal64(n) } -func limitValFromInt(i int, defaultVal int) LimitVal { - if i == defaultVal { - return DefaultLimit - } else if i == math.MaxInt { - return Unlimited - } else if i == 0 { - return BlockAllLimit - } - return LimitVal(i) -} - -func limitValFromInt64(i int64, defaultVal int64) LimitVal64 { - if i == defaultVal { - return DefaultLimit64 - } else if i == math.MaxInt { - return Unlimited64 - } else if i == 0 { - return BlockAllLimit64 - } - return LimitVal64(i) -} - // ToResourceLimits converts the BaseLimit to a ResourceLimits func (l BaseLimit) ToResourceLimits() ResourceLimits { return ResourceLimits{ @@ -150,19 +128,6 @@ func (l BaseLimit) ToResourceLimits() ResourceLimits { } } -func (l BaseLimit) ToResourceLimitsWithDefault(defaultLimit BaseLimit) ResourceLimits { - return ResourceLimits{ - Streams: limitValFromInt(l.Streams, defaultLimit.Streams), - StreamsInbound: limitValFromInt(l.StreamsInbound, defaultLimit.StreamsInbound), - StreamsOutbound: limitValFromInt(l.StreamsOutbound, defaultLimit.StreamsOutbound), - Conns: limitValFromInt(l.Conns, defaultLimit.Conns), - ConnsInbound: limitValFromInt(l.ConnsInbound, defaultLimit.ConnsInbound), - ConnsOutbound: limitValFromInt(l.ConnsOutbound, defaultLimit.ConnsOutbound), - FD: limitValFromInt(l.FD, defaultLimit.FD), - Memory: limitValFromInt64(l.Memory, defaultLimit.Memory), - } -} - // Apply overwrites all zero-valued limits with the values of l2 // Must not use a pointer receiver. func (l *BaseLimit) Apply(l2 BaseLimit) { diff --git a/p2p/host/resource-manager/limit_config_test.go b/p2p/host/resource-manager/limit_config_test.go index 40e3b233b9..546f9779d8 100644 --- a/p2p/host/resource-manager/limit_config_test.go +++ b/p2p/host/resource-manager/limit_config_test.go @@ -94,12 +94,12 @@ func TestLimitConfigParser(t *testing.T) { require.Equal(t, int64(4097), cfg.peer[peerID].Memory) // Roundtrip - limitConfig := cfg.ToLimitConfigWithDefaults(defaults) + limitConfig := cfg.ToLimitConfig() jsonBytes, err := json.Marshal(&limitConfig) require.NoError(t, err) cfgAfterRoundTrip, err := readLimiterConfigFromJSON(bytes.NewReader(jsonBytes), defaults) require.NoError(t, err) - require.Equal(t, cfg, cfgAfterRoundTrip) + require.Equal(t, limitConfig, cfgAfterRoundTrip.ToLimitConfig()) } func TestLimitConfigRoundTrip(t *testing.T) { diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index 6086f6d124..db242af7d9 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -375,23 +375,6 @@ type PartialLimitConfig struct { Stream ResourceLimits `json:",omitempty"` } -func resourceLimitsMapFromBaseLimitMapWithDefaults[K comparable](m map[K]BaseLimit, defaultLimits map[K]BaseLimit, fallbackDefault BaseLimit) map[K]ResourceLimits { - if len(m) == 0 { - return nil - } - - out := make(map[K]ResourceLimits, len(m)) - for k, v := range m { - def := fallbackDefault - if defaultForKey, ok := defaultLimits[k]; ok { - def = defaultForKey - } - rl := v.ToResourceLimitsWithDefault(def) - out[k] = rl - } - return out -} - func (cfg *PartialLimitConfig) MarshalJSON() ([]byte, error) { // we want to marshal the encoded peer id encodedPeerMap := make(map[string]ResourceLimits, len(cfg.Peer)) @@ -557,40 +540,6 @@ type ConcreteLimitConfig struct { stream BaseLimit } -// ToLimitConfigWithDefaults converts a ConcreteLimitConfig to a PartialLimitConfig. -// Uses the defaults config to know what was specifically set and what was left -// as default. Returns a minimal PartialLimitConfig. Build the returned PartialLimitConfig -// with the defaults to get back to the original ConcreteLimitConfig. -func (cfg ConcreteLimitConfig) ToLimitConfigWithDefaults(defaults ConcreteLimitConfig) PartialLimitConfig { - out := PartialLimitConfig{} - - out.System = cfg.system.ToResourceLimitsWithDefault(defaults.system) - out.Transient = cfg.transient.ToResourceLimitsWithDefault(defaults.transient) - - out.AllowlistedSystem = cfg.allowlistedSystem.ToResourceLimitsWithDefault(defaults.allowlistedSystem) - out.AllowlistedTransient = cfg.allowlistedTransient.ToResourceLimitsWithDefault(defaults.allowlistedTransient) - - out.ServiceDefault = cfg.serviceDefault.ToResourceLimitsWithDefault(defaults.serviceDefault) - out.Service = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.service, defaults.service, defaults.serviceDefault) - - out.ServicePeerDefault = cfg.servicePeerDefault.ToResourceLimitsWithDefault(defaults.servicePeerDefault) - out.ServicePeer = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.servicePeer, defaults.servicePeer, defaults.servicePeerDefault) - - out.ProtocolDefault = cfg.protocolDefault.ToResourceLimitsWithDefault(defaults.protocolDefault) - out.Protocol = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.protocol, defaults.protocol, defaults.protocolDefault) - - out.ProtocolPeerDefault = cfg.protocolPeerDefault.ToResourceLimitsWithDefault(defaults.protocolPeerDefault) - out.ProtocolPeer = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.protocolPeer, defaults.protocolPeer, defaults.protocolPeerDefault) - - out.PeerDefault = cfg.peerDefault.ToResourceLimitsWithDefault(defaults.peerDefault) - out.Peer = resourceLimitsMapFromBaseLimitMapWithDefaults(cfg.peer, defaults.peer, defaults.peerDefault) - - out.Conn = cfg.conn.ToResourceLimitsWithDefault(defaults.conn) - out.Stream = cfg.stream.ToResourceLimitsWithDefault(defaults.stream) - - return out -} - func resourceLimitsMapFromBaseLimitMap[K comparable](baseLimitMap map[K]BaseLimit) map[K]ResourceLimits { if baseLimitMap == nil { return nil diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index a719f845d4..15700ab46e 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -206,8 +206,8 @@ func TestRoundTripFromConcreteAndBack(t *testing.T) { concrete := l.Build(InfiniteLimits) // Roundtrip - fromConcrete := concrete.ToLimitConfigWithDefaults(InfiniteLimits) - require.Equal(t, l, fromConcrete) + fromConcrete := concrete.ToLimitConfig().Build(InfiniteLimits) + require.Equal(t, concrete, fromConcrete) } func TestSerializeJSON(t *testing.T) { From e7a41cdf60736d0d1f5452764dac34225b76bce0 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 7 Feb 2023 08:38:36 -0800 Subject: [PATCH 18/18] Rename to ToPartialLimitConfig --- p2p/host/resource-manager/limit_config_test.go | 6 +++--- p2p/host/resource-manager/limit_defaults.go | 6 +++--- p2p/host/resource-manager/limit_test.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/p2p/host/resource-manager/limit_config_test.go b/p2p/host/resource-manager/limit_config_test.go index 546f9779d8..d1c5a81619 100644 --- a/p2p/host/resource-manager/limit_config_test.go +++ b/p2p/host/resource-manager/limit_config_test.go @@ -94,12 +94,12 @@ func TestLimitConfigParser(t *testing.T) { require.Equal(t, int64(4097), cfg.peer[peerID].Memory) // Roundtrip - limitConfig := cfg.ToLimitConfig() + limitConfig := cfg.ToPartialLimitConfig() jsonBytes, err := json.Marshal(&limitConfig) require.NoError(t, err) cfgAfterRoundTrip, err := readLimiterConfigFromJSON(bytes.NewReader(jsonBytes), defaults) require.NoError(t, err) - require.Equal(t, limitConfig, cfgAfterRoundTrip.ToLimitConfig()) + require.Equal(t, limitConfig, cfgAfterRoundTrip.ToPartialLimitConfig()) } func TestLimitConfigRoundTrip(t *testing.T) { @@ -115,7 +115,7 @@ func TestLimitConfigRoundTrip(t *testing.T) { require.NoError(t, err) // Roundtrip - limitConfig := concreteCfg.ToLimitConfig() + limitConfig := concreteCfg.ToPartialLimitConfig() // Using InfiniteLimits because it's different then the defaults used above. // If anything was marked "default" in the round trip, it would show up as a // difference here. diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index db242af7d9..e7489c45d1 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -553,9 +553,9 @@ func resourceLimitsMapFromBaseLimitMap[K comparable](baseLimitMap map[K]BaseLimi return out } -// ToLimitConfig converts a ConcreteLimitConfig to a PartialLimitConfig. The returned -// PartialLimitConfig will have no default values. -func (cfg ConcreteLimitConfig) ToLimitConfig() PartialLimitConfig { +// ToPartialLimitConfig converts a ConcreteLimitConfig to a PartialLimitConfig. +// The returned PartialLimitConfig will have no default values. +func (cfg ConcreteLimitConfig) ToPartialLimitConfig() PartialLimitConfig { return PartialLimitConfig{ System: cfg.system.ToResourceLimits(), Transient: cfg.transient.ToResourceLimits(), diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index 15700ab46e..68d6bf29cd 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -206,7 +206,7 @@ func TestRoundTripFromConcreteAndBack(t *testing.T) { concrete := l.Build(InfiniteLimits) // Roundtrip - fromConcrete := concrete.ToLimitConfig().Build(InfiniteLimits) + fromConcrete := concrete.ToPartialLimitConfig().Build(InfiniteLimits) require.Equal(t, concrete, fromConcrete) }