Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: versioned transport protocol #274

Merged
merged 15 commits into from
Apr 11, 2023
2 changes: 1 addition & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"scripts": {
"test:web": "playwright-test test/**/*.spec.js --cov && nyc report",
"test:node": "c8 --check-coverage --branches 100 --functions 100 --lines 100 mocha test/**/*.spec.js",
"test": "npm run test:node",
"test": "c8 --check-coverage --branches 100 --functions 100 --lines 100 mocha --bail test/**/*.spec.js",
"coverage": "c8 --reporter=html mocha test/test-*.js && npm_config_yes=true npx st -d coverage -p 8080",
"check": "tsc --build",
"build": "tsc --build"
Expand Down
25 changes: 14 additions & 11 deletions packages/client/src/connection.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as API from '@ucanto/interface'
import { Receipt, Signature, sha256 } from '@ucanto/core'
import { Signature, Message, Receipt, sha256 } from '@ucanto/core'

/**
* Creates a connection to a service.
Expand Down Expand Up @@ -29,8 +29,9 @@ class Connection {
* @template {API.Capability} C
* @template {API.Tuple<API.ServiceInvocation<C, T>>} I
* @param {I} invocations
* @returns {Promise<API.InferReceipts<I, T>>}
*/
execute(...invocations) {
async execute(...invocations) {
return execute(invocations, this)
}
}
Expand All @@ -40,28 +41,30 @@ class Connection {
* @template {Record<string, any>} T
* @template {API.Tuple<API.ServiceInvocation<C, T>>} I
* @param {API.Connection<T>} connection
* @param {I} workflow
* @returns {Promise<API.InferWorkflowReceipts<I, T>>}
* @param {I} invocations
* @returns {Promise<API.InferReceipts<I, T>>}
*/
export const execute = async (workflow, connection) => {
const request = await connection.codec.encode(workflow, connection)
export const execute = async (invocations, connection) => {
const input = await Message.build({ invocations })
const request = await connection.codec.encode(input, connection)
const response = await connection.channel.request(request)
// We may fail to decode the response if content type is not supported
// or if data was corrupted. We do not want to throw in such case however,
// because client will get an Error object as opposed to a receipt, to retain
// consistent client API with two kinds of errors we encode caught error as
// a receipts per workflow invocation.
try {
return await connection.codec.decode(response)
const output = await connection.codec.decode(response)
const receipts = input.invocationLinks.map(link => output.get(link))
return /** @type {API.InferReceipts<I, T>} */ (receipts)
} catch (error) {
// No third party code is run during decode and we know
// we only throw an Error
const { message, ...cause } = /** @type {Error} */ (error)
const receipts = []
for await (const invocation of workflow) {
const { cid } = await invocation.delegate()
for await (const ran of input.invocationLinks) {
const receipt = await Receipt.issue({
ran: cid,
ran,
result: { error: { ...cause, message } },
// @ts-expect-error - we can not really sign a receipt without having
// an access to a signer which client does not have. In the future
Expand All @@ -80,6 +83,6 @@ export const execute = async (workflow, connection) => {
receipts.push(receipt)
}

return /** @type {any} */ (receipts)
return /** @type {API.InferReceipts<I, T>} */ (receipts)
}
}
50 changes: 28 additions & 22 deletions packages/client/test/client.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Client from '../src/lib.js'
import * as HTTP from '@ucanto/transport/http'
import { CAR, Codec } from '@ucanto/transport'
import * as Service from './service.js'
import { Receipt, CBOR } from '@ucanto/core'
import { Receipt, Message, CBOR } from '@ucanto/core'
import { alice, bob, mallory, service as w3 } from './fixtures.js'
import fetch from '@web-std/fetch'

Expand All @@ -30,18 +30,19 @@ test('encode invocation', async () => {
proofs: [],
})

const payload = await connection.codec.encode([add])
const message = await Message.build({ invocations: [add] })
const payload = await connection.codec.encode(message)

assert.deepEqual(payload.headers, {
'content-type': 'application/car',
accept: 'application/car',
'content-type': 'application/vnd.ipld.car',
accept: 'application/vnd.ipld.car',
})
assert.ok(payload.body instanceof Uint8Array)

const request = await CAR.decode(payload)
const request = await CAR.request.decode(payload)

const [invocation] = request
assert.equal(request.length, 1)
const [invocation] = request.invocations
assert.equal(request.invocations.length, 1)
assert.equal(invocation.issuer.did(), alice.did())
assert.equal(invocation.audience.did(), w3.did())
assert.deepEqual(invocation.proofs, [])
Expand Down Expand Up @@ -98,11 +99,12 @@ test('encode delegated invocation', async () => {
},
})

const payload = await connection.codec.encode([add, remove])
const request = await CAR.decode(payload)
const message = await Message.build({ invocations: [add, remove] })
const payload = await connection.codec.encode(message)
const request = await CAR.request.decode(payload)
{
const [add, remove] = request
assert.equal(request.length, 2)
const [add, remove] = request.invocations
assert.equal(request.invocations.length, 2)

assert.equal(add.issuer.did(), bob.did())
assert.equal(add.audience.did(), w3.did())
Expand All @@ -125,13 +127,16 @@ test('encode delegated invocation', async () => {
assert.equal(remove.issuer.did(), alice.did())
assert.equal(remove.audience.did(), w3.did())
assert.deepEqual(remove.proofs, [])
assert.deepEqual(remove.capabilities, [
{
can: 'store/remove',
with: alice.did(),
link: car.cid,
},
])
assert.deepEqual(
[
Object({
can: 'store/remove',
with: alice.did(),
link: car.cid,
}),
],
remove.capabilities
)
}
})

Expand All @@ -140,8 +145,7 @@ const service = Service.create()
const channel = HTTP.open({
url: new URL('about:blank'),
fetch: async (url, input) => {
/** @type {Client.Tuple<Client.Invocation>} */
const invocations = await CAR.request.decode(input)
const { invocations } = await CAR.request.decode(input)
const promises = invocations.map(async invocation => {
const [capability] = invocation.capabilities
switch (capability.can) {
Expand Down Expand Up @@ -172,7 +176,9 @@ const channel = HTTP.open({
await Promise.all(promises)
)

const { headers, body } = await CAR.response.encode(receipts)
const message = await Message.build({ receipts })

const { headers, body } = await CAR.response.encode(message)

return {
ok: true,
Expand Down Expand Up @@ -317,7 +323,7 @@ test('decode error', async () => {
error: {
error: true,
message:
"Can not decode response with content-type 'application/car' because no matching transport decoder is configured.",
"Can not decode response with content-type 'application/vnd.ipld.car' because no matching transport decoder is configured.",
},
})
})
18 changes: 10 additions & 8 deletions packages/core/src/car.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@ import { base32 } from 'multiformats/bases/base32'
import { create as createLink } from './link.js'
import { sha256 } from 'multiformats/hashes/sha2'

// @see https://www.iana.org/assignments/media-types/application/vnd.ipld.car
export const contentType = 'application/vnd.ipld.car'
export const name = 'CAR'

/** @type {API.MulticodecCode<0x0202, 'CAR'>} */
export const code = 0x0202

/**
* @typedef {API.Block<unknown, number, number, 0|1>} Block
* @typedef {{
* roots: Block[]
* blocks: Map<string, Block>
* roots: API.IPLDBlock[]
* blocks: Map<string, API.IPLDBlock>
* }} Model
*/

class Writer {
/**
* @param {Block[]} blocks
* @param {API.IPLDBlock[]} blocks
* @param {number} byteLength
*/
constructor(blocks = [], byteLength = 0) {
Expand All @@ -29,23 +30,23 @@ class Writer {
this.byteLength = byteLength
}
/**
* @param {Block[]} blocks
* @param {API.IPLDBlock[]} blocks
*/
write(...blocks) {
for (const block of blocks) {
const id = block.cid.toString(base32)
if (!this.written.has(id)) {
this.blocks.push(block)
this.byteLength += CarBufferWriter.blockLength(
/** @type {CarBufferWriter.Block} */ (block)
/** @type {any} */ (block)
)
this.written.add(id)
}
}
return this
}
/**
* @param {Block[]} rootBlocks
* @param {API.IPLDBlock[]} rootBlocks
*/
flush(...rootBlocks) {
const roots = []
Expand Down Expand Up @@ -99,11 +100,12 @@ export const encode = ({ roots = [], blocks }) => {
*/
export const decode = bytes => {
const reader = CarBufferReader.fromBytes(bytes)
/** @type {API.IPLDBlock[]} */
const roots = []
const blocks = new Map()

for (const root of reader.getRoots()) {
const block = reader.get(root)
const block = /** @type {API.IPLDBlock} */ (reader.get(root))
if (block) {
roots.push(block)
}
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/cbor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ export { code, name, decode } from '@ipld/dag-cbor'
import { sha256 } from 'multiformats/hashes/sha2'
import { create as createLink, isLink } from 'multiformats/link'

// @see https://www.iana.org/assignments/media-types/application/vnd.ipld.dag-cbor
export const contentType = 'application/vnd.ipld.dag-cbor'

/**
* @param {unknown} data
* @param {Set<unknown>} seen
Expand Down
28 changes: 20 additions & 8 deletions packages/core/src/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import * as MF from 'multiformats/interface'
import * as CBOR from './cbor.js'
import { identity } from 'multiformats/hashes/identity'

export { CBOR, sha256, identity }

/**
* Function takes arbitrary value and if it happens to be an `IPLDView`
* it will iterate over it's blocks. It is just a convenience for traversing
Expand All @@ -27,15 +29,20 @@ export const iterate = function* (value) {
}

/**
* @template T
* @typedef {Map<API.ToString<API.Link>, API.Block<T>>} BlockStore
* @template [T=unknown]
* @typedef {Map<API.ToString<API.Link>, API.Block<T, number, number, 0>|API.Block<T, number, number, 1>>} BlockStore
*/

/**
* @template [T=unknown]
* @param {API.Block<T>[]} blocks
* @returns {BlockStore<T>}
*/
export const createStore = () => new Map()
export const createStore = (blocks = []) => {
const store = new Map()
addEveryInto(blocks, store)
return store
}

/** @type {API.MulticodecCode<typeof identity.code, typeof identity.name>} */
const EMBED_CODE = identity.code
Expand All @@ -45,21 +52,26 @@ const EMBED_CODE = identity.code
* contain the block, `fallback` is returned. If `fallback` is not provided, it
* will throw an error.
*
* @template {0|1} V
* @template {T} U
* @template T
* @template {API.MulticodecCode} Format
* @template {API.MulticodecCode} Alg
* @template [E=never]
* @param {API.Link<U>} cid
* @param {API.Link<U, Format, Alg, V>} cid
* @param {BlockStore<T>} store
* @param {E} [fallback]
* @returns {API.Block<U>|E}
* @returns {API.Block<U, Format, Alg, V>|E}
*/
export const get = (cid, store, fallback) => {
// If CID uses identity hash, we can return the block data directly
if (cid.multihash.code === EMBED_CODE) {
return { cid, bytes: cid.multihash.digest }
}

const block = /** @type {API.Block<U>|undefined} */ (store.get(`${cid}`))
const block = /** @type {API.Block<U, Format, Alg, V>|undefined} */ (
store.get(`${cid}`)
)
return block ? block : fallback === undefined ? notFound(cid) : fallback
}

Expand All @@ -84,10 +96,10 @@ export const embed = (source, { codec } = {}) => {
}

/**
* @param {API.Link} link
* @param {API.Link<*, *, *, *>} link
* @returns {never}
*/
const notFound = link => {
export const notFound = link => {
throw new Error(`Block for the ${link} is not found`)
}

Expand Down
10 changes: 5 additions & 5 deletions packages/core/src/delegation.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ const matchAbility = (provided, claimed) => {
export class Delegation {
/**
* @param {API.UCANBlock<C>} root
* @param {Map<string, API.Block>} [blocks]
* @param {DAG.BlockStore} [blocks]
*/
constructor(root, blocks = new Map()) {
this.root = root
Expand Down Expand Up @@ -364,14 +364,14 @@ export const delegate = async (
/**
* @template {API.Capabilities} C
* @param {API.UCANBlock<C>} root
* @param {Map<string, API.Block>} blocks
* @param {DAG.BlockStore} blocks
* @returns {IterableIterator<API.Block>}
*/

export const exportDAG = function* (root, blocks) {
for (const link of decode(root).proofs) {
// Check if block is included in this delegation
const root = /** @type {UCAN.Block} */ (blocks.get(link.toString()))
const root = /** @type {UCAN.Block} */ (blocks.get(`${link}`))
if (root) {
yield* exportDAG(root, blocks)
}
Expand Down Expand Up @@ -409,7 +409,7 @@ export const importDAG = dag => {
* @template {API.Capabilities} C
* @param {object} dag
* @param {API.UCANBlock<C>} dag.root
* @param {Map<string, API.Block<unknown>>} [dag.blocks]
* @param {DAG.BlockStore} [dag.blocks]
* @returns {API.Delegation<C>}
*/
export const create = ({ root, blocks }) => new Delegation(root, blocks)
Expand All @@ -419,7 +419,7 @@ export const create = ({ root, blocks }) => new Delegation(root, blocks)
* @template [T=undefined]
* @param {object} dag
* @param {API.UCANLink<C>} dag.root
* @param {Map<string, API.Block>} dag.blocks
* @param {DAG.BlockStore} dag.blocks
* @param {T} [fallback]
* @returns {API.Delegation<C>|T}
*/
Expand Down
Loading