Skip to content

Commit

Permalink
Merge pull request #3314 from ipfs/kevina/posinfo-2
Browse files Browse the repository at this point in the history
Create a FilestoreNode object to carry PosInfo
  • Loading branch information
whyrusleeping authored Oct 25, 2016
2 parents 6f3ae5d + 65ffff2 commit 465044a
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 14 deletions.
5 changes: 5 additions & 0 deletions commands/files/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ type SizeFile interface {

Size() (int64, error)
}

type FileInfo interface {
FullPath() string
Stat() os.FileInfo
}
12 changes: 11 additions & 1 deletion core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,12 @@ func (adder *Adder) addFile(file files.File) error {
// progress updates to the client (over the output channel)
var reader io.Reader = file
if adder.Progress {
reader = &progressReader{file: file, out: adder.Out}
rdr := &progressReader{file: file, out: adder.Out}
if fi, ok := file.(files.FileInfo); ok {
reader = &progressReader2{rdr, fi}
} else {
reader = rdr
}
}

dagnode, err := adder.add(reader)
Expand Down Expand Up @@ -520,3 +525,8 @@ func (i *progressReader) Read(p []byte) (int, error) {

return n, err
}

type progressReader2 struct {
*progressReader
files.FileInfo
}
116 changes: 115 additions & 1 deletion core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ package coreunix

import (
"bytes"
"context"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
"time"

"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/commands/files"
"github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin/gc"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/config"
pi "github.com/ipfs/go-ipfs/thirdparty/posinfo"
"github.com/ipfs/go-ipfs/thirdparty/testutil"

"context"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
)

Expand Down Expand Up @@ -162,3 +168,111 @@ func TestAddGCLive(t *testing.T) {
t.Fatal(err)
}
}

func testAddWPosInfo(t *testing.T, rawLeaves bool) {
r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
PeerID: "Qmfoo", // required by offline node
},
},
D: testutil.ThreadSafeCloserMapDatastore(),
}
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
if err != nil {
t.Fatal(err)
}

bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: "/tmp/foo.txt", t: t}
bserv := blockservice.New(bs, node.Exchange)
dserv := dag.NewDAGService(bserv)
adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
if err != nil {
t.Fatal(err)
}
adder.Out = make(chan interface{})
adder.Progress = true
adder.RawLeaves = rawLeaves

data := make([]byte, 5*1024*1024)
rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
fileData := ioutil.NopCloser(bytes.NewBuffer(data))
fileInfo := dummyFileInfo{"foo.txt", int64(len(data)), time.Now()}
file := files.NewReaderFile("foo.txt", "/tmp/foo.txt", fileData, &fileInfo)

go func() {
defer close(adder.Out)
err = adder.AddFile(file)
if err != nil {
t.Fatal(err)
}
}()
for _ = range adder.Out {
}

if bs.countAtOffsetZero != 2 {
t.Fatal("expected 2 blocks with an offset at zero (one root and one leafh), got", bs.countAtOffsetZero)
}
if bs.countAtOffsetNonZero != 19 {
// note: the exact number will depend on the size and the sharding algo. used
t.Fatal("expected 19 blocks with an offset > 0, got", bs.countAtOffsetNonZero)
}
}

func TestAddWPosInfo(t *testing.T) {
testAddWPosInfo(t, false)
}

func TestAddWPosInfoAndRawLeafs(t *testing.T) {
testAddWPosInfo(t, true)
}


type testBlockstore struct {
blockstore.GCBlockstore
expectedPath string
t *testing.T
countAtOffsetZero int
countAtOffsetNonZero int
}

func (bs *testBlockstore) Put(block blocks.Block) error {
bs.CheckForPosInfo(block)
return bs.GCBlockstore.Put(block)
}

func (bs *testBlockstore) PutMany(blocks []blocks.Block) error {
for _, blk := range blocks {
bs.CheckForPosInfo(blk)
}
return bs.GCBlockstore.PutMany(blocks)
}

func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) error {
fsn, ok := block.(*pi.FilestoreNode)
if ok {
posInfo := fsn.PosInfo
if posInfo.FullPath != bs.expectedPath {
bs.t.Fatal("PosInfo does not have the expected path")
}
if posInfo.Offset == 0 {
bs.countAtOffsetZero += 1
} else {
bs.countAtOffsetNonZero += 1
}
}
return nil
}

type dummyFileInfo struct {
name string
size int64
modTime time.Time
}

func (fi *dummyFileInfo) Name() string { return fi.name }
func (fi *dummyFileInfo) Size() int64 { return fi.size }
func (fi *dummyFileInfo) Mode() os.FileMode { return 0 }
func (fi *dummyFileInfo) ModTime() time.Time { return fi.modTime }
func (fi *dummyFileInfo) IsDir() bool { return false }
func (fi *dummyFileInfo) Sys() interface{} { return nil }
12 changes: 9 additions & 3 deletions importer/balanced/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
)

func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
var offset uint64 = 0
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {

nroot := h.NewUnixfsNode()
db.SetPosInfo(nroot, 0)

// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
Expand All @@ -22,11 +24,13 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
}

// fill it up.
if err := fillNodeRec(db, nroot, level); err != nil {
if err := fillNodeRec(db, nroot, level, offset); err != nil {
return nil, err
}

offset = nroot.FileSize()
root = nroot

}
if root == nil {
root = h.NewUnixfsNode()
Expand All @@ -50,7 +54,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int, offset uint64) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}
Expand All @@ -69,15 +73,17 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := h.NewUnixfsNode()
db.SetPosInfo(child, offset)

err := fillNodeRec(db, child, depth-1)
err := fillNodeRec(db, child, depth-1, offset)
if err != nil {
return err
}

if err := node.AddChild(child, db); err != nil {
return err
}
offset += child.FileSize()
}

return nil
Expand Down
10 changes: 8 additions & 2 deletions importer/chunk/rabin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
var IpfsRabinPoly = chunker.Pol(17437180132763653)

type Rabin struct {
r *chunker.Chunker
r *chunker.Chunker
reader io.Reader
}

func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
Expand All @@ -25,7 +26,8 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max)

return &Rabin{
r: ch,
r: ch,
reader: r,
}
}

Expand All @@ -37,3 +39,7 @@ func (r *Rabin) NextBytes() ([]byte, error) {

return ch.Data, nil
}

func (r *Rabin) Reader() io.Reader {
return r.reader
}
5 changes: 5 additions & 0 deletions importer/chunk/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var log = logging.Logger("chunk")
var DefaultBlockSize int64 = 1024 * 256

type Splitter interface {
Reader() io.Reader
NextBytes() ([]byte, error)
}

Expand Down Expand Up @@ -77,3 +78,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {

return buf[:n], nil
}

func (ss *sizeSplitterv2) Reader() io.Reader {
return ss.r
}
44 changes: 37 additions & 7 deletions importer/helpers/dagbuilder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package helpers

import (
"io"
"os"

"github.com/ipfs/go-ipfs/commands/files"
"github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"

Expand All @@ -17,6 +21,8 @@ type DagBuilderHelper struct {
nextData []byte // the next item to return.
maxlinks int
batch *dag.Batch
fullPath string
stat os.FileInfo
}

type DagBuilderParams struct {
Expand All @@ -34,44 +40,58 @@ type DagBuilderParams struct {
// Generate a new DagBuilderHelper from the given params, which data source comes
// from chunks object
func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
return &DagBuilderHelper{
db := &DagBuilderHelper{
dserv: dbp.Dagserv,
spl: spl,
rawLeaves: dbp.RawLeaves,
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
}
if fi, ok := spl.Reader().(files.FileInfo); ok {
db.fullPath = fi.FullPath()
db.stat = fi.Stat()
}
return db
}

// prepareNext consumes the next item from the splitter and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
func (db *DagBuilderHelper) prepareNext() {
// if we already have data waiting to be consumed, we're ready
if db.nextData != nil {
if db.nextData != nil || db.recvdErr != nil {
return
}

// TODO: handle err (which wasn't handled either when the splitter was channeled)
db.nextData, _ = db.spl.NextBytes()
db.nextData, db.recvdErr = db.spl.NextBytes()
if db.recvdErr == io.EOF {
db.recvdErr = nil
}
}

// Done returns whether or not we're done consuming the incoming data.
func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
if db.recvdErr != nil {
return false
}
return db.nextData == nil
}

// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func (db *DagBuilderHelper) Next() []byte {
func (db *DagBuilderHelper) Next() ([]byte, error) {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
return d
if db.recvdErr != nil {
return nil, db.recvdErr
} else {
return d, nil
}
}

// GetDagServ returns the dagservice object this Helper is using
Expand Down Expand Up @@ -100,7 +120,11 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
}

func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
data := db.Next()
data, err := db.Next()
if err != nil {
return nil, err
}

if data == nil { // we're done!
return nil, nil
}
Expand All @@ -121,6 +145,12 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
}
}

func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) {
if db.stat != nil {
node.SetPosInfo(offset, db.fullPath, db.stat)
}
}

func (db *DagBuilderHelper) Add(node *UnixfsNode) (node.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
Expand Down
Loading

0 comments on commit 465044a

Please sign in to comment.