Skip to content

Commit

Permalink
neofs: Add test for consistency
Browse files Browse the repository at this point in the history
The test requires real neofs cluster to execute. We are checking the object after uploading.

Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Oct 9, 2023
1 parent bbb1e04 commit f0f5a22
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions internal/neofs/neofs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"context"
"crypto/rand"
"fmt"
"io"
"runtime"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -133,3 +136,101 @@ func deleteContainer(ctx context.Context, id cid.ID, signer user.Signer, p *pool
var prm client.PrmContainerDelete
return p.ContainerDelete(ctx, id, signer, prm)
}

func TestConcurrencyAndConsistency(t *testing.T) {
t.Skip("Required connection to NeoFS cluster")

ctx, cancel := context.WithCancel(context.Background())

pk, err := keys.NewPrivateKey()
require.NoError(t, err)
signer := user.NewAutoIDSignerRFC6979(pk.PrivateKey)

anonPk, err := keys.NewPrivateKey()
require.NoError(t, err)
anonSigner := user.NewAutoIDSignerRFC6979(anonPk.PrivateKey)

var prm pool.InitParameters
prm.SetSigner(signer)
prm.AddNode(pool.NewNodeParam(1, "localhost:8080", 1))

p, err := pool.NewPool(prm)
require.NoError(t, err)

require.NoError(t, p.Dial(ctx))

ni, err := p.NetworkInfo(ctx, client.PrmNetworkInfo{})
require.NoError(t, err)

gorutines := runtime.GOMAXPROCS(0)

neofsCfg := Config{
MaxObjectSize: int64(ni.MaxObjectSize()),
IsSlicerEnabled: false,
IsHomomorphicEnabled: !ni.HomomorphicHashingDisabled(),
}

neo := NewNeoFS(p, signer, anonSigner, neofsCfg, ni)

var createParams layer.PrmObjectCreate
createParams.Creator = signer.UserID()

wg := sync.WaitGroup{}
wg.Add(gorutines)

for i := 0; i < gorutines; i++ {
go func() {
uploadDownload(ctx, t, neo, p, signer, createParams, &wg)
}()
}

<-time.After(30 * time.Second)
cancel()
wg.Wait()
}

func uploadDownload(ctx context.Context, t *testing.T, neo *NeoFS, p *pool.Pool, signer user.Signer, createParams layer.PrmObjectCreate, wg *sync.WaitGroup) {
defer wg.Done()

payload := make([]byte, 32*1024)

id, err := createContainer(ctx, signer, p)
require.NoError(t, err)
createParams.Container = id

defer func() {
_ = deleteContainer(ctx, id, signer, p)
}()

// separate context to operations to catch ContextCanceled only in Select.
opContext := context.Background()

for {
select {
case <-ctx.Done():
return
default:
}

_, err = rand.Read(payload)
require.NoError(t, err)

createParams.Payload = bytes.NewReader(payload)
createParams.CreationTime = time.Now()

objID, err := neo.CreateObject(opContext, createParams)
require.NoError(t, err)

var objReadPrm layer.PrmObjectRead
objReadPrm.Object = objID
objReadPrm.Container = id

op, err := neo.ReadObject(opContext, objReadPrm)
require.NoError(t, err)

pl, err := io.ReadAll(op.Payload)
require.NoError(t, err)

require.True(t, bytes.Equal(payload, pl))
}
}

0 comments on commit f0f5a22

Please sign in to comment.