diff --git a/extern/boostd-data/svc/setup_yugabyte_test_util.go b/extern/boostd-data/svc/setup_yugabyte_test_util.go index 179b6b8f1..6934a0390 100644 --- a/extern/boostd-data/svc/setup_yugabyte_test_util.go +++ b/extern/boostd-data/svc/setup_yugabyte_test_util.go @@ -18,6 +18,13 @@ var TestYugabyteSettings = yugabyte.DBSettings{ ConnectString: "postgresql://postgres:postgres@yugabyte:5433", } +// Use when testing against a local yugabyte instance. +// Warning: This will delete all tables in the local yugabyte instance. +var TestYugabyteSettingsLocal = yugabyte.DBSettings{ + Hosts: []string{"localhost"}, + ConnectString: "postgresql://postgres:postgres@localhost:5433", +} + func SetupYugabyte(t *testing.T) { ctx := context.Background() diff --git a/extern/boostd-data/svc/svc_size_test.go b/extern/boostd-data/svc/svc_size_test.go index 403e0c8fe..0810d938e 100644 --- a/extern/boostd-data/svc/svc_size_test.go +++ b/extern/boostd-data/svc/svc_size_test.go @@ -14,8 +14,6 @@ import ( "github.com/filecoin-project/boostd-data/model" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/ipld/go-car/v2/index" - "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) @@ -122,20 +120,3 @@ func generateRandomCid(baseCid []byte) (cid.Cid, error) { return c, nil } - -func toEntries(idx index.Index) (map[string]uint64, bool) { - it, ok := idx.(index.IterableIndex) - if !ok { - return nil, false - } - - entries := make(map[string]uint64) - err := it.ForEach(func(mh multihash.Multihash, o uint64) error { - entries[mh.String()] = o - return nil - }) - if err != nil { - return nil, false - } - return entries, true -} diff --git a/extern/boostd-data/svc/svc_test.go b/extern/boostd-data/svc/svc_test.go index 31edc0c8d..511b35d9a 100644 --- a/extern/boostd-data/svc/svc_test.go +++ b/extern/boostd-data/svc/svc_test.go @@ -1,527 +1,46 @@ -//go:build test_lid -// +build test_lid - package svc import ( - "bufio" - "bytes" "context" - "encoding/hex" - "fmt" - "math/rand" - "os" - "testing" - "time" - - "github.com/filecoin-project/boost/testutil" - "github.com/filecoin-project/boostd-data/client" - "github.com/filecoin-project/boostd-data/model" - "github.com/filecoin-project/go-state-types/abi" - "github.com/google/uuid" - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/ipld/go-car/v2" - "github.com/ipld/go-car/v2/index" - "github.com/multiformats/go-multicodec" - "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" + "testing" + "time" ) -func TestService(t *testing.T) { +func TestServiceLevelDB(t *testing.T) { _ = logging.SetLogLevel("cbtest", "debug") - t.Run("leveldb", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - bdsvc, err := NewLevelDB("") - require.NoError(t, err) - - testService(ctx, t, bdsvc, "localhost:0") - }) - - t.Run("yugabyte", func(t *testing.T) { - _ = logging.SetLogLevel("boostd-data-yb", "debug") - - // Running yugabyte tests may require download the docker container - // so set a high timeout - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) - defer cancel() - - SetupYugabyte(t) - - bdsvc := NewYugabyte(TestYugabyteSettings) - - addr := "localhost:0" - testService(ctx, t, bdsvc, addr) - }) -} - -func testService(ctx context.Context, t *testing.T, bdsvc *Service, addr string) { - ln, err := bdsvc.Start(ctx, addr) - require.NoError(t, err) - - cl := client.NewStore() - err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", ln)) - require.NoError(t, err) - defer cl.Close(ctx) - - sampleidx := "fixtures/baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka.full.idx" - - pieceCid, err := cid.Parse("baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka") - require.NoError(t, err) - - subject, err := loadIndex(sampleidx) - require.NoError(t, err) - - records, err := getRecords(subject) - require.NoError(t, err) - - randomuuid, err := uuid.Parse("4d8f5ce6-dbfd-40dc-8b03-29308e97357b") - require.NoError(t, err) - - err = cl.AddIndex(ctx, pieceCid, records, true) - require.NoError(t, err) - - di := model.DealInfo{ - DealUuid: randomuuid.String(), - SectorID: abi.SectorNumber(1), - PieceOffset: 1, - PieceLength: 2, - CarLength: 3, - } - - // Add a deal for the piece - err = cl.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - - // Add the same deal a second time to test uniqueness - err = cl.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - - // There should only be one deal - dis, err := cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 1) - require.Equal(t, di, dis[0]) - - // Add a second deal - di2 := model.DealInfo{ - DealUuid: uuid.NewString(), - SectorID: abi.SectorNumber(11), - PieceOffset: 11, - PieceLength: 12, - CarLength: 13, - } - err = cl.AddDealForPiece(ctx, pieceCid, di2) - require.NoError(t, err) - - // There should now be two deals - dis, err = cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 2) - require.Contains(t, dis, di) - require.Contains(t, dis, di2) - - b, err := hex.DecodeString("1220ff63d7689e2d9567d1a90a7a68425f430137142e1fbc28fe4780b9ee8a5ef842") - require.NoError(t, err) - - mhash, err := multihash.Cast(b) - require.NoError(t, err) - - offset, err := cl.GetOffsetSize(ctx, pieceCid, mhash) - require.NoError(t, err) - require.EqualValues(t, 3039040395, offset.Offset) - require.EqualValues(t, 0, offset.Size) - - pcids, err := cl.PiecesContainingMultihash(ctx, mhash) - require.NoError(t, err) - require.Len(t, pcids, 1) - require.Equal(t, pieceCid, pcids[0]) - - allPieceCids, err := cl.ListPieces(ctx) - require.NoError(t, err) - require.Len(t, allPieceCids, 1) - require.Equal(t, pieceCid, allPieceCids[0]) - - indexed, err := cl.IsIndexed(ctx, pieceCid) - require.NoError(t, err) - require.True(t, indexed) - - recs, err := cl.GetRecords(ctx, pieceCid) - require.NoError(t, err) - require.Equal(t, len(records), len(recs)) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - loadedSubject, err := cl.GetIndex(ctx, pieceCid) + bdsvc, err := NewLevelDB("") require.NoError(t, err) - ok, err := compareIndices(subject, loadedSubject) - require.NoError(t, err) - require.True(t, ok) + testService(ctx, t, bdsvc, "localhost:0") } -func TestServiceFuzz(t *testing.T) { +func TestServiceFuzzLevelDB(t *testing.T) { t.Skip() _ = logging.SetLogLevel("*", "info") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - t.Run("leveldb", func(t *testing.T) { - bdsvc, err := NewLevelDB("") - require.NoError(t, err) - addr := "localhost:0" - ln, err := bdsvc.Start(ctx, addr) - require.NoError(t, err) - testServiceFuzz(ctx, t, ln.String()) - }) - - t.Run("yugabyte", func(t *testing.T) { - SetupYugabyte(t) - bdsvc := NewYugabyte(TestYugabyteSettings) - - addr := "localhost:0" - ln, err := bdsvc.Start(ctx, addr) - require.NoError(t, err) - - testServiceFuzz(ctx, t, ln.String()) - }) -} - -func testServiceFuzz(ctx context.Context, t *testing.T, addr string) { - cl := client.NewStore() - err := cl.Dial(context.Background(), "ws://"+addr) + bdsvc, err := NewLevelDB("") require.NoError(t, err) - defer cl.Close(ctx) - - var idxs []index.Index - for i := 0; i < 10; i++ { - size := (5 + (i % 3)) << 20 - idxs = append(idxs, createCarIndex(t, size, i+1)) - } - - throttle := make(chan struct{}, 64) - var eg errgroup.Group - for _, idx := range idxs { - idx := idx - eg.Go(func() error { - records, err := getRecords(idx) - require.NoError(t, err) - - randomuuid := uuid.New() - pieceCid := testutil.GenerateCid() - err = cl.AddIndex(ctx, pieceCid, records, true) - require.NoError(t, err) - - di := model.DealInfo{ - DealUuid: randomuuid.String(), - SectorID: abi.SectorNumber(1), - PieceOffset: 1, - PieceLength: 2, - CarLength: 3, - } - - err = cl.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - - dis, err := cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 1) - require.Equal(t, di, dis[0]) - - indexed, err := cl.IsIndexed(ctx, pieceCid) - require.NoError(t, err) - require.True(t, indexed) - - recs, err := cl.GetRecords(ctx, pieceCid) - require.NoError(t, err) - require.Equal(t, len(records), len(recs)) - - var offsetEG errgroup.Group - for _, r := range recs { - if rand.Float32() > 0.1 { - continue - } - - idx := idx - c := r.Cid - throttle <- struct{}{} - offsetEG.Go(func() error { - defer func() { <-throttle }() - - mhash := c.Hash() - var err error - err1 := idx.GetAll(c, func(expected uint64) bool { - var offsetSize *model.OffsetSize - offsetSize, err = cl.GetOffsetSize(ctx, pieceCid, mhash) - if err != nil { - return false - } - if expected != offsetSize.Offset { - err = fmt.Errorf("cid %s: expected offset %d, got offset %d", c, expected, offsetSize.Offset) - return false - } - return true - }) - if err != nil { - return err - } - if err1 != nil { - return err1 - } - - pcids, err := cl.PiecesContainingMultihash(ctx, mhash) - if err != nil { - return err - } - if len(pcids) != 1 { - return fmt.Errorf("expected 1 piece, got %d", len(pcids)) - } - if pieceCid != pcids[0] { - return fmt.Errorf("expected piece %s, got %s", pieceCid, pcids[0]) - } - return nil - }) - } - err = offsetEG.Wait() - require.NoError(t, err) - - loadedSubject, err := cl.GetIndex(ctx, pieceCid) - require.NoError(t, err) - - ok, err := compareIndices(idx, loadedSubject) - require.NoError(t, err) - require.True(t, ok) - - return nil - }) - } - - err = eg.Wait() - require.NoError(t, err) -} - -func createCarIndex(t *testing.T, size int, rseed int) index.Index { - // Create a CAR file - randomFilePath, err := testutil.CreateRandomFile(t.TempDir(), rseed, size) - require.NoError(t, err) - _, carFilePath, err := testutil.CreateDenseCARv2(t.TempDir(), randomFilePath) - require.NoError(t, err) - carFile, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFile.Close() - idx, err := car.ReadOrGenerateIndex(carFile) + addr := "localhost:0" + ln, err := bdsvc.Start(ctx, addr) require.NoError(t, err) - return idx + testServiceFuzz(ctx, t, ln.String()) } -func loadIndex(path string) (index.Index, error) { - defer func(now time.Time) { - log.Debugw("loadindex", "took", time.Since(now).String()) - }(time.Now()) - - idxf, err := os.Open(path) - if err != nil { - return nil, err - } - defer idxf.Close() - - subject, err := index.ReadFrom(idxf) - if err != nil { - return nil, err - } - - return subject, nil -} - -func getRecords(subject index.Index) ([]model.Record, error) { - records := make([]model.Record, 0) - - switch idx := subject.(type) { - case index.IterableIndex: - err := idx.ForEach(func(m multihash.Multihash, offset uint64) error { - - cid := cid.NewCidV1(cid.Raw, m) - - records = append(records, model.Record{ - Cid: cid, - OffsetSize: model.OffsetSize{ - Offset: offset, - Size: 0, - }, - }) - - return nil - }) - if err != nil { - return nil, err - } - default: - return nil, fmt.Errorf("wanted %v but got %v\n", multicodec.CarMultihashIndexSorted, idx.Codec()) - } - return records, nil -} - -func compareIndices(subject, subjectDb index.Index) (bool, error) { - var b bytes.Buffer - w := bufio.NewWriter(&b) - - _, err := subject.Marshal(w) - if err != nil { - return false, err - } - - var b2 bytes.Buffer - w2 := bufio.NewWriter(&b2) - - _, err = subjectDb.Marshal(w2) - if err != nil { - return false, err - } - - equal := bytes.Equal(b.Bytes(), b2.Bytes()) - - if !equal { - a, oka := toEntries(subject) - b, okb := toEntries(subjectDb) - if oka && okb { - if len(a) != len(b) { - return false, fmt.Errorf("index length mismatch: first %d / second %d", len(a), len(b)) - } - for mh, oa := range a { - ob, ok := b[mh] - if !ok { - return false, fmt.Errorf("second index missing multihash %s", mh) - } - if oa != ob { - return false, fmt.Errorf("offset mismatch for multihash %s: first %d / second %d", mh, oa, ob) - } - } - } - } - - return equal, nil -} - -func TestCleanup(t *testing.T) { +func TestCleanupLevelDB(t *testing.T) { _ = logging.SetLogLevel("*", "debug") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - t.Run("leveldb", func(t *testing.T) { - bdsvc, err := NewLevelDB("") - require.NoError(t, err) - testCleanup(ctx, t, bdsvc, "localhost:0") - }) - - t.Run("yugabyte", func(t *testing.T) { - // Running yugabyte tests may require download the docker container - // so set a high timeout - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) - defer cancel() - - SetupYugabyte(t) - - bdsvc := NewYugabyte(TestYugabyteSettings) - testCleanup(ctx, t, bdsvc, "localhost:0") - }) -} - -func testCleanup(ctx context.Context, t *testing.T, bdsvc *Service, addr string) { - ln, err := bdsvc.Start(ctx, addr) - require.NoError(t, err) - - cl := client.NewStore() - err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", ln)) - require.NoError(t, err) - defer cl.Close(ctx) - - sampleidx := "fixtures/baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka.full.idx" - - pieceCid, err := cid.Parse("baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka") - require.NoError(t, err) - - subject, err := loadIndex(sampleidx) - require.NoError(t, err) - - records, err := getRecords(subject) - require.NoError(t, err) - - err = cl.AddIndex(ctx, pieceCid, records, true) - require.NoError(t, err) - - di := model.DealInfo{ - DealUuid: uuid.NewString(), - SectorID: abi.SectorNumber(1), - PieceOffset: 1, - PieceLength: 2, - CarLength: 3, - } - di2 := model.DealInfo{ - DealUuid: uuid.NewString(), - SectorID: abi.SectorNumber(10), - PieceOffset: 11, - PieceLength: 12, - CarLength: 13, - } - - // Add two deals for the piece - err = cl.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - err = cl.AddDealForPiece(ctx, pieceCid, di2) - require.NoError(t, err) - - dis, err := cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 2) - - // Remove one deal for the piece - err = cl.RemoveDealForPiece(ctx, pieceCid, di.DealUuid) - require.NoError(t, err) - - dis, err = cl.GetPieceDeals(ctx, pieceCid) - require.NoError(t, err) - require.Len(t, dis, 1) - - b, err := hex.DecodeString("1220ff63d7689e2d9567d1a90a7a68425f430137142e1fbc28fe4780b9ee8a5ef842") - require.NoError(t, err) - - mhash, err := multihash.Cast(b) - require.NoError(t, err) - - offset, err := cl.GetOffsetSize(ctx, pieceCid, mhash) - require.NoError(t, err) - require.EqualValues(t, 3039040395, offset.Offset) - - pcids, err := cl.PiecesContainingMultihash(ctx, mhash) - require.NoError(t, err) - require.Len(t, pcids, 1) - require.Equal(t, pieceCid, pcids[0]) - - indexed, err := cl.IsIndexed(ctx, pieceCid) - require.NoError(t, err) - require.True(t, indexed) - - // Remove the other deal for the piece. - // After this call there are no deals left, so it should also cause the - // piece metadata and indexes to be removed. - err = cl.RemoveDealForPiece(ctx, pieceCid, di2.DealUuid) - require.NoError(t, err) - - _, err = cl.GetPieceDeals(ctx, pieceCid) - require.ErrorContains(t, err, "not found") - - _, err = cl.GetOffsetSize(ctx, pieceCid, mhash) - require.ErrorContains(t, err, "not found") - - _, err = cl.GetRecords(ctx, pieceCid) - require.ErrorContains(t, err, "not found") - - _, err = cl.PiecesContainingMultihash(ctx, mhash) - require.ErrorContains(t, err, "not found") - - indexed, err = cl.IsIndexed(ctx, pieceCid) + bdsvc, err := NewLevelDB("") require.NoError(t, err) - require.False(t, indexed) + testCleanup(ctx, t, bdsvc, "localhost:0") } diff --git a/extern/boostd-data/svc/svc_test_util.go b/extern/boostd-data/svc/svc_test_util.go new file mode 100644 index 000000000..f97a3cb2f --- /dev/null +++ b/extern/boostd-data/svc/svc_test_util.go @@ -0,0 +1,459 @@ +package svc + +import ( + "bufio" + "bytes" + "context" + "encoding/hex" + "fmt" + "math/rand" + "os" + "testing" + "time" + + "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/boostd-data/client" + "github.com/filecoin-project/boostd-data/model" + "github.com/filecoin-project/go-state-types/abi" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/index" + "github.com/multiformats/go-multicodec" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func testService(ctx context.Context, t *testing.T, bdsvc *Service, addr string) { + ln, err := bdsvc.Start(ctx, addr) + require.NoError(t, err) + + cl := client.NewStore() + err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", ln)) + require.NoError(t, err) + defer cl.Close(ctx) + + sampleidx := "fixtures/baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka.full.idx" + + pieceCid, err := cid.Parse("baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka") + require.NoError(t, err) + + subject, err := loadIndex(sampleidx) + require.NoError(t, err) + + records, err := getRecords(subject) + require.NoError(t, err) + + randomuuid, err := uuid.Parse("4d8f5ce6-dbfd-40dc-8b03-29308e97357b") + require.NoError(t, err) + + err = cl.AddIndex(ctx, pieceCid, records, true) + require.NoError(t, err) + + di := model.DealInfo{ + DealUuid: randomuuid.String(), + SectorID: abi.SectorNumber(1), + PieceOffset: 1, + PieceLength: 2, + CarLength: 3, + } + + // Add a deal for the piece + err = cl.AddDealForPiece(ctx, pieceCid, di) + require.NoError(t, err) + + // Add the same deal a second time to test uniqueness + err = cl.AddDealForPiece(ctx, pieceCid, di) + require.NoError(t, err) + + // There should only be one deal + dis, err := cl.GetPieceDeals(ctx, pieceCid) + require.NoError(t, err) + require.Len(t, dis, 1) + require.Equal(t, di, dis[0]) + + // Add a second deal + di2 := model.DealInfo{ + DealUuid: uuid.NewString(), + SectorID: abi.SectorNumber(11), + PieceOffset: 11, + PieceLength: 12, + CarLength: 13, + } + err = cl.AddDealForPiece(ctx, pieceCid, di2) + require.NoError(t, err) + + // There should now be two deals + dis, err = cl.GetPieceDeals(ctx, pieceCid) + require.NoError(t, err) + require.Len(t, dis, 2) + require.Contains(t, dis, di) + require.Contains(t, dis, di2) + + b, err := hex.DecodeString("1220ff63d7689e2d9567d1a90a7a68425f430137142e1fbc28fe4780b9ee8a5ef842") + require.NoError(t, err) + + mhash, err := multihash.Cast(b) + require.NoError(t, err) + + offset, err := cl.GetOffsetSize(ctx, pieceCid, mhash) + require.NoError(t, err) + require.EqualValues(t, 3039040395, offset.Offset) + require.EqualValues(t, 0, offset.Size) + + pcids, err := cl.PiecesContainingMultihash(ctx, mhash) + require.NoError(t, err) + require.Len(t, pcids, 1) + require.Equal(t, pieceCid, pcids[0]) + + allPieceCids, err := cl.ListPieces(ctx) + require.NoError(t, err) + require.Len(t, allPieceCids, 1) + require.Equal(t, pieceCid, allPieceCids[0]) + + indexed, err := cl.IsIndexed(ctx, pieceCid) + require.NoError(t, err) + require.True(t, indexed) + + recs, err := cl.GetRecords(ctx, pieceCid) + require.NoError(t, err) + require.Equal(t, len(records), len(recs)) + + loadedSubject, err := cl.GetIndex(ctx, pieceCid) + require.NoError(t, err) + + ok, err := compareIndices(subject, loadedSubject) + require.NoError(t, err) + require.True(t, ok) +} + +func testServiceFuzz(ctx context.Context, t *testing.T, addr string) { + cl := client.NewStore() + err := cl.Dial(context.Background(), "ws://"+addr) + require.NoError(t, err) + defer cl.Close(ctx) + + var idxs []index.Index + for i := 0; i < 10; i++ { + size := (5 + (i % 3)) << 20 + idxs = append(idxs, createCarIndex(t, size, i+1)) + } + + throttle := make(chan struct{}, 64) + var eg errgroup.Group + for _, idx := range idxs { + idx := idx + eg.Go(func() error { + records, err := getRecords(idx) + require.NoError(t, err) + + randomuuid := uuid.New() + pieceCid := testutil.GenerateCid() + err = cl.AddIndex(ctx, pieceCid, records, true) + require.NoError(t, err) + + di := model.DealInfo{ + DealUuid: randomuuid.String(), + SectorID: abi.SectorNumber(1), + PieceOffset: 1, + PieceLength: 2, + CarLength: 3, + } + + err = cl.AddDealForPiece(ctx, pieceCid, di) + require.NoError(t, err) + + dis, err := cl.GetPieceDeals(ctx, pieceCid) + require.NoError(t, err) + require.Len(t, dis, 1) + require.Equal(t, di, dis[0]) + + indexed, err := cl.IsIndexed(ctx, pieceCid) + require.NoError(t, err) + require.True(t, indexed) + + recs, err := cl.GetRecords(ctx, pieceCid) + require.NoError(t, err) + require.Equal(t, len(records), len(recs)) + + var offsetEG errgroup.Group + for _, r := range recs { + if rand.Float32() > 0.1 { + continue + } + + idx := idx + c := r.Cid + throttle <- struct{}{} + offsetEG.Go(func() error { + defer func() { <-throttle }() + + mhash := c.Hash() + var err error + err1 := idx.GetAll(c, func(expected uint64) bool { + var offsetSize *model.OffsetSize + offsetSize, err = cl.GetOffsetSize(ctx, pieceCid, mhash) + if err != nil { + return false + } + if expected != offsetSize.Offset { + err = fmt.Errorf("cid %s: expected offset %d, got offset %d", c, expected, offsetSize.Offset) + return false + } + return true + }) + if err != nil { + return err + } + if err1 != nil { + return err1 + } + + pcids, err := cl.PiecesContainingMultihash(ctx, mhash) + if err != nil { + return err + } + if len(pcids) != 1 { + return fmt.Errorf("expected 1 piece, got %d", len(pcids)) + } + if pieceCid != pcids[0] { + return fmt.Errorf("expected piece %s, got %s", pieceCid, pcids[0]) + } + return nil + }) + } + err = offsetEG.Wait() + require.NoError(t, err) + + loadedSubject, err := cl.GetIndex(ctx, pieceCid) + require.NoError(t, err) + + ok, err := compareIndices(idx, loadedSubject) + require.NoError(t, err) + require.True(t, ok) + + return nil + }) + } + + err = eg.Wait() + require.NoError(t, err) +} + +func createCarIndex(t *testing.T, size int, rseed int) index.Index { + // Create a CAR file + randomFilePath, err := testutil.CreateRandomFile(t.TempDir(), rseed, size) + require.NoError(t, err) + _, carFilePath, err := testutil.CreateDenseCARv2(t.TempDir(), randomFilePath) + require.NoError(t, err) + carFile, err := os.Open(carFilePath) + require.NoError(t, err) + defer carFile.Close() + idx, err := car.ReadOrGenerateIndex(carFile) + require.NoError(t, err) + return idx +} + +func loadIndex(path string) (index.Index, error) { + defer func(now time.Time) { + log.Debugw("loadindex", "took", time.Since(now).String()) + }(time.Now()) + + idxf, err := os.Open(path) + if err != nil { + return nil, err + } + defer idxf.Close() + + subject, err := index.ReadFrom(idxf) + if err != nil { + return nil, err + } + + return subject, nil +} + +func getRecords(subject index.Index) ([]model.Record, error) { + records := make([]model.Record, 0) + + switch idx := subject.(type) { + case index.IterableIndex: + err := idx.ForEach(func(m multihash.Multihash, offset uint64) error { + + cid := cid.NewCidV1(cid.Raw, m) + + records = append(records, model.Record{ + Cid: cid, + OffsetSize: model.OffsetSize{ + Offset: offset, + Size: 0, + }, + }) + + return nil + }) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("wanted %v but got %v\n", multicodec.CarMultihashIndexSorted, idx.Codec()) + } + return records, nil +} + +func compareIndices(subject, subjectDb index.Index) (bool, error) { + var b bytes.Buffer + w := bufio.NewWriter(&b) + + _, err := subject.Marshal(w) + if err != nil { + return false, err + } + + var b2 bytes.Buffer + w2 := bufio.NewWriter(&b2) + + _, err = subjectDb.Marshal(w2) + if err != nil { + return false, err + } + + equal := bytes.Equal(b.Bytes(), b2.Bytes()) + + if !equal { + a, oka := toEntries(subject) + b, okb := toEntries(subjectDb) + if oka && okb { + if len(a) != len(b) { + return false, fmt.Errorf("index length mismatch: first %d / second %d", len(a), len(b)) + } + for mh, oa := range a { + ob, ok := b[mh] + if !ok { + return false, fmt.Errorf("second index missing multihash %s", mh) + } + if oa != ob { + return false, fmt.Errorf("offset mismatch for multihash %s: first %d / second %d", mh, oa, ob) + } + } + } + } + + return equal, nil +} + +func testCleanup(ctx context.Context, t *testing.T, bdsvc *Service, addr string) { + ln, err := bdsvc.Start(ctx, addr) + require.NoError(t, err) + + cl := client.NewStore() + err = cl.Dial(context.Background(), fmt.Sprintf("ws://%s", ln)) + require.NoError(t, err) + defer cl.Close(ctx) + + sampleidx := "fixtures/baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka.full.idx" + + pieceCid, err := cid.Parse("baga6ea4seaqnfhocd544oidrgsss2ahoaomvxuaqxfmlsizljtzsuivjl5hamka") + require.NoError(t, err) + + subject, err := loadIndex(sampleidx) + require.NoError(t, err) + + records, err := getRecords(subject) + require.NoError(t, err) + + err = cl.AddIndex(ctx, pieceCid, records, true) + require.NoError(t, err) + + di := model.DealInfo{ + DealUuid: uuid.NewString(), + SectorID: abi.SectorNumber(1), + PieceOffset: 1, + PieceLength: 2, + CarLength: 3, + } + di2 := model.DealInfo{ + DealUuid: uuid.NewString(), + SectorID: abi.SectorNumber(10), + PieceOffset: 11, + PieceLength: 12, + CarLength: 13, + } + + // Add two deals for the piece + err = cl.AddDealForPiece(ctx, pieceCid, di) + require.NoError(t, err) + err = cl.AddDealForPiece(ctx, pieceCid, di2) + require.NoError(t, err) + + dis, err := cl.GetPieceDeals(ctx, pieceCid) + require.NoError(t, err) + require.Len(t, dis, 2) + + // Remove one deal for the piece + err = cl.RemoveDealForPiece(ctx, pieceCid, di.DealUuid) + require.NoError(t, err) + + dis, err = cl.GetPieceDeals(ctx, pieceCid) + require.NoError(t, err) + require.Len(t, dis, 1) + + b, err := hex.DecodeString("1220ff63d7689e2d9567d1a90a7a68425f430137142e1fbc28fe4780b9ee8a5ef842") + require.NoError(t, err) + + mhash, err := multihash.Cast(b) + require.NoError(t, err) + + offset, err := cl.GetOffsetSize(ctx, pieceCid, mhash) + require.NoError(t, err) + require.EqualValues(t, 3039040395, offset.Offset) + + pcids, err := cl.PiecesContainingMultihash(ctx, mhash) + require.NoError(t, err) + require.Len(t, pcids, 1) + require.Equal(t, pieceCid, pcids[0]) + + indexed, err := cl.IsIndexed(ctx, pieceCid) + require.NoError(t, err) + require.True(t, indexed) + + // Remove the other deal for the piece. + // After this call there are no deals left, so it should also cause the + // piece metadata and indexes to be removed. + err = cl.RemoveDealForPiece(ctx, pieceCid, di2.DealUuid) + require.NoError(t, err) + + _, err = cl.GetPieceDeals(ctx, pieceCid) + require.ErrorContains(t, err, "not found") + + _, err = cl.GetOffsetSize(ctx, pieceCid, mhash) + require.ErrorContains(t, err, "not found") + + _, err = cl.GetRecords(ctx, pieceCid) + require.ErrorContains(t, err, "not found") + + _, err = cl.PiecesContainingMultihash(ctx, mhash) + require.ErrorContains(t, err, "not found") + + indexed, err = cl.IsIndexed(ctx, pieceCid) + require.NoError(t, err) + require.False(t, indexed) +} + +func toEntries(idx index.Index) (map[string]uint64, bool) { + it, ok := idx.(index.IterableIndex) + if !ok { + return nil, false + } + + entries := make(map[string]uint64) + err := it.ForEach(func(mh multihash.Multihash, o uint64) error { + entries[mh.String()] = o + return nil + }) + if err != nil { + return nil, false + } + return entries, true +} diff --git a/extern/boostd-data/svc/svc_yugabyte_test.go b/extern/boostd-data/svc/svc_yugabyte_test.go new file mode 100644 index 000000000..eafcea215 --- /dev/null +++ b/extern/boostd-data/svc/svc_yugabyte_test.go @@ -0,0 +1,59 @@ +//go:build test_lid +// +build test_lid + +package svc + +import ( + "context" + logging "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestServiceYugabyte(t *testing.T) { + _ = logging.SetLogLevel("cbtest", "debug") + _ = logging.SetLogLevel("boostd-data-yb", "debug") + + // Running yugabyte tests may require download the docker container + // so set a high timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + SetupYugabyte(t) + + bdsvc := NewYugabyte(TestYugabyteSettings) + + addr := "localhost:0" + testService(ctx, t, bdsvc, addr) +} + +func TestServiceFuzzYugabyte(t *testing.T) { + t.Skip() + _ = logging.SetLogLevel("*", "info") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + SetupYugabyte(t) + bdsvc := NewYugabyte(TestYugabyteSettings) + + addr := "localhost:0" + ln, err := bdsvc.Start(ctx, addr) + require.NoError(t, err) + + testServiceFuzz(ctx, t, ln.String()) +} + +func TestCleanupYugabyte(t *testing.T) { + _ = logging.SetLogLevel("*", "debug") + + // Running yugabyte tests may require download the docker container + // so set a high timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + SetupYugabyte(t) + + bdsvc := NewYugabyte(TestYugabyteSettings) + testCleanup(ctx, t, bdsvc, "localhost:0") +} diff --git a/piecedirectory/piece_directory_test.go b/piecedirectory/piece_directory_test.go new file mode 100644 index 000000000..10c28f796 --- /dev/null +++ b/piecedirectory/piece_directory_test.go @@ -0,0 +1,14 @@ +package piecedirectory + +import ( + "context" + "github.com/filecoin-project/boostd-data/svc" + "github.com/stretchr/testify/require" + "testing" +) + +func TestPieceDirectoryLevelDB(t *testing.T) { + bdsvc, err := svc.NewLevelDB("") + require.NoError(t, err) + testPieceDirectory(context.Background(), t, bdsvc) +} diff --git a/piecedirectory/piece_directory_yugabyte_test.go b/piecedirectory/piece_directory_yugabyte_test.go new file mode 100644 index 000000000..515552c22 --- /dev/null +++ b/piecedirectory/piece_directory_yugabyte_test.go @@ -0,0 +1,21 @@ +//go:build test_lid +// +build test_lid + +package piecedirectory + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/boostd-data/svc" +) + +func TestPieceDirectoryYugabyte(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + + svc.SetupYugabyte(t) + bdsvc := svc.NewYugabyte(svc.TestYugabyteSettings) + testPieceDirectory(ctx, t, bdsvc) +} diff --git a/piecedirectory/piecedirectory_test.go b/piecedirectory/piecedirectory_test_util.go similarity index 96% rename from piecedirectory/piecedirectory_test.go rename to piecedirectory/piecedirectory_test_util.go index 6e114713b..3dc628f39 100644 --- a/piecedirectory/piecedirectory_test.go +++ b/piecedirectory/piecedirectory_test_util.go @@ -1,17 +1,9 @@ -//go:build test_lid -// +build test_lid - package piecedirectory import ( "bytes" "context" "fmt" - "io" - "os" - "testing" - "time" - pdTypes "github.com/filecoin-project/boost/piecedirectory/types" mock_piecedirectory "github.com/filecoin-project/boost/piecedirectory/types/mocks" "github.com/filecoin-project/boostd-data/client" @@ -25,25 +17,11 @@ import ( "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/blockstore" "github.com/stretchr/testify/require" + "io" + "os" + "testing" ) -func TestPieceDirectory(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) - defer cancel() - - t.Run("leveldb", func(t *testing.T) { - bdsvc, err := svc.NewLevelDB("") - require.NoError(t, err) - testPieceDirectory(ctx, t, bdsvc) - }) - - t.Run("yugabyte", func(t *testing.T) { - svc.SetupYugabyte(t) - bdsvc := svc.NewYugabyte(svc.TestYugabyteSettings) - testPieceDirectory(ctx, t, bdsvc) - }) -} - func testPieceDirectory(ctx context.Context, t *testing.T, bdsvc *svc.Service) { ln, err := bdsvc.Start(ctx, "localhost:0") require.NoError(t, err)