Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Piece IO #2

Merged
merged 5 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
filestore/_test/

# JetBrains
.idea

.filecoin-build
.update-modules
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "extern/filecoin-ffi"]
path = extern/filecoin-ffi
url = [email protected]:filecoin-project/filecoin-ffi.git
34 changes: 34 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
all: build
.PHONY: all

SUBMODULES=

FFI_PATH:=extern/filecoin-ffi/
FFI_DEPS:=libfilecoin.a filecoin.pc filecoin.h
FFI_DEPS:=$(addprefix $(FFI_PATH),$(FFI_DEPS))

$(FFI_DEPS): .filecoin-build ;

.filecoin-build: $(FFI_PATH)
$(MAKE) -C $(FFI_PATH) $(FFI_DEPS:$(FFI_PATH)%=%)
@touch $@

.update-modules:
git submodule update --init --recursive
@touch $@

pieceio: .update-modules .filecoin-build
go build ./pieceio
.PHONY: pieceio
SUBMODULES+=pieceio

filestore:
go build ./filestore
.PHONY: filestore
SUBMODULES+=filestore

build: $(SUBMODULES)

clean:
rm -f .filecoin-build
rm -f .update-modules
1 change: 1 addition & 0 deletions extern/filecoin-ffi
Submodule filecoin-ffi added at 2383ce
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/filecoin-project/go-fil-components
go 1.13

require (
github.com/filecoin-project/filecoin-ffi v0.0.0-20191210104338-2383ce072e95
github.com/filecoin-project/go-fil-filestore v0.0.0-20191202230242-40c6a5a2306c
github.com/gogo/protobuf v1.3.1 // indirect
github.com/google/go-cmp v0.3.1 // indirect
Expand All @@ -11,6 +12,7 @@ require (
github.com/ipfs/go-bitswap v0.1.8 // indirect
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1
github.com/ipfs/go-cid v0.0.4-0.20191112011718-79e75dffeb10
github.com/ipfs/go-datastore v0.1.0
github.com/ipfs/go-graphsync v0.0.4
Expand Down Expand Up @@ -40,6 +42,6 @@ require (
github.com/whyrusleeping/cbor-gen v0.0.0-20191116002219-891f55cd449d
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 // indirect
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
)

replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
85 changes: 27 additions & 58 deletions go.sum

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions pieceio/cario/cario.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cario

import (
"context"
"fmt"
"github.com/filecoin-project/go-fil-components/pieceio"
"github.com/ipfs/go-car"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal/selector"
"io"
)

type carIO struct {
}

func NewCarIO() pieceio.CarIO {
return &carIO{}
}

func (c carIO) WriteCar(ctx context.Context, bs pieceio.ReadStore, payloadCid cid.Cid, node ipld.Node, w io.Writer) error {
selector, err := selector.ParseSelector(node)
if err != nil {
return err
}
return car.WriteSelectiveCar(ctx, bs, []car.CarDag{{Root: payloadCid, Selector: selector}}, w)
}

func (c carIO) LoadCar(bs pieceio.WriteStore, r io.Reader) (cid.Cid, error) {
header, err := car.LoadCar(bs, r)
if err != nil {
return cid.Undef, err
}
l := len(header.Roots)
if l == 0 {
return cid.Undef, fmt.Errorf("invalid header: missing root")
}
if l > 1 {
return cid.Undef, fmt.Errorf("invalid header: contains %d roots (expecting 1)", l)
}
return header.Roots[0], nil
}
27 changes: 27 additions & 0 deletions pieceio/padreader/padreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package padreader

import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-fil-components/pieceio"
"math/bits"
)

type padReader struct {
}

func NewPadReader() pieceio.PadReader {
return &padReader{}
}

// Functions bellow copied from lotus/lib/padreader/padreader.go
func (p padReader) PaddedSize(size uint64) uint64 {
logv := 64 - bits.LeadingZeros64(size)

sectSize := uint64(1 << logv)
bound := ffi.GetMaxUserBytesPerStagedSector(sectSize)
if size <= bound {
return bound
}

return ffi.GetMaxUserBytesPerStagedSector(1 << (logv + 1))
}
84 changes: 84 additions & 0 deletions pieceio/pieceio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package pieceio

import (
"context"
"fmt"
"github.com/filecoin-project/go-fil-components/filestore"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"io"
"io/ioutil"
"os"
)

type SectorCalculator interface {
// GeneratePieceCommitment takes a PADDED io stream and a total size and generates a commP
GeneratePieceCommitment(piece io.Reader, pieceSize uint64) ([]byte, error)
}

type PadReader interface {
// PaddedSize returns the expected size of a piece after it's been padded
PaddedSize(size uint64) uint64
}

type CarIO interface {
// WriteCar writes a given payload to a CAR file and into the passed IO stream
WriteCar(ctx context.Context, bs ReadStore, payloadCid cid.Cid, selector ipld.Node, w io.Writer) error
// LoadCar loads blocks into the a store from a given CAR file
LoadCar(bs WriteStore, r io.Reader) (cid.Cid, error)
}

type pieceIO struct {
padReader PadReader
carIO CarIO
sectorCalculator SectorCalculator
tempDir filestore.Path
}

func NewPieceIO(padReader PadReader, carIO CarIO, sectorCalculator SectorCalculator, tempDir filestore.Path) PieceIO {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder -- should PieceIO just have access to a Filestore directly and be called a PieceStore? And then we don't have to worry about getting things in and out of the filestore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case I'd like add a function to create a temporary file through the filestore.

return &pieceIO{padReader, carIO, sectorCalculator, tempDir}
}

func (pio *pieceIO) GeneratePieceCommitment(bs ReadStore, payloadCid cid.Cid, selector ipld.Node) ([]byte, filestore.Path, error) {
f, err := ioutil.TempFile(string(pio.tempDir), "")
if err != nil {
return nil, "", err
}
err = pio.carIO.WriteCar(context.Background(), bs, payloadCid, selector, f)
if err != nil {
os.Remove(f.Name())
return nil, "", err
}
fi, err := f.Stat()
if err != nil {
os.Remove(f.Name())
return nil, "", err
}
pieceSize := uint64(fi.Size())
paddedSize := pio.padReader.PaddedSize(pieceSize)
remaining := paddedSize - pieceSize
padbuf := make([]byte, remaining)
padded, err := f.Write(padbuf)
if err != nil {
os.Remove(f.Name())
return nil, "", err
}
if uint64(padded) != remaining {
os.Remove(f.Name())
return nil, "", fmt.Errorf("wrote %d byte of padding while expecting %d to be written", padded, remaining)
}
f.Seek(0, io.SeekStart)
commitment, err := pio.sectorCalculator.GeneratePieceCommitment(f, paddedSize)
if err != nil {
os.Remove(f.Name())
return nil, "", err
}
return commitment, filestore.Path(f.Name()), nil
}

func (pio *pieceIO) WritePayload(bs ReadStore, payloadCid cid.Cid, selector ipld.Node, w io.Writer) ([]byte, error) {
return nil, pio.carIO.WriteCar(context.Background(), bs, payloadCid, selector, w)
ergastic marked this conversation as resolved.
Show resolved Hide resolved
}
func (pio *pieceIO) ReadPiece(r io.Reader, bs WriteStore) (cid.Cid, error) {
return pio.carIO.LoadCar(bs, r)
}
165 changes: 165 additions & 0 deletions pieceio/pieceio_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package pieceio_test

import (
"bytes"
"context"
"github.com/filecoin-project/go-fil-components/filestore"
"github.com/filecoin-project/go-fil-components/pieceio"
"github.com/filecoin-project/go-fil-components/pieceio/cario"
"github.com/filecoin-project/go-fil-components/pieceio/padreader"
"github.com/filecoin-project/go-fil-components/pieceio/sectorcalculator"
dag "github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"
"io"
"os"
"testing"
)


func Test_ThereAndBackAgain(t *testing.T) {
tempDir := filestore.Path("./tempDir")
sc := sectorcalculator.NewSectorCalculator(tempDir)
pr := padreader.NewPadReader()
cio := cario.NewCarIO()

pio := pieceio.NewPieceIO(pr, cio, sc, tempDir)

sourceBserv := dstest.Bserv()
sourceBs := sourceBserv.Blockstore()
dserv := dag.NewDAGService(sourceBserv)
a := dag.NewRawNode([]byte("aaaa"))
b := dag.NewRawNode([]byte("bbbb"))
c := dag.NewRawNode([]byte("cccc"))

nd1 := &dag.ProtoNode{}
nd1.AddNodeLink("cat", a)

nd2 := &dag.ProtoNode{}
nd2.AddNodeLink("first", nd1)
nd2.AddNodeLink("dog", b)

nd3 := &dag.ProtoNode{}
nd3.AddNodeLink("second", nd2)
nd3.AddNodeLink("bear", c)

ctx := context.Background()
dserv.Add(ctx, a)
dserv.Add(ctx, b)
dserv.Add(ctx, c)
dserv.Add(ctx, nd1)
dserv.Add(ctx, nd2)
dserv.Add(ctx, nd3)

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
node := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Links",
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

bytes, filename, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
require.NoError(t, err)
for _, b := range bytes {
require.NotEqual(t, 0, b)
}
f, err := os.Open(string(filename))
require.NoError(t, err)
info, err := os.Stat(string(filename))
require.NoError(t, err)
bufSize := int64(16) // small buffer to illustrate the logic
buf := make([]byte, bufSize)
var readErr error
padStart := int64(-1)
loops := int64(-1)
read := 0
skipped, err := f.Seek(info.Size()/2, io.SeekStart)
require.NoError(t, err)
for readErr == nil {
loops++
read, readErr = f.Read(buf)
for idx := int64(0); idx < int64(read); idx++ {
if buf[idx] == 0 {
if padStart == -1 {
padStart = skipped + loops * bufSize + idx
}
} else {
padStart = -1
}
}
}
f.Seek(0, io.SeekStart)
var reader io.Reader
if padStart != -1 {
reader = io.LimitReader(f, padStart)
} else {
reader = f
}

id, err := pio.ReadPiece(reader, sourceBs)
os.Remove(string(filename))
require.NoError(t, err)
require.Equal(t, nd3.Cid(), id)
}

func Test_StoreRestoreMemoryBuffer(t *testing.T) {
tempDir := filestore.Path("./tempDir")
sc := sectorcalculator.NewSectorCalculator(tempDir)
pr := padreader.NewPadReader()
cio := cario.NewCarIO()

pio := pieceio.NewPieceIO(pr, cio, sc, tempDir)

sourceBserv := dstest.Bserv()
sourceBs := sourceBserv.Blockstore()
dserv := dag.NewDAGService(sourceBserv)
a := dag.NewRawNode([]byte("aaaa"))
b := dag.NewRawNode([]byte("bbbb"))
c := dag.NewRawNode([]byte("cccc"))

nd1 := &dag.ProtoNode{}
nd1.AddNodeLink("cat", a)

nd2 := &dag.ProtoNode{}
nd2.AddNodeLink("first", nd1)
nd2.AddNodeLink("dog", b)

nd3 := &dag.ProtoNode{}
nd3.AddNodeLink("second", nd2)
nd3.AddNodeLink("bear", c)

ctx := context.Background()
dserv.Add(ctx, a)
dserv.Add(ctx, b)
dserv.Add(ctx, c)
dserv.Add(ctx, nd1)
dserv.Add(ctx, nd2)
dserv.Add(ctx, nd3)

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
node := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Links",
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

commitment, filename, err := pio.GeneratePieceCommitment(sourceBs, nd3.Cid(), node)
require.NoError(t, err)
for _, b := range commitment {
require.NotEqual(t, 0, b)
}
f, err := os.Open(string(filename))
require.NoError(t, err)
defer func () {
f.Close()
os.Remove(f.Name())
}()
info, err := os.Stat(string(filename))
buf := make([]byte, info.Size())
f.Read(buf)
buffer := bytes.NewBuffer(buf)
secondCommitment, err := sc.GeneratePieceCommitment(buffer, uint64(info.Size()))
require.NoError(t, err)
require.Equal(t, commitment, secondCommitment)
}
Loading