Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add uploadCAR function #329

Merged
merged 3 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion packages/upload-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ await Upload.add(conf, rootCID, carCIDs)
- [API](#api)
- [`uploadDirectory`](#uploaddirectory)
- [`uploadFile`](#uploadfile)
- [`uploadCAR`](#uploadcar)
- [`CAR.BlockStream`](#carblockstream)
- [`CAR.encode`](#carencode)
- [`ShardingStream`](#shardingstream)
- [`ShardStoringStream`](#shardstoringstream)
Expand Down Expand Up @@ -204,6 +206,35 @@ Required delegated capability proofs: `store/add`, `upload/add`

More information: [`InvocationConfig`](#invocationconfig)

### `uploadCAR`

```ts
function uploadCAR(
conf: InvocationConfig,
car: Blob,
options: {
retries?: number
signal?: AbortSignal
onShardStored?: ShardStoredCallback
alanshaw marked this conversation as resolved.
Show resolved Hide resolved
shardSize?: number
} = {}
): Promise<void>
```

Uploads a CAR file to the service. The difference between this function and [Store.add](#storeadd) is that the CAR file is automatically sharded and an "upload" is registered (see [`Upload.add`](#uploadadd)), linking the individual shards. Use the `onShardStored` callback to obtain the CIDs of the CAR file shards.

Required delegated capability proofs: `store/add`, `upload/add`

More information: [`InvocationConfig`](#invocationconfig), [`ShardStoredCallback`](#shardstoredcallback)

### `CAR.BlockStream`

```ts
class BlockStream extends ReadableStream<Block>
```

Creates a readable stream of blocks from a CAR file `Blob`.

### `CAR.encode`

```ts
Expand Down Expand Up @@ -253,7 +284,7 @@ function add(
): Promise<CID>
```

Store a CAR file to the service.
Store a CAR file to the service. Returns the CID of the CAR file stored.
alanshaw marked this conversation as resolved.
Show resolved Hide resolved

Required delegated capability proofs: `store/add`

Expand Down
61 changes: 60 additions & 1 deletion packages/upload-client/src/car.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CarWriter } from '@ipld/car'
import { CarBlockIterator, CarWriter } from '@ipld/car'

/**
* @param {Iterable<import('@ipld/unixfs').Block>|AsyncIterable<import('@ipld/unixfs').Block>} blocks
Expand Down Expand Up @@ -29,3 +29,62 @@ export async function encode(blocks, root) {
const roots = root != null ? [root] : []
return Object.assign(new Blob(chunks), { version: 1, roots })
}

/** @extends {ReadableStream<import('@ipld/unixfs').Block>} */
export class BlockStream extends ReadableStream {
/** @param {import('./types').BlobLike} car */
constructor(car) {
/** @type {Promise<CarBlockIterator>?} */
let blocksPromise = null
const getBlocksIterable = () => {
if (blocksPromise) return blocksPromise
blocksPromise = CarBlockIterator.fromIterable(toIterable(car.stream()))
return blocksPromise
}

/** @type {AsyncIterator<import('@ipld/unixfs').Block>?} */
let iterator = null
super({
async start() {
const blocks = await getBlocksIterable()
iterator = blocks[Symbol.asyncIterator]()
},
async pull(controller) {
/* c8 ignore next */
if (!iterator) throw new Error('missing blocks iterator')
const { value, done } = await iterator.next()
if (done) return controller.close()
controller.enqueue(value)
},
})

/** @returns {Promise<import('./types').AnyLink[]>} */
this.getRoots = async () => {
const blocks = await getBlocksIterable()
return await blocks.getRoots()
}
}
}

/**
* @template T
* @param {{ getReader: () => ReadableStreamDefaultReader<T> } | AsyncIterable<T>} stream
* @returns {AsyncIterable<T>}
*/
function toIterable(stream) {
return Symbol.asyncIterator in stream
? stream
: /* c8 ignore next 12 */
(async function* () {
const reader = stream.getReader()
try {
while (true) {
const { done, value } = await reader.read()
if (done) return
yield value
}
} finally {
reader.releaseLock()
}
})()
}
33 changes: 33 additions & 0 deletions packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,39 @@ export async function uploadDirectory(conf, files, options = {}) {
)
}

/**
* Uploads a CAR file to the service.
*
* The difference between this function and `Store.add` is that the CAR file is
* automatically sharded and an "upload" is registered, linking the individual
* shards (see `Upload.add`).
*
* Use the `onShardStored` callback to obtain the CIDs of the CAR file shards.
*
* Required delegated capability proofs: `store/add`, `upload/add`
*
* @param {import('./types').InvocationConfig} conf Configuration
* for the UCAN invocation. An object with `issuer`, `with` and `proofs`.
*
* The `issuer` is the signing authority that is issuing the UCAN
* invocation(s). It is typically the user _agent_.
*
* The `with` is the resource the invocation applies to. It is typically the
* DID of a space.
*
* The `proofs` are a set of capability delegations that prove the issuer
* has the capability to perform the action.
*
* The issuer needs the `store/add` and `upload/add` delegated capability.
* @param {import('./types').BlobLike} car CAR file.
* @param {import('./types').UploadOptions} [options]
*/
export async function uploadCAR(conf, car, options = {}) {
const blocks = new CAR.BlockStream(car)
options.rootCID = options.rootCID ?? (await blocks.getRoots())[0]
alanshaw marked this conversation as resolved.
Show resolved Hide resolved
return await uploadBlockStream(conf, blocks, options)
}

/**
* @param {import('./types').InvocationConfig} conf
* @param {ReadableStream<import('@ipld/unixfs').Block>} blocks
Expand Down
15 changes: 10 additions & 5 deletions packages/upload-client/src/sharding.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ const SHARD_SIZE = 1024 * 1024 * 100
const CONCURRENT_UPLOADS = 3

/**
* Shard a set of blocks into a set of CAR files. The last block is assumed to
* be the DAG root and becomes the CAR root CID for the last CAR output.
* Shard a set of blocks into a set of CAR files. By default the last block
* received is assumed to be the DAG root and becomes the CAR root CID for the
* last CAR output. Set the `rootCID` option to override.
*
* @extends {TransformStream<import('@ipld/unixfs').Block, import('./types').CARFile>}
*/
Expand Down Expand Up @@ -45,7 +46,9 @@ export class ShardingStream extends TransformStream {

const rootBlock = shard.at(-1)
if (rootBlock != null) {
controller.enqueue(await encode(shard, rootBlock.cid))
controller.enqueue(
await encode(shard, options.rootCID ?? rootBlock.cid)
)
}
},
})
Expand Down Expand Up @@ -78,10 +81,12 @@ export class ShardStoringStream extends TransformStream {
* has the capability to perform the action.
*
* The issuer needs the `store/add` delegated capability.
* @param {import('./types').RequestOptions} [options]
* @param {import('./types').ShardStoringOptions} [options]
*/
constructor(conf, options = {}) {
const queue = new Queue({ concurrency: CONCURRENT_UPLOADS })
const queue = new Queue({
concurrency: options.concurrentRequests ?? CONCURRENT_UPLOADS,
})
const abortController = new AbortController()
super({
async transform(car, controller) {
Expand Down
18 changes: 17 additions & 1 deletion packages/upload-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,25 @@ export interface ShardingOptions {
* header and block encoding data.
*/
shardSize?: number
/**
* The root CID of the DAG contained in the shards. By default The last block
* is assumed to be the DAG root and becomes the CAR root CID for the last CAR
* output. Set this option to use this CID instead.
*/
rootCID?: AnyLink
}

export interface ShardStoringOptions extends RequestOptions {
/**
* The number of concurrent requests to store shards. Default 3.
*/
concurrentRequests?: number
}

export interface UploadOptions extends RequestOptions, ShardingOptions {
export interface UploadOptions
extends RequestOptions,
ShardingOptions,
ShardStoringOptions {
onShardStored?: (meta: CARMetadata) => void
}

Expand Down
31 changes: 30 additions & 1 deletion packages/upload-client/test/car.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
import assert from 'assert'
import { CID } from 'multiformats'
import { encode } from '../src/car.js'
import { BlockStream, encode } from '../src/car.js'
import { toCAR } from './helpers/car.js'
import { randomBytes } from './helpers/random.js'

describe('CAR.BlockStream', () => {
it('creates a stream of blocks from a CAR file', async () => {
const bytes = await randomBytes(32)
const car = await toCAR(bytes)
const stream = new BlockStream(car)
const chunks = []
await stream.pipeTo(
new WritableStream({
write: (chunk) => {
chunks.push(chunk)
},
})
)
assert.equal(chunks.length, 1) // should only be enough data for 1 block
})

it('allows access to CAR roots', async () => {
const bytes = await randomBytes(32)
const car = await toCAR(bytes)
const stream = new BlockStream(car)
const roots = await stream.getRoots()
assert(roots[0])
assert(car.roots[0])
assert.equal(roots[0]?.toString(), car.roots[0].toString())
})
})

describe('CAR.encode', () => {
it('propagates error when source throws', async () => {
Expand Down
10 changes: 10 additions & 0 deletions packages/upload-client/test/helpers/block.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import { sha256 } from 'multiformats/hashes/sha2'

/** @param {Uint8Array} bytes */
export async function toBlock(bytes) {
const hash = await sha256.digest(bytes)
const cid = CID.create(1, raw.code, hash)
return { cid, bytes }
}
14 changes: 5 additions & 9 deletions packages/upload-client/test/helpers/car.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import { CarWriter } from '@ipld/car'
import { CID } from 'multiformats/cid'
import * as raw from 'multiformats/codecs/raw'
import { sha256 } from 'multiformats/hashes/sha2'
import * as CAR from '@ucanto/transport/car'
import { toBlock } from './block.js'

/**
* @param {Uint8Array} bytes
**/
export async function toCAR(bytes) {
const hash = await sha256.digest(bytes)
const root = CID.create(1, raw.code, hash)

const { writer, out } = CarWriter.create(root)
writer.put({ cid: root, bytes })
const block = await toBlock(bytes)
const { writer, out } = CarWriter.create(block.cid)
writer.put(block)
writer.close()

const chunks = []
Expand All @@ -22,5 +18,5 @@ export async function toCAR(bytes) {
const blob = new Blob(chunks)
const cid = await CAR.codec.link(new Uint8Array(await blob.arrayBuffer()))

return Object.assign(blob, { cid, roots: [root] })
return Object.assign(blob, { cid, roots: [block.cid] })
}
7 changes: 7 additions & 0 deletions packages/upload-client/test/helpers/random.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { toBlock } from './block.js'
import { toCAR } from './car.js'

/** @param {number} size */
Expand Down Expand Up @@ -29,3 +30,9 @@ export async function randomCAR(size) {
const bytes = await randomBytes(size)
return toCAR(bytes)
}

/** @param {number} size */
export async function randomBlock(size) {
const bytes = await randomBytes(size)
return await toBlock(bytes)
}
Loading