Skip to content

Commit

Permalink
feat!: auto-shard based on node size (#171)
Browse files Browse the repository at this point in the history
js counterpart to ipfs/kubo#8114

Changes the `shardSplitThreshold` parameter to mean the size of the final DAGNode (including link names, sizes, etc) instead of the number of entries in the directory.

Fixes: #149

BREAKING CHANGE: the `shardSplitThreshold` option has changed to `shardSplitThresholdBytes` and reflects a DAGNode size where sharding might kick in
  • Loading branch information
achingbrain authored Feb 9, 2023
1 parent 5ebc923 commit 6ef187f
Show file tree
Hide file tree
Showing 16 changed files with 212 additions and 47 deletions.
3 changes: 0 additions & 3 deletions packages/ipfs-unixfs-exporter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,14 @@
},
"devDependencies": {
"@types/sinon": "^10.0.0",
"abort-controller": "^3.0.0",
"aegir": "^38.1.2",
"blockstore-core": "^3.0.0",
"crypto-browserify": "^3.12.0",
"delay": "^5.0.0",
"ipfs-unixfs-importer": "^12.0.0",
"it-all": "^2.0.0",
"it-buffer-stream": "^3.0.0",
"it-first": "^2.0.0",
"merge-options": "^3.0.4",
"native-abort-controller": "^1.0.3",
"sinon": "^15.0.0"
},
"browser": {
Expand Down
7 changes: 4 additions & 3 deletions packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe('exporter sharded', function () {
*/
const createShardWithFiles = async (files) => {
const result = await last(importer(files, block, {
shardSplitThreshold: SHARD_SPLIT_THRESHOLD,
shardSplitThresholdBytes: SHARD_SPLIT_THRESHOLD,
wrapWithDirectory: true
}))

Expand All @@ -60,7 +60,8 @@ describe('exporter sharded', function () {
/** @type {{ [key: string]: { content: Uint8Array, cid?: CID }}} */
const files = {}

for (let i = 0; i < (SHARD_SPLIT_THRESHOLD + 1); i++) {
// needs to result in a block that is larger than SHARD_SPLIT_THRESHOLD bytes
for (let i = 0; i < 100; i++) {
files[`file-${Math.random()}.txt`] = {
content: uint8ArrayConcat(await all(randomBytes(100)))
}
Expand All @@ -71,7 +72,7 @@ describe('exporter sharded', function () {
content: asAsyncIterable(files[path].content)
})), block, {
wrapWithDirectory: true,
shardSplitThreshold: SHARD_SPLIT_THRESHOLD
shardSplitThresholdBytes: SHARD_SPLIT_THRESHOLD
}))

const dirCid = imported.pop()?.cid
Expand Down
1 change: 0 additions & 1 deletion packages/ipfs-unixfs-exporter/test/exporter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import all from 'it-all'
import last from 'it-last'
import first from 'it-first'
import randomBytes from 'it-buffer-stream'
import { AbortController } from 'native-abort-controller'
import blockApi from './helpers/block.js'
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('builder: directory sharding', () => {
path: 'a/b',
content: asAsyncIterable(content)
}], block, {
shardSplitThreshold: Infinity // never shard
shardSplitThresholdBytes: Infinity // never shard
}))

expect(nodes.length).to.equal(2)
Expand Down Expand Up @@ -62,7 +62,7 @@ describe('builder: directory sharding', () => {
path: 'a/b',
content: asAsyncIterable(uint8ArrayFromString('i have the best bytes'))
}], block, {
shardSplitThreshold: 0 // always shard
shardSplitThresholdBytes: 0 // always shard
}))

expect(nodes.length).to.equal(2)
Expand All @@ -84,7 +84,7 @@ describe('builder: directory sharding', () => {
path: 'a/b',
content: asAsyncIterable(uint8ArrayFromString(content))
}], block, {
shardSplitThreshold: Infinity // never shard
shardSplitThresholdBytes: Infinity // never shard
}))

const nonShardedHash = nodes[1].cid
Expand Down Expand Up @@ -121,7 +121,7 @@ describe('builder: directory sharding', () => {
path: 'a/b',
content: asAsyncIterable(uint8ArrayFromString(content))
}], block, {
shardSplitThreshold: 0 // always shard
shardSplitThresholdBytes: 0 // always shard
}))

const shardedHash = nodes[1].cid
Expand Down
4 changes: 2 additions & 2 deletions packages/ipfs-unixfs-exporter/test/importer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ strategies.forEach((strategy) => {
const options = {
cidVersion: 1,
// Ensures we use DirSharded for the data below
shardSplitThreshold: 3
shardSplitThresholdBytes: 3
}

const files = await all(importer(inputFiles.map(file => ({
Expand Down Expand Up @@ -941,7 +941,7 @@ strategies.forEach((strategy) => {
}, {
path: '/foo/qux'
}], block, {
shardSplitThreshold: 0
shardSplitThresholdBytes: 0
}))

const nodes = await all(recursive(entries[entries.length - 1].cid, block))
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-unixfs-importer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
`options` is an JavaScript option that might include the following keys:

- `wrapWithDirectory` (boolean, defaults to false): if true, a wrapping node will be created
- `shardSplitThreshold` (positive integer, defaults to 1000): the number of directory entries above which we decide to use a sharding directory builder (instead of the default flat one)
- `shardSplitThresholdBytes` (positive integer, defaults to 256KiB): if the serialized node is larger than this it might be converted to a HAMT sharded directory
- `chunker` (string, defaults to `"fixed"`): the chunking strategy. Supports:
- `fixed`
- `rabin`
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-unixfs-importer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@
},
"devDependencies": {
"aegir": "^38.1.2",
"assert": "^2.0.0",
"blockstore-core": "^3.0.0",
"it-buffer-stream": "^3.0.0",
"it-last": "^2.0.0",
"wherearewe": "^2.0.1"
},
"browser": {
Expand Down
48 changes: 30 additions & 18 deletions packages/ipfs-unixfs-importer/src/dir-flat.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { encode, prepare } from '@ipld/dag-pb'
import { UnixFS } from 'ipfs-unixfs'
import Dir from './dir.js'
import { Dir, CID_V0, CID_V1 } from './dir.js'
import persist from './utils/persist.js'

/**
Expand All @@ -21,8 +21,8 @@ class DirFlat extends Dir {
constructor (props, options) {
super(props, options)

/** @type {{ [key: string]: InProgressImportResult | Dir }} */
this._children = {}
/** @type {Map<string, InProgressImportResult | Dir>} */
this._children = new Map()
}

/**
Expand All @@ -32,53 +32,65 @@ class DirFlat extends Dir {
async put (name, value) {
this.cid = undefined
this.size = undefined
this.nodeSize = undefined

this._children[name] = value
this._children.set(name, value)
}

/**
* @param {string} name
*/
get (name) {
return Promise.resolve(this._children[name])
return Promise.resolve(this._children.get(name))
}

childCount () {
return Object.keys(this._children).length
return this._children.size
}

directChildrenCount () {
return this.childCount()
}

onlyChild () {
return this._children[Object.keys(this._children)[0]]
return this._children.values().next().value
}

async * eachChildSeries () {
const keys = Object.keys(this._children)
for (const [key, child] of this._children.entries()) {
yield {
key,
child
}
}
}

estimateNodeSize () {
if (this.nodeSize !== undefined) {
return this.nodeSize
}

for (let i = 0; i < keys.length; i++) {
const key = keys[i]
this.nodeSize = 0

yield {
key: key,
child: this._children[key]
// estimate size only based on DAGLink name and CID byte lengths
// https://github.com/ipfs/go-unixfsnode/blob/37b47f1f917f1b2f54c207682f38886e49896ef9/data/builder/directory.go#L81-L96
for (const [name, child] of this._children.entries()) {
if (child.size != null && child.cid) {
this.nodeSize += name.length + (this.options.cidVersion === 1 ? CID_V1.bytes.byteLength : CID_V0.bytes.byteLength)
}
}

return this.nodeSize
}

/**
* @param {Blockstore} block
* @returns {AsyncIterable<ImportResult>}
*/
async * flush (block) {
const children = Object.keys(this._children)
const links = []

for (let i = 0; i < children.length; i++) {
let child = this._children[children[i]]

for (let [name, child] of this._children.entries()) {
if (child instanceof Dir) {
for await (const entry of child.flush(block)) {
child = entry
Expand All @@ -89,7 +101,7 @@ class DirFlat extends Dir {

if (child.size != null && child.cid) {
links.push({
Name: children[i],
Name: name,
Tsize: child.size,
Hash: child.cid
})
Expand Down
91 changes: 89 additions & 2 deletions packages/ipfs-unixfs-importer/src/dir-sharded.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { encode, prepare } from '@ipld/dag-pb'
import { UnixFS } from 'ipfs-unixfs'
import Dir from './dir.js'
import { Dir, CID_V0, CID_V1 } from './dir.js'
import persist from './utils/persist.js'
import { createHAMT, Bucket } from 'hamt-sharding'

Expand Down Expand Up @@ -35,6 +35,10 @@ class DirSharded extends Dir {
* @param {InProgressImportResult | Dir} value
*/
async put (name, value) {
this.cid = undefined
this.size = undefined
this.nodeSize = undefined

await this._bucket.put(name, value)
}

Expand Down Expand Up @@ -66,6 +70,16 @@ class DirSharded extends Dir {
}
}

estimateNodeSize () {
if (this.nodeSize !== undefined) {
return this.nodeSize
}

this.nodeSize = calculateSize(this._bucket, this, this.options)

return this.nodeSize
}

/**
* @param {Blockstore} blockstore
* @returns {AsyncIterable<ImportResult>}
Expand All @@ -85,7 +99,7 @@ export default DirSharded
/**
* @param {Bucket<?>} bucket
* @param {Blockstore} blockstore
* @param {*} shardRoot
* @param {DirSharded | null} shardRoot
* @param {ImporterOptions} options
* @returns {AsyncIterable<ImportResult>}
*/
Expand Down Expand Up @@ -183,3 +197,76 @@ async function * flush (bucket, blockstore, shardRoot, options) {
size
}
}

/**
* @param {Bucket<?>} bucket
* @param {DirSharded | null} shardRoot
* @param {ImporterOptions} options
*/
function calculateSize (bucket, shardRoot, options) {
const children = bucket._children
const links = []

for (let i = 0; i < children.length; i++) {
const child = children.get(i)

if (!child) {
continue
}

const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0')

if (child instanceof Bucket) {
const size = calculateSize(child, null, options)

links.push({
Name: labelPrefix,
Tsize: size,
Hash: options.cidVersion === 0 ? CID_V0 : CID_V1
})
} else if (typeof child.value.flush === 'function') {
const dir = child.value
const size = dir.nodeSize()

links.push({
Name: labelPrefix + child.key,
Tsize: size,
Hash: options.cidVersion === 0 ? CID_V0 : CID_V1
})
} else {
const value = child.value

if (!value.cid) {
continue
}

const label = labelPrefix + child.key
const size = value.size

links.push({
Name: label,
Tsize: size,
Hash: value.cid
})
}
}

// go-ipfs uses little endian, that's why we have to
// reverse the bit field before storing it
const data = Uint8Array.from(children.bitField().reverse())
const dir = new UnixFS({
type: 'hamt-sharded-directory',
data,
fanout: bucket.tableSize(),
hashType: options.hamtHashCode,
mtime: shardRoot && shardRoot.mtime,
mode: shardRoot && shardRoot.mode
})

const buffer = encode(prepare({
Data: dir.marshal(),
Links: links
}))

return buffer.length
}
Loading

0 comments on commit 6ef187f

Please sign in to comment.