From ec47db0a35a75196ff8c43e8eb1a2f476b385512 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Wed, 4 Oct 2023 14:07:35 +0400 Subject: [PATCH] neofs: Preallocate buffers to object uploads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit closes #833 goos: linux goarch: amd64 pkg: github.com/nspcc-dev/neofs-s3-gw/internal/neofs cpu: Intel(R) Core(TM) i7-10875H CPU @ 2.30GHz │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ /object_upload_128-16 13.82m ± 10% 13.86m ± 8% ~ (p=0.631 n=10) /object_upload_256-16 18.84m ± 2% 19.46m ± 21% ~ (p=0.105 n=10) /object_upload_384-16 23.99m ± 2% 26.70m ± 10% +11.28% (p=0.003 n=10) /object_upload_512-16 29.31m ± 2% 30.09m ± 7% ~ (p=0.631 n=10) geomean 20.69m 21.58m +4.30% │ old.txt │ new.txt │ │ B/op │ B/op vs base │ /object_upload_128-16 65869.5Ki ± 0% 326.1Ki ± 0% -99.50% (p=0.000 n=10) /object_upload_256-16 66125.6Ki ± 0% 582.3Ki ± 0% -99.12% (p=0.000 n=10) /object_upload_384-16 66381.8Ki ± 0% 838.5Ki ± 0% -98.74% (p=0.000 n=10) /object_upload_512-16 65.076Mi ± 0% 1.069Mi ± 0% -98.36% (p=0.000 n=10) geomean 64.70Mi 646.1Ki -99.02% │ old.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ /object_upload_128-16 883.0 ± 0% 864.0 ± 0% -2.15% (p=0.000 n=10) /object_upload_256-16 885.0 ± 0% 866.0 ± 0% -2.15% (p=0.000 n=10) /object_upload_384-16 889.0 ± 0% 872.5 ± 1% -1.86% (p=0.001 n=10) /object_upload_512-16 895.0 ± 0% 875.0 ± 0% -2.23% (p=0.000 n=10) geomean 888.0 869.4 -2.10% Signed-off-by: Evgenii Baidakov --- cmd/s3-authmate/main.go | 7 ++- cmd/s3-gw/app.go | 12 +++- cmd/s3-gw/app_settings.go | 5 ++ config/config.yaml | 3 + internal/neofs/neofs.go | 20 +++++-- internal/neofs/neofs_test.go | 109 +++++++++++++++++++++++++++++++++++ 6 files changed, 146 insertions(+), 10 deletions(-) diff --git a/cmd/s3-authmate/main.go b/cmd/s3-authmate/main.go index de960212..ff3ce757 100644 --- a/cmd/s3-authmate/main.go +++ b/cmd/s3-authmate/main.go @@ -733,9 +733,10 @@ func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigne } neofsCfg := neofs.Config{ - MaxObjectSize: int64(ni.MaxObjectSize()), - IsSlicerEnabled: isSlicerEnabled, - IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + MaxObjectSize: int64(ni.MaxObjectSize()), + IsSlicerEnabled: isSlicerEnabled, + IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + UploadChunkPoolLength: 1, } neoFS := neofs.NewNeoFS(p, signer, anonSigner, neofsCfg, ni) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index c86a24b3..f00fb25d 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -106,10 +106,16 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { log.logger.Fatal("newApp: networkInfo", zap.Error(err)) } + poolSize := v.GetInt(cfgUploadChunkPoolLength) + if poolSize == 0 { + poolSize = defaultUploadChunkPoolLength + } + neofsCfg := neofs.Config{ - MaxObjectSize: int64(ni.MaxObjectSize()), - IsSlicerEnabled: v.GetBool(cfgSlicerEnabled), - IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + MaxObjectSize: int64(ni.MaxObjectSize()), + IsSlicerEnabled: v.GetBool(cfgSlicerEnabled), + IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + UploadChunkPoolLength: poolSize, } // If slicer is disabled, we should use "static" getter, which doesn't make periodic requests to the NeoFS. diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 53a2dd9f..77698815 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -28,6 +28,8 @@ const ( defaultMaxClientsCount = 100 defaultMaxClientsDeadline = time.Second * 30 + + defaultUploadChunkPoolLength = 64 ) const ( // Settings. @@ -124,6 +126,9 @@ const ( // Settings. // Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true. cfgEpochUpdateInterval = "neofs.epoch_update_interval" + // Pool size for upload object chunks. They are used to upload objects payload. + cfgUploadChunkPoolLength = "neofs.upload_chunk_pool_length" + // List of allowed AccessKeyID prefixes. cfgAllowedAccessKeyIDPrefixes = "allowed_access_key_id_prefixes" diff --git a/config/config.yaml b/config/config.yaml index e2972b9d..2fb0341b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -138,6 +138,9 @@ neofs: set_copies_number: 0 # Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true. epoch_update_interval: 2m + # Pool size for upload object chunks. They are used to upload objects payload. + # Total preallocated memory = upload_chunk_pool_length*MaxObjectSize (from NeoFS network) + upload_chunk_pool_length: 128 # List of allowed AccessKeyID prefixes # If the parameter is omitted, S3 GW will accept all AccessKeyIDs diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 106aa60e..127f4324 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -32,9 +32,10 @@ import ( // Config allows to configure some [NeoFS] parameters. type Config struct { - MaxObjectSize int64 - IsSlicerEnabled bool - IsHomomorphicEnabled bool + MaxObjectSize int64 + IsSlicerEnabled bool + IsHomomorphicEnabled bool + UploadChunkPoolLength int } // NeoFS represents virtual connection to the NeoFS network. @@ -46,16 +47,23 @@ type NeoFS struct { anonSigner user.Signer cfg Config epochGetter EpochGetter + buffers chan []byte } // NewNeoFS creates new NeoFS using provided pool.Pool. func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config, epochGetter EpochGetter) *NeoFS { + buffers := make(chan []byte, cfg.UploadChunkPoolLength) + for i := 0; i < cfg.UploadChunkPoolLength; i++ { + buffers <- make([]byte, cfg.MaxObjectSize) + } + return &NeoFS{ pool: p, gateSigner: signer, anonSigner: anonSigner, cfg: cfg, epochGetter: epochGetter, + buffers: buffers, } } @@ -316,7 +324,11 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi return oid.ID{}, fmt.Errorf("save object via connection pool: %w", err) } - chunk := make([]byte, x.cfg.MaxObjectSize) + chunk := <-x.buffers + defer func() { + x.buffers <- chunk + }() + _, err = io.CopyBuffer(writer, prm.Payload, chunk) if err != nil { return oid.ID{}, fmt.Errorf("read payload chunk: %w", err) diff --git a/internal/neofs/neofs_test.go b/internal/neofs/neofs_test.go index f83b1880..7a8f63e0 100644 --- a/internal/neofs/neofs_test.go +++ b/internal/neofs/neofs_test.go @@ -1,11 +1,25 @@ package neofs import ( + "bytes" + "context" + "crypto/rand" "fmt" + "strconv" "testing" + "time" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neofs-s3-gw/api/layer" + "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "github.com/nspcc-dev/neofs-sdk-go/container" + "github.com/nspcc-dev/neofs-sdk-go/container/acl" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/pool" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/neofs-sdk-go/waiter" "github.com/stretchr/testify/require" ) @@ -23,3 +37,98 @@ func TestErrorChecking(t *testing.T) { require.ErrorIs(t, wrappedError, layer.ErrAccessDenied) require.Contains(t, wrappedError.Error(), reason) } + +func Benchmark(b *testing.B) { + ctx := context.Background() + + pk, err := keys.NewPrivateKey() + require.NoError(b, err) + signer := user.NewAutoIDSignerRFC6979(pk.PrivateKey) + + anonPk, err := keys.NewPrivateKey() + require.NoError(b, err) + anonSigner := user.NewAutoIDSignerRFC6979(anonPk.PrivateKey) + + var prm pool.InitParameters + prm.SetSigner(signer) + prm.AddNode(pool.NewNodeParam(1, "localhost:8080", 1)) + + p, err := pool.NewPool(prm) + require.NoError(b, err) + + require.NoError(b, p.Dial(ctx)) + + ni, err := p.NetworkInfo(ctx, client.PrmNetworkInfo{}) + require.NoError(b, err) + + neofsCfg := Config{ + MaxObjectSize: int64(ni.MaxObjectSize()), + IsSlicerEnabled: false, + IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + UploadChunkPoolLength: 64, + } + + neo := NewNeoFS(p, signer, anonSigner, neofsCfg, ni) + + var createParams layer.PrmObjectCreate + createParams.Creator = signer.UserID() + + for i := 128; i <= 512; i += 128 { + b.Run("object upload "+strconv.Itoa(i), func(b *testing.B) { + b.StopTimer() + payload := make([]byte, i*1024) + _, err = rand.Read(payload) + require.NoError(b, err) + + id, err := createContainer(ctx, signer, p) + require.NoError(b, err) + createParams.Container = id + + defer func() { + _ = deleteContainer(ctx, id, signer, p) + }() + + b.ReportAllocs() + b.ResetTimer() + b.StartTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + createParams.Payload = bytes.NewReader(payload) + createParams.CreationTime = time.Now() + b.StartTimer() + + _, err = neo.CreateObject(ctx, createParams) + b.StopTimer() + require.NoError(b, err) + b.StartTimer() + } + }) + } +} + +func createContainer(ctx context.Context, signer user.Signer, p *pool.Pool) (cid.ID, error) { + var cnr container.Container + cnr.Init() + cnr.SetOwner(signer.UserID()) + + var rd netmap.ReplicaDescriptor + rd.SetNumberOfObjects(1) + + var pp netmap.PlacementPolicy + pp.SetContainerBackupFactor(1) + pp.AddReplicas(rd) + + cnr.SetPlacementPolicy(pp) + cnr.SetBasicACL(acl.PublicRW) + + var prm client.PrmContainerPut + + w := waiter.NewContainerPutWaiter(p, waiter.DefaultPollInterval) + return w.ContainerPut(ctx, cnr, signer, prm) +} + +func deleteContainer(ctx context.Context, id cid.ID, signer user.Signer, p *pool.Pool) error { + var prm client.PrmContainerDelete + return p.ContainerDelete(ctx, id, signer, prm) +}