From b8b11b3e1065ea324becd96cbdaaa04d0b6f1178 Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Wed, 27 Oct 2021 14:53:03 -0300 Subject: [PATCH] walk HAMT in memory --- hamt/hamt.go | 99 ++++++++++++++++++++++++++++++++++------------------ hamt/util.go | 47 +++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 33 deletions(-) diff --git a/hamt/hamt.go b/hamt/hamt.go index 74a8d2759..d331222f6 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -182,7 +182,7 @@ func (ds *Shard) Node() (ipld.Node, error) { } else { // child unloaded, just copy in link with updated name lnk := ds.childer.link(sliceIndex) - label := lnk.Name[ds.maxpadlen:] + label := ds.extractLinkName(lnk) err := out.AddRawLink(ds.linkNamePrefix(childIndex)+label, lnk) if err != nil { @@ -207,6 +207,10 @@ func (ds *Shard) Node() (ipld.Node, error) { return out, nil } +func (ds *Shard) extractLinkName(lnk *ipld.Link) string { + return lnk.Name[ds.maxpadlen:] +} + func (ds *Shard) makeShardValue(lnk *ipld.Link) (*Shard, error) { lnk2 := *lnk s, err := makeShard(ds.dserv, ds.tableSize) @@ -214,7 +218,7 @@ func (ds *Shard) makeShardValue(lnk *ipld.Link) (*Shard, error) { return nil, err } - s.key = lnk.Name[ds.maxpadlen:] + s.key = ds.extractLinkName(lnk) s.val = &lnk2 return s, nil @@ -372,14 +376,9 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { go func() { defer close(linkResults) defer cancel() - getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults) + getLinks := getLinksHAMTNode(ds, linkResults) cset := cid.NewSet() - rootNode, err := ds.Node() - if err != nil { - emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) - return - } - err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent()) + err := dag.Walk(ctx, getLinks, packBytesIntoCID(nil), cset.Visit, dag.Concurrent()) if err != nil { emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err}) } @@ -387,42 +386,75 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult { return linkResults } -// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync -// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called -// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation -func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks { +// Used only as the `GetLinks` callback for `dag.Walk` to enumerate links in +// the HAMT structure (`EnumLinksAsync`). It is bind to the root `Shard` +// node being enumerated (using its DAG service and traversing always +// from that node). +// FIXME: To traverse the HAMT in memory (not only its links but its +// child Shard nodes loaded in memory, `node.childer.children`) we are +// **abusing the CID** passing arbitrary bytes through it which enocde +// the position of a specific node in the HAMT which we will scan for its +// children. +func getLinksHAMTNode(root *Shard, linkResults chan<- format.LinkResult) dag.GetLinks { + // Traverse the HAMT position encoded in the CID to find the `node` + // we are fetching. + goToNodeFromCID := func(ctx context.Context, pos *hamtPos) (*Shard, error) { + node := root + for depth := range pos.childSlicePos { + sliceIndex := int(pos.childSlicePos[depth]) + var err error + node, err = node.childer.get(ctx, sliceIndex) + if err != nil { + return nil, err + } + } + return node, nil + } + + emitValueLink := func(ctx context.Context, name string, link *ipld.Link) { + formattedLink := *link + formattedLink.Name = name + emitResult(ctx, linkResults, format.LinkResult{Link: &formattedLink, Err: nil}) + } + + packShardPosAsLinkCID := func(currentPos *hamtPos, newSliceIndex int) *ipld.Link { + return &ipld.Link{Cid: packBytesIntoCID(posToBytes(currentPos.addChildPos(newSliceIndex)))} + } return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) { - node, err := dagService.Get(ctx, currentCid) - if err != nil { - return nil, err - } - directoryShard, err := NewHamtFromDag(dagService, node) + pos := posFromBytes(unpackBytesFromCID(currentCid)) + node, err := goToNodeFromCID(ctx, pos) if err != nil { return nil, err } - childShards := make([]*ipld.Link, 0, directoryShard.childer.length()) - links := directoryShard.childer.links - for idx := range directoryShard.childer.children { - lnk := links[idx] - lnkLinkType, err := directoryShard.childLinkType(lnk) + childShards := make([]*ipld.Link, 0, node.childer.length()) + for sliceIndex := range node.childer.children { + newPos := packShardPosAsLinkCID(pos, sliceIndex) - if err != nil { - return nil, err - } - if lnkLinkType == shardLink { - childShards = append(childShards, lnk) + if inMemoryChild := node.childer.children[sliceIndex]; inMemoryChild != nil { + // The child is already loaded in memory. + if inMemoryChild.isValueNode() { + emitValueLink(ctx, inMemoryChild.key, inMemoryChild.val) + } else { + childShards = append(childShards, newPos) + } } else { - sv, err := directoryShard.makeShardValue(lnk) + // The child is not loaded in memory but is instead stored + // in the DAG service and referenced here though its link. + link := node.childer.links[sliceIndex] + lnkLinkType, err := node.childLinkType(link) if err != nil { return nil, err } - formattedLink := sv.val - formattedLink.Name = sv.key - emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil}) + if lnkLinkType == shardValueLink { + emitValueLink(ctx, node.extractLinkName(link), link) + } else { + childShards = append(childShards, newPos) + } } } + return childShards, nil } } @@ -596,7 +628,8 @@ type childer struct { dserv ipld.DAGService bitfield bitfield.Bitfield - // Only one of links/children will be non-nil for every child/link. + // Only one of links/children will be non-nil for every child/link, + // but they have always the same length. links []*ipld.Link children []*Shard } diff --git a/hamt/util.go b/hamt/util.go index 29f59435e..c0b3ac1b3 100644 --- a/hamt/util.go +++ b/hamt/util.go @@ -6,6 +6,8 @@ import ( "github.com/ipfs/go-unixfs/internal" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" "github.com/spaolacci/murmur3" ) @@ -78,3 +80,48 @@ func murmur3Hash(val []byte) []byte { h.Write(val) return h.Sum(nil) } + +// We convert CIDs to arbitrary strings as CIDv1 raw identity hashes. +func packBytesIntoCID(data []byte) cid.Cid { + h, err := multihash.Sum(data, multihash.IDENTITY, -1) + if err != nil { + panic(fmt.Sprintf("error while packing data %s: %s", + data, err)) + } + return cid.NewCidV1(cid.Raw, h) +} + +func unpackBytesFromCID(c cid.Cid) []byte { + if c.Version() != 1 { + panic("trying to unpack CIDv0") + } + decoded, err := multihash.Decode(c.Hash()) + if err != nil { + panic(fmt.Sprintf("error while unpacking data from CID %s: %s", + c, err)) + } + return decoded.Digest +} + +// Coordinate in a HAMT to single out a single node. +// For simplicity this is independent of the hash the node encodes and just +// the position in the nodes. +// See `getLinksHAMTNode` for details. +type hamtPos struct { + // Slice of child positions (from the immediate parent perspective). + // FIXME: We are assuming a shard width <= 256 (the default) to easily + // convert to and from a byte slice. + childSlicePos []byte +} + +func (hp *hamtPos) addChildPos(childIndex int) *hamtPos { + return &hamtPos{ append(hp.childSlicePos, byte(childIndex))} +} + +func posFromBytes(data []byte) *hamtPos{ + return &hamtPos{data} +} + +func posToBytes(hp *hamtPos) []byte { + return hp.childSlicePos +}