diff --git a/go.mod b/go.mod index cdfbc3dbc..d981b36f7 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,18 @@ module github.com/ipfs/go-unixfs require ( + github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a github.com/gogo/protobuf v1.3.2 github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect github.com/ipfs/go-bitfield v1.0.0 github.com/ipfs/go-bitswap v0.1.2 // indirect + github.com/ipfs/go-block-format v0.0.2 + github.com/ipfs/go-blockservice v0.1.0 github.com/ipfs/go-cid v0.0.7 + github.com/ipfs/go-datastore v0.0.5 + github.com/ipfs/go-ipfs-blockstore v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.1 + github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.3 github.com/ipfs/go-ipfs-posinfo v0.0.1 github.com/ipfs/go-ipfs-util v0.0.1 @@ -18,7 +24,9 @@ require ( github.com/smartystreets/assertions v1.0.0 // indirect github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect github.com/spaolacci/murmur3 v1.1.0 + github.com/stretchr/testify v1.7.0 github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 // indirect + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 ) go 1.16 diff --git a/go.sum b/go.sum index 9c5bd5070..ca32d1143 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a h1:E/8AP5dFtMhl5KPJz66Kt9G0n+7Sn41Fy1wv9/jHOrc= +github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50 h1:4i3KsuVA0o0KoBxAC5x+MY7RbteiMK1V7gf/G08NGIQ= @@ -273,8 +275,9 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 h1:8kxMKmKzXXL4Ru1nyhvdms/JjWt+3YLpvRb/bAjO/y0= @@ -332,6 +335,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -366,6 +370,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1N golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= @@ -373,5 +378,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/hamt/hamt.go b/hamt/hamt.go index 55b798ce4..ac1c5e458 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -24,12 +24,17 @@ import ( "context" "fmt" "os" + "sync" + + "golang.org/x/sync/errgroup" + + format "github.com/ipfs/go-unixfs" + "github.com/ipfs/go-unixfs/internal" bitfield "github.com/ipfs/go-bitfield" cid "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" dag "github.com/ipfs/go-merkledag" - format "github.com/ipfs/go-unixfs" ) const ( @@ -37,27 +42,41 @@ const ( HashMurmur3 uint64 = 0x22 ) +func init() { + internal.HAMTHashFunction = murmur3Hash +} + func (ds *Shard) isValueNode() bool { return ds.key != "" && ds.val != nil } // A Shard represents the HAMT. It should be initialized with NewShard(). type Shard struct { - cid cid.Cid - childer *childer - tableSize int + // Entries per node (number of possible childs indexed by the partial key). + tableSize int + // Bits needed to encode child indexes (log2 of number of entries). This is + // the number of bits taken from the hash key on each level of the tree. tableSizeLg2 int builder cid.Builder hashFunc uint64 + // String format with number of zeros that will be present in the hexadecimal + // encoding of the child index to always reach the fixed maxpadlen chars. + // Example: maxpadlen = 4 => prefixPadStr: "%04X" (print number in hexadecimal + // format padding with zeros to always reach 4 characters). prefixPadStr string - maxpadlen int + // Length in chars of string that encodes child indexes. We encode indexes + // as hexadecimal strings to this is log4 of number of entries. + maxpadlen int dserv ipld.DAGService + // FIXME: Remove. We don't actually store "value nodes". This confusing + // abstraction just removes the maxpadlen from the link names to extract + // the actual value link the trie is storing. // leaf node key string val *ipld.Link @@ -70,12 +89,13 @@ func NewShard(dserv ipld.DAGService, size int) (*Shard, error) { return nil, err } + // FIXME: Make this at least a static configuration for testing. ds.hashFunc = HashMurmur3 return ds, nil } func makeShard(ds ipld.DAGService, size int) (*Shard, error) { - lg2s, err := logtwo(size) + lg2s, err := Logtwo(size) if err != nil { return nil, err } @@ -123,7 +143,6 @@ func NewHamtFromDag(dserv ipld.DAGService, nd ipld.Node) (*Shard, error) { ds.childer.makeChilder(fsn.Data(), pbnd.Links()) - ds.cid = pbnd.Cid() ds.hashFunc = fsn.HashType() ds.builder = pbnd.CidBuilder() @@ -206,31 +225,49 @@ func (ds *Shard) makeShardValue(lnk *ipld.Link) (*Shard, error) { // Set sets 'name' = nd in the HAMT func (ds *Shard) Set(ctx context.Context, name string, nd ipld.Node) error { - hv := &hashBits{b: hash([]byte(name))} - err := ds.dserv.Add(ctx, nd) + _, err := ds.Swap(ctx, name, nd) + return err +} + +// Swap sets a link pointing to the passed node as the value under the +// name key in this Shard or its children. It also returns the previous link +// under that name key (if any). +func (ds *Shard) Swap(ctx context.Context, name string, node ipld.Node) (*ipld.Link, error) { + hv := newHashBits(name) + err := ds.dserv.Add(ctx, node) if err != nil { - return err + return nil, err } - lnk, err := ipld.MakeLink(nd) + lnk, err := ipld.MakeLink(node) if err != nil { - return err + return nil, err } + + // FIXME: We don't need to set the name here, it will get overwritten. + // This is confusing, confirm and remove this line. lnk.Name = ds.linkNamePrefix(0) + name - return ds.modifyValue(ctx, hv, name, lnk) + return ds.swapValue(ctx, hv, name, lnk) } // Remove deletes the named entry if it exists. Otherwise, it returns // os.ErrNotExist. func (ds *Shard) Remove(ctx context.Context, name string) error { - hv := &hashBits{b: hash([]byte(name))} - return ds.modifyValue(ctx, hv, name, nil) + _, err := ds.Take(ctx, name) + return err +} + +// Take is similar to the public Remove but also returns the +// old removed link (if it exists). +func (ds *Shard) Take(ctx context.Context, name string) (*ipld.Link, error) { + hv := newHashBits(name) + return ds.swapValue(ctx, hv, name, nil) } // Find searches for a child node by 'name' within this hamt func (ds *Shard) Find(ctx context.Context, name string) (*ipld.Link, error) { - hv := &hashBits{b: hash([]byte(name))} + hv := newHashBits(name) var out *ipld.Link err := ds.getValue(ctx, hv, name, func(sv *Shard) error { @@ -338,9 +375,11 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { go func() { defer close(linkResults) defer cancel() - getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults) - cset := cid.NewSet() - err := dag.Walk(ctx, getLinks, ds.cid, cset.Visit, dag.Concurrent()) + + err := parallelShardWalk(ctx, ds, ds.dserv, func(formattedLink *ipld.Link) error { + emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil}) + return nil + }) if err != nil { emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) } @@ -348,44 +387,178 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { return linkResults } -// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync -// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called -// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation -func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks { - - return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) { - node, err := dagService.Get(ctx, currentCid) - if err != nil { - return nil, err - } - directoryShard, err := NewHamtFromDag(dagService, node) - if err != nil { - return nil, err - } +type listCidsAndShards struct { + cids []cid.Cid + shards []*Shard +} - childShards := make([]*ipld.Link, 0, directoryShard.childer.length()) - links := directoryShard.childer.links - for idx := range directoryShard.childer.children { - lnk := links[idx] - lnkLinkType, err := directoryShard.childLinkType(lnk) +func (ds *Shard) walkChildren(processLinkValues func(formattedLink *ipld.Link) error) (*listCidsAndShards, error) { + res := &listCidsAndShards{} + for idx, lnk := range ds.childer.links { + if nextShard := ds.childer.children[idx]; nextShard == nil { + lnkLinkType, err := ds.childLinkType(lnk) if err != nil { return nil, err } - if lnkLinkType == shardLink { - childShards = append(childShards, lnk) - } else { - sv, err := directoryShard.makeShardValue(lnk) + + switch lnkLinkType { + case shardValueLink: + sv, err := ds.makeShardValue(lnk) if err != nil { return nil, err } formattedLink := sv.val formattedLink.Name = sv.key - emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil}) + + if err := processLinkValues(formattedLink); err != nil { + return nil, err + } + case shardLink: + res.cids = append(res.cids, lnk.Cid) + default: + return nil, fmt.Errorf("unsupported shard link type") + } + + } else { + if nextShard.val != nil { + formattedLink := &ipld.Link{ + Name: nextShard.key, + Size: nextShard.val.Size, + Cid: nextShard.val.Cid, + } + if err := processLinkValues(formattedLink); err != nil { + return nil, err + } + } else { + res.shards = append(res.shards, nextShard) } } - return childShards, nil } + return res, nil +} + +// parallelShardWalk is quite similar to the DAG walking algorithm from https://github.com/ipfs/go-merkledag/blob/594e515f162e764183243b72c2ba84f743424c8c/merkledag.go#L464 +// However, there are a few notable differences: +// 1. Some children are actualized Shard structs and some are in the blockstore, this will leverage walking over the in memory Shards as well as the stored blocks +// 2. Instead of just passing each child into the worker pool by itself we group them so that we can leverage optimizations from GetMany. +// This optimization also makes the walk a little more biased towards depth (as opposed to BFS) in the earlier part of the DAG. +// This is particularly helpful for operations like estimating the directory size which should complete quickly when possible. +// 3. None of the extra options from that package are needed +func parallelShardWalk(ctx context.Context, root *Shard, dserv ipld.DAGService, processShardValues func(formattedLink *ipld.Link) error) error { + const concurrency = 32 + + var visitlk sync.Mutex + visitSet := cid.NewSet() + visit := visitSet.Visit + + // Setup synchronization + grp, errGrpCtx := errgroup.WithContext(ctx) + + // Input and output queues for workers. + feed := make(chan *listCidsAndShards) + out := make(chan *listCidsAndShards) + done := make(chan struct{}) + + for i := 0; i < concurrency; i++ { + grp.Go(func() error { + for feedChildren := range feed { + for _, nextShard := range feedChildren.shards { + nextChildren, err := nextShard.walkChildren(processShardValues) + if err != nil { + return err + } + + select { + case out <- nextChildren: + case <-errGrpCtx.Done(): + return nil + } + } + + var linksToVisit []cid.Cid + for _, nextCid := range feedChildren.cids { + var shouldVisit bool + + visitlk.Lock() + shouldVisit = visit(nextCid) + visitlk.Unlock() + + if shouldVisit { + linksToVisit = append(linksToVisit, nextCid) + } + } + + chNodes := dserv.GetMany(errGrpCtx, linksToVisit) + for optNode := range chNodes { + if optNode.Err != nil { + return optNode.Err + } + + nextShard, err := NewHamtFromDag(dserv, optNode.Node) + if err != nil { + return err + } + + nextChildren, err := nextShard.walkChildren(processShardValues) + if err != nil { + return err + } + + select { + case out <- nextChildren: + case <-errGrpCtx.Done(): + return nil + } + } + + select { + case done <- struct{}{}: + case <-errGrpCtx.Done(): + } + } + return nil + }) + } + + send := feed + var todoQueue []*listCidsAndShards + var inProgress int + + next := &listCidsAndShards{ + shards: []*Shard{root}, + } + +dispatcherLoop: + for { + select { + case send <- next: + inProgress++ + if len(todoQueue) > 0 { + next = todoQueue[0] + todoQueue = todoQueue[1:] + } else { + next = nil + send = nil + } + case <-done: + inProgress-- + if inProgress == 0 && next == nil { + break dispatcherLoop + } + case nextNodes := <-out: + if next == nil { + next = nextNodes + send = feed + } else { + todoQueue = append(todoQueue, nextNodes) + } + case <-errGrpCtx.Done(): + break dispatcherLoop + } + } + close(feed) + return grp.Wait() } func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) { @@ -419,75 +592,95 @@ func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error { }) } -func (ds *Shard) modifyValue(ctx context.Context, hv *hashBits, key string, val *ipld.Link) error { +// swapValue sets the link `value` in the given key, either creating the entry +// if it didn't exist or overwriting the old one. It returns the old entry (if any). +func (ds *Shard) swapValue(ctx context.Context, hv *hashBits, key string, value *ipld.Link) (*ipld.Link, error) { idx, err := hv.Next(ds.tableSizeLg2) if err != nil { - return err + return nil, err } if !ds.childer.has(idx) { - return ds.childer.insert(key, val, idx) + // Entry does not exist, create a new one. + return nil, ds.childer.insert(key, value, idx) } i := ds.childer.sliceIndex(idx) - child, err := ds.childer.get(ctx, i) if err != nil { - return err + return nil, err } if child.isValueNode() { + // Leaf node. This is the base case of this recursive function. if child.key == key { - // value modification - if val == nil { - return ds.childer.rm(idx) + // We are in the correct shard (tree level) so we modify this child + // and return. + oldValue := child.val + + if value == nil { // Remove old entry. + return oldValue, ds.childer.rm(idx) } - child.val = val - return nil + child.val = value // Overwrite entry. + return oldValue, nil } - if val == nil { - return os.ErrNotExist + if value == nil { + return nil, os.ErrNotExist } - // replace value with another shard, one level deeper - ns, err := NewShard(ds.dserv, ds.tableSize) + // We are in the same slot with another entry with a different key + // so we need to fork this leaf node into a shard with two childs: + // the old entry and the new one being inserted here. + // We don't overwrite anything here so we keep: + // `oldValue = nil` + + // The child of this shard will now be a new shard. The old child value + // will be a child of this new shard (along with the new value being + // inserted). + grandChild := child + child, err = NewShard(ds.dserv, ds.tableSize) if err != nil { - return err - } - ns.builder = ds.builder - chhv := &hashBits{ - b: hash([]byte(child.key)), - consumed: hv.consumed, + return nil, err } - - err = ns.modifyValue(ctx, hv, key, val) + child.builder = ds.builder + chhv := newConsumedHashBits(grandChild.key, hv.consumed) + + // We explicitly ignore the oldValue returned by the next two insertions + // (which will be nil) to highlight there is no overwrite here: they are + // done with different keys to a new (empty) shard. (At best this shard + // will create new ones until we find different slots for both.) + _, err = child.swapValue(ctx, hv, key, value) if err != nil { - return err + return nil, err } - - err = ns.modifyValue(ctx, chhv, child.key, child.val) + _, err = child.swapValue(ctx, chhv, grandChild.key, grandChild.val) if err != nil { - return err + return nil, err } - ds.childer.set(ns, i) - return nil + // Replace this leaf node with the new Shard node. + ds.childer.set(child, i) + return nil, nil } else { - err := child.modifyValue(ctx, hv, key, val) + // We are in a Shard (internal node). We will recursively call this + // function until finding the leaf (the logic of the `if` case above). + oldValue, err := child.swapValue(ctx, hv, key, value) if err != nil { - return err + return nil, err } - if val == nil { + if value == nil { + // We have removed an entry, check if we should remove shards + // as well. switch child.childer.length() { case 0: // empty sub-shard, prune it // Note: this shouldnt normally ever happen // in the event of another implementation creates flawed // structures, this will help to normalize them. - return ds.childer.rm(idx) + return oldValue, ds.childer.rm(idx) case 1: // The single child _should_ be a value by // induction. However, we allow for it to be a @@ -499,24 +692,25 @@ func (ds *Shard) modifyValue(ctx context.Context, hv *hashBits, key string, val if schild.isValueNode() { ds.childer.set(schild, i) } - return nil + return oldValue, nil } // Otherwise, work with the link. slnk := child.childer.link(0) - lnkType, err := child.childer.sd.childLinkType(slnk) + var lnkType linkType + lnkType, err = child.childer.sd.childLinkType(slnk) if err != nil { - return err + return nil, err } if lnkType == shardValueLink { // sub-shard with a single value element, collapse it ds.childer.setLink(slnk, i) } - return nil + return oldValue, nil } } - return nil + return oldValue, nil } } diff --git a/hamt/util.go b/hamt/util.go index 7ae02dfb3..29f59435e 100644 --- a/hamt/util.go +++ b/hamt/util.go @@ -2,9 +2,11 @@ package hamt import ( "fmt" + "math/bits" + + "github.com/ipfs/go-unixfs/internal" "github.com/spaolacci/murmur3" - "math/bits" ) // hashBits is a helper that allows the reading of the 'next n bits' as an integer. @@ -13,6 +15,16 @@ type hashBits struct { consumed int } +func newHashBits(val string) *hashBits { + return &hashBits{b: internal.HAMTHashFunction([]byte(val))} +} + +func newConsumedHashBits(val string, consumed int) *hashBits { + hv := &hashBits{b: internal.HAMTHashFunction([]byte(val))} + hv.consumed = consumed + return hv +} + func mkmask(n int) byte { return (1 << uint(n)) - 1 } @@ -50,7 +62,7 @@ func (hb *hashBits) next(i int) int { } } -func logtwo(v int) (int, error) { +func Logtwo(v int) (int, error) { if v <= 0 { return 0, fmt.Errorf("hamt size should be a power of two") } @@ -61,7 +73,7 @@ func logtwo(v int) (int, error) { return lg2, nil } -func hash(val []byte) []byte { +func murmur3Hash(val []byte) []byte { h := murmur3.New64() h.Write(val) return h.Sum(nil) diff --git a/internal/config.go b/internal/config.go new file mode 100644 index 000000000..9250ae2ae --- /dev/null +++ b/internal/config.go @@ -0,0 +1,3 @@ +package internal + +var HAMTHashFunction func(val []byte) []byte diff --git a/io/completehamt_test.go b/io/completehamt_test.go new file mode 100644 index 000000000..2af652e32 --- /dev/null +++ b/io/completehamt_test.go @@ -0,0 +1,97 @@ +package io + +import ( + "context" + "encoding/binary" + "fmt" + "github.com/ipfs/go-unixfs/internal" + "math" + "testing" + + mdtest "github.com/ipfs/go-merkledag/test" + "github.com/stretchr/testify/assert" + + "github.com/ipfs/go-unixfs" + "github.com/ipfs/go-unixfs/hamt" + + ipld "github.com/ipfs/go-ipld-format" +) + +// CreateCompleteHAMT creates a HAMT the following properties: +// * its height (distance/edges from root to deepest node) is specified by treeHeight. +// * all leaf Shard nodes have the same depth (and have only 'value' links). +// * all internal Shard nodes point only to other Shards (and hence have zero 'value' links). +// * the total number of 'value' links (directory entries) is: +// childsPerNode ^ (treeHeight). +// treeHeight: The number of layers of non-value HAMT nodes (e.g. height = 1 is a single shard pointing to some values) +// FIXME: HAMTHashFunction needs to be set to idHash by the caller. We depend on +// this simplification for the current logic to work. +func CreateCompleteHAMT(ds ipld.DAGService, treeHeight int, childsPerNode int) (ipld.Node, error) { + if treeHeight < 1 { + panic("treeHeight < 1") + } + if treeHeight > 8 { + panic("treeHeight > 8: we don't allow a key larger than what can be encoded in a 64-bit word") + } + + rootShard, err := hamt.NewShard(ds, childsPerNode) + if err != nil { + return nil, err + } + + // Assuming we are using the ID hash function we can just insert all + // the combinations of a byte slice that will reach the desired height. + totalChildren := int(math.Pow(float64(childsPerNode), float64(treeHeight))) + log2ofChilds, err := hamt.Logtwo(childsPerNode) + if err != nil { + return nil, err + } + if log2ofChilds*treeHeight%8 != 0 { + return nil, fmt.Errorf("childsPerNode * treeHeight should be multiple of 8") + } + bytesInKey := log2ofChilds * treeHeight / 8 + for i := 0; i < totalChildren; i++ { + var hashbuf [8]byte + binary.LittleEndian.PutUint64(hashbuf[:], uint64(i)) + var oldLink *ipld.Link + oldLink, err = rootShard.Swap(context.Background(), string(hashbuf[:bytesInKey]), unixfs.EmptyFileNode()) + if err != nil { + return nil, err + } + if oldLink != nil { + // We shouldn't be overwriting any value, otherwise the tree + // won't be complete. + return nil, fmt.Errorf("we have overwritten entry %s", + oldLink.Cid) + } + } + + return rootShard.Node() +} + +// Return the same value as the hash. +func idHash(val []byte) []byte { + return val +} + +// FIXME: This is not checking the exact height of the tree but just making +// sure there are as many children as we would have with a complete HAMT. +func TestCreateCompleteShard(t *testing.T) { + oldHashFunc := internal.HAMTHashFunction + defer func() { internal.HAMTHashFunction = oldHashFunc }() + internal.HAMTHashFunction = idHash + + ds := mdtest.Mock() + childsPerNode := 16 + treeHeight := 2 + node, err := CreateCompleteHAMT(ds, treeHeight, childsPerNode) + assert.NoError(t, err) + + shard, err := hamt.NewHamtFromDag(ds, node) + assert.NoError(t, err) + links, err := shard.EnumLinks(context.Background()) + assert.NoError(t, err) + + childNodes := int(math.Pow(float64(childsPerNode), float64(treeHeight))) + assert.Equal(t, childNodes, len(links)) +} diff --git a/io/directory.go b/io/directory.go index 15c7e862a..2ec862247 100644 --- a/io/directory.go +++ b/io/directory.go @@ -5,14 +5,15 @@ import ( "fmt" "os" - mdag "github.com/ipfs/go-merkledag" - - format "github.com/ipfs/go-unixfs" "github.com/ipfs/go-unixfs/hamt" + "github.com/ipfs/go-unixfs/private/linksize" + "github.com/alecthomas/units" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" + mdag "github.com/ipfs/go-merkledag" + format "github.com/ipfs/go-unixfs" ) var log = logging.Logger("unixfs") @@ -23,9 +24,10 @@ var log = logging.Logger("unixfs") // The size is not the *exact* block size of the encoded BasicDirectory but just // the estimated size based byte length of links name and CID (BasicDirectory's // ProtoNode doesn't use the Data field so this estimate is pretty accurate). -var HAMTShardingSize = 0 +var HAMTShardingSize = int(256 * units.KiB) // DefaultShardWidth is the default value used for hamt sharding width. +// Needs to be a power of two (shard entry size) and multiple of 8 (bitfield size). var DefaultShardWidth = 256 // Directory defines a UnixFS directory. It is used for creating, reading and @@ -74,6 +76,16 @@ type Directory interface { // TODO: Evaluate removing `dserv` from this layer and providing it in MFS. // (The functions should in that case add a `DAGService` argument.) +// Link size estimation function. For production it's usually the one here +// but during test we may mock it to get fixed sizes. +func productionLinkSize(linkName string, linkCid cid.Cid) int { + return len(linkName) + linkCid.ByteLen() +} + +func init() { + linksize.LinkSizeFunction = productionLinkSize +} + // BasicDirectory is the basic implementation of `Directory`. All the entries // are stored in a single node. type BasicDirectory struct { @@ -93,6 +105,10 @@ type BasicDirectory struct { type HAMTDirectory struct { shard *hamt.Shard dserv ipld.DAGService + + // Track the changes in size by the AddChild and RemoveChild calls + // for the HAMTShardingSize option. + sizeChange int } func newEmptyBasicDirectory(dserv ipld.DAGService) *BasicDirectory { @@ -110,10 +126,10 @@ func newBasicDirectoryFromNode(dserv ipld.DAGService, node *mdag.ProtoNode) *Bas return basicDir } -// NewDirectory returns a Directory implemented by UpgradeableDirectory +// NewDirectory returns a Directory implemented by DynamicDirectory // containing a BasicDirectory that can be converted to a HAMTDirectory. func NewDirectory(dserv ipld.DAGService) Directory { - return &UpgradeableDirectory{newEmptyBasicDirectory(dserv)} + return &DynamicDirectory{newEmptyBasicDirectory(dserv)} } // ErrNotADir implies that the given node was not a unixfs directory @@ -134,16 +150,13 @@ func NewDirectoryFromNode(dserv ipld.DAGService, node ipld.Node) (Directory, err switch fsNode.Type() { case format.TDirectory: - return &UpgradeableDirectory{newBasicDirectoryFromNode(dserv, protoBufNode.Copy().(*mdag.ProtoNode))}, nil + return &DynamicDirectory{newBasicDirectoryFromNode(dserv, protoBufNode.Copy().(*mdag.ProtoNode))}, nil case format.THAMTShard: shard, err := hamt.NewHamtFromDag(dserv, node) if err != nil { return nil, err } - return &HAMTDirectory{ - dserv: dserv, - shard: shard, - }, nil + return &DynamicDirectory{&HAMTDirectory{shard, dserv, 0}}, nil } return nil, ErrNotADir @@ -155,18 +168,16 @@ func (d *BasicDirectory) computeEstimatedSize() { d.addToEstimatedSize(l.Name, l.Cid) return nil }) -} - -func estimatedLinkSize(linkName string, linkCid cid.Cid) int { - return len(linkName) + linkCid.ByteLen() + // ForEachLink will never fail traversing the BasicDirectory + // and neither the inner callback `addToEstimatedSize`. } func (d *BasicDirectory) addToEstimatedSize(name string, linkCid cid.Cid) { - d.estimatedSize += estimatedLinkSize(name, linkCid) + d.estimatedSize += linksize.LinkSizeFunction(name, linkCid) } func (d *BasicDirectory) removeFromEstimatedSize(name string, linkCid cid.Cid) { - d.estimatedSize -= estimatedLinkSize(name, linkCid) + d.estimatedSize -= linksize.LinkSizeFunction(name, linkCid) if d.estimatedSize < 0 { // Something has gone very wrong. Log an error and recompute the // size from scratch. @@ -183,17 +194,50 @@ func (d *BasicDirectory) SetCidBuilder(builder cid.Builder) { // AddChild implements the `Directory` interface. It adds (or replaces) // a link to the given `node` under `name`. func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.Node) error { - // Remove old link (if it existed; ignore `ErrNotExist` otherwise). + link, err := ipld.MakeLink(node) + if err != nil { + return err + } + + return d.addLinkChild(ctx, name, link) +} + +func (d *BasicDirectory) needsToSwitchToHAMTDir(name string, nodeToAdd ipld.Node) (bool, error) { + if HAMTShardingSize == 0 { // Option disabled. + return false, nil + } + + operationSizeChange := 0 + // Find if there is an old entry under that name that will be overwritten. + entryToRemove, err := d.node.GetNodeLink(name) + if err != mdag.ErrLinkNotFound { + if err != nil { + return false, err + } + operationSizeChange -= linksize.LinkSizeFunction(name, entryToRemove.Cid) + } + if nodeToAdd != nil { + operationSizeChange += linksize.LinkSizeFunction(name, nodeToAdd.Cid()) + } + + return d.estimatedSize+operationSizeChange >= HAMTShardingSize, nil +} + +// addLinkChild adds the link as an entry to this directory under the given +// name. Plumbing function for the AddChild API. +func (d *BasicDirectory) addLinkChild(ctx context.Context, name string, link *ipld.Link) error { + // Remove old link and account for size change (if it existed; ignore + // `ErrNotExist` otherwise). err := d.RemoveChild(ctx, name) if err != nil && err != os.ErrNotExist { return err } - err = d.node.AddNodeLink(name, node) + err = d.node.AddRawLink(name, link) if err != nil { return err } - d.addToEstimatedSize(name, node.Cid()) + d.addToEstimatedSize(name, link.Cid) return nil } @@ -218,7 +262,7 @@ func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkR } // ForEachLink implements the `Directory` interface. -func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error { +func (d *BasicDirectory) ForEachLink(_ context.Context, f func(*ipld.Link) error) error { for _, l := range d.node.Links() { if err := f(l); err != nil { return err @@ -277,8 +321,8 @@ func (d *BasicDirectory) GetCidBuilder() cid.Builder { return d.node.CidBuilder() } -// SwitchToSharding returns a HAMT implementation of this directory. -func (d *BasicDirectory) SwitchToSharding(ctx context.Context) (Directory, error) { +// switchToSharding returns a HAMT implementation of this directory. +func (d *BasicDirectory) switchToSharding(ctx context.Context) (*HAMTDirectory, error) { hamtDir := new(HAMTDirectory) hamtDir.dserv = d.dserv @@ -311,7 +355,16 @@ func (d *HAMTDirectory) SetCidBuilder(builder cid.Builder) { // AddChild implements the `Directory` interface. func (d *HAMTDirectory) AddChild(ctx context.Context, name string, nd ipld.Node) error { - return d.shard.Set(ctx, name, nd) + oldChild, err := d.shard.Swap(ctx, name, nd) + if err != nil { + return err + } + + if oldChild != nil { + d.removeFromSizeChange(oldChild.Name, oldChild.Cid) + } + d.addToSizeChange(name, nd.Cid()) + return nil } // ForEachLink implements the `Directory` interface. @@ -342,7 +395,16 @@ func (d *HAMTDirectory) Find(ctx context.Context, name string) (ipld.Node, error // RemoveChild implements the `Directory` interface. func (d *HAMTDirectory) RemoveChild(ctx context.Context, name string) error { - return d.shard.Remove(ctx, name) + oldChild, err := d.shard.Take(ctx, name) + if err != nil { + return err + } + + if oldChild != nil { + d.removeFromSizeChange(oldChild.Name, oldChild.Cid) + } + + return nil } // GetNode implements the `Directory` interface. @@ -355,42 +417,198 @@ func (d *HAMTDirectory) GetCidBuilder() cid.Builder { return d.shard.CidBuilder() } -// UpgradeableDirectory wraps a Directory interface and provides extra logic -// to upgrade from its BasicDirectory implementation to HAMTDirectory. -type UpgradeableDirectory struct { +// switchToBasic returns a BasicDirectory implementation of this directory. +func (d *HAMTDirectory) switchToBasic(ctx context.Context) (*BasicDirectory, error) { + basicDir := newEmptyBasicDirectory(d.dserv) + basicDir.SetCidBuilder(d.GetCidBuilder()) + + err := d.ForEachLink(ctx, func(lnk *ipld.Link) error { + err := basicDir.addLinkChild(ctx, lnk.Name, lnk) + if err != nil { + return err + } + + return nil + // This function enumerates all the links in the Directory requiring all + // shards to be accessible but it is only called *after* sizeBelowThreshold + // returns true, which means we have already enumerated and fetched *all* + // shards in the first place (that's the only way we can be really sure + // we are actually below the threshold). + }) + if err != nil { + return nil, err + } + + return basicDir, nil +} + +func (d *HAMTDirectory) addToSizeChange(name string, linkCid cid.Cid) { + d.sizeChange += linksize.LinkSizeFunction(name, linkCid) +} + +func (d *HAMTDirectory) removeFromSizeChange(name string, linkCid cid.Cid) { + d.sizeChange -= linksize.LinkSizeFunction(name, linkCid) +} + +// Evaluate a switch from HAMTDirectory to BasicDirectory in case the size will +// go above the threshold when we are adding or removing an entry. +// In both the add/remove operations any old name will be removed, and for the +// add operation in particular a new entry will be added under that name (otherwise +// nodeToAdd is nil). We compute both (potential) future subtraction and +// addition to the size change. +func (d *HAMTDirectory) needsToSwitchToBasicDir(ctx context.Context, name string, nodeToAdd ipld.Node) (switchToBasic bool, err error) { + if HAMTShardingSize == 0 { // Option disabled. + return false, nil + } + + operationSizeChange := 0 + + // Find if there is an old entry under that name that will be overwritten + // (AddEntry) or flat out removed (RemoveEntry). + entryToRemove, err := d.shard.Find(ctx, name) + if err != os.ErrNotExist { + if err != nil { + return false, err + } + operationSizeChange -= linksize.LinkSizeFunction(name, entryToRemove.Cid) + } + + // For the AddEntry case compute the size addition of the new entry. + if nodeToAdd != nil { + operationSizeChange += linksize.LinkSizeFunction(name, nodeToAdd.Cid()) + } + + if d.sizeChange+operationSizeChange >= 0 { + // We won't have reduced the HAMT net size. + return false, nil + } + + // We have reduced the directory size, check if went below the + // HAMTShardingSize threshold to trigger a switch. + return d.sizeBelowThreshold(ctx, operationSizeChange) +} + +// Evaluate directory size and a future sizeChange and check if it will be below +// HAMTShardingSize threshold (to trigger a transition to a BasicDirectory). +// Instead of enumerating the entire tree we eagerly call EnumLinksAsync +// until we either reach a value above the threshold (in that case no need +// to keep counting) or an error occurs (like the context being canceled +// if we take too much time fetching the necessary shards). +func (d *HAMTDirectory) sizeBelowThreshold(ctx context.Context, sizeChange int) (below bool, err error) { + if HAMTShardingSize == 0 { + panic("asked to compute HAMT size with HAMTShardingSize option off (0)") + } + + // We don't necessarily compute the full size of *all* shards as we might + // end early if we already know we're above the threshold or run out of time. + partialSize := 0 + + // We stop the enumeration once we have enough information and exit this function. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + for linkResult := range d.EnumLinksAsync(ctx) { + if linkResult.Err != nil { + return false, linkResult.Err + } + + partialSize += linksize.LinkSizeFunction(linkResult.Link.Name, linkResult.Link.Cid) + if partialSize+sizeChange >= HAMTShardingSize { + // We have already fetched enough shards to assert we are + // above the threshold, so no need to keep fetching. + return false, nil + } + } + + // We enumerated *all* links in all shards and didn't reach the threshold. + return true, nil +} + +// DynamicDirectory wraps a Directory interface and provides extra logic +// to switch from BasicDirectory to HAMTDirectory and backwards based on +// size. +type DynamicDirectory struct { Directory } -var _ Directory = (*UpgradeableDirectory)(nil) +var _ Directory = (*DynamicDirectory)(nil) // AddChild implements the `Directory` interface. We check when adding new entries // if we should switch to HAMTDirectory according to global option(s). -func (d *UpgradeableDirectory) AddChild(ctx context.Context, name string, nd ipld.Node) error { - err := d.Directory.AddChild(ctx, name, nd) +func (d *DynamicDirectory) AddChild(ctx context.Context, name string, nd ipld.Node) error { + hamtDir, ok := d.Directory.(*HAMTDirectory) + if ok { + // We evaluate a switch in the HAMTDirectory case even for an AddChild + // as it may overwrite an existing entry and end up actually reducing + // the directory size. + switchToBasic, err := hamtDir.needsToSwitchToBasicDir(ctx, name, nd) + if err != nil { + return err + } + + if switchToBasic { + basicDir, err := hamtDir.switchToBasic(ctx) + if err != nil { + return err + } + err = basicDir.AddChild(ctx, name, nd) + if err != nil { + return err + } + d.Directory = basicDir + return nil + } + + return d.Directory.AddChild(ctx, name, nd) + } + + // BasicDirectory + basicDir := d.Directory.(*BasicDirectory) + switchToHAMT, err := basicDir.needsToSwitchToHAMTDir(name, nd) if err != nil { return err } - - // Evaluate possible HAMT upgrade. - if HAMTShardingSize == 0 { - return nil + if !switchToHAMT { + return basicDir.AddChild(ctx, name, nd) } - basicDir, ok := d.Directory.(*BasicDirectory) + hamtDir, err = basicDir.switchToSharding(ctx) + if err != nil { + return err + } + hamtDir.AddChild(ctx, name, nd) + if err != nil { + return err + } + d.Directory = hamtDir + return nil +} + +// RemoveChild implements the `Directory` interface. Used in the case where we wrap +// a HAMTDirectory that might need to be downgraded to a BasicDirectory. The +// upgrade path is in AddChild. +func (d *DynamicDirectory) RemoveChild(ctx context.Context, name string) error { + hamtDir, ok := d.Directory.(*HAMTDirectory) if !ok { - return nil + return d.Directory.RemoveChild(ctx, name) } - if basicDir.estimatedSize >= HAMTShardingSize { - // Ideally to minimize performance we should check if this last - // `AddChild` call would bring the directory size over the threshold - // *before* executing it since we would end up switching anyway and - // that call would be "wasted". This is a minimal performance impact - // and we prioritize a simple code base. - hamtDir, err := basicDir.SwitchToSharding(ctx) - if err != nil { - return err - } - d.Directory = hamtDir + + switchToBasic, err := hamtDir.needsToSwitchToBasicDir(ctx, name, nil) + if err != nil { + return err } + if !switchToBasic { + return hamtDir.RemoveChild(ctx, name) + } + + basicDir, err := hamtDir.switchToBasic(ctx) + if err != nil { + return err + } + basicDir.RemoveChild(ctx, name) + if err != nil { + return err + } + d.Directory = basicDir return nil } diff --git a/io/directory_test.go b/io/directory_test.go index 8c5d8e109..f5fa2e564 100644 --- a/io/directory_test.go +++ b/io/directory_test.go @@ -4,13 +4,30 @@ import ( "context" "fmt" "math" + "sort" + "strconv" + "strings" + "sync" "testing" - + "time" + + blocks "github.com/ipfs/go-block-format" + bsrv "github.com/ipfs/go-blockservice" + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + blockstore "github.com/ipfs/go-ipfs-blockstore" + offline "github.com/ipfs/go-ipfs-exchange-offline" ipld "github.com/ipfs/go-ipld-format" mdag "github.com/ipfs/go-merkledag" mdtest "github.com/ipfs/go-merkledag/test" ft "github.com/ipfs/go-unixfs" + "github.com/ipfs/go-unixfs/hamt" + "github.com/ipfs/go-unixfs/internal" + "github.com/ipfs/go-unixfs/private/linksize" + + "github.com/stretchr/testify/assert" ) func TestEmptyNode(t *testing.T) { @@ -100,116 +117,331 @@ func TestDuplicateAddDir(t *testing.T) { } } -// FIXME: Nothing blocking but nice to have: -// * Check estimated size against link enumeration (indirectly done in the -// restored node check from NewDirectoryFromNode). -// * Check estimated size against encoded node (the difference should only be -// a small percentage for a directory with 10s of entries). func TestBasicDirectory_estimatedSize(t *testing.T) { ds := mdtest.Mock() + basicDir := newEmptyBasicDirectory(ds) + + testDirectorySizeEstimation(t, basicDir, ds, func(dir Directory) int { + return dir.(*BasicDirectory).estimatedSize + }) +} + +func TestHAMTDirectory_sizeChange(t *testing.T) { + ds := mdtest.Mock() + hamtDir, err := newEmptyHAMTDirectory(ds, DefaultShardWidth) + assert.NoError(t, err) + + testDirectorySizeEstimation(t, hamtDir, ds, func(dir Directory) int { + // Since we created a HAMTDirectory from scratch with size 0 its + // internal sizeChange delta will in fact track the directory size + // throughout this run. + return dir.(*HAMTDirectory).sizeChange + }) +} + +func fullSizeEnumeration(dir Directory) int { + size := 0 + dir.ForEachLink(context.Background(), func(l *ipld.Link) error { + size += linksize.LinkSizeFunction(l.Name, l.Cid) + return nil + }) + return size +} + +func testDirectorySizeEstimation(t *testing.T, dir Directory, ds ipld.DAGService, size func(Directory) int) { + linksize.LinkSizeFunction = mockLinkSizeFunc(1) + defer func() { linksize.LinkSizeFunction = productionLinkSize }() + ctx := context.Background() child := ft.EmptyFileNode() - err := ds.Add(ctx, child) - if err != nil { - t.Fatal(err) - } - - basicDir := newEmptyBasicDirectory(ds) + assert.NoError(t, ds.Add(ctx, child)) // Several overwrites should not corrupt the size estimation. - basicDir.AddChild(ctx, "child", child) - basicDir.AddChild(ctx, "child", child) - basicDir.AddChild(ctx, "child", child) - basicDir.RemoveChild(ctx, "child") - basicDir.AddChild(ctx, "child", child) - basicDir.RemoveChild(ctx, "child") - // FIXME: Check errors above (abstract adds/removals in iteration). - if basicDir.estimatedSize != 0 { - t.Fatal("estimated size is not zero after removing all entries") - } - - for i := 0; i < 100; i++ { - basicDir.AddChild(ctx, fmt.Sprintf("child-%03d", i), child) // e.g., "child-045" - } - // Estimated entry size: name (9) + CID (32 from hash and 2 extra for header) - entrySize := 9 + 32 + 2 - expectedSize := 100 * entrySize - if basicDir.estimatedSize != expectedSize { - t.Fatalf("estimated size (%d) inaccurate after adding many entries (expected %d)", - basicDir.estimatedSize, expectedSize) - } - - basicDir.RemoveChild(ctx, "child-045") // just random values - basicDir.RemoveChild(ctx, "child-063") - basicDir.RemoveChild(ctx, "child-011") - basicDir.RemoveChild(ctx, "child-000") - basicDir.RemoveChild(ctx, "child-099") - - basicDir.RemoveChild(ctx, "child-045") // already removed, won't impact size - basicDir.RemoveChild(ctx, "nonexistent-name") // also doesn't count - basicDir.RemoveChild(ctx, "child-100") // same - expectedSize -= 5 * entrySize - if basicDir.estimatedSize != expectedSize { - t.Fatalf("estimated size (%d) inaccurate after removing some entries (expected %d)", - basicDir.estimatedSize, expectedSize) - } + assert.NoError(t, dir.AddChild(ctx, "child", child)) + assert.NoError(t, dir.AddChild(ctx, "child", child)) + assert.NoError(t, dir.AddChild(ctx, "child", child)) + assert.NoError(t, dir.RemoveChild(ctx, "child")) + assert.NoError(t, dir.AddChild(ctx, "child", child)) + assert.NoError(t, dir.RemoveChild(ctx, "child")) + assert.Equal(t, 0, size(dir), "estimated size is not zero after removing all entries") + + dirEntries := 100 + for i := 0; i < dirEntries; i++ { + assert.NoError(t, dir.AddChild(ctx, fmt.Sprintf("child-%03d", i), child)) + } + assert.Equal(t, dirEntries, size(dir), "estimated size inaccurate after adding many entries") + + assert.NoError(t, dir.RemoveChild(ctx, "child-045")) // just random values + assert.NoError(t, dir.RemoveChild(ctx, "child-063")) + assert.NoError(t, dir.RemoveChild(ctx, "child-011")) + assert.NoError(t, dir.RemoveChild(ctx, "child-000")) + assert.NoError(t, dir.RemoveChild(ctx, "child-099")) + dirEntries -= 5 + assert.Equal(t, dirEntries, size(dir), "estimated size inaccurate after removing some entries") + + // All of the following remove operations will fail (won't impact dirEntries): + assert.Error(t, dir.RemoveChild(ctx, "nonexistent-name")) + assert.Error(t, dir.RemoveChild(ctx, "child-045")) // already removed + assert.Error(t, dir.RemoveChild(ctx, "child-100")) + assert.Equal(t, dirEntries, size(dir), "estimated size inaccurate after failed remove attempts") // Restore a directory from original's node and check estimated size consistency. - basicDirSingleNode, _ := basicDir.GetNode() // no possible error - restoredBasicDir := newBasicDirectoryFromNode(ds, basicDirSingleNode.(*mdag.ProtoNode)) - if basicDir.estimatedSize != restoredBasicDir.estimatedSize { - t.Fatalf("restored basic directory size (%d) doesn't match original estimate (%d)", - basicDir.estimatedSize, restoredBasicDir.estimatedSize) + dirNode, err := dir.GetNode() + assert.NoError(t, err) + restoredDir, err := NewDirectoryFromNode(ds, dirNode.(*mdag.ProtoNode)) + assert.NoError(t, err) + assert.Equal(t, size(dir), fullSizeEnumeration(restoredDir), "restored directory's size doesn't match original's") + // We don't use the estimation size function for the restored directory + // because in the HAMT case this function depends on the sizeChange variable + // that will be cleared when loading the directory from the node. + // This also covers the case of comparing the size estimation `size()` with + // the full enumeration function `fullSizeEnumeration()` to make sure it's + // correct. +} + +// Any entry link size will have the fixedSize passed. +func mockLinkSizeFunc(fixedSize int) func(linkName string, linkCid cid.Cid) int { + return func(_ string, _ cid.Cid) int { + return fixedSize } } -// Basic test on extreme threshold to trigger switch. More fine-grained sizes -// are checked in TestBasicDirectory_estimatedSize (without the swtich itself -// but focusing on the size computation). -// FIXME: Ideally, instead of checking size computation on one test and directory -// upgrade on another a better structured test should test both dimensions -// simultaneously. -func TestUpgradeableDirectory(t *testing.T) { +func checkBasicDirectory(t *testing.T, dir Directory, errorMessage string) { + if _, ok := dir.(*DynamicDirectory).Directory.(*BasicDirectory); !ok { + t.Fatal(errorMessage) + } +} + +func checkHAMTDirectory(t *testing.T, dir Directory, errorMessage string) { + if _, ok := dir.(*DynamicDirectory).Directory.(*HAMTDirectory); !ok { + t.Fatal(errorMessage) + } +} + +func TestProductionLinkSize(t *testing.T) { + link, err := ipld.MakeLink(ft.EmptyDirNode()) + assert.NoError(t, err) + link.Name = "directory_link_name" + assert.Equal(t, 53, productionLinkSize(link.Name, link.Cid)) + + link, err = ipld.MakeLink(ft.EmptyFileNode()) + assert.NoError(t, err) + link.Name = "file_link_name" + assert.Equal(t, 48, productionLinkSize(link.Name, link.Cid)) + + ds := mdtest.Mock() + basicDir := newEmptyBasicDirectory(ds) + assert.NoError(t, err) + for i := 0; i < 10; i++ { + basicDir.AddChild(context.Background(), strconv.FormatUint(uint64(i), 10), ft.EmptyFileNode()) + } + basicDirNode, err := basicDir.GetNode() + assert.NoError(t, err) + link, err = ipld.MakeLink(basicDirNode) + assert.NoError(t, err) + link.Name = "basic_dir" + assert.Equal(t, 43, productionLinkSize(link.Name, link.Cid)) +} + +// Test HAMTDirectory <-> BasicDirectory switch based on directory size. The +// switch is managed by the DynamicDirectory abstraction. +func TestDynamicDirectorySwitch(t *testing.T) { oldHamtOption := HAMTShardingSize defer func() { HAMTShardingSize = oldHamtOption }() + HAMTShardingSize = 0 // Disable automatic switch at the start. + linksize.LinkSizeFunction = mockLinkSizeFunc(1) + defer func() { linksize.LinkSizeFunction = productionLinkSize }() ds := mdtest.Mock() dir := NewDirectory(ds) + checkBasicDirectory(t, dir, "new dir is not BasicDirectory") + ctx := context.Background() child := ft.EmptyDirNode() err := ds.Add(ctx, child) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) - HAMTShardingSize = 0 // Create a BasicDirectory. - if _, ok := dir.(*UpgradeableDirectory).Directory.(*BasicDirectory); !ok { - t.Fatal("UpgradeableDirectory doesn't contain BasicDirectory") - } + err = dir.AddChild(ctx, "1", child) + assert.NoError(t, err) + checkBasicDirectory(t, dir, "added child, option still disabled") // Set a threshold so big a new entry won't trigger the change. HAMTShardingSize = math.MaxInt32 - err = dir.AddChild(ctx, "test", child) - if err != nil { - t.Fatal(err) - } - - if _, ok := dir.(*UpgradeableDirectory).Directory.(*HAMTDirectory); ok { - t.Fatal("UpgradeableDirectory was upgraded to HAMTDirectory for a large threshold") - } + err = dir.AddChild(ctx, "2", child) + assert.NoError(t, err) + checkBasicDirectory(t, dir, "added child, option now enabled but at max") // Now set it so low to make sure any new entry will trigger the upgrade. HAMTShardingSize = 1 - err = dir.AddChild(ctx, "test", child) // overwriting an entry should also trigger the switch - if err != nil { - t.Fatal(err) + // We are already above the threshold, we trigger the switch with an overwrite + // (any AddChild() should reevaluate the size). + err = dir.AddChild(ctx, "2", child) + assert.NoError(t, err) + checkHAMTDirectory(t, dir, "added child, option at min, should switch up") + + // Set threshold at the number of current entries and delete the last one + // to trigger a switch and evaluate if the rest of the entries are conserved. + HAMTShardingSize = 2 + err = dir.RemoveChild(ctx, "2") + assert.NoError(t, err) + checkBasicDirectory(t, dir, "removed threshold entry, option at min, should switch down") +} + +func TestIntegrityOfDirectorySwitch(t *testing.T) { + ds := mdtest.Mock() + dir := NewDirectory(ds) + checkBasicDirectory(t, dir, "new dir is not BasicDirectory") + + ctx := context.Background() + child := ft.EmptyDirNode() + err := ds.Add(ctx, child) + assert.NoError(t, err) + + basicDir := newEmptyBasicDirectory(ds) + hamtDir, err := newEmptyHAMTDirectory(ds, DefaultShardWidth) + assert.NoError(t, err) + for i := 0; i < 1000; i++ { + basicDir.AddChild(ctx, strconv.FormatUint(uint64(i), 10), child) + hamtDir.AddChild(ctx, strconv.FormatUint(uint64(i), 10), child) + } + compareDirectoryEntries(t, basicDir, hamtDir) + + hamtDirFromSwitch, err := basicDir.switchToSharding(ctx) + assert.NoError(t, err) + basicDirFromSwitch, err := hamtDir.switchToBasic(ctx) + assert.NoError(t, err) + compareDirectoryEntries(t, basicDir, basicDirFromSwitch) + compareDirectoryEntries(t, hamtDir, hamtDirFromSwitch) +} + +// This is the value of concurrent fetches during dag.Walk. Used in +// test to better predict how many nodes will be fetched. +var defaultConcurrentFetch = 32 + +// FIXME: Taken from private github.com/ipfs/go-merkledag@v0.2.3/merkledag.go. +// (We can also pass an explicit concurrency value in `(*Shard).EnumLinksAsync()` +// and take ownership of this configuration, but departing from the more +// standard and reliable one in `go-merkledag`. + +// Test that we fetch as little nodes as needed to reach the HAMTShardingSize +// during the sizeBelowThreshold computation. +func TestHAMTEnumerationWhenComputingSize(t *testing.T) { + // Adjust HAMT global/static options for the test to simplify its logic. + // FIXME: These variables weren't designed to be modified and we should + // review in depth side effects. + + // Set all link sizes to a uniform 1 so the estimated directory size + // is just the count of its entry links (in HAMT/Shard terminology these + // are the "value" links pointing to anything that is *not* another Shard). + linksize.LinkSizeFunction = mockLinkSizeFunc(1) + defer func() { linksize.LinkSizeFunction = productionLinkSize }() + + // Use an identity hash function to ease the construction of "complete" HAMTs + // (see CreateCompleteHAMT below for more details). (Ideally this should be + // a parameter we pass and not a global option we modify in the caller.) + oldHashFunc := internal.HAMTHashFunction + defer func() { internal.HAMTHashFunction = oldHashFunc }() + internal.HAMTHashFunction = idHash + + oldHamtOption := HAMTShardingSize + defer func() { HAMTShardingSize = oldHamtOption }() + + // --- End of test static configuration adjustments. --- + + // Some arbitrary values below that make this test not that expensive. + treeHeight := 4 + // How many leaf shards nodes (with value links, + // i.e., directory entries) do we need to reach the threshold. + thresholdToWidthRatio := 4 + // Departing from DefaultShardWidth of 256 to reduce HAMT size in + // CreateCompleteHAMT. + shardWidth := 16 + HAMTShardingSize = shardWidth * thresholdToWidthRatio + + // We create a "complete" HAMT (see CreateCompleteHAMT for more details) + // with a regular structure to be able to predict how many Shard nodes we + // will need to fetch in order to reach the HAMTShardingSize threshold in + // sizeBelowThreshold (assuming a sequential DAG walk function). + + bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + countGetsDS := newCountGetsDS(bstore) + dsrv := mdag.NewDAGService(bsrv.New(countGetsDS, offline.Exchange(countGetsDS))) + completeHAMTRoot, err := CreateCompleteHAMT(dsrv, treeHeight, shardWidth) + assert.NoError(t, err) + + // Calculate the optimal number of nodes to traverse + optimalNodesToFetch := 0 + nodesToProcess := HAMTShardingSize + for i := 0; i < treeHeight-1; i++ { + // divide by the shard width to get the parents and continue up the tree + parentNodes := int(math.Ceil(float64(nodesToProcess) / float64(shardWidth))) + optimalNodesToFetch += parentNodes + nodesToProcess = parentNodes + } + + // With this structure and a BFS traversal (from `parallelWalkDepth`) then + // we would roughly fetch the following nodes: + nodesToFetch := 0 + // * all layers up to (but not including) the last one with leaf nodes + // (because it's a BFS) + for i := 0; i < treeHeight-1; i++ { + nodesToFetch += int(math.Pow(float64(shardWidth), float64(i))) + } + // * `thresholdToWidthRatio` leaf Shards with enough value links to reach + // the HAMTShardingSize threshold. + nodesToFetch += thresholdToWidthRatio + + hamtDir, err := newHAMTDirectoryFromNode(dsrv, completeHAMTRoot) + assert.NoError(t, err) + + countGetsDS.resetCounter() + countGetsDS.setRequestDelay(10 * time.Millisecond) + // (Without the `setRequestDelay` above the number of nodes fetched + // drops dramatically and unpredictably as the BFS starts to behave + // more like a DFS because some search paths are fetched faster than + // others.) + below, err := hamtDir.sizeBelowThreshold(context.TODO(), 0) + assert.NoError(t, err) + assert.False(t, below) + t.Logf("fetched %d nodes (predicted range: %d-%d)", + countGetsDS.uniqueCidsFetched(), optimalNodesToFetch, nodesToFetch+defaultConcurrentFetch) + // Check that the actual number of nodes fetched is within the margin of the + // estimated `nodesToFetch` plus an extra of `defaultConcurrentFetch` since + // we are fetching in parallel. + assert.True(t, countGetsDS.uniqueCidsFetched() <= nodesToFetch+defaultConcurrentFetch) + assert.True(t, countGetsDS.uniqueCidsFetched() >= optimalNodesToFetch) +} + +// Compare entries in the leftDir against the rightDir and possibly +// missingEntries in the second. +func compareDirectoryEntries(t *testing.T, leftDir Directory, rightDir Directory) { + leftLinks, err := getAllLinksSortedByName(leftDir) + assert.NoError(t, err) + rightLinks, err := getAllLinksSortedByName(rightDir) + assert.NoError(t, err) + + assert.Equal(t, len(leftLinks), len(rightLinks)) + + for i, leftLink := range leftLinks { + assert.Equal(t, leftLink, rightLinks[i]) // FIXME: Can we just compare the entire struct? } +} - if _, ok := dir.(*UpgradeableDirectory).Directory.(*HAMTDirectory); !ok { - t.Fatal("UpgradeableDirectory wasn't upgraded to HAMTDirectory for a low threshold") +func getAllLinksSortedByName(d Directory) ([]*ipld.Link, error) { + entries, err := d.Links(context.Background()) + if err != nil { + return nil, err } + sortLinksByName(entries) + return entries, nil +} + +func sortLinksByName(l []*ipld.Link) { + sort.SliceStable(l, func(i, j int) bool { + return strings.Compare(l[i].Name, l[j].Name) == -1 // FIXME: Is this correct? + }) } func TestDirBuilder(t *testing.T) { @@ -296,3 +528,111 @@ func TestDirBuilder(t *testing.T) { t.Fatal("wrong number of links", len(asyncLinks), count) } } + +func newHAMTDirectoryFromNode(dserv ipld.DAGService, node ipld.Node) (*HAMTDirectory, error) { + shard, err := hamt.NewHamtFromDag(dserv, node) + if err != nil { + return nil, err + } + return &HAMTDirectory{ + dserv: dserv, + shard: shard, + }, nil +} + +func newEmptyHAMTDirectory(dserv ipld.DAGService, shardWidth int) (*HAMTDirectory, error) { + shard, err := hamt.NewShard(dserv, shardWidth) + if err != nil { + return nil, err + } + + return &HAMTDirectory{ + dserv: dserv, + shard: shard, + }, nil +} + +// countGetsDS is a DAG service that keeps track of the number of +// unique CIDs fetched. +type countGetsDS struct { + blockstore.Blockstore + + cidsFetched map[cid.Cid]struct{} + mapLock sync.Mutex + started bool + + getRequestDelay time.Duration +} + +var _ blockstore.Blockstore = (*countGetsDS)(nil) + +func newCountGetsDS(bs blockstore.Blockstore) *countGetsDS { + return &countGetsDS{ + bs, + make(map[cid.Cid]struct{}), + sync.Mutex{}, + false, + 0, + } +} + +func (d *countGetsDS) resetCounter() { + d.mapLock.Lock() + defer d.mapLock.Unlock() + d.cidsFetched = make(map[cid.Cid]struct{}) + d.started = true +} + +func (d *countGetsDS) uniqueCidsFetched() int { + d.mapLock.Lock() + defer d.mapLock.Unlock() + return len(d.cidsFetched) +} + +func (d *countGetsDS) setRequestDelay(timeout time.Duration) { + d.getRequestDelay = timeout +} + +func (d *countGetsDS) maybeSleep(c cid.Cid) { + d.mapLock.Lock() + _, cidRequestedBefore := d.cidsFetched[c] + d.cidsFetched[c] = struct{}{} + d.mapLock.Unlock() + + if d.getRequestDelay != 0 && !cidRequestedBefore { + // First request gets a timeout to simulate a network fetch. + // Subsequent requests get no timeout simulating an in-disk cache. + time.Sleep(d.getRequestDelay) + } +} + +func (d *countGetsDS) Has(c cid.Cid) (bool, error) { + if d.started { + panic("implement me") + } + return d.Blockstore.Has(c) +} + +func (d *countGetsDS) Get(c cid.Cid) (blocks.Block, error) { + blk, err := d.Blockstore.Get(c) + if err != nil { + return nil, err + } + + d.maybeSleep(c) + return blk, nil +} + +func (d *countGetsDS) GetSize(c cid.Cid) (int, error) { + if d.started { + panic("implement me") + } + return d.Blockstore.GetSize(c) +} + +func (d *countGetsDS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + if d.started { + panic("implement me") + } + return d.Blockstore.AllKeysChan(ctx) +} diff --git a/private/linksize/linksize.go b/private/linksize/linksize.go new file mode 100644 index 000000000..e7ae098b6 --- /dev/null +++ b/private/linksize/linksize.go @@ -0,0 +1,5 @@ +package linksize + +import "github.com/ipfs/go-cid" + +var LinkSizeFunction func(linkName string, linkCid cid.Cid) int