Skip to content

Commit

Permalink
Add ipfs retry (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
ridenaio authored Nov 27, 2019
1 parent 974a9e2 commit 7c8b0b9
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 20 deletions.
14 changes: 7 additions & 7 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
var (
MaxHash *big.Float
ParentHashIsInvalid = errors.New("parentHash is invalid")
BlockInsertionErr = errors.New("can't insert block")
)

type Blockchain struct {
Expand Down Expand Up @@ -920,17 +921,16 @@ func (chain *Blockchain) insertHeader(header *types.Header) {
}

func (chain *Blockchain) insertBlock(block *types.Block, diff *state.IdentityStateDiff) error {
chain.insertHeader(block.Header)
_, err := chain.ipfs.Add(block.Body.Bytes())
if err != nil {
return errors.Wrap(BlockInsertionErr, err.Error())
}
chain.insertHeader(block.Header)
chain.WriteIdentityStateDiff(block.Height(), diff)
chain.WriteTxIndex(block.Hash(), block.Body.Transactions)
chain.SaveTxs(block.Header, block.Body.Transactions)
chain.repo.WriteHead(block.Header)

if err == nil {
chain.setCurrentHead(block.Header)
}
return err
chain.setCurrentHead(block.Header)
return nil
}

func (chain *Blockchain) WriteTxIndex(hash common.Hash, txs types.Transactions) {
Expand Down
35 changes: 24 additions & 11 deletions ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,23 +215,36 @@ func (p *ipfsProxy) Add(data []byte) (cid.Cid, error) {
defer p.rwLock.RUnlock()
api, _ := coreapi.NewCoreAPI(p.node)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

file := files.NewBytesFile(data)
defer file.Close()
path, err := api.Unixfs().Add(ctx, file, options.Unixfs.Pin(true), options.Unixfs.CidVersion(1))
select {
case <-ctx.Done():
err = errors.New("timeout while writing data to ipfs")
default:
break

var ipfsPath path.Resolved
var err error
for num := 5; num > 0; num-- {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
ipfsPath, err = api.Unixfs().Add(ctx, file, options.Unixfs.Pin(true), options.Unixfs.CidVersion(1))
select {
case <-ctx.Done():
err = errors.New("timeout while writing data to ipfs")
default:
break
}
cancel()
if err == nil {
break
}
file = files.NewBytesFile(data)
defer file.Close()

time.Sleep(1 * time.Second)
}

if err != nil {
return cid.Cid{}, err
}
p.log.Debug("Add ipfs data", "cid", path.Cid().String())
return path.Cid(), nil

p.log.Debug("Add ipfs data", "cid", ipfsPath.Cid().String())
return ipfsPath.Cid(), nil
}

func (p *ipfsProxy) AddFile(absPath string, data io.ReadCloser, fi os.FileInfo) (cid.Cid, error) {
Expand Down
5 changes: 3 additions & 2 deletions protocol/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ func (fs *fullSync) applyDeferredBlocks(checkState *appstate.AppState) (uint64,
if err := fs.appState.ResetTo(fs.chain.Head.Height()); err != nil {
return block.Height(), err
}
fs.pm.BanPeer(b.peerId, err)
if errors.Cause(err) != blockchain.BlockInsertionErr {
fs.pm.BanPeer(b.peerId, err)
}
fs.log.Warn(fmt.Sprintf("Block %v is invalid: %v", block.Height(), err))
time.Sleep(time.Second)
// TODO: ban bad peer
return block.Height(), err
}
if !b.Cert.Empty() {
Expand Down

0 comments on commit 7c8b0b9

Please sign in to comment.