From 2998a938628a924361450d24c5fc7be572acef3e Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Thu, 30 May 2024 03:20:13 -0700 Subject: [PATCH] feat!: updates agent-store api to unblock integration with w3infra (#1479) While working on https://github.com/w3s-project/w3infra/pull/380 I have realized that agent-store API was poorly designed because it imposed: 1. Store to traverse agent message in order to index each invocations and receipts within it and code that dealt with traversal was trapped in util module here that was not even exported. 2. Store has to encode message into bytes in order to persist it, which is redundant given that we received message in encoded form. This PR fixes above limitations by switching store interface from receiving `AgentMessage` to `ParsedAgentMessage` which wraps `AgentMessage` along with message bytes and index freeing store from doing any kind of traversal or encoding. I also removed legacy code that was left behind by previous PR. --- packages/upload-api/src/lib.js | 29 ++++- packages/upload-api/src/types.ts | 44 ++++++- packages/upload-api/src/types/service.ts | 17 --- .../upload-api/src/utils/agent-message.js | 31 +++++ packages/upload-api/test/handlers/blob.js | 4 +- packages/upload-api/test/handlers/ucan.js | 16 ++- .../test/storage/agent-store-tests.js | 42 ++++--- .../upload-api/test/storage/agent-store.js | 113 ++++++++++++------ .../test/storage/receipts-storage.js | 64 ---------- .../upload-api/test/storage/tasks-storage.js | 72 ----------- 10 files changed, 207 insertions(+), 225 deletions(-) delete mode 100644 packages/upload-api/src/types/service.ts delete mode 100644 packages/upload-api/test/storage/receipts-storage.js delete mode 100644 packages/upload-api/test/storage/tasks-storage.js diff --git a/packages/upload-api/src/lib.js b/packages/upload-api/src/lib.js index 615ce690d..14d90884e 100644 --- a/packages/upload-api/src/lib.js +++ b/packages/upload-api/src/lib.js @@ -24,17 +24,23 @@ import { createService as createW3sService } from './service.js' import { createService as createPlanService } from './plan.js' import { createService as createUsageService } from './usage.js' import { createService as createFilecoinService } from '@web3-storage/filecoin-api/storefront/service' +import * as AgentMessage from './utils/agent-message.js' export * from './types.js' +export { AgentMessage } /** * @param {Omit} options * @returns {Agent} */ -export const createServer = ({ codec = Legacy.inbound, ...context }) => { +export const createServer = ({ codec = Legacy.inbound, ...options }) => { + const context = { + ...options, + ...createRevocationChecker(options), + } + const server = Server.create({ - ...createRevocationChecker(context), - id: context.id, + ...context, codec, service: createService(context), catch: (error) => context.errorReporter.catch(error), @@ -69,6 +75,7 @@ export const createServer = ({ codec = Legacy.inbound, ...context }) => { * @template {Types.Tuple>} I * @param {Agent} agent * @param {Types.HTTPRequest, Out: Types.Tuple }>>} request + * @returns {Promise, In: Types.Tuple }>>>} */ export const handle = async (agent, request) => { const selection = agent.codec.accept(request) @@ -86,7 +93,12 @@ export const handle = async (agent, request) => { // Save invocation inside agent store so we can find it later. If we fail // to save it we return 500 as we do not want to run the invocation that // we are unable to service. - const save = await agent.context.agentStore.messages.write(input) + const save = await agent.context.agentStore.messages.write({ + data: input, + source: request, + index: AgentMessage.index(input), + }) + if (save.error) { return { status: 500, @@ -96,8 +108,14 @@ export const handle = async (agent, request) => { } const output = await execute(agent, input) + const response = await encoder.encode(output) + + const { error } = await agent.context.agentStore.messages.write({ + data: output, + source: response, + index: AgentMessage.index(output), + }) - const { error } = await agent.context.agentStore.messages.write(output) // Failure to write a receipt is not something we can recover from. Throwing // or returning HTTP 500 is also a not a great option because invocation may // have change state and we would not want to rerun it. Which is why we @@ -106,7 +124,6 @@ export const handle = async (agent, request) => { agent.catch(error) } - const response = await encoder.encode(output) return response } } diff --git a/packages/upload-api/src/types.ts b/packages/upload-api/src/types.ts index e785e88bd..8f4e0e288 100644 --- a/packages/upload-api/src/types.ts +++ b/packages/upload-api/src/types.ts @@ -2,6 +2,7 @@ import type { Failure, ServiceMethod, UCANLink, + Link, HandlerExecutionError, Signer, DID, @@ -20,6 +21,13 @@ import type { AgentMessage, Invocation, Receipt, + AgentMessageModel, + UCAN, + Capability, + ReceiptModel, + Variant, + HTTPRequest, + HTTPResponse, } from '@ucanto/interface' import type { ProviderInput, ConnectionView } from '@ucanto/server' @@ -189,8 +197,7 @@ import { SubscriptionsStorage } from './types/subscriptions.js' export type { SubscriptionsStorage } import { UsageStorage } from './types/usage.js' export type { UsageStorage } -import { StorageGetError, TasksScheduler } from './types/service.js' -export type { TasksScheduler } +import { StorageGetError } from './types/storage.js' import { AllocationsStorage, BlobsStorage, BlobAddInput } from './types/blob.js' export type { AllocationsStorage, BlobsStorage, BlobAddInput } import { IPNIService, IndexServiceContext } from './types/index.js' @@ -199,7 +206,7 @@ export type { IPNIService, BlobRetriever, BlobNotFound, - ShardedDAGIndex + ShardedDAGIndex, } from './types/index.js' export interface Service extends StorefrontService, W3sService { @@ -495,11 +502,40 @@ export interface AgentContext { * {@link Invocation} and {@link Receipt} lookups. */ export interface AgentStore { - messages: Writer + messages: Writer invocations: Accessor receipts: Accessor } +export type TaskLink = Link + +export type InvocationLink = Link> +export type ReceiptLink = Link +export type AgentMessageLink = Link> + +export interface ParsedAgentMessage { + source: HTTPRequest | HTTPResponse + data: AgentMessage + index: Iterable +} + +export interface InvocationSource { + task: TaskLink + invocation: Invocation + message: AgentMessageLink +} + +export interface ReceiptSource { + task: TaskLink + receipt: Receipt + message: AgentMessageLink +} + +export type AgentMessageIndexRecord = Variant<{ + invocation: InvocationSource + receipt: ReceiptSource +}> + /** * Read interface for the key value store. */ diff --git a/packages/upload-api/src/types/service.ts b/packages/upload-api/src/types/service.ts deleted file mode 100644 index e0609464d..000000000 --- a/packages/upload-api/src/types/service.ts +++ /dev/null @@ -1,17 +0,0 @@ -import type { - UnknownLink, - Receipt, - Result, - Unit, - Failure, - ServiceInvocation, -} from '@ucanto/interface' -import type { Storage, StorageGetError, StoragePutError } from './storage.js' - -export type { StorageGetError, StoragePutError } - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export type ReceiptsStorage = Storage> -export interface TasksScheduler { - schedule: (invocation: ServiceInvocation) => Promise> -} diff --git a/packages/upload-api/src/utils/agent-message.js b/packages/upload-api/src/utils/agent-message.js index f7685839f..17906b080 100644 --- a/packages/upload-api/src/utils/agent-message.js +++ b/packages/upload-api/src/utils/agent-message.js @@ -118,3 +118,34 @@ export class Iterator { export function* iterate(message, options) { yield* new Iterator(message, options) } + +/** + * @param {API.AgentMessage} message + * @returns {Iterable} + */ +export const index = function* (message) { + const source = message.root.cid + for (const { receipt, invocation } of iterate(message)) { + if (invocation) { + // TODO: actually derive task CID + const task = invocation.link() + yield { + invocation: { + task, + invocation, + message: source, + }, + } + } + + if (receipt) { + yield { + receipt: { + task: receipt.ran.link(), + receipt, + message: source, + }, + } + } + } +} diff --git a/packages/upload-api/test/handlers/blob.js b/packages/upload-api/test/handlers/blob.js index b78098403..33da67d0a 100644 --- a/packages/upload-api/test/handlers/blob.js +++ b/packages/upload-api/test/handlers/blob.js @@ -409,7 +409,9 @@ export const test = { } assert.ok( - String(accept.ok.out.error).match(/Blob not found/), + /** @type {{message:string}} */ (accept.ok.out.error).message.match( + /Blob not found/ + ), 'accept was not successful' ) }, diff --git a/packages/upload-api/test/handlers/ucan.js b/packages/upload-api/test/handlers/ucan.js index cc47b5665..d0e952839 100644 --- a/packages/upload-api/test/handlers/ucan.js +++ b/packages/upload-api/test/handlers/ucan.js @@ -403,17 +403,21 @@ export const test = { assert.ok(conclude.out.ok) assert.ok(conclude.out.ok?.time) - assert.deepEqual( - await context.agentStore.invocations.get(invocation.link()), - { ok: invocation } - ) + const stored = await context.agentStore.invocations.get(invocation.link()) + assert.equal(stored.ok?.link().toString(), invocation.link().toString()) const storedReceipt = await context.agentStore.receipts.get( invocation.link() ) assert.ok(storedReceipt.ok) - assert.deepEqual(storedReceipt.ok?.link(), receipt.link()) - assert.deepEqual(storedReceipt.ok?.ran, invocation) + assert.deepEqual( + storedReceipt.ok?.link().toString(), + receipt.link().toString() + ) + assert.deepEqual( + storedReceipt.ok?.ran.link().toString(), + invocation.link().toString() + ) }, 'ucan/conclude schedules web3.storage/blob/accept if invoked with the http/put receipt': async (assert, context) => { diff --git a/packages/upload-api/test/storage/agent-store-tests.js b/packages/upload-api/test/storage/agent-store-tests.js index f3aeafc32..0dda643e9 100644 --- a/packages/upload-api/test/storage/agent-store-tests.js +++ b/packages/upload-api/test/storage/agent-store-tests.js @@ -6,7 +6,9 @@ import { Console } from '@web3-storage/capabilities' import { alice, registerSpace } from '../util.js' import { Message, Receipt } from '@ucanto/core' +import * as CAR from '@ucanto/transport/car' import { createConcludeInvocation } from '../../src/ucan/conclude.js' +import * as AgentMessage from '../../src/utils/agent-message.js' /** * @type {API.Tests} @@ -109,7 +111,7 @@ export const test = { assert.ok(hiReceipt.out.ok) const storedHi = await context.agentStore.invocations.get(hi.link()) - assert.deepEqual(storedHi.ok?.link(), hi.link()) + assert.deepEqual(storedHi.ok?.link().toString(), hi.link().toString()) const storedHiReceipt = await context.agentStore.receipts.get(hi.link()) assert.equal( @@ -119,11 +121,11 @@ export const test = { const [byeReceipt, hiReceipt2] = await context.connection.execute(bye, hi) - assert.deepEqual(hiReceipt2.ran.link(), hi.link()) + assert.deepEqual(hiReceipt2.ran.link().toString(), hi.link().toString()) assert.ok(byeReceipt.out.ok) const storedBye = await context.agentStore.invocations.get(bye.link()) - assert.deepEqual(storedBye.ok?.link(), bye.link()) + assert.deepEqual(storedBye.ok?.link().toString(), bye.link().toString()) const storedByeReceipt = await context.agentStore.receipts.get(bye.link()) assert.equal( @@ -160,15 +162,19 @@ export const test = { receipts: [receipt], }) - const result = await context.agentStore.messages.write(message) + const result = await context.agentStore.messages.write({ + data: message, + source: CAR.request.encode(message), + index: AgentMessage.index(message), + }) assert.ok(result.ok) const storedReceipt = await context.agentStore.receipts.get( receipt.ran.link() ) assert.deepEqual( - storedReceipt.ok?.link(), - receipt.link(), + storedReceipt.ok?.link().toString(), + receipt.link().toString(), 'receipt was stored and indexed by invocation' ) @@ -176,11 +182,9 @@ export const test = { receipt.ran.link() ) - console.log(storedInvocation) - assert.deepEqual( - storedInvocation.ok?.link(), - hi.link(), + storedInvocation.ok?.link().toString(), + hi.link().toString(), 'invocation was stored and indexed by invocation' ) }, @@ -213,15 +217,19 @@ export const test = { invocations: [conclude], }) - const result = await context.agentStore.messages.write(message) + const result = await context.agentStore.messages.write({ + data: message, + source: CAR.request.encode(message), + index: AgentMessage.index(message), + }) assert.ok(result.ok) const storedReceipt = await context.agentStore.receipts.get( receipt.ran.link() ) assert.deepEqual( - storedReceipt.ok?.link(), - receipt.link(), + storedReceipt.ok?.link().toString(), + receipt.link().toString(), 'receipt was stored and indexed by invocation' ) @@ -230,8 +238,8 @@ export const test = { ) assert.deepEqual( - storedInvocation.ok?.link(), - hi.link(), + storedInvocation.ok?.link().toString(), + hi.link().toString(), 'invocation was stored and indexed by invocation' ) @@ -240,8 +248,8 @@ export const test = { ) assert.deepEqual( - storedConclude.ok?.link(), - conclude.link(), + storedConclude.ok?.link().toString(), + conclude.link().toString(), 'store conclude invocation was stored and indexed by invocation' ) }, diff --git a/packages/upload-api/test/storage/agent-store.js b/packages/upload-api/test/storage/agent-store.js index 9e5b146ae..42ed60137 100644 --- a/packages/upload-api/test/storage/agent-store.js +++ b/packages/upload-api/test/storage/agent-store.js @@ -1,68 +1,105 @@ import * as API from '../../src/types.js' -import { TasksStorage } from './tasks-storage.js' -import { ReceiptsStorage } from './receipts-storage.js' -import * as AgentMessage from '../../src/utils/agent-message.js' +import { CAR, Invocation, Receipt } from '@ucanto/core' +import { RecordNotFound } from '../../src/errors.js' export const memory = () => new AgentStore() /** + * @typedef {object} Model + * @property {Record} store + * @property {Record} index + * * @implements {API.AgentStore} */ class AgentStore { - constructor() { - this.invocations = new TasksStorage() - this.receipts = new ReceiptsStorage() + /** + * @param {Partial} [model] + */ + constructor({ + store = Object.create(null), + index = Object.create(null), + } = {}) { + const model = { store, index } + this.model = model + + this.invocations = new InvocationLookup(model) + this.receipts = new ReceiptLookup(model) } get messages() { return this } /** - * @param {API.AgentMessage} message - * @returns {Promise>>} + * @param {API.ParsedAgentMessage} message + * @returns {Promise>>} */ async write(message) { - const promises = [] - for (const { invocation, receipt } of AgentMessage.iterate(message)) { + const { index, store } = this.model + const at = message.data.root.cid.toString() + store[at] = CAR.decode(/** @type {Uint8Array} */ (message.source.body)) + + for (const { invocation, receipt } of message.index) { if (invocation) { - promises.push(this.invocations.put(invocation)) + let entry = index[`/${invocation.task.toString()}/invocation/`] ?? [] + entry.push({ root: invocation.invocation.link(), at }) + index[`/${invocation.task.toString()}/invocation/`] = entry } + if (receipt) { - promises.push(this.receipts.put(receipt)) + let entry = index[`/${receipt.task.toString()}/receipt/`] ?? [] + entry.push({ root: receipt.receipt.link(), at }) + index[`/${receipt.task.toString()}/receipt/`] = entry } } - const results = await Promise.all(promises) - const failure = results.find((result) => result.error) + return { ok: {} } + } +} - return failure?.error - ? { - error: new WriteError({ - payload: message, - writer: this, - cause: failure.error, - }), - } - : { ok: {} } +class InvocationLookup { + /** + * @param {Model} model + */ + constructor(model) { + this.model = model + } + /** + * + * @param {API.UnknownLink} key + * @returns {Promise>} + */ + async get(key) { + const { index, store } = this.model + const record = index[`/${key.toString()}/invocation/`]?.[0] + const archive = record ? store[record.at] : null + const value = archive + ? Invocation.view({ root: record.root, blocks: archive.blocks }, null) + : null + + return value ? { ok: value } : { error: new RecordNotFound() } } } -/** - * @template T - * @implements {API.WriteError} - */ -class WriteError extends Error { - name = /** @type {const} */ ('WriteError') +class ReceiptLookup { /** - * @param {object} input - * @param {Error} input.cause - * @param {T} input.payload - * @param {API.Writer} input.writer + * @param {Model} model */ - constructor({ cause, payload, writer }) { - super(`Write to store has failed: ${cause}`) - this.cause = cause - this.payload = payload - this.writer = writer + constructor(model) { + this.model = model + } + /** + * + * @param {API.UnknownLink} key + * @returns {Promise>} + */ + async get(key) { + const { index, store } = this.model + const record = index[`/${key.toString()}/receipt/`]?.[0] + const archive = record ? store[record.at] : null + const value = archive + ? Receipt.view({ root: record.root, blocks: archive.blocks }, null) + : null + + return value ? { ok: value } : { error: new RecordNotFound() } } } diff --git a/packages/upload-api/test/storage/receipts-storage.js b/packages/upload-api/test/storage/receipts-storage.js deleted file mode 100644 index 3bcc5f6fe..000000000 --- a/packages/upload-api/test/storage/receipts-storage.js +++ /dev/null @@ -1,64 +0,0 @@ -import * as API from '../../src/types.js' - -import { RecordNotFound } from '../../src/errors.js' - -/** - * @typedef {import('@web3-storage/capabilities/types').StorageGetError} StorageGetError - * @typedef {import('@web3-storage/capabilities/types').StoragePutError} StoragePutError - * @typedef {import('@ucanto/interface').UnknownLink} UnknownLink - * @typedef {import('@ucanto/interface').Receipt} Receipt - */ - -/** - * @implements {API.Accessor} - */ -export class ReceiptsStorage { - constructor() { - /** @type {Map} */ - this.items = new Map() - } - - /** - * @param {Receipt} record - * @returns {Promise>} - */ - async put(record) { - this.items.set(record.ran.link().toString(), record) - - return Promise.resolve({ - ok: {}, - }) - } - - /** - * @param {UnknownLink} link - * @returns {Promise>} - */ - async get(link) { - const record = this.items.get(link.toString()) - if (!record) { - return { - error: new RecordNotFound('not found'), - } - } - return { - ok: record, - } - } - - /** - * @param {UnknownLink} link - * @returns {Promise>} - */ - async has(link) { - const record = this.items.get(link.toString()) - if (!record) { - return { - ok: false, - } - } - return { - ok: Boolean(record), - } - } -} diff --git a/packages/upload-api/test/storage/tasks-storage.js b/packages/upload-api/test/storage/tasks-storage.js deleted file mode 100644 index 82efa82a3..000000000 --- a/packages/upload-api/test/storage/tasks-storage.js +++ /dev/null @@ -1,72 +0,0 @@ -import * as API from '../../src/types.js' - -import { RecordNotFound } from '../../src/errors.js' - -/** - * @typedef {import('@web3-storage/capabilities/types').StorageGetError} StorageGetError - * @typedef {import('@web3-storage/capabilities/types').StoragePutError} StoragePutError - * @typedef {import('@ucanto/interface').UnknownLink} UnknownLink - * @typedef {import('@ucanto/interface').Invocation} Invocation - */ - -/** - * @implements {API.Accessor} - */ -export class TasksStorage { - constructor() { - /** @type {Map} */ - this.items = new Map() - } - - /** - * @param {Invocation} record - * @returns {Promise>} - */ - async put(record) { - this.items.set(record.cid.toString(), record) - - // TODO: store implementation - // const archiveDelegationRes = await task.archive() - // if (archiveDelegationRes.error) { - // return { - // error: archiveDelegationRes.error - // } - // } - - return Promise.resolve({ - ok: {}, - }) - } - - /** - * @param {UnknownLink} link - * @returns {Promise>} - */ - async get(link) { - const record = this.items.get(link.toString()) - if (!record) { - return { - error: new RecordNotFound(), - } - } - return { - ok: record, - } - } - - /** - * @param {UnknownLink} link - * @returns {Promise>} - */ - async has(link) { - const record = this.items.get(link.toString()) - if (!record) { - return { - ok: false, - } - } - return { - ok: Boolean(record), - } - } -}