From 8d0ac07406bda8bbf5e66b7d544c2d83353e11e8 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 8 Aug 2024 12:03:07 +0400 Subject: [PATCH 1/2] node/grpc: Fix calculation of max recv message size from object limit Since storage node serves `ObjectService.Replicate` RPC, the gRPC server must be able to accept the biggest allowed object. Previously, node set max size of received gRPC messages to sum of payload (`MaxObjectSize` network setting) and header (16K at the moment) size limits. This was not entirely correct because replication request message also contains non-object fields (only signature for now) and protobuf service bytes. Thus, when the size of an object approached the maximum allowed, it was possible to overflow the calculated limit and receive service refuse. This adds 1 KB to the calculated limit. This is a heuristic value that is larger than the current protocol version's query extra data, while supporting small extensions in advance. The exact value is meaningless given the smallness volume degrees. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/grpc.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index c40cdf7938..3c9aab55e1 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -29,9 +29,13 @@ func initGRPC(c *cfg) { fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err) maxRecvSize := maxObjSize - // don't forget about meta fields - if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice - maxRecvSize += object.MaxHeaderLen + // don't forget about meta fields: object header + other ObjectService.Replicate + // request fields. For the latter, less is needed now, but it is still better to + // take with a reserve for potential protocol extensions. Anyway, 1 KB is + // nothing IRL. + const maxMetadataSize = object.MaxHeaderLen + 1<<10 + if maxRecvSize < uint64(math.MaxUint64-maxMetadataSize) { // just in case, always true in practice + maxRecvSize += maxMetadataSize } else { maxRecvSize = math.MaxUint64 } From 19f39df2243dcecbf7bb3b657bad6841322ff565 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 8 Aug 2024 16:49:20 +0400 Subject: [PATCH 2/2] node/grpc: Change max recv msg size on `MaxObjectSize` setting update Since storage node serves `ObjectService.Replicate` RPC, the gRPC server must be able to accept the biggest allowed object. Previously, node calculated global message limit for the gRPC server once on startup. With this behavior, when network setting `MaxObjectSize` was increased, the node stopped accepting write objects larger than the previous limit. This manifested itself in a denial of replication service. From now storage node updates max received gRPC message size (if needed) on each refresh of the `MaxObjectSize` setting cache and via Netmap contract's polling done once per minute. Refs #2910. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/cache.go | 26 +++++++++--- cmd/neofs-node/config.go | 2 + cmd/neofs-node/grpc.go | 84 ++++++++++++++++++++++++------------- cmd/neofs-node/grpc_test.go | 50 ++++++++++++++++++++++ cmd/neofs-node/netmap.go | 25 +++++++++++ cmd/neofs-node/object.go | 5 ++- go.mod | 2 + go.sum | 4 +- 8 files changed, 160 insertions(+), 38 deletions(-) create mode 100644 cmd/neofs-node/grpc_test.go diff --git a/cmd/neofs-node/cache.go b/cmd/neofs-node/cache.go index e1d7271c45..58bfc1a423 100644 --- a/cmd/neofs-node/cache.go +++ b/cmd/neofs-node/cache.go @@ -446,14 +446,30 @@ type ttlMaxObjectSizeCache struct { lastUpdated time.Time lastSize uint64 src putsvc.MaxSizeSource + onChange func(uint64) } -func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource { +func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource, onChange func(uint64)) *ttlMaxObjectSizeCache { return &ttlMaxObjectSizeCache{ - src: src, + src: src, + onChange: onChange, } } +func (c *ttlMaxObjectSizeCache) updateLastSize(sz uint64) { + if c.lastSize != sz { + c.onChange(sz) + } + c.lastSize = sz + c.lastUpdated = time.Now() +} + +func (c *ttlMaxObjectSizeCache) handleNewMaxObjectPayloadSize(sz uint64) { + c.mtx.Lock() + c.updateLastSize(sz) + c.mtx.Unlock() +} + func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 { const ttl = time.Second * 30 @@ -469,9 +485,9 @@ func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 { c.mtx.Lock() size = c.lastSize if !c.lastUpdated.After(prevUpdated) { - size = c.src.MaxObjectSize() - c.lastSize = size - c.lastUpdated = time.Now() + newSize := c.src.MaxObjectSize() + c.updateLastSize(newSize) + size = newSize } c.mtx.Unlock() diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index e6de08ec90..1a6c3eac7b 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -424,6 +424,8 @@ type cfgGRPC struct { maxChunkSize uint64 maxAddrAmount uint64 + + maxRecvMsgSize atomic.Value // int } type cfgMorph struct { diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index 3c9aab55e1..e50982f861 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -28,42 +28,27 @@ func initGRPC(c *cfg) { maxObjSize, err := c.nCli.MaxObjectSize() fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err) - maxRecvSize := maxObjSize - // don't forget about meta fields: object header + other ObjectService.Replicate - // request fields. For the latter, less is needed now, but it is still better to - // take with a reserve for potential protocol extensions. Anyway, 1 KB is - // nothing IRL. - const maxMetadataSize = object.MaxHeaderLen + 1<<10 - if maxRecvSize < uint64(math.MaxUint64-maxMetadataSize) { // just in case, always true in practice - maxRecvSize += maxMetadataSize - } else { - maxRecvSize = math.MaxUint64 - } - - var maxRecvMsgSizeOpt grpc.ServerOption - if maxRecvSize > maxMsgSize { // do not decrease default value - if maxRecvSize > math.MaxInt { - // ^2GB for 32-bit systems which is currently enough in practice. If at some - // point this is not enough, we'll need to expand the option - fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d", - maxRecvSize, math.MaxInt)) - } - maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize)) - c.log.Debug("limit max recv gRPC message size to fit max stored objects", - zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize)) + maxRecvSize, overflowed := calculateMaxReplicationRequestSize(maxObjSize) + if maxRecvSize < 0 { + // ^2GB for 32-bit systems which is currently enough in practice. If at some + // point this is not enough, we'll need to expand the option + fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d", + overflowed, math.MaxInt)) } + c.cfgGRPC.maxRecvMsgSize.Store(maxRecvSize) + // TODO(@cthulhu-rider): the setting can be server-global only now, support + // per-RPC limits + maxRecvMsgSizeOpt := grpc.MaxRecvMsgSizeFunc(func() int { + return c.cfgGRPC.maxRecvMsgSize.Load().(int) // initialized above, so safe + }) + c.log.Info("limit max recv gRPC message size to fit max stored objects", + zap.Uint64("max object size", maxObjSize), zap.Int("max recv msg", maxRecvSize)) var successCount int grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) { serverOpts := []grpc.ServerOption{ grpc.MaxSendMsgSize(maxMsgSize), - } - if maxRecvMsgSizeOpt != nil { - // TODO(@cthulhu-rider): the setting can be server-global only now, support - // per-RPC limits - // TODO(@cthulhu-rider): max object size setting may change in general, - // but server configuration is static now - serverOpts = append(serverOpts, maxRecvMsgSizeOpt) + maxRecvMsgSizeOpt, } tlsCfg := sc.TLS() @@ -157,3 +142,42 @@ func stopGRPC(name string, s *grpc.Server, l *zap.Logger) { l.Info("gRPC server stopped successfully") } + +// calculates approximation for max size of the ObjectService.Replicate request +// with given object payload limit. Second value is returned when calculation +// result overflows int type. In this case, first return is negative. +func calculateMaxReplicationRequestSize(maxObjPayloadSize uint64) (int, uint64) { + res := maxObjPayloadSize + // don't forget about meta fields: object header + other ObjectService.Replicate + // request fields. For the latter, less is needed now, but it is still better to + // take with a reserve for potential protocol extensions. Anyway, 1 KB is + // nothing IRL. + const maxMetadataSize = object.MaxHeaderLen + 1<<10 + if res < uint64(math.MaxUint64-maxMetadataSize) { // just in case, always true in practice + res += maxMetadataSize + } else { + res = math.MaxUint64 + } + if res > math.MaxInt { + return -1, res + } + if res < maxMsgSize { // do not decrease default value + return maxMsgSize, 0 + } + return int(res), 0 +} + +func (c *cfg) handleNewMaxObjectPayloadSize(maxObjPayloadSize uint64) { + maxRecvSize, overflowed := calculateMaxReplicationRequestSize(maxObjPayloadSize) + if maxRecvSize < 0 { + // unlike a startup, we don't want to stop a running service. Moreover, this is + // just a limit: even if it has become incredibly large, most data is expected + // to be of smaller degrees + c.log.Info("max gRPC recv msg size re-calculated for new max object payload size overflows int type, fallback to max int", + zap.Uint64("calculated limit", overflowed)) + maxRecvSize = math.MaxInt + } + c.cfgGRPC.maxRecvMsgSize.Store(maxRecvSize) + c.log.Info("updated max gRPC recv msg size limit after max object payload size has been changed", + zap.Uint64("new object limit", maxObjPayloadSize), zap.Int("new gRPC limit", maxRecvSize)) +} diff --git a/cmd/neofs-node/grpc_test.go b/cmd/neofs-node/grpc_test.go new file mode 100644 index 0000000000..1715e80e60 --- /dev/null +++ b/cmd/neofs-node/grpc_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func Test_calculateMaxReplicationRequestSize(t *testing.T) { + limit, _ := calculateMaxReplicationRequestSize(64 << 20) + require.EqualValues(t, 67126272, limit) + t.Run("int overflow", func(t *testing.T) { + limit, overflow := calculateMaxReplicationRequestSize(math.MaxInt - 17<<10 + 1) + require.Negative(t, limit) + require.EqualValues(t, uint64(math.MaxInt)+1, overflow) + }) + t.Run("uint64 overflow", func(t *testing.T) { + limit, overflow := calculateMaxReplicationRequestSize(math.MaxUint64 - 17<<10 + 1) + require.Negative(t, limit) + require.EqualValues(t, uint64(math.MaxUint64), overflow) + }) + t.Run("smaller than gRPC default", func(t *testing.T) { + limit, _ := calculateMaxReplicationRequestSize(0) + require.EqualValues(t, 4<<20, limit) + limit, _ = calculateMaxReplicationRequestSize(4<<20 - 17<<10 - 1) + require.EqualValues(t, 4<<20, limit) + }) +} + +func Test_cfg_handleNewMaxObjectPayloadSize(t *testing.T) { + var c cfg + c.log = zap.NewNop() + c.cfgGRPC.maxRecvMsgSize.Store(0) // any + + c.handleNewMaxObjectPayloadSize(100 << 20) + require.EqualValues(t, 100<<20+17<<10, c.cfgGRPC.maxRecvMsgSize.Load()) + c.handleNewMaxObjectPayloadSize(64 << 20) + require.EqualValues(t, 64<<20+17<<10, c.cfgGRPC.maxRecvMsgSize.Load()) + // int overflow + c.handleNewMaxObjectPayloadSize(math.MaxInt - 17<<10 + 1) + require.EqualValues(t, math.MaxInt, c.cfgGRPC.maxRecvMsgSize.Load()) + // uint64 overflow + c.handleNewMaxObjectPayloadSize(math.MaxUint64 - 17<<10 + 1) + require.EqualValues(t, math.MaxInt, c.cfgGRPC.maxRecvMsgSize.Load()) + // smaller than gRPC default + c.handleNewMaxObjectPayloadSize(4<<20 - 17<<10 - 1) + require.EqualValues(t, 4<<20, c.cfgGRPC.maxRecvMsgSize.Load()) +} diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index 575b88e76f..2c1629f3cd 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -2,9 +2,11 @@ package main import ( "bytes" + "context" "errors" "fmt" "sync/atomic" + "time" netmapGRPC "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -445,3 +447,26 @@ func (n *netInfo) Dump(ver version.Version) (*netmapSDK.NetworkInfo, error) { return &ni, nil } + +func listenMaxObjectPayloadSizeChanges(ctx context.Context, cli *nmClient.Client, lg *zap.Logger, f func(uint64)) { + // config rarely changes, but when it does - we do not want to wait long. + // Notification events would help https://github.com/nspcc-dev/neofs-contract/issues/427 + const pollInterval = time.Minute + t := time.NewTimer(pollInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + lg.Debug("stop max object payload size net config poller by context", zap.Error(ctx.Err())) + return + case <-t.C: + lg.Info("rereading max object payload size net config by timer", zap.Duration("interval", pollInterval)) + if sz, err := cli.MaxObjectSize(); err == nil { + f(sz) + } else { + lg.Error("failed to read max object payload size net config", zap.Error(err)) + } + t.Reset(pollInterval) + } + } +} diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index fd77cae480..7850ef2109 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -256,10 +256,13 @@ func initObjectService(c *cfg) { searchsvcV2.WithKeyStorage(keyStorage), ) + cachedMaxObjPayloadSizeSrc := newCachedMaxObjectSizeSource(c, c.handleNewMaxObjectPayloadSize) + go listenMaxObjectPayloadSizeChanges(c.ctx, c.nCli, c.log, cachedMaxObjPayloadSizeSrc.handleNewMaxObjectPayloadSize) + sPut := putsvc.NewService(&transport{clients: putConstructor}, putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), - putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), + putsvc.WithMaxSizeSource(cachedMaxObjPayloadSizeSrc), putsvc.WithObjectStorage(storageEngine{engine: ls}), putsvc.WithContainerSource(c.cfgObject.cnrSource), putsvc.WithNetworkMapSource(c.netMapSource), diff --git a/go.mod b/go.mod index cac4449df0..2301833872 100644 --- a/go.mod +++ b/go.mod @@ -105,3 +105,5 @@ retract ( v1.22.1 // Contains retraction only. v1.22.0 // Published accidentally. ) + +replace google.golang.org/grpc => github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657 diff --git a/go.sum b/go.sum index efc571159c..52e0c3515b 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb/go.mod h github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657 h1:khwEiSUz2Pi0dPqIPittz466RG/gFyzvTHrZHDiTcWg= +github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -273,8 +275,6 @@ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3j google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= -google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk= -google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=