Skip to content

Commit

Permalink
feat: ucan endpoint supports agent message
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Apr 11, 2023
1 parent 04198ef commit 0f11106
Show file tree
Hide file tree
Showing 13 changed files with 13,556 additions and 20,375 deletions.
9 changes: 5 additions & 4 deletions docs/ucan-invocation-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ UCAN is a chained-capability format. A UCAN contains all of the information that

We can identify three core components on our services built relying on UCANs:
- Task to be executed (`with`, `can` and `nb` fields of UCAN)
- Bear in mind that in UCAN world `task` is now referred as `instruction`.
- Invocation (task to be executed together with the provable authority to do so `proofs` + `signature`)
- Workflow (file containing one or more invocations to be executed)

Expand All @@ -17,7 +18,7 @@ With the above components, we can say that:

## Architecture

The entry point for the UCAN Invocation stream is an HTTP endpoint `POST /ucan`. It will receive `workflows` and `receipts` from other services. All invocations and their receipts are persisted in buckets and added into the Stream.
The entry point for the UCAN Invocation stream is an HTTP endpoint `POST /ucan`. It will receive [Agent Messages](https://github.com/web3-storage/ucanto/blob/main/packages/core/src/message.js) from from other services with invocations to be executed and/or reported receipts. All invocations and their receipts are persisted in buckets and added into the UCAN Stream.

AWS Kinesis is the central piece of this architecture. Multiple stream consumers can be hooked into AWS Kinesis for post processing of UCAN invocations.

Expand All @@ -29,11 +30,11 @@ Note that at the time of writing Event Archival flow is still to be implemented.

UCAN Invocation Stack contains 3 buckets so that it can keep an audit of the entire system, while allowing this information to be queried in multiple fashions.

Firstly, the **`workflow-store` bucket** stores the entire encoded file containing one or more invocations to be executed. It is stored as received from UCAN services interacting with UCAN Invocation Stream. It is keyed as `${workflow.cid}/${workflow.cid}` and its value is likely in CAR format. However, CID codec should tell if it is something else.
Firstly, the **`workflow-store` bucket** stores the entire encoded agent message files containing invocations to be executed, and/or created receipts for ran invocations. It is stored as received from UCAN services interacting with UCAN Invocation Stream. It is keyed as `${agentMessage.cid}/${agentMessage.cid}` and its value is likely in CAR format. However, CID codec should tell if it is something else.

At the invocation level, the **`invocation-store` bucket** is responsible for storing two types of values related to UCAN invocations:
- a pseudo symlink to `/${workflow.cid}/${workflow.cid}` via key `${invocation.cid}/${workflow.cid}.workflow` to track where each invocation lives in a Workflow file. As a pseudo symlink, it is an empty object.
- a receipt block issued for a specific task invocation via key `${invocation.cid}/${invocation.cid}.receipt` with block bytes as value.
- a pseudo symlink to `/${agentMessage.cid}/${agentMessage.cid}` via key `${invocation.cid}/${agentMessage.cid}.in` to track where each invocation lives in a agent message file. As a pseudo symlink, it is an empty object.
- a pseudo symlink to `/${agentMessage.cid}/${agentMessage.cid}` via key `${invocation.cid}/${agentMessage.cid}.out` to track where each receipt lives in a agent message file. As a pseudo symlink, it is an empty object.

In the tasks context, the **`task-store` bucket** stores two types of values related to executed tasks:
- a pseudo symlink to `/${invocation.cid}/${invocation.cid}` via `${task.cid}/${invocation.cid}.invocation` to enable looking up invocations and receipts by a task. As a pseudo symlink, it is an empty object.
Expand Down
33,172 changes: 12,852 additions & 20,320 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion upload-api/access.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as Space from '@web3-storage/capabilities/space'
import { connect } from '@ucanto/client'
import { Failure } from '@ucanto/server'
import { CAR, CBOR, HTTP } from '@ucanto/transport'
import { CAR, CBOR, HTTP } from '@ucanto/transport-legacy'
import fetch from '@web-std/fetch'

/**
Expand Down
46 changes: 46 additions & 0 deletions upload-api/buckets/invocation-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,52 @@ export function createInvocationStore(region, bucketName, options = {}) {
*/
export const useInvocationStore = (s3client, bucketName) => {
return {
/**
* Put mapping for where each invocation lives in agent message file.
*
* @param {string} invocationCid
* @param {string} messageCid
*/
putInLink: async (invocationCid, messageCid) => {
const putCmd = new PutObjectCommand({
Bucket: bucketName,
Key: `${invocationCid}/${messageCid}.in`,
})
await pRetry(() => s3client.send(putCmd))
},
/**
* Put mapping for where each receipt lives in agent message file.
*
* @param {string} invocationCid
* @param {string} messageCid
*/
putOutLink: async (invocationCid, messageCid) => {
const putCmd = new PutObjectCommand({
Bucket: bucketName,
Key: `${invocationCid}/${messageCid}.out`,
})
await pRetry(() => s3client.send(putCmd))
},
/**
* Get the agent message file CID for an invocation.
*
* @param {string} invocationCid
*/
getInLink: async (invocationCid) => {
const prefix = `${invocationCid}/`
const listObjectCmd = new ListObjectsV2Command({
Bucket: bucketName,
Prefix: prefix,
})
const listObject = await s3client.send(listObjectCmd)
const carEntry = listObject.Contents?.find(
content => content.Key?.endsWith('.in')
)
if (!carEntry) {
return
}
return carEntry.Key?.replace(prefix, '').replace('.in', '')
},
/**
* Put mapping for where each invocation lives in a Workflow file.
*
Expand Down
4 changes: 2 additions & 2 deletions upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { DID } from '@ucanto/core'
import * as Server from '@ucanto/server'
import * as CAR from '@ucanto/transport/car'
import * as CBOR from '@ucanto/transport/cbor'
import * as CAR from '@ucanto/transport-legacy/car'
import * as CBOR from '@ucanto/transport-legacy/cbor'
import { Kinesis } from '@aws-sdk/client-kinesis'
import * as Sentry from '@sentry/serverless'

Expand Down
2 changes: 0 additions & 2 deletions upload-api/functions/ucan.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ Sentry.AWSLambda.init({
const kinesisClient = new Kinesis({})
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'



/**
* AWS HTTP Gateway handler for POST / with ucan invocation router.
*
Expand Down
9 changes: 6 additions & 3 deletions upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
"@sentry/serverless": "^7.22.0",
"@serverless-stack/node": "^1.18.2",
"@ucanto/client": "^5.1.0",
"@ucanto/interface": "^6.0.0",
"@ucanto/core": "^5.1.0",
"@ucanto/core-next": "npm:@ucanto/core@^7.0.1",
"@ucanto/interface": "^6.2.0",
"@ucanto/principal": "^5.1.0",
"@ucanto/server": "^6.0.0",
"@ucanto/transport": "^5.1.0",
"@ucanto/server": "^6.1.0",
"@ucanto/transport-legacy": "npm:@ucanto/transport@^5.1.1",
"@ucanto/transport": "^7.0.2",
"@web-std/fetch": "^4.1.0",
"@web3-storage/access": "^10.0.0",
"@web3-storage/capabilities": "^3.2.0",
Expand Down
2 changes: 1 addition & 1 deletion upload-api/test/access-verifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as API from '@web3-storage/upload-api'
import { Space } from '@web3-storage/capabilities'
import * as Server from '@ucanto/server'
import * as Client from '@ucanto/client'
import { CAR, CBOR } from '@ucanto/transport'
import { CAR, CBOR } from '@ucanto/transport-legacy'
import { Failure } from '@ucanto/server'

/**
Expand Down
2 changes: 1 addition & 1 deletion upload-api/test/helpers/resources.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { S3Client, CreateBucketCommand } from '@aws-sdk/client-s3'
import { DynamoDBClient, CreateTableCommand } from '@aws-sdk/client-dynamodb'
import * as Signer from '@ucanto/principal/ed25519'
import * as Server from '@ucanto/server'
import { CAR, CBOR } from '@ucanto/transport'
import { CAR, CBOR } from '@ucanto/transport-legacy'
import * as HTTP from 'http'

/**
Expand Down
67 changes: 66 additions & 1 deletion upload-api/test/helpers/ucan.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as ucanto from '@ucanto/core'
import { invoke, Receipt } from '@ucanto/core-next'
import * as Signer from '@ucanto/principal/ed25519'
import * as UcantoClient from '@ucanto/client'
import * as CBOR from '@ucanto/transport/cbor'
import * as CBOR from '@ucanto/transport-legacy/cbor'

/**
* @param {import('@ucanto/interface').Principal} audience
Expand Down Expand Up @@ -79,3 +80,67 @@ export async function createUcanInvocation (can, nb, options = {}) {
proofs,
})
}

/**
* @param {import('@ucanto/interface').Ability} can
* @param {any} nb
* @param {object} [options]
* @param {Signer.EdSigner} [options.audience]
* @param {Signer.EdSigner} [options.issuer]
* @param {`did:key:${string}`} [options.withDid]
* @param {Signer.Delegation[]} [options.proofs]
*/
export async function createAgentMessageInvocation (can, nb, options = {}) {
const audience = options.audience || await Signer.generate()
const issuer = options.issuer || await Signer.generate()

let proofs
let withDid
if (!options.withDid || !options.proofs) {
const { proof, spaceDid } = await createSpace(issuer)

proofs = [proof]
withDid = spaceDid
} else {
proofs = options.proofs
withDid = options.withDid
}

const invocation = invoke({
issuer,
audience,
capability: {
can,
with: withDid,
nb,
},
// @ts-expect-error old client still in use
proofs,
})

return invocation
}

/**
* @param {import('@ucanto/core-next').API.IssuedInvocation} run
* @param {object} options
* @param {any} [options.result]
* @param {any} [options.meta]
*/
export async function createAgentMessageReceipt (run, {
result = { ok: {} },
meta = { test: 'metadata' },
}) {
const delegation = await run.buildIPLDView()

return await Receipt.issue({
// @ts-ignore
issuer: run.audience,
result,
ran: delegation.link(),
meta,
fx: {
fork: [],
},
})
}
Loading

0 comments on commit 0f11106

Please sign in to comment.