Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

feat: plumb through datastore contexts #39

Merged
merged 1 commit into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions batched/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ func (s *BatchProvidingSystem) Run() {
s.lastReprovideBatchSize = len(keys)
s.lastReprovideDuration = dur

if err := s.ds.Put(lastReprovideKey, storeTime(time.Now())); err != nil {
if err := s.ds.Put(s.ctx, lastReprovideKey, storeTime(time.Now())); err != nil {
log.Errorf("could not store last reprovide time: %v", err)
}
if err := s.ds.Sync(lastReprovideKey); err != nil {
if err := s.ds.Sync(s.ctx, lastReprovideKey); err != nil {
log.Errorf("could not perform sync of last reprovide time: %v", err)
}
}
Expand Down Expand Up @@ -374,7 +374,7 @@ reprovideCidLoop:
}

func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) {
val, err := s.ds.Get(lastReprovideKey)
val, err := s.ds.Get(s.ctx, lastReprovideKey)
if errors.Is(err, datastore.ErrNotFound) {
return time.Time{}, nil
}
Expand Down
22 changes: 13 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@ retract [v1.0.0, v1.0.1]

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.4
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.2.0
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-cidutil v0.0.2
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-fetcher v1.5.0
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-blockstore v0.2.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-ipfs-exchange-offline v0.1.0
github.com/ipfs/go-ipfs-routing v0.2.0
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-verifcid v0.0.1
github.com/ipld/go-ipld-prime v0.11.0
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-testing v0.4.0
github.com/multiformats/go-multihash v0.0.15
github.com/multiformats/go-base32 v0.0.4 // indirect
github.com/multiformats/go-multihash v0.0.16
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/sys v0.0.0-20211025112917-711f33c9992c // indirect
)
573 changes: 531 additions & 42 deletions go.sum

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (q *Queue) work() {
c, err = cid.Parse(head.Value)
if err != nil {
log.Warningf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err)
err = q.ds.Delete(k)
err = q.ds.Delete(q.ctx, k)
if err != nil {
log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err)
return
Expand All @@ -120,12 +120,12 @@ func (q *Queue) work() {
keyPath := fmt.Sprintf("%d/%s", time.Now().UnixNano(), c.String())
nextKey := datastore.NewKey(keyPath)

if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil {
if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
continue
}
case dequeue <- c:
err := q.ds.Delete(k)
err := q.ds.Delete(q.ctx, k)

if err != nil {
log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err)
Expand All @@ -141,7 +141,7 @@ func (q *Queue) work() {

func (q *Queue) getQueueHead() (*query.Entry, error) {
qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1}
results, err := q.ds.Query(qry)
results, err := q.ds.Query(q.ctx, qry)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipfs-blocksutil"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
)

var blockGenerator = blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestMangledData(t *testing.T) {

// put bad data in the queue
queueKey := datastore.NewKey("/test/0")
err = queue.ds.Put(queueKey, []byte("borked"))
err = queue.ds.Put(ctx, queueKey, []byte("borked"))
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions simple/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Reprovider struct {
}

// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
func NewReprovider(ctx context.Context, reprovideInterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
ctx, cancel := context.WithCancel(ctx)
return &Reprovider{
ctx: ctx,
Expand All @@ -55,7 +55,7 @@ func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys r

rsys: rsys,
keyProvider: keyProvider,
tick: reprovideIniterval,
tick: reprovideInterval,
}
}

Expand Down
5 changes: 3 additions & 2 deletions simple/reprovide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) {
t.Fatal(err)
}
blk := toBlock(t, nb.Build())
err = bstore.Put(blk)
err = bstore.Put(context.Background(), blk)
if err != nil {
t.Fatal(err)
}
Expand All @@ -60,7 +60,7 @@ func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) {
t.Fatal(err)
}
blk = toBlock(t, nd)
err = bstore.Put(blk)
err = bstore.Put(context.Background(), blk)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -117,6 +117,7 @@ func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context

keyProvider := NewBlockstoreProvider(bstore)
reprov := NewReprovider(ctx, time.Hour, clA, keyProvider)
reprov.Trigger(context.Background())
err := trigger(reprov, ctx)
if err != nil {
t.Fatal(err)
Expand Down