Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

feat(Directory): Add EnumLinksAsync method #39

Merged
merged 6 commits into from
Oct 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"context"
"fmt"
"os"
"sync"

bitfield "github.com/Stebalien/go-bitfield"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -400,21 +399,16 @@ func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func
// EnumLinks collects all links in the Shard.
func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) {
var links []*ipld.Link
var setlk sync.Mutex

getLinks := makeAsyncTrieGetLinks(ds.dserv, func(sv *Shard) error {
lnk := sv.val
lnk.Name = sv.key
setlk.Lock()
links = append(links, lnk)
setlk.Unlock()
return nil
})

cset := cid.NewSet()
linkResults := ds.EnumLinksAsync(ctx)

err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
return links, err
for linkResult := range linkResults {
if linkResult.Err != nil {
return links, linkResult.Err
}
links = append(links, linkResult.Link)
}
return links, nil
}

// ForEachLink walks the Shard and calls the given function.
Expand All @@ -427,10 +421,28 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro
})
}

// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
linkResults := make(chan format.LinkResult)
ctx, cancel := context.WithCancel(ctx)
go func() {
defer close(linkResults)
defer cancel()
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
cset := cid.NewSet()
err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
}
}()
return linkResults
}

// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync
// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called
// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation
func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard *Shard) error) dag.GetLinks {
func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks {

return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) {
node, err := dagService.Get(ctx, currentCid)
Expand Down Expand Up @@ -458,16 +470,31 @@ func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard *
if err != nil {
return nil, err
}
err = onShardValue(sv)
if err != nil {
return nil, err
}
formattedLink := sv.val
formattedLink.Name = sv.key
emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil})
}
}
return childShards, nil
}
}

func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) {
// make sure that context cancel is processed first
// the reason is due to the concurrency of EnumerateChildrenAsync
// it's possible for EnumLinksAsync to complete and close the linkResults
// channel before this code runs
select {
case <-ctx.Done():
return
default:
}
select {
case linkResults <- r:
case <-ctx.Done():
}
}

func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error {
for idx := range ds.children {
c, err := ds.getChild(ctx, idx)
Expand Down
89 changes: 67 additions & 22 deletions hamt/hamt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,7 @@ func assertLink(s *Shard, name string, found bool) error {
}
}

func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nd, err := s.Node()
if err != nil {
return err
}

nds, err := NewHamtFromDag(ds, nd)
if err != nil {
return err
}

linksA, err := s.EnumLinks(ctx)
if err != nil {
return err
}

linksB, err := nds.EnumLinks(ctx)
if err != nil {
return err
}
func assertLinksEqual(linksA []*ipld.Link, linksB []*ipld.Link) error {

if len(linksA) != len(linksB) {
return fmt.Errorf("links arrays are different sizes")
Expand All @@ -121,6 +100,32 @@ func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
return nil
}

func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nd, err := s.Node()
if err != nil {
return err
}

nds, err := NewHamtFromDag(ds, nd)
if err != nil {
return err
}

linksA, err := s.EnumLinks(ctx)
if err != nil {
return err
}

linksB, err := nds.EnumLinks(ctx)
if err != nil {
return err
}

return assertLinksEqual(linksA, linksB)
}

func TestBasicSet(t *testing.T) {
ds := mdtest.Mock()
for _, w := range []int{128, 256, 512, 1024, 2048, 4096} {
Expand Down Expand Up @@ -309,6 +314,46 @@ func TestSetAfterMarshal(t *testing.T) {
}
}

func TestEnumLinksAsync(t *testing.T) {
ds := mdtest.Mock()
_, s, err := makeDir(ds, 300)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()

nd, err := s.Node()
if err != nil {
t.Fatal(err)
}

nds, err := NewHamtFromDag(ds, nd)
if err != nil {
t.Fatal(err)
}

linksA, err := nds.EnumLinks(ctx)
if err != nil {
t.Fatal(err)
}

linkResults := nds.EnumLinksAsync(ctx)

var linksB []*ipld.Link

for linkResult := range linkResults {
if linkResult.Err != nil {
t.Fatal(linkResult.Err)
}
linksB = append(linksB, linkResult.Link)
}

err = assertLinksEqual(linksA, linksB)
if err != nil {
t.Fatal(err)
}
}

func TestDuplicateAddShard(t *testing.T) {
ds := mdtest.Mock()
dir, _ := NewShard(ds, 256)
Expand Down
31 changes: 31 additions & 0 deletions io/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

mdag "github.com/ipfs/go-merkledag"

format "github.com/ipfs/go-unixfs"
hamt "github.com/ipfs/go-unixfs/hamt"

Expand Down Expand Up @@ -38,6 +39,10 @@ type Directory interface {
// ForEachLink applies the given function to Links in the directory.
ForEachLink(context.Context, func(*ipld.Link) error) error

// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
EnumLinksAsync(context.Context) <-chan format.LinkResult

// Links returns the all the links in the directory node.
Links(context.Context) ([]*ipld.Link, error)

Expand Down Expand Up @@ -141,6 +146,26 @@ func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.No
return d.node.AddNodeLink(name, node)
}

// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
linkResults := make(chan format.LinkResult)
go func() {
defer close(linkResults)
for _, l := range d.node.Links() {
select {
case linkResults <- format.LinkResult{
Link: l,
Err: nil,
}:
case <-ctx.Done():
return
}
}
}()
return linkResults
}

// ForEachLink implements the `Directory` interface.
func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
for _, l := range d.node.Links() {
Expand Down Expand Up @@ -226,6 +251,12 @@ func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) erro
return d.shard.ForEachLink(ctx, f)
}

// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (d *HAMTDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
return d.shard.EnumLinksAsync(ctx)
}

// Links implements the `Directory` interface.
func (d *HAMTDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
return d.shard.EnumLinks(ctx)
Expand Down
26 changes: 26 additions & 0 deletions io/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"testing"

ipld "github.com/ipfs/go-ipld-format"
mdtest "github.com/ipfs/go-merkledag/test"

ft "github.com/ipfs/go-unixfs"
)

Expand Down Expand Up @@ -155,4 +157,28 @@ func TestDirBuilder(t *testing.T) {
if len(links) != count {
t.Fatal("wrong number of links", len(links), count)
}

linkResults := dir.EnumLinksAsync(ctx)

asyncNames := make(map[string]bool)
var asyncLinks []*ipld.Link

for linkResult := range linkResults {
if linkResult.Err != nil {
t.Fatal(linkResult.Err)
}
asyncNames[linkResult.Link.Name] = true
asyncLinks = append(asyncLinks, linkResult.Link)
}

for i := 0; i < count; i++ {
n := fmt.Sprintf("entry %d", i)
if !asyncNames[n] {
t.Fatal("COULDNT FIND: ", n)
}
}

if len(asyncLinks) != count {
t.Fatal("wrong number of links", len(asyncLinks), count)
}
}
9 changes: 9 additions & 0 deletions unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,18 @@ import (
proto "github.com/gogo/protobuf/proto"

dag "github.com/ipfs/go-merkledag"

ipld "github.com/ipfs/go-ipld-format"
pb "github.com/ipfs/go-unixfs/pb"
)

// A LinkResult for any parallel enumeration of links
// TODO: Should this live in go-ipld-format?
type LinkResult struct {
Link *ipld.Link
Err error
}

// Shorthands for protobuffer types
const (
TRaw = pb.Data_Raw
Expand Down