Skip to content

Commit

Permalink
Merge pull request #2 from filecoin-project/feat/pieceio
Browse files Browse the repository at this point in the history
Piece IO
  • Loading branch information
ergastic authored Dec 11, 2019
2 parents 21dd66b + c64a72d commit 02533f3
Show file tree
Hide file tree
Showing 13 changed files with 459 additions and 60 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
filestore/_test/

# JetBrains
.idea

.filecoin-build
.update-modules
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "extern/filecoin-ffi"]
path = extern/filecoin-ffi
url = [email protected]:filecoin-project/filecoin-ffi.git
34 changes: 34 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
all: build
.PHONY: all

SUBMODULES=

FFI_PATH:=extern/filecoin-ffi/
FFI_DEPS:=libfilecoin.a filecoin.pc filecoin.h
FFI_DEPS:=$(addprefix $(FFI_PATH),$(FFI_DEPS))

$(FFI_DEPS): .filecoin-build ;

.filecoin-build: $(FFI_PATH)
$(MAKE) -C $(FFI_PATH) $(FFI_DEPS:$(FFI_PATH)%=%)
@touch $@

.update-modules:
git submodule update --init --recursive
@touch $@

pieceio: .update-modules .filecoin-build
go build ./pieceio
.PHONY: pieceio
SUBMODULES+=pieceio

filestore:
go build ./filestore
.PHONY: filestore
SUBMODULES+=filestore

build: $(SUBMODULES)

clean:
rm -f .filecoin-build
rm -f .update-modules
1 change: 1 addition & 0 deletions extern/filecoin-ffi
Submodule filecoin-ffi added at 2383ce
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/filecoin-project/go-fil-components
go 1.13

require (
github.com/filecoin-project/filecoin-ffi v0.0.0-20191210104338-2383ce072e95
github.com/filecoin-project/go-fil-filestore v0.0.0-20191202230242-40c6a5a2306c
github.com/gogo/protobuf v1.3.1 // indirect
github.com/google/go-cmp v0.3.1 // indirect
Expand All @@ -11,6 +12,7 @@ require (
github.com/ipfs/go-bitswap v0.1.8 // indirect
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1
github.com/ipfs/go-cid v0.0.4-0.20191112011718-79e75dffeb10
github.com/ipfs/go-datastore v0.1.0
github.com/ipfs/go-graphsync v0.0.4
Expand Down Expand Up @@ -40,6 +42,6 @@ require (
github.com/whyrusleeping/cbor-gen v0.0.0-20191116002219-891f55cd449d
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 // indirect
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
)

replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
85 changes: 27 additions & 58 deletions go.sum

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions pieceio/cario/cario.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cario

import (
"context"
"fmt"
"github.com/filecoin-project/go-fil-components/pieceio"
"github.com/ipfs/go-car"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal/selector"
"io"
)

type carIO struct {
}

func NewCarIO() pieceio.CarIO {
return &carIO{}
}

func (c carIO) WriteCar(ctx context.Context, bs pieceio.ReadStore, payloadCid cid.Cid, node ipld.Node, w io.Writer) error {
selector, err := selector.ParseSelector(node)
if err != nil {
return err
}
return car.WriteSelectiveCar(ctx, bs, []car.CarDag{{Root: payloadCid, Selector: selector}}, w)
}

func (c carIO) LoadCar(bs pieceio.WriteStore, r io.Reader) (cid.Cid, error) {
header, err := car.LoadCar(bs, r)
if err != nil {
return cid.Undef, err
}
l := len(header.Roots)
if l == 0 {
return cid.Undef, fmt.Errorf("invalid header: missing root")
}
if l > 1 {
return cid.Undef, fmt.Errorf("invalid header: contains %d roots (expecting 1)", l)
}
return header.Roots[0], nil
}
27 changes: 27 additions & 0 deletions pieceio/padreader/padreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package padreader

import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-fil-components/pieceio"
"math/bits"
)

type padReader struct {
}

func NewPadReader() pieceio.PadReader {
return &padReader{}
}

// Functions bellow copied from lotus/lib/padreader/padreader.go
func (p padReader) PaddedSize(size uint64) uint64 {
logv := 64 - bits.LeadingZeros64(size)

sectSize := uint64(1 << logv)
bound := ffi.GetMaxUserBytesPerStagedSector(sectSize)
if size <= bound {
return bound
}

return ffi.GetMaxUserBytesPerStagedSector(1 << (logv + 1))
}
81 changes: 81 additions & 0 deletions pieceio/pieceio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package pieceio

import (
"context"
"fmt"
"github.com/filecoin-project/go-fil-components/filestore"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"io"
"io/ioutil"
"os"
)

type SectorCalculator interface {
// GeneratePieceCommitment takes a PADDED io stream and a total size and generates a commP
GeneratePieceCommitment(piece io.Reader, pieceSize uint64) ([]byte, error)
}

type PadReader interface {
// PaddedSize returns the expected size of a piece after it's been padded
PaddedSize(size uint64) uint64
}

type CarIO interface {
// WriteCar writes a given payload to a CAR file and into the passed IO stream
WriteCar(ctx context.Context, bs ReadStore, payloadCid cid.Cid, selector ipld.Node, w io.Writer) error
// LoadCar loads blocks into the a store from a given CAR file
LoadCar(bs WriteStore, r io.Reader) (cid.Cid, error)
}

type pieceIO struct {
padReader PadReader
carIO CarIO
sectorCalculator SectorCalculator
tempDir filestore.Path
}

func NewPieceIO(padReader PadReader, carIO CarIO, sectorCalculator SectorCalculator, tempDir filestore.Path) PieceIO {
return &pieceIO{padReader, carIO, sectorCalculator, tempDir}
}

func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, error) {
f, err := ioutil.TempFile(string(pio.tempDir), "")
if err != nil {
return nil, "", err
}
err = pio.carIO.WriteCar(context.Background(), bs, payloadCid, selector, f)
if err != nil {
os.Remove(f.Name())
return nil, "", err
}
fi, err := f.Stat()
if err != nil {
os.Remove(f.Name())
return nil, "", err
}
pieceSize := uint64(fi.Size())
paddedSize := pio.padReader.PaddedSize(pieceSize)
remaining := paddedSize - pieceSize
padbuf := make([]byte, remaining)
padded, err := f.Write(padbuf)
if err != nil {
os.Remove(f.Name())
return nil, "", err
}
if uint64(padded) != remaining {
os.Remove(f.Name())
return nil, "", fmt.Errorf("wrote %d byte of padding while expecting %d to be written", padded, remaining)
}
f.Seek(0, io.SeekStart)
commitment, err := pio.sectorCalculator.GeneratePieceCommitment(f, paddedSize)
if err != nil {
os.Remove(f.Name())
return nil, "", err
}
return commitment, filestore.Path(f.Name()), nil
}

func (pio *pieceIO) ReadPiece(r io.Reader, bs WriteStore) (cid.Cid, error) {
return pio.carIO.LoadCar(bs, r)
}
165 changes: 165 additions & 0 deletions pieceio/pieceio_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package pieceio_test

import (
"bytes"
"context"
"github.com/filecoin-project/go-fil-components/filestore"
"github.com/filecoin-project/go-fil-components/pieceio"
"github.com/filecoin-project/go-fil-components/pieceio/cario"
"github.com/filecoin-project/go-fil-components/pieceio/padreader"
"github.com/filecoin-project/go-fil-components/pieceio/sectorcalculator"
dag "github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"
"io"
"os"
"testing"
)


func Test_ThereAndBackAgain(t *testing.T) {
tempDir := filestore.Path("./tempDir")
sc := sectorcalculator.NewSectorCalculator(tempDir)
pr := padreader.NewPadReader()
cio := cario.NewCarIO()

pio := pieceio.NewPieceIO(pr, cio, sc, tempDir)

sourceBserv := dstest.Bserv()
sourceBs := sourceBserv.Blockstore()
dserv := dag.NewDAGService(sourceBserv)
a := dag.NewRawNode([]byte("aaaa"))
b := dag.NewRawNode([]byte("bbbb"))
c := dag.NewRawNode([]byte("cccc"))

nd1 := &dag.ProtoNode{}
nd1.AddNodeLink("cat", a)

nd2 := &dag.ProtoNode{}
nd2.AddNodeLink("first", nd1)
nd2.AddNodeLink("dog", b)

nd3 := &dag.ProtoNode{}
nd3.AddNodeLink("second", nd2)
nd3.AddNodeLink("bear", c)

ctx := context.Background()
dserv.Add(ctx, a)
dserv.Add(ctx, b)
dserv.Add(ctx, c)
dserv.Add(ctx, nd1)
dserv.Add(ctx, nd2)
dserv.Add(ctx, nd3)

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
node := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Links",
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

bytes, filename, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
require.NoError(t, err)
for _, b := range bytes {
require.NotEqual(t, 0, b)
}
f, err := os.Open(string(filename))
require.NoError(t, err)
info, err := os.Stat(string(filename))
require.NoError(t, err)
bufSize := int64(16) // small buffer to illustrate the logic
buf := make([]byte, bufSize)
var readErr error
padStart := int64(-1)
loops := int64(-1)
read := 0
skipped, err := f.Seek(info.Size()/2, io.SeekStart)
require.NoError(t, err)
for readErr == nil {
loops++
read, readErr = f.Read(buf)
for idx := int64(0); idx < int64(read); idx++ {
if buf[idx] == 0 {
if padStart == -1 {
padStart = skipped + loops * bufSize + idx
}
} else {
padStart = -1
}
}
}
f.Seek(0, io.SeekStart)
var reader io.Reader
if padStart != -1 {
reader = io.LimitReader(f, padStart)
} else {
reader = f
}

id, err := pio.ReadPiece(reader, sourceBs)
os.Remove(string(filename))
require.NoError(t, err)
require.Equal(t, nd3.Cid(), id)
}

func Test_StoreRestoreMemoryBuffer(t *testing.T) {
tempDir := filestore.Path("./tempDir")
sc := sectorcalculator.NewSectorCalculator(tempDir)
pr := padreader.NewPadReader()
cio := cario.NewCarIO()

pio := pieceio.NewPieceIO(pr, cio, sc, tempDir)

sourceBserv := dstest.Bserv()
sourceBs := sourceBserv.Blockstore()
dserv := dag.NewDAGService(sourceBserv)
a := dag.NewRawNode([]byte("aaaa"))
b := dag.NewRawNode([]byte("bbbb"))
c := dag.NewRawNode([]byte("cccc"))

nd1 := &dag.ProtoNode{}
nd1.AddNodeLink("cat", a)

nd2 := &dag.ProtoNode{}
nd2.AddNodeLink("first", nd1)
nd2.AddNodeLink("dog", b)

nd3 := &dag.ProtoNode{}
nd3.AddNodeLink("second", nd2)
nd3.AddNodeLink("bear", c)

ctx := context.Background()
dserv.Add(ctx, a)
dserv.Add(ctx, b)
dserv.Add(ctx, c)
dserv.Add(ctx, nd1)
dserv.Add(ctx, nd2)
dserv.Add(ctx, nd3)

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
node := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Links",
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

commitment, filename, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
require.NoError(t, err)
for _, b := range commitment {
require.NotEqual(t, 0, b)
}
f, err := os.Open(string(filename))
require.NoError(t, err)
defer func () {
f.Close()
os.Remove(f.Name())
}()
info, err := os.Stat(string(filename))
buf := make([]byte, info.Size())
f.Read(buf)
buffer := bytes.NewBuffer(buf)
secondCommitment, err := sc.GeneratePieceCommitment(buffer, uint64(info.Size()))
require.NoError(t, err)
require.Equal(t, commitment, secondCommitment)
}
Loading

0 comments on commit 02533f3

Please sign in to comment.