diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 106aa60e..a3ab60bf 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -8,6 +8,7 @@ import ( "io" "math" "strconv" + "sync" "time" "github.com/nspcc-dev/neofs-s3-gw/api" @@ -46,16 +47,24 @@ type NeoFS struct { anonSigner user.Signer cfg Config epochGetter EpochGetter + buffers *sync.Pool } // NewNeoFS creates new NeoFS using provided pool.Pool. func NewNeoFS(p *pool.Pool, signer user.Signer, anonSigner user.Signer, cfg Config, epochGetter EpochGetter) *NeoFS { + buffers := sync.Pool{} + buffers.New = func() any { + b := make([]byte, cfg.MaxObjectSize) + return &b + } + return &NeoFS{ pool: p, gateSigner: signer, anonSigner: anonSigner, cfg: cfg, epochGetter: epochGetter, + buffers: &buffers, } } @@ -316,8 +325,12 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi return oid.ID{}, fmt.Errorf("save object via connection pool: %w", err) } - chunk := make([]byte, x.cfg.MaxObjectSize) - _, err = io.CopyBuffer(writer, prm.Payload, chunk) + data := x.buffers.Get() + chunk := data.(*[]byte) + + _, err = io.CopyBuffer(writer, prm.Payload, *chunk) + x.buffers.Put(chunk) + if err != nil { return oid.ID{}, fmt.Errorf("read payload chunk: %w", err) } diff --git a/internal/neofs/neofs_test.go b/internal/neofs/neofs_test.go index f83b1880..47343f83 100644 --- a/internal/neofs/neofs_test.go +++ b/internal/neofs/neofs_test.go @@ -1,11 +1,25 @@ package neofs import ( + "bytes" + "context" + "crypto/rand" "fmt" + "strconv" "testing" + "time" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neofs-s3-gw/api/layer" + "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "github.com/nspcc-dev/neofs-sdk-go/container" + "github.com/nspcc-dev/neofs-sdk-go/container/acl" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/pool" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/neofs-sdk-go/waiter" "github.com/stretchr/testify/require" ) @@ -23,3 +37,99 @@ func TestErrorChecking(t *testing.T) { require.ErrorIs(t, wrappedError, layer.ErrAccessDenied) require.Contains(t, wrappedError.Error(), reason) } + +func Benchmark(b *testing.B) { + b.Skip("Required connection to NeoFS cluster") + + ctx := context.Background() + + pk, err := keys.NewPrivateKey() + require.NoError(b, err) + signer := user.NewAutoIDSignerRFC6979(pk.PrivateKey) + + anonPk, err := keys.NewPrivateKey() + require.NoError(b, err) + anonSigner := user.NewAutoIDSignerRFC6979(anonPk.PrivateKey) + + var prm pool.InitParameters + prm.SetSigner(signer) + prm.AddNode(pool.NewNodeParam(1, "localhost:8080", 1)) + + p, err := pool.NewPool(prm) + require.NoError(b, err) + + require.NoError(b, p.Dial(ctx)) + + ni, err := p.NetworkInfo(ctx, client.PrmNetworkInfo{}) + require.NoError(b, err) + + neofsCfg := Config{ + MaxObjectSize: int64(ni.MaxObjectSize()), + IsSlicerEnabled: false, + IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(), + } + + neo := NewNeoFS(p, signer, anonSigner, neofsCfg, ni) + + var createParams layer.PrmObjectCreate + createParams.Creator = signer.UserID() + + for i := 128; i <= 512; i += 128 { + b.Run("object upload "+strconv.Itoa(i), func(b *testing.B) { + b.StopTimer() + payload := make([]byte, i*1024) + _, err = rand.Read(payload) + require.NoError(b, err) + + id, err := createContainer(ctx, signer, p) + require.NoError(b, err) + createParams.Container = id + + defer func() { + _ = deleteContainer(ctx, id, signer, p) + }() + + b.ReportAllocs() + b.ResetTimer() + b.StartTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + createParams.Payload = bytes.NewReader(payload) + createParams.CreationTime = time.Now() + b.StartTimer() + + _, err = neo.CreateObject(ctx, createParams) + b.StopTimer() + require.NoError(b, err) + b.StartTimer() + } + }) + } +} + +func createContainer(ctx context.Context, signer user.Signer, p *pool.Pool) (cid.ID, error) { + var cnr container.Container + cnr.Init() + cnr.SetOwner(signer.UserID()) + + var rd netmap.ReplicaDescriptor + rd.SetNumberOfObjects(1) + + var pp netmap.PlacementPolicy + pp.SetContainerBackupFactor(1) + pp.AddReplicas(rd) + + cnr.SetPlacementPolicy(pp) + cnr.SetBasicACL(acl.PublicRW) + + var prm client.PrmContainerPut + + w := waiter.NewContainerPutWaiter(p, waiter.DefaultPollInterval) + return w.ContainerPut(ctx, cnr, signer, prm) +} + +func deleteContainer(ctx context.Context, id cid.ID, signer user.Signer, p *pool.Pool) error { + var prm client.PrmContainerDelete + return p.ContainerDelete(ctx, id, signer, prm) +}