Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix replication service denial after MaxObjectSize increase #2911

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions cmd/neofs-node/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,30 @@
lastUpdated time.Time
lastSize uint64
src putsvc.MaxSizeSource
onChange func(uint64)
}

func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource {
func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource, onChange func(uint64)) *ttlMaxObjectSizeCache {

Check warning on line 452 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L452

Added line #L452 was not covered by tests
return &ttlMaxObjectSizeCache{
src: src,
src: src,
onChange: onChange,

Check warning on line 455 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L454-L455

Added lines #L454 - L455 were not covered by tests
}
}

func (c *ttlMaxObjectSizeCache) updateLastSize(sz uint64) {
if c.lastSize != sz {
c.onChange(sz)

Check warning on line 461 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L459-L461

Added lines #L459 - L461 were not covered by tests
}
c.lastSize = sz
c.lastUpdated = time.Now()

Check warning on line 464 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L463-L464

Added lines #L463 - L464 were not covered by tests
}

func (c *ttlMaxObjectSizeCache) handleNewMaxObjectPayloadSize(sz uint64) {
c.mtx.Lock()
c.updateLastSize(sz)
c.mtx.Unlock()

Check warning on line 470 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L467-L470

Added lines #L467 - L470 were not covered by tests
}

func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 {
const ttl = time.Second * 30

Expand All @@ -469,9 +485,9 @@
c.mtx.Lock()
size = c.lastSize
if !c.lastUpdated.After(prevUpdated) {
size = c.src.MaxObjectSize()
c.lastSize = size
c.lastUpdated = time.Now()
newSize := c.src.MaxObjectSize()
c.updateLastSize(newSize)
size = newSize

Check warning on line 490 in cmd/neofs-node/cache.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/cache.go#L488-L490

Added lines #L488 - L490 were not covered by tests
}
c.mtx.Unlock()

Expand Down
2 changes: 2 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ type cfgGRPC struct {
maxChunkSize uint64

maxAddrAmount uint64

maxRecvMsgSize atomic.Value // int
}

type cfgMorph struct {
Expand Down
80 changes: 54 additions & 26 deletions cmd/neofs-node/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,27 @@
maxObjSize, err := c.nCli.MaxObjectSize()
fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err)

maxRecvSize := maxObjSize
// don't forget about meta fields
if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice
maxRecvSize += object.MaxHeaderLen
} else {
maxRecvSize = math.MaxUint64
}

var maxRecvMsgSizeOpt grpc.ServerOption
if maxRecvSize > maxMsgSize { // do not decrease default value
if maxRecvSize > math.MaxInt {
// ^2GB for 32-bit systems which is currently enough in practice. If at some
// point this is not enough, we'll need to expand the option
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d",
maxRecvSize, math.MaxInt))
}
maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize))
c.log.Debug("limit max recv gRPC message size to fit max stored objects",
zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize))
maxRecvSize, overflowed := calculateMaxReplicationRequestSize(maxObjSize)
if maxRecvSize < 0 {

Check warning on line 32 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L31-L32

Added lines #L31 - L32 were not covered by tests
// ^2GB for 32-bit systems which is currently enough in practice. If at some
// point this is not enough, we'll need to expand the option
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d",
overflowed, math.MaxInt))

Check warning on line 36 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L35-L36

Added lines #L35 - L36 were not covered by tests
}
c.cfgGRPC.maxRecvMsgSize.Store(maxRecvSize)

Check warning on line 38 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L38

Added line #L38 was not covered by tests
// TODO(@cthulhu-rider): the setting can be server-global only now, support
// per-RPC limits
maxRecvMsgSizeOpt := grpc.MaxRecvMsgSizeFunc(func() int {
return c.cfgGRPC.maxRecvMsgSize.Load().(int) // initialized above, so safe
})
c.log.Info("limit max recv gRPC message size to fit max stored objects",
zap.Uint64("max object size", maxObjSize), zap.Int("max recv msg", maxRecvSize))

Check warning on line 45 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L41-L45

Added lines #L41 - L45 were not covered by tests

var successCount int
grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) {
serverOpts := []grpc.ServerOption{
grpc.MaxSendMsgSize(maxMsgSize),
}
if maxRecvMsgSizeOpt != nil {
// TODO(@cthulhu-rider): the setting can be server-global only now, support
// per-RPC limits
// TODO(@cthulhu-rider): max object size setting may change in general,
// but server configuration is static now
serverOpts = append(serverOpts, maxRecvMsgSizeOpt)
maxRecvMsgSizeOpt,

Check warning on line 51 in cmd/neofs-node/grpc.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/grpc.go#L51

Added line #L51 was not covered by tests
}

tlsCfg := sc.TLS()
Expand Down Expand Up @@ -153,3 +142,42 @@

l.Info("gRPC server stopped successfully")
}

// calculates approximation for max size of the ObjectService.Replicate request
// with given object payload limit. Second value is returned when calculation
// result overflows int type. In this case, first return is negative.
func calculateMaxReplicationRequestSize(maxObjPayloadSize uint64) (int, uint64) {
res := maxObjPayloadSize
// don't forget about meta fields: object header + other ObjectService.Replicate
// request fields. For the latter, less is needed now, but it is still better to
// take with a reserve for potential protocol extensions. Anyway, 1 KB is
// nothing IRL.
const maxMetadataSize = object.MaxHeaderLen + 1<<10
if res < uint64(math.MaxUint64-maxMetadataSize) { // just in case, always true in practice
res += maxMetadataSize
} else {
res = math.MaxUint64
}
if res > math.MaxInt {
return -1, res
}
if res < maxMsgSize { // do not decrease default value
return maxMsgSize, 0
}
return int(res), 0
}

func (c *cfg) handleNewMaxObjectPayloadSize(maxObjPayloadSize uint64) {
maxRecvSize, overflowed := calculateMaxReplicationRequestSize(maxObjPayloadSize)
if maxRecvSize < 0 {
// unlike a startup, we don't want to stop a running service. Moreover, this is
// just a limit: even if it has become incredibly large, most data is expected
// to be of smaller degrees
c.log.Info("max gRPC recv msg size re-calculated for new max object payload size overflows int type, fallback to max int",
zap.Uint64("calculated limit", overflowed))
maxRecvSize = math.MaxInt
}
c.cfgGRPC.maxRecvMsgSize.Store(maxRecvSize)
c.log.Info("updated max gRPC recv msg size limit after max object payload size has been changed",
zap.Uint64("new object limit", maxObjPayloadSize), zap.Int("new gRPC limit", maxRecvSize))
}
50 changes: 50 additions & 0 deletions cmd/neofs-node/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"math"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func Test_calculateMaxReplicationRequestSize(t *testing.T) {
limit, _ := calculateMaxReplicationRequestSize(64 << 20)
require.EqualValues(t, 67126272, limit)
t.Run("int overflow", func(t *testing.T) {
limit, overflow := calculateMaxReplicationRequestSize(math.MaxInt - 17<<10 + 1)
require.Negative(t, limit)
require.EqualValues(t, uint64(math.MaxInt)+1, overflow)
})
t.Run("uint64 overflow", func(t *testing.T) {
limit, overflow := calculateMaxReplicationRequestSize(math.MaxUint64 - 17<<10 + 1)
require.Negative(t, limit)
require.EqualValues(t, uint64(math.MaxUint64), overflow)
})
t.Run("smaller than gRPC default", func(t *testing.T) {
limit, _ := calculateMaxReplicationRequestSize(0)
require.EqualValues(t, 4<<20, limit)
limit, _ = calculateMaxReplicationRequestSize(4<<20 - 17<<10 - 1)
require.EqualValues(t, 4<<20, limit)
})
}

func Test_cfg_handleNewMaxObjectPayloadSize(t *testing.T) {
var c cfg
c.log = zap.NewNop()
c.cfgGRPC.maxRecvMsgSize.Store(0) // any

c.handleNewMaxObjectPayloadSize(100 << 20)
require.EqualValues(t, 100<<20+17<<10, c.cfgGRPC.maxRecvMsgSize.Load())
c.handleNewMaxObjectPayloadSize(64 << 20)
require.EqualValues(t, 64<<20+17<<10, c.cfgGRPC.maxRecvMsgSize.Load())
// int overflow
c.handleNewMaxObjectPayloadSize(math.MaxInt - 17<<10 + 1)
require.EqualValues(t, math.MaxInt, c.cfgGRPC.maxRecvMsgSize.Load())
// uint64 overflow
c.handleNewMaxObjectPayloadSize(math.MaxUint64 - 17<<10 + 1)
require.EqualValues(t, math.MaxInt, c.cfgGRPC.maxRecvMsgSize.Load())
// smaller than gRPC default
c.handleNewMaxObjectPayloadSize(4<<20 - 17<<10 - 1)
require.EqualValues(t, 4<<20, c.cfgGRPC.maxRecvMsgSize.Load())
}
25 changes: 25 additions & 0 deletions cmd/neofs-node/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import (
"bytes"
"context"
"errors"
"fmt"
"sync/atomic"
"time"

netmapGRPC "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
Expand Down Expand Up @@ -445,3 +447,26 @@

return &ni, nil
}

func listenMaxObjectPayloadSizeChanges(ctx context.Context, cli *nmClient.Client, lg *zap.Logger, f func(uint64)) {

Check warning on line 451 in cmd/neofs-node/netmap.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/netmap.go#L451

Added line #L451 was not covered by tests
// config rarely changes, but when it does - we do not want to wait long.
// Notification events would help https://github.com/nspcc-dev/neofs-contract/issues/427
const pollInterval = time.Minute
t := time.NewTimer(pollInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
lg.Debug("stop max object payload size net config poller by context", zap.Error(ctx.Err()))
return
case <-t.C:
lg.Info("rereading max object payload size net config by timer", zap.Duration("interval", pollInterval))
if sz, err := cli.MaxObjectSize(); err == nil {
f(sz)
} else {
lg.Error("failed to read max object payload size net config", zap.Error(err))

Check warning on line 467 in cmd/neofs-node/netmap.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/netmap.go#L454-L467

Added lines #L454 - L467 were not covered by tests
}
t.Reset(pollInterval)

Check warning on line 469 in cmd/neofs-node/netmap.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/netmap.go#L469

Added line #L469 was not covered by tests
}
}
}
5 changes: 4 additions & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,13 @@
searchsvcV2.WithKeyStorage(keyStorage),
)

cachedMaxObjPayloadSizeSrc := newCachedMaxObjectSizeSource(c, c.handleNewMaxObjectPayloadSize)
go listenMaxObjectPayloadSizeChanges(c.ctx, c.nCli, c.log, cachedMaxObjPayloadSizeSrc.handleNewMaxObjectPayloadSize)

Check warning on line 260 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L259-L260

Added lines #L259 - L260 were not covered by tests

sPut := putsvc.NewService(&transport{clients: putConstructor},
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(putConstructor),
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
putsvc.WithMaxSizeSource(cachedMaxObjPayloadSizeSrc),

Check warning on line 265 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L265

Added line #L265 was not covered by tests
putsvc.WithObjectStorage(storageEngine{engine: ls}),
putsvc.WithContainerSource(c.cfgObject.cnrSource),
putsvc.WithNetworkMapSource(c.netMapSource),
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,5 @@ retract (
v1.22.1 // Contains retraction only.
v1.22.0 // Published accidentally.
)

replace google.golang.org/grpc => github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb/go.mod h
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657 h1:khwEiSUz2Pi0dPqIPittz466RG/gFyzvTHrZHDiTcWg=
github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -273,8 +275,6 @@ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3j
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY=
google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk=
google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
Loading