From dcab5795cdfce80340b1b07ed01b2c5e8ae506d9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 16 Aug 2016 11:43:01 -0700 Subject: [PATCH] datastore: blockstore should retry when it encounters temp errors License: MIT Signed-off-by: Jeromy --- core/builder.go | 26 +++++++++++++++++++++++--- exchange/bitswap/bitswap.go | 14 +------------- exchange/bitswap/bitswap_test.go | 7 +++++-- package.json | 12 ++++++++++++ 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/core/builder.go b/core/builder.go index db282748a5e9..c3a222b01365 100644 --- a/core/builder.go +++ b/core/builder.go @@ -4,6 +4,9 @@ import ( "crypto/rand" "encoding/base64" "errors" + "os" + "syscall" + "time" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" key "github.com/ipfs/go-ipfs/blocks/key" @@ -14,12 +17,13 @@ import ( pin "github.com/ipfs/go-ipfs/pin" repo "github.com/ipfs/go-ipfs/repo" cfg "github.com/ipfs/go-ipfs/repo/config" - ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore" - dsync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync" pstore "gx/ipfs/QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P/go-libp2p-peerstore" goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context" + ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore" + dsync "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync" ci "gx/ipfs/QmUWER4r4qMvaCnX5zREcfyiWN7cXN9g3a7fkRqNz8qWPP/go-libp2p-crypto" + retry "gx/ipfs/QmY6UVhgS2ZxhbM5qU23Fnz3daJwfyAuNErd3StmVofnAU/retry-datastore" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ) @@ -127,14 +131,30 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) { return n, nil } +func isTooManyFDError(err error) bool { + perr, ok := err.(*os.PathError) + if ok && perr.Err == syscall.EMFILE { + return true + } + + return false +} + func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { // setup local peer ID (private key is loaded in online setup) if err := n.loadID(); err != nil { return err } + rds := &retry.Datastore{ + Batching: n.Repo.Datastore(), + Delay: time.Millisecond * 200, + Retries: 6, + TempErrFunc: isTooManyFDError, + } + var err error - bs := bstore.NewBlockstore(n.Repo.Datastore()) + bs := bstore.NewBlockstore(rds) opts := bstore.DefaultCacheOpts() conf, err := n.Repo.Config() if err != nil { diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f14fe9162e69..732700894d87 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -261,7 +261,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { default: } - err := bs.tryPutBlock(blk, 4) // attempt to store block up to four times + err := bs.blockstore.Put(blk) if err != nil { log.Errorf("Error writing block to datastore: %s", err) return err @@ -280,18 +280,6 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { return nil } -func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error { - var err error - for i := 0; i < attempts; i++ { - if err = bs.blockstore.Put(blk); err == nil { - break - } - - time.Sleep(time.Millisecond * time.Duration(400*(i+1))) - } - return err -} - func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 6cbfe2b62a90..8887e72dec6e 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -4,7 +4,6 @@ import ( "bytes" "sync" "testing" - "time" detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race" travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis" @@ -24,8 +23,12 @@ import ( // well under varying conditions const kNetworkDelay = 0 * time.Millisecond +func getVirtualNetwork() tn.Network { + return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) +} + func TestClose(t *testing.T) { - vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + vnet := getVirtualNetwork() sesgen := NewTestSessionGenerator(vnet) defer sesgen.Close() bgen := blocksutil.NewBlockGenerator() diff --git a/package.json b/package.json index 89ac26d3480a..8c72676b7ced 100644 --- a/package.json +++ b/package.json @@ -189,6 +189,18 @@ "hash": "QmVvJ27GcLaLSXvcB4auk3Gn3xuWK5ti5ENkZ2pCoJEYW4", "name": "autobatch", "version": "0.2.0" + }, + { + "author": "whyrusleeping", + "hash": "QmY6UVhgS2ZxhbM5qU23Fnz3daJwfyAuNErd3StmVofnAU", + "name": "retry-datastore", + "version": "1.1.0" + }, + { + "author": "whyrusleeping", + "hash": "QmdjfJJFxgqqR9skVZDmgiGrbKomSqxpaw12rjLNim5NYR", + "name": "failstore", + "version": "1.0.0" } ], "gxVersion": "0.4.0",