Skip to content

Commit

Permalink
feat(car): allow block hooks when using two step write
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Nov 17, 2020
1 parent c2f1ff2 commit 7f9fecd
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
11 changes: 10 additions & 1 deletion car_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func TestRoundtripSelective(t *testing.T) {
// write car in one step
buf := new(bytes.Buffer)
blockCount := 0
var oneStepBlocks []Block
err := sc.Write(buf, func(block Block) error {
oneStepBlocks = append(oneStepBlocks, block)
blockCount++
return nil
})
Expand All @@ -128,7 +130,11 @@ func TestRoundtripSelective(t *testing.T) {
sc2 := NewSelectiveCar(context.Background(), sourceBs, []Dag{Dag{Root: nd3.Cid(), Selector: selector}})

// write car in two steps
scp, err := sc2.Prepare()
var twoStepBlocks []Block
scp, err := sc2.Prepare(func(block Block) error {
twoStepBlocks = append(twoStepBlocks, block)
return nil
})
require.NoError(t, err)
buf2 := new(bytes.Buffer)
err = scp.Dump(buf2)
Expand All @@ -141,6 +147,9 @@ func TestRoundtripSelective(t *testing.T) {
// verify equal data written by both methods
require.Equal(t, buf.Bytes(), buf2.Bytes())

// verify equal blocks were passed to user block hook funcs
require.Equal(t, oneStepBlocks, twoStepBlocks)

// readout car and verify contents
bserv := dstest.Bserv()
ch, err := LoadCar(bserv.Blockstore(), buf)
Expand Down
28 changes: 23 additions & 5 deletions selectivecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type OnNewCarBlockFunc func(Block) error
// the Car file like size and number of blocks that go into it
type SelectiveCarPrepared struct {
SelectiveCar
size uint64
header CarHeader
cids []cid.Cid
size uint64
header CarHeader
cids []cid.Cid
userOnNewCarBlocks []OnNewCarBlockFunc
}

// NewSelectiveCar creates a new SelectiveCar for the given car file based
Expand All @@ -72,7 +73,7 @@ func (sc SelectiveCar) traverse(onCarHeader OnCarHeaderFunc, onNewCarBlock OnNew

// Prepare traverse a car file and collects data on what is about to be written, but
// does not actually write the file
func (sc SelectiveCar) Prepare() (SelectiveCarPrepared, error) {
func (sc SelectiveCar) Prepare(userOnNewCarBlocks ...OnNewCarBlockFunc) (SelectiveCarPrepared, error) {
var header CarHeader
var cids []cid.Cid

Expand All @@ -88,7 +89,7 @@ func (sc SelectiveCar) Prepare() (SelectiveCarPrepared, error) {
if err != nil {
return SelectiveCarPrepared{}, err
}
return SelectiveCarPrepared{sc, size, header, cids}, nil
return SelectiveCarPrepared{sc, size, header, cids, userOnNewCarBlocks}, nil
}

func (sc SelectiveCar) Write(w io.Writer, userOnNewCarBlocks ...OnNewCarBlockFunc) error {
Expand Down Expand Up @@ -133,6 +134,10 @@ func (sc SelectiveCarPrepared) Cids() []cid.Cid {
// Dump writes the car file as quickly as possible based on information already
// collected
func (sc SelectiveCarPrepared) Dump(w io.Writer) error {
offset, err := HeaderSize(&sc.header)
if err != nil {
return fmt.Errorf("failed to size car header: %s", err)
}
if err := WriteHeader(&sc.header, w); err != nil {
return fmt.Errorf("failed to write car header: %s", err)
}
Expand All @@ -142,10 +147,23 @@ func (sc SelectiveCarPrepared) Dump(w io.Writer) error {
return err
}
raw := blk.RawData()
size := util.LdSize(c.Bytes(), raw)
err = util.LdWrite(w, c.Bytes(), raw)
if err != nil {
return err
}
for _, userOnNewCarBlock := range sc.userOnNewCarBlocks {
err := userOnNewCarBlock(Block{
BlockCID: c,
Data: raw,
Offset: offset,
Size: size,
})
if err != nil {
return err
}
}
offset += size
}
return nil
}
Expand Down

0 comments on commit 7f9fecd

Please sign in to comment.