Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: updates agent-store api to unblock integration with w3infra #1479

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions packages/upload-api/src/lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@ import { createService as createW3sService } from './service.js'
import { createService as createPlanService } from './plan.js'
import { createService as createUsageService } from './usage.js'
import { createService as createFilecoinService } from '@web3-storage/filecoin-api/storefront/service'
import * as AgentMessage from './utils/agent-message.js'

export * from './types.js'
export { AgentMessage }

/**
* @param {Omit<Types.UcantoServerContext, 'validateAuthorization'>} options
* @returns {Agent<Types.Service>}
*/
export const createServer = ({ codec = Legacy.inbound, ...context }) => {
export const createServer = ({ codec = Legacy.inbound, ...options }) => {
const context = {
...options,
...createRevocationChecker(options),
}

const server = Server.create({
...createRevocationChecker(context),
id: context.id,
...context,
codec,
service: createService(context),
catch: (error) => context.errorReporter.catch(error),
Expand Down Expand Up @@ -69,6 +75,7 @@ export const createServer = ({ codec = Legacy.inbound, ...context }) => {
* @template {Types.Tuple<Types.ServiceInvocation<Types.Capability, S>>} I
* @param {Agent<S>} agent
* @param {Types.HTTPRequest<Types.AgentMessage<{ In: Types.InferInvocations<I>, Out: Types.Tuple<Types.Receipt> }>>} request
* @returns {Promise<Types.HTTPResponse<Types.AgentMessage<{ Out: Types.InferReceipts<I, S>, In: Types.Tuple<Types.Invocation> }>>>}
*/
export const handle = async (agent, request) => {
const selection = agent.codec.accept(request)
Expand All @@ -86,7 +93,12 @@ export const handle = async (agent, request) => {
// Save invocation inside agent store so we can find it later. If we fail
// to save it we return 500 as we do not want to run the invocation that
// we are unable to service.
const save = await agent.context.agentStore.messages.write(input)
const save = await agent.context.agentStore.messages.write({
data: input,
source: request,
index: AgentMessage.index(input),
})

if (save.error) {
return {
status: 500,
Expand All @@ -96,8 +108,14 @@ export const handle = async (agent, request) => {
}

const output = await execute(agent, input)
const response = await encoder.encode(output)

const { error } = await agent.context.agentStore.messages.write({
data: output,
source: response,
index: AgentMessage.index(output),
})

const { error } = await agent.context.agentStore.messages.write(output)
// Failure to write a receipt is not something we can recover from. Throwing
// or returning HTTP 500 is also a not a great option because invocation may
// have change state and we would not want to rerun it. Which is why we
Expand All @@ -106,7 +124,6 @@ export const handle = async (agent, request) => {
agent.catch(error)
}

const response = await encoder.encode(output)
return response
}
}
Expand Down
44 changes: 40 additions & 4 deletions packages/upload-api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type {
Failure,
ServiceMethod,
UCANLink,
Link,
HandlerExecutionError,
Signer,
DID,
Expand All @@ -20,6 +21,13 @@ import type {
AgentMessage,
Invocation,
Receipt,
AgentMessageModel,
UCAN,
Capability,
ReceiptModel,
Variant,
HTTPRequest,
HTTPResponse,
} from '@ucanto/interface'
import type { ProviderInput, ConnectionView } from '@ucanto/server'

Expand Down Expand Up @@ -189,8 +197,7 @@ import { SubscriptionsStorage } from './types/subscriptions.js'
export type { SubscriptionsStorage }
import { UsageStorage } from './types/usage.js'
export type { UsageStorage }
import { StorageGetError, TasksScheduler } from './types/service.js'
export type { TasksScheduler }
import { StorageGetError } from './types/storage.js'
import { AllocationsStorage, BlobsStorage, BlobAddInput } from './types/blob.js'
export type { AllocationsStorage, BlobsStorage, BlobAddInput }
import { IPNIService, IndexServiceContext } from './types/index.js'
Expand All @@ -199,7 +206,7 @@ export type {
IPNIService,
BlobRetriever,
BlobNotFound,
ShardedDAGIndex
ShardedDAGIndex,
} from './types/index.js'

export interface Service extends StorefrontService, W3sService {
Expand Down Expand Up @@ -495,11 +502,40 @@ export interface AgentContext {
* {@link Invocation} and {@link Receipt} lookups.
*/
export interface AgentStore {
messages: Writer<AgentMessage>
messages: Writer<ParsedAgentMessage>
invocations: Accessor<UnknownLink, Invocation>
receipts: Accessor<UnknownLink, Receipt>
}

export type TaskLink = Link

export type InvocationLink = Link<UCAN.UCAN<[Capability]>>
export type ReceiptLink = Link<ReceiptModel>
export type AgentMessageLink = Link<AgentMessageModel<unknown>>

export interface ParsedAgentMessage {
source: HTTPRequest | HTTPResponse
data: AgentMessage
index: Iterable<AgentMessageIndexRecord>
}

export interface InvocationSource {
task: TaskLink
invocation: Invocation
message: AgentMessageLink
}

export interface ReceiptSource {
task: TaskLink
receipt: Receipt
message: AgentMessageLink
}

export type AgentMessageIndexRecord = Variant<{
invocation: InvocationSource
receipt: ReceiptSource
}>

/**
* Read interface for the key value store.
*/
Expand Down
17 changes: 0 additions & 17 deletions packages/upload-api/src/types/service.ts

This file was deleted.

31 changes: 31 additions & 0 deletions packages/upload-api/src/utils/agent-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,34 @@ export class Iterator {
export function* iterate(message, options) {
yield* new Iterator(message, options)
}

/**
* @param {API.AgentMessage} message
* @returns {Iterable<API.AgentMessageIndexRecord>}
*/
export const index = function* (message) {
const source = message.root.cid
for (const { receipt, invocation } of iterate(message)) {
if (invocation) {
// TODO: actually derive task CID
const task = invocation.link()
yield {
invocation: {
task,
invocation,
message: source,
},
}
}

if (receipt) {
yield {
receipt: {
task: receipt.ran.link(),
receipt,
message: source,
},
}
}
}
}
4 changes: 3 additions & 1 deletion packages/upload-api/test/handlers/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@ export const test = {
}

assert.ok(
String(accept.ok.out.error).match(/Blob not found/),
/** @type {{message:string}} */ (accept.ok.out.error).message.match(
/Blob not found/
),
'accept was not successful'
)
},
Expand Down
16 changes: 10 additions & 6 deletions packages/upload-api/test/handlers/ucan.js
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,21 @@ export const test = {
assert.ok(conclude.out.ok)
assert.ok(conclude.out.ok?.time)

assert.deepEqual(
await context.agentStore.invocations.get(invocation.link()),
{ ok: invocation }
)
const stored = await context.agentStore.invocations.get(invocation.link())
assert.equal(stored.ok?.link().toString(), invocation.link().toString())

const storedReceipt = await context.agentStore.receipts.get(
invocation.link()
)
assert.ok(storedReceipt.ok)
assert.deepEqual(storedReceipt.ok?.link(), receipt.link())
assert.deepEqual(storedReceipt.ok?.ran, invocation)
assert.deepEqual(
storedReceipt.ok?.link().toString(),
receipt.link().toString()
)
assert.deepEqual(
storedReceipt.ok?.ran.link().toString(),
invocation.link().toString()
)
},
'ucan/conclude schedules web3.storage/blob/accept if invoked with the http/put receipt':
async (assert, context) => {
Expand Down
42 changes: 25 additions & 17 deletions packages/upload-api/test/storage/agent-store-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import { Console } from '@web3-storage/capabilities'

import { alice, registerSpace } from '../util.js'
import { Message, Receipt } from '@ucanto/core'
import * as CAR from '@ucanto/transport/car'
import { createConcludeInvocation } from '../../src/ucan/conclude.js'
import * as AgentMessage from '../../src/utils/agent-message.js'

/**
* @type {API.Tests}
Expand Down Expand Up @@ -109,7 +111,7 @@ export const test = {
assert.ok(hiReceipt.out.ok)

const storedHi = await context.agentStore.invocations.get(hi.link())
assert.deepEqual(storedHi.ok?.link(), hi.link())
assert.deepEqual(storedHi.ok?.link().toString(), hi.link().toString())

const storedHiReceipt = await context.agentStore.receipts.get(hi.link())
assert.equal(
Expand All @@ -119,11 +121,11 @@ export const test = {

const [byeReceipt, hiReceipt2] = await context.connection.execute(bye, hi)

assert.deepEqual(hiReceipt2.ran.link(), hi.link())
assert.deepEqual(hiReceipt2.ran.link().toString(), hi.link().toString())
assert.ok(byeReceipt.out.ok)

const storedBye = await context.agentStore.invocations.get(bye.link())
assert.deepEqual(storedBye.ok?.link(), bye.link())
assert.deepEqual(storedBye.ok?.link().toString(), bye.link().toString())

const storedByeReceipt = await context.agentStore.receipts.get(bye.link())
assert.equal(
Expand Down Expand Up @@ -160,27 +162,29 @@ export const test = {
receipts: [receipt],
})

const result = await context.agentStore.messages.write(message)
const result = await context.agentStore.messages.write({
data: message,
source: CAR.request.encode(message),
index: AgentMessage.index(message),
})
assert.ok(result.ok)

const storedReceipt = await context.agentStore.receipts.get(
receipt.ran.link()
)
assert.deepEqual(
storedReceipt.ok?.link(),
receipt.link(),
storedReceipt.ok?.link().toString(),
receipt.link().toString(),
'receipt was stored and indexed by invocation'
)

const storedInvocation = await context.agentStore.invocations.get(
receipt.ran.link()
)

console.log(storedInvocation)

assert.deepEqual(
storedInvocation.ok?.link(),
hi.link(),
storedInvocation.ok?.link().toString(),
hi.link().toString(),
'invocation was stored and indexed by invocation'
)
},
Expand Down Expand Up @@ -213,15 +217,19 @@ export const test = {
invocations: [conclude],
})

const result = await context.agentStore.messages.write(message)
const result = await context.agentStore.messages.write({
data: message,
source: CAR.request.encode(message),
index: AgentMessage.index(message),
})
assert.ok(result.ok)

const storedReceipt = await context.agentStore.receipts.get(
receipt.ran.link()
)
assert.deepEqual(
storedReceipt.ok?.link(),
receipt.link(),
storedReceipt.ok?.link().toString(),
receipt.link().toString(),
'receipt was stored and indexed by invocation'
)

Expand All @@ -230,8 +238,8 @@ export const test = {
)

assert.deepEqual(
storedInvocation.ok?.link(),
hi.link(),
storedInvocation.ok?.link().toString(),
hi.link().toString(),
'invocation was stored and indexed by invocation'
)

Expand All @@ -240,8 +248,8 @@ export const test = {
)

assert.deepEqual(
storedConclude.ok?.link(),
conclude.link(),
storedConclude.ok?.link().toString(),
conclude.link().toString(),
'store conclude invocation was stored and indexed by invocation'
)
},
Expand Down
Loading
Loading