Skip to content

Commit

Permalink
feat: add uploadCAR function (#329)
Browse files Browse the repository at this point in the history
The PR adds an `uploadCAR` function to the client.

It differs from `Store.add` because it automatically shards the CAR
file, calls `Store.add` for each shard and finally registers an upload
(using `Upload.add`). This is very similar to `uploadFile` or
`uploadDirectory` except it skips the UnixFS encoding step.

Once merged, I'll propogate this to the
https://github.com/web3-storage/w3cli so that @lanzafame can upload BIG
CARs and have them automatically sharded.
  • Loading branch information
Alan Shaw authored Jan 11, 2023
1 parent 4cfe312 commit 6e40e47
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 20 deletions.
36 changes: 35 additions & 1 deletion packages/upload-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ await Upload.add(conf, rootCID, carCIDs)
- [API](#api)
- [`uploadDirectory`](#uploaddirectory)
- [`uploadFile`](#uploadfile)
- [`uploadCAR`](#uploadcar)
- [`CAR.BlockStream`](#carblockstream)
- [`CAR.encode`](#carencode)
- [`ShardingStream`](#shardingstream)
- [`ShardStoringStream`](#shardstoringstream)
Expand Down Expand Up @@ -173,6 +175,7 @@ function uploadDirectory(
signal?: AbortSignal
onShardStored?: ShardStoredCallback
shardSize?: number
concurrentRequests?: number
} = {}
): Promise<CID>
```
Expand All @@ -194,6 +197,7 @@ function uploadFile(
signal?: AbortSignal
onShardStored?: ShardStoredCallback
shardSize?: number
concurrentRequests?: number
} = {}
): Promise<CID>
```
Expand All @@ -204,6 +208,36 @@ Required delegated capability proofs: `store/add`, `upload/add`

More information: [`InvocationConfig`](#invocationconfig)

### `uploadCAR`

```ts
function uploadCAR(
conf: InvocationConfig,
car: Blob,
options: {
retries?: number
signal?: AbortSignal
onShardStored?: ShardStoredCallback
shardSize?: number
concurrentRequests?: number
} = {}
): Promise<void>
```

Uploads a CAR file to the service. The difference between this function and [Store.add](#storeadd) is that the CAR file is automatically sharded and an "upload" is registered (see [`Upload.add`](#uploadadd)), linking the individual shards. Use the `onShardStored` callback to obtain the CIDs of the CAR file shards.

Required delegated capability proofs: `store/add`, `upload/add`

More information: [`InvocationConfig`](#invocationconfig), [`ShardStoredCallback`](#shardstoredcallback)

### `CAR.BlockStream`

```ts
class BlockStream extends ReadableStream<Block>
```

Creates a readable stream of blocks from a CAR file `Blob`.

### `CAR.encode`

```ts
Expand Down Expand Up @@ -253,7 +287,7 @@ function add(
): Promise<CID>
```

Store a CAR file to the service.
Store a CAR file to the service. Returns the CID of the CAR file stored.

Required delegated capability proofs: `store/add`

Expand Down
61 changes: 60 additions & 1 deletion packages/upload-client/src/car.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CarWriter } from '@ipld/car'
import { CarBlockIterator, CarWriter } from '@ipld/car'

/**
* @param {Iterable<import('@ipld/unixfs').Block>|AsyncIterable<import('@ipld/unixfs').Block>} blocks
Expand Down Expand Up @@ -29,3 +29,62 @@ export async function encode(blocks, root) {
const roots = root != null ? [root] : []
return Object.assign(new Blob(chunks), { version: 1, roots })
}

/** @extends {ReadableStream<import('@ipld/unixfs').Block>} */
export class BlockStream extends ReadableStream {
/** @param {import('./types').BlobLike} car */
constructor(car) {
/** @type {Promise<CarBlockIterator>?} */
let blocksPromise = null
const getBlocksIterable = () => {
if (blocksPromise) return blocksPromise
blocksPromise = CarBlockIterator.fromIterable(toIterable(car.stream()))
return blocksPromise
}

/** @type {AsyncIterator<import('@ipld/unixfs').Block>?} */
let iterator = null
super({
async start() {
const blocks = await getBlocksIterable()
iterator = blocks[Symbol.asyncIterator]()
},
async pull(controller) {
/* c8 ignore next */
if (!iterator) throw new Error('missing blocks iterator')
const { value, done } = await iterator.next()
if (done) return controller.close()
controller.enqueue(value)
},
})

/** @returns {Promise<import('./types').AnyLink[]>} */
this.getRoots = async () => {
const blocks = await getBlocksIterable()
return await blocks.getRoots()
}
}
}

/**
* @template T
* @param {{ getReader: () => ReadableStreamDefaultReader<T> } | AsyncIterable<T>} stream
* @returns {AsyncIterable<T>}
*/
function toIterable(stream) {
return Symbol.asyncIterator in stream
? stream
: /* c8 ignore next 12 */
(async function* () {
const reader = stream.getReader()
try {
while (true) {
const { done, value } = await reader.read()
if (done) return
yield value
}
} finally {
reader.releaseLock()
}
})()
}
33 changes: 33 additions & 0 deletions packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,39 @@ export async function uploadDirectory(conf, files, options = {}) {
)
}

/**
* Uploads a CAR file to the service.
*
* The difference between this function and `Store.add` is that the CAR file is
* automatically sharded and an "upload" is registered, linking the individual
* shards (see `Upload.add`).
*
* Use the `onShardStored` callback to obtain the CIDs of the CAR file shards.
*
* Required delegated capability proofs: `store/add`, `upload/add`
*
* @param {import('./types').InvocationConfig} conf Configuration
* for the UCAN invocation. An object with `issuer`, `with` and `proofs`.
*
* The `issuer` is the signing authority that is issuing the UCAN
* invocation(s). It is typically the user _agent_.
*
* The `with` is the resource the invocation applies to. It is typically the
* DID of a space.
*
* The `proofs` are a set of capability delegations that prove the issuer
* has the capability to perform the action.
*
* The issuer needs the `store/add` and `upload/add` delegated capability.
* @param {import('./types').BlobLike} car CAR file.
* @param {import('./types').UploadOptions} [options]
*/
export async function uploadCAR(conf, car, options = {}) {
const blocks = new CAR.BlockStream(car)
options.rootCID = options.rootCID ?? (await blocks.getRoots())[0]
return await uploadBlockStream(conf, blocks, options)
}

/**
* @param {import('./types').InvocationConfig} conf
* @param {ReadableStream<import('@ipld/unixfs').Block>} blocks
Expand Down
15 changes: 10 additions & 5 deletions packages/upload-client/src/sharding.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ const SHARD_SIZE = 1024 * 1024 * 100
const CONCURRENT_UPLOADS = 3

/**
* Shard a set of blocks into a set of CAR files. The last block is assumed to
* be the DAG root and becomes the CAR root CID for the last CAR output.
* Shard a set of blocks into a set of CAR files. By default the last block
* received is assumed to be the DAG root and becomes the CAR root CID for the
* last CAR output. Set the `rootCID` option to override.
*
* @extends {TransformStream<import('@ipld/unixfs').Block, import('./types').CARFile>}
*/
Expand Down Expand Up @@ -45,7 +46,9 @@ export class ShardingStream extends TransformStream {

const rootBlock = shard.at(-1)
if (rootBlock != null) {
controller.enqueue(await encode(shard, rootBlock.cid))
controller.enqueue(
await encode(shard, options.rootCID ?? rootBlock.cid)
)
}
},
})
Expand Down Expand Up @@ -78,10 +81,12 @@ export class ShardStoringStream extends TransformStream {
* has the capability to perform the action.
*
* The issuer needs the `store/add` delegated capability.
* @param {import('./types').RequestOptions} [options]
* @param {import('./types').ShardStoringOptions} [options]
*/
constructor(conf, options = {}) {
const queue = new Queue({ concurrency: CONCURRENT_UPLOADS })
const queue = new Queue({
concurrency: options.concurrentRequests ?? CONCURRENT_UPLOADS,
})
const abortController = new AbortController()
super({
async transform(car, controller) {
Expand Down
18 changes: 17 additions & 1 deletion packages/upload-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,25 @@ export interface ShardingOptions {
* header and block encoding data.
*/
shardSize?: number
/**
* The root CID of the DAG contained in the shards. By default The last block
* is assumed to be the DAG root and becomes the CAR root CID for the last CAR
* output. Set this option to use this CID instead.
*/
rootCID?: AnyLink
}

export interface ShardStoringOptions extends RequestOptions {
/**
* The number of concurrent requests to store shards. Default 3.
*/
concurrentRequests?: number
}

export interface UploadOptions extends RequestOptions, ShardingOptions {
export interface UploadOptions
extends RequestOptions,
ShardingOptions,
ShardStoringOptions {
onShardStored?: (meta: CARMetadata) => void
}

Expand Down
31 changes: 30 additions & 1 deletion packages/upload-client/test/car.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
import assert from 'assert'
import { CID } from 'multiformats'
import { encode } from '../src/car.js'
import { BlockStream, encode } from '../src/car.js'
import { toCAR } from './helpers/car.js'
import { randomBytes } from './helpers/random.js'

describe('CAR.BlockStream', () => {
it('creates a stream of blocks from a CAR file', async () => {
const bytes = await randomBytes(32)
const car = await toCAR(bytes)
const stream = new BlockStream(car)
const chunks = []
await stream.pipeTo(
new WritableStream({
write: (chunk) => {
chunks.push(chunk)
},
})
)
assert.equal(chunks.length, 1) // should only be enough data for 1 block
})

it('allows access to CAR roots', async () => {
const bytes = await randomBytes(32)
const car = await toCAR(bytes)
const stream = new BlockStream(car)
const roots = await stream.getRoots()
assert(roots[0])
assert(car.roots[0])
assert.equal(roots[0]?.toString(), car.roots[0].toString())
})
})

describe('CAR.encode', () => {
it('propagates error when source throws', async () => {
Expand Down
10 changes: 10 additions & 0 deletions packages/upload-client/test/helpers/block.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import { sha256 } from 'multiformats/hashes/sha2'

/** @param {Uint8Array} bytes */
export async function toBlock(bytes) {
const hash = await sha256.digest(bytes)
const cid = CID.create(1, raw.code, hash)
return { cid, bytes }
}
14 changes: 5 additions & 9 deletions packages/upload-client/test/helpers/car.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import { CarWriter } from '@ipld/car'
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import { sha256 } from 'multiformats/hashes/sha2'
import * as CAR from '@ucanto/transport/car'
import { toBlock } from './block.js'

/**
* @param {Uint8Array} bytes
**/
export async function toCAR(bytes) {
const hash = await sha256.digest(bytes)
const root = CID.create(1, raw.code, hash)

const { writer, out } = CarWriter.create(root)
writer.put({ cid: root, bytes })
const block = await toBlock(bytes)
const { writer, out } = CarWriter.create(block.cid)
writer.put(block)
writer.close()

const chunks = []
Expand All @@ -22,5 +18,5 @@ export async function toCAR(bytes) {
const blob = new Blob(chunks)
const cid = await CAR.codec.link(new Uint8Array(await blob.arrayBuffer()))

return Object.assign(blob, { cid, roots: [root] })
return Object.assign(blob, { cid, roots: [block.cid] })
}
7 changes: 7 additions & 0 deletions packages/upload-client/test/helpers/random.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { toBlock } from './block.js'
import { toCAR } from './car.js'

/** @param {number} size */
Expand Down Expand Up @@ -29,3 +30,9 @@ export async function randomCAR(size) {
const bytes = await randomBytes(size)
return toCAR(bytes)
}

/** @param {number} size */
export async function randomBlock(size) {
const bytes = await randomBytes(size)
return await toBlock(bytes)
}
Loading

0 comments on commit 6e40e47

Please sign in to comment.