Skip to content

Commit

Permalink
neofs: Preallocate buffers to object uploads
Browse files Browse the repository at this point in the history
closes #833

goos: linux
goarch: amd64
pkg: github.com/nspcc-dev/neofs-s3-gw/internal/neofs
cpu: Intel(R) Core(TM) i7-10875H CPU @ 2.30GHz
                      │   old.txt    │               new.txt                │
                      │    sec/op    │    sec/op     vs base                │
/object_upload_128-16   13.82m ± 10%   13.86m ±  8%        ~ (p=0.631 n=10)
/object_upload_256-16   18.84m ±  2%   19.46m ± 21%        ~ (p=0.105 n=10)
/object_upload_384-16   23.99m ±  2%   26.70m ± 10%  +11.28% (p=0.003 n=10)
/object_upload_512-16   29.31m ±  2%   30.09m ±  7%        ~ (p=0.631 n=10)
geomean                 20.69m         21.58m         +4.30%

                      │    old.txt     │               new.txt                │
                      │      B/op      │     B/op      vs base                │
/object_upload_128-16   65869.5Ki ± 0%   326.1Ki ± 0%  -99.50% (p=0.000 n=10)
/object_upload_256-16   66125.6Ki ± 0%   582.3Ki ± 0%  -99.12% (p=0.000 n=10)
/object_upload_384-16   66381.8Ki ± 0%   838.5Ki ± 0%  -98.74% (p=0.000 n=10)
/object_upload_512-16    65.076Mi ± 0%   1.069Mi ± 0%  -98.36% (p=0.000 n=10)
geomean                   64.70Mi        646.1Ki       -99.02%

                      │  old.txt   │              new.txt              │
                      │ allocs/op  │ allocs/op   vs base               │
/object_upload_128-16   883.0 ± 0%   864.0 ± 0%  -2.15% (p=0.000 n=10)
/object_upload_256-16   885.0 ± 0%   866.0 ± 0%  -2.15% (p=0.000 n=10)
/object_upload_384-16   889.0 ± 0%   872.5 ± 1%  -1.86% (p=0.001 n=10)
/object_upload_512-16   895.0 ± 0%   875.0 ± 0%  -2.23% (p=0.000 n=10)
geomean                 888.0        869.4       -2.10%

Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Oct 4, 2023
1 parent aa37177 commit ec47db0
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 10 deletions.
7 changes: 4 additions & 3 deletions cmd/s3-authmate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,10 @@ func createNeoFS(ctx context.Context, log *zap.Logger, cfg PoolConfig, anonSigne
}

neofsCfg := neofs.Config{
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: isSlicerEnabled,
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: isSlicerEnabled,
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
UploadChunkPoolLength: 1,
}

neoFS := neofs.NewNeoFS(p, signer, anonSigner, neofsCfg, ni)
Expand Down
12 changes: 9 additions & 3 deletions cmd/s3-gw/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,16 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
log.logger.Fatal("newApp: networkInfo", zap.Error(err))
}

poolSize := v.GetInt(cfgUploadChunkPoolLength)
if poolSize == 0 {
poolSize = defaultUploadChunkPoolLength
}

neofsCfg := neofs.Config{
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: v.GetBool(cfgSlicerEnabled),
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: v.GetBool(cfgSlicerEnabled),
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
UploadChunkPoolLength: poolSize,
}

// If slicer is disabled, we should use "static" getter, which doesn't make periodic requests to the NeoFS.
Expand Down
5 changes: 5 additions & 0 deletions cmd/s3-gw/app_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (

defaultMaxClientsCount = 100
defaultMaxClientsDeadline = time.Second * 30

defaultUploadChunkPoolLength = 64
)

const ( // Settings.
Expand Down Expand Up @@ -124,6 +126,9 @@ const ( // Settings.
// Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true.
cfgEpochUpdateInterval = "neofs.epoch_update_interval"

// Pool size for upload object chunks. They are used to upload objects payload.
cfgUploadChunkPoolLength = "neofs.upload_chunk_pool_length"

// List of allowed AccessKeyID prefixes.
cfgAllowedAccessKeyIDPrefixes = "allowed_access_key_id_prefixes"

Expand Down
3 changes: 3 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ neofs:
set_copies_number: 0
# Timeout between retrieving actual epoch from NeoFS. Actual only if slicer.enabled = true.
epoch_update_interval: 2m
# Pool size for upload object chunks. They are used to upload objects payload.
# Total preallocated memory = upload_chunk_pool_length*MaxObjectSize (from NeoFS network)
upload_chunk_pool_length: 128

# List of allowed AccessKeyID prefixes
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs
Expand Down
20 changes: 16 additions & 4 deletions internal/neofs/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ import (

// Config allows to configure some [NeoFS] parameters.
type Config struct {
MaxObjectSize int64
IsSlicerEnabled bool
IsHomomorphicEnabled bool
MaxObjectSize int64
IsSlicerEnabled bool
IsHomomorphicEnabled bool
UploadChunkPoolLength int
}

// NeoFS represents virtual connection to the NeoFS network.
Expand All @@ -46,16 +47,23 @@ type NeoFS struct {
anonSigner user.Signer
cfg Config
epochGetter EpochGetter
buffers chan []byte
}

// NewNeoFS creates new NeoFS using provided pool.Pool.
func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config, epochGetter EpochGetter) *NeoFS {
buffers := make(chan []byte, cfg.UploadChunkPoolLength)
for i := 0; i < cfg.UploadChunkPoolLength; i++ {
buffers <- make([]byte, cfg.MaxObjectSize)
}

Check warning on line 58 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L54-L58

Added lines #L54 - L58 were not covered by tests

return &NeoFS{
pool: p,
gateSigner: signer,
anonSigner: anonSigner,
cfg: cfg,
epochGetter: epochGetter,
buffers: buffers,

Check warning on line 66 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L61-L66

Added lines #L61 - L66 were not covered by tests
}
}

Expand Down Expand Up @@ -316,7 +324,11 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi
return oid.ID{}, fmt.Errorf("save object via connection pool: %w", err)
}

chunk := make([]byte, x.cfg.MaxObjectSize)
chunk := <-x.buffers
defer func() {
x.buffers <- chunk
}()

Check warning on line 330 in internal/neofs/neofs.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/neofs.go#L327-L330

Added lines #L327 - L330 were not covered by tests

_, err = io.CopyBuffer(writer, prm.Payload, chunk)
if err != nil {
return oid.ID{}, fmt.Errorf("read payload chunk: %w", err)
Expand Down
109 changes: 109 additions & 0 deletions internal/neofs/neofs_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
package neofs

import (
"bytes"
"context"
"crypto/rand"
"fmt"
"strconv"
"testing"
"time"

"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/nspcc-dev/neofs-sdk-go/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/container"
"github.com/nspcc-dev/neofs-sdk-go/container/acl"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/waiter"
"github.com/stretchr/testify/require"
)

Expand All @@ -23,3 +37,98 @@ func TestErrorChecking(t *testing.T) {
require.ErrorIs(t, wrappedError, layer.ErrAccessDenied)
require.Contains(t, wrappedError.Error(), reason)
}

func Benchmark(b *testing.B) {
ctx := context.Background()

pk, err := keys.NewPrivateKey()
require.NoError(b, err)
signer := user.NewAutoIDSignerRFC6979(pk.PrivateKey)

anonPk, err := keys.NewPrivateKey()
require.NoError(b, err)
anonSigner := user.NewAutoIDSignerRFC6979(anonPk.PrivateKey)

var prm pool.InitParameters
prm.SetSigner(signer)
prm.AddNode(pool.NewNodeParam(1, "localhost:8080", 1))

p, err := pool.NewPool(prm)
require.NoError(b, err)

require.NoError(b, p.Dial(ctx))

ni, err := p.NetworkInfo(ctx, client.PrmNetworkInfo{})
require.NoError(b, err)

neofsCfg := Config{
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: false,
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
UploadChunkPoolLength: 64,
}

neo := NewNeoFS(p, signer, anonSigner, neofsCfg, ni)

var createParams layer.PrmObjectCreate
createParams.Creator = signer.UserID()

for i := 128; i <= 512; i += 128 {
b.Run("object upload "+strconv.Itoa(i), func(b *testing.B) {
b.StopTimer()
payload := make([]byte, i*1024)
_, err = rand.Read(payload)
require.NoError(b, err)

id, err := createContainer(ctx, signer, p)
require.NoError(b, err)
createParams.Container = id

defer func() {
_ = deleteContainer(ctx, id, signer, p)
}()

b.ReportAllocs()
b.ResetTimer()
b.StartTimer()

for i := 0; i < b.N; i++ {
b.StopTimer()
createParams.Payload = bytes.NewReader(payload)
createParams.CreationTime = time.Now()
b.StartTimer()

_, err = neo.CreateObject(ctx, createParams)
b.StopTimer()
require.NoError(b, err)
b.StartTimer()
}
})
}
}

func createContainer(ctx context.Context, signer user.Signer, p *pool.Pool) (cid.ID, error) {
var cnr container.Container
cnr.Init()
cnr.SetOwner(signer.UserID())

var rd netmap.ReplicaDescriptor
rd.SetNumberOfObjects(1)

var pp netmap.PlacementPolicy
pp.SetContainerBackupFactor(1)
pp.AddReplicas(rd)

cnr.SetPlacementPolicy(pp)
cnr.SetBasicACL(acl.PublicRW)

var prm client.PrmContainerPut

w := waiter.NewContainerPutWaiter(p, waiter.DefaultPollInterval)
return w.ContainerPut(ctx, cnr, signer, prm)
}

func deleteContainer(ctx context.Context, id cid.ID, signer user.Signer, p *pool.Pool) error {
var prm client.PrmContainerDelete
return p.ContainerDelete(ctx, id, signer, prm)
}

0 comments on commit ec47db0

Please sign in to comment.