From 421bacb9bac8c251cb41f887144e953feaa5558f Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 26 Mar 2024 11:51:25 +0000 Subject: [PATCH] feat: api waits for trigger filecoin pipeline from the client (#1332) This PR hooks up possibility to compute Piece CID when submit message is received. For extra context, today w3infra sets `skipFilecoinSubmitQueue` on `filecoin/offer` from users, resulting `filecoin/offer` in a noop that simply links receipts for effects of `filecoin/submit` and `filecoin/accept`. This is due to the API to not care about what was submit by client as it is calculated in bucket event anyway. However, we want to change this behaviour to have filecoin pipeline only triggered on `filecoin/offer`, putting it in the submit queue where validation will now happen. We can summarise these changes in two units: - `handleFilecoinSubmitMessage` now reads data from store, computes piece CID for the bytes and checks equality with provided Piece (i.e. bucket event code https://github.com/web3-storage/w3infra/blob/main/filecoin/index.js#L33 now lives in the handler - `handlePieceInsertToEquivalencyClaim` now exposed in storefront events. This behaviour was [also triggered from bucket event](https://github.com/web3-storage/w3infra/blob/main/filecoin/index.js#L107) by the service. Now we make it a side effect of insertion to piece table, which happens on `handleFilecoinSubmitMessage` Rollout plan: - w3infra swaps `skipFilecoinSubmitQueue` to False when releasing with this. For the old bucket we invoke `filecoin/offer` from bucket event, given current client does not do it Follow ups: - [ ] `filecoin/offer` from service on old bucket event - [ ] hook up finding where data is with datastore - [ ] clean up `skipFilecoinSubmitQueue` option --- packages/filecoin-api/package.json | 2 + .../src/aggregator/buffer-reducing.js | 6 +- .../filecoin-api/src/aggregator/events.js | 2 +- packages/filecoin-api/src/errors.js | 33 ++++ packages/filecoin-api/src/storefront/api.ts | 38 +++- .../filecoin-api/src/storefront/events.js | 58 +++++- packages/filecoin-api/src/storefront/piece.js | 44 +++++ .../filecoin-api/src/storefront/service.js | 1 + packages/filecoin-api/src/types.ts | 11 ++ packages/filecoin-api/test/context/mocks.js | 4 + packages/filecoin-api/test/context/service.js | 8 + .../test/context/store-implementations.js | 25 ++- packages/filecoin-api/test/context/store.js | 47 +++++ packages/filecoin-api/test/context/types.ts | 4 + .../filecoin-api/test/events/aggregator.js | 178 +++++++++--------- .../filecoin-api/test/events/storefront.js | 62 ++++++ packages/filecoin-api/test/storefront.spec.js | 14 +- packages/filecoin-api/test/types.ts | 2 + packages/filecoin-api/test/utils.js | 1 + pnpm-lock.yaml | 45 +++++ 20 files changed, 492 insertions(+), 93 deletions(-) create mode 100644 packages/filecoin-api/src/storefront/piece.js diff --git a/packages/filecoin-api/package.json b/packages/filecoin-api/package.json index d8f329751..40a8a7738 100644 --- a/packages/filecoin-api/package.json +++ b/packages/filecoin-api/package.json @@ -159,7 +159,9 @@ "@ucanto/server": "^9.0.1", "@ucanto/transport": "^9.1.0", "@web3-storage/capabilities": "workspace:^", + "@web3-storage/content-claims": "^4.0.2", "@web3-storage/data-segment": "^4.0.0", + "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.3.0", "p-map": "^6.0.0" }, "devDependencies": { diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index 0bf74d9f9..ab947fbed 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -184,10 +184,12 @@ export function aggregatePieces(bufferedPieces, config) { const remainingBufferedPieces = [] // start by adding prepend buffered pieces if available - for (const bufferedPiece of (config.prependBufferedPieces || [])) { + for (const bufferedPiece of config.prependBufferedPieces || []) { const p = Piece.fromLink(bufferedPiece.piece) if (builder.estimate(p).error) { - throw new Error('aggregate builder is not able to create aggregates with only prepend buffered pieces') + throw new Error( + 'aggregate builder is not able to create aggregates with only prepend buffered pieces' + ) } builder.write(p) addedBufferedPieces.push(bufferedPiece) diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index 66b00c911..24e96be39 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -114,7 +114,7 @@ export const handleBufferQueueMessage = async (context, records) => { maxAggregateSize: context.config.maxAggregateSize, minAggregateSize: context.config.minAggregateSize, minUtilizationFactor: context.config.minUtilizationFactor, - prependBufferedPieces: context.config.prependBufferedPieces + prependBufferedPieces: context.config.prependBufferedPieces, }) // Store buffered pieces if not enough to do aggregate and re-queue them diff --git a/packages/filecoin-api/src/errors.js b/packages/filecoin-api/src/errors.js index 94f27277e..b47ba05e2 100644 --- a/packages/filecoin-api/src/errors.js +++ b/packages/filecoin-api/src/errors.js @@ -71,3 +71,36 @@ export class DecodeBlockOperationFailed extends Server.Failure { return DecodeBlockOperationErrorName } } + +export const BlobNotFoundErrorName = /** @type {const} */ ('BlobNotFound') +export class BlobNotFound extends Server.Failure { + get reason() { + return this.message + } + + get name() { + return BlobNotFoundErrorName + } +} + +export const ComputePieceErrorName = /** @type {const} */ ('ComputePieceFailed') +export class ComputePieceFailed extends Error { + get reason() { + return this.message + } + + get name() { + return ComputePieceErrorName + } +} + +export const UnexpectedPieceErrorName = /** @type {const} */ ('UnexpectedPiece') +export class UnexpectedPiece extends Error { + get reason() { + return this.message + } + + get name() { + return UnexpectedPieceErrorName + } +} diff --git a/packages/filecoin-api/src/storefront/api.ts b/packages/filecoin-api/src/storefront/api.ts index 11d876b0d..aa4b4d712 100644 --- a/packages/filecoin-api/src/storefront/api.ts +++ b/packages/filecoin-api/src/storefront/api.ts @@ -5,6 +5,9 @@ import type { Receipt, Invocation, Failure, + DID, + Proof, + ConnectionView, } from '@ucanto/interface' import { PieceLink } from '@web3-storage/data-segment' import { @@ -15,6 +18,7 @@ import { import { Store, UpdatableAndQueryableStore, + StreammableStore, Queue, ServiceConfig, } from '../types.js' @@ -26,6 +30,7 @@ export type PieceStore = UpdatableAndQueryableStore< > export type FilecoinSubmitQueue = Queue export type PieceOfferQueue = Queue +export type DataStore = StreammableStore export type TaskStore = Store export type ReceiptStore = Store @@ -76,7 +81,9 @@ export interface ServiceContext { } export interface FilecoinSubmitMessageContext - extends Pick {} + extends Pick { + dataStore: DataStore +} export interface PieceOfferMessageContext { /** @@ -92,6 +99,35 @@ export interface StorefrontClientContext { storefrontService: ServiceConfig } +export interface ClaimsInvocationConfig { + /** + * Signing authority that is issuing the UCAN invocation(s). + */ + issuer: Signer + /** + * The principal delegated to in the current UCAN. + */ + audience: Principal + /** + * The resource the invocation applies to. + */ + with: DID + /** + * Proof(s) the issuer has the capability to perform the action. + */ + proofs?: Proof[] +} + +export interface ClaimsClientContext { + /** + * Claims own connection to issue claims. + */ + claimsService: { + invocationConfig: ClaimsInvocationConfig + connection: ConnectionView + } +} + export interface CronContext extends Pick< ServiceContext, diff --git a/packages/filecoin-api/src/storefront/events.js b/packages/filecoin-api/src/storefront/events.js index 488320d18..85b1f5a0d 100644 --- a/packages/filecoin-api/src/storefront/events.js +++ b/packages/filecoin-api/src/storefront/events.js @@ -1,12 +1,16 @@ import pMap from 'p-map' import { Storefront, Aggregator } from '@web3-storage/filecoin-client' import * as AggregatorCaps from '@web3-storage/capabilities/filecoin/aggregator' +import { Assert } from '@web3-storage/content-claims/capability' +import { computePieceCid } from './piece.js' // eslint-disable-next-line no-unused-vars import * as API from '../types.js' import { RecordNotFoundErrorName, + BlobNotFound, StoreOperationFailed, + UnexpectedPiece, UnexpectedState, } from '../errors.js' @@ -34,7 +38,28 @@ export const handleFilecoinSubmitMessage = async (context, message) => { } } - // TODO: verify piece + // read and compute piece for content + // TODO: needs to be hooked with location claims + const contentStreamRes = await context.dataStore.stream(message.content) + if (contentStreamRes.error) { + return { error: new BlobNotFound(contentStreamRes.error.message) } + } + + const computedPieceCid = await computePieceCid(contentStreamRes.ok) + if (computedPieceCid.error) { + return computedPieceCid + } + + // check provided piece equals the one computed + if (!message.piece.equals(computedPieceCid.ok.piece.link)) { + return { + error: new UnexpectedPiece( + `provided piece ${message.piece.toString()} is not the same as computed ${ + computedPieceCid.ok.piece + }` + ), + } + } const putRes = await context.pieceStore.put({ piece: message.piece, @@ -95,6 +120,37 @@ export const handlePieceInsert = async (context, record) => { return { ok: {} } } +/** + * On piece inserted into store, invoke equivalency claim to enable reads. + * + * @param {import('./api.js').ClaimsClientContext} context + * @param {PieceRecord} record + */ +export const handlePieceInsertToEquivalencyClaim = async (context, record) => { + const claimResult = await Assert.equals + .invoke({ + issuer: context.claimsService.invocationConfig.issuer, + audience: context.claimsService.invocationConfig.audience, + with: context.claimsService.invocationConfig.with, + nb: { + content: record.content, + equals: record.piece, + }, + expiration: Infinity, + proofs: context.claimsService.invocationConfig.proofs, + }) + .execute(context.claimsService.connection) + if (claimResult.out.error) { + return { + error: claimResult.out.error, + } + } + + return { + ok: {}, + } +} + /** * @param {import('./api.js').StorefrontClientContext} context * @param {PieceRecord} record diff --git a/packages/filecoin-api/src/storefront/piece.js b/packages/filecoin-api/src/storefront/piece.js new file mode 100644 index 000000000..099c8d2a8 --- /dev/null +++ b/packages/filecoin-api/src/storefront/piece.js @@ -0,0 +1,44 @@ +import { Piece } from '@web3-storage/data-segment' +import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash' +import * as Digest from 'multiformats/hashes/digest' + +import { ComputePieceFailed } from '../errors.js' + +/** + * Compute PieceCid for provided async iterable. + * + * @param {AsyncIterable} stream + */ +export async function computePieceCid(stream) { + /** @type {import('../types.js').PieceLink} */ + let piece + try { + const hasher = Hasher.create() + for await (const chunk of stream) { + hasher.write(chunk) + } + + // ⚠️ Because digest size will dependen on the payload (padding) + // we have to determine number of bytes needed after we're done + // writing payload + const digest = new Uint8Array(hasher.multihashByteLength()) + hasher.digestInto(digest, 0, true) + + // There's no GC (yet) in WASM so you should free up + // memory manually once you're done. + hasher.free() + const multihashDigest = Digest.decode(digest) + // @ts-expect-error some properties from PieceDigest are not present in MultihashDigest + piece = Piece.fromDigest(multihashDigest) + } catch (/** @type {any} */ error) { + return { + error: new ComputePieceFailed(`failed to compute piece CID for bytes`, { + cause: error, + }), + } + } + + return { + ok: { piece }, + } +} diff --git a/packages/filecoin-api/src/storefront/service.js b/packages/filecoin-api/src/storefront/service.js index b4b5c3036..5afd10a95 100644 --- a/packages/filecoin-api/src/storefront/service.js +++ b/packages/filecoin-api/src/storefront/service.js @@ -21,6 +21,7 @@ export const filecoinOffer = async ({ capability }, context) => { const { piece, content } = capability.nb // Queue offer for filecoin submission + // We need to identify new client here... if (!context.options?.skipFilecoinSubmitQueue) { // dedupe const hasRes = await context.pieceStore.has({ piece }) diff --git a/packages/filecoin-api/src/types.ts b/packages/filecoin-api/src/types.ts index e15e12001..89297d600 100644 --- a/packages/filecoin-api/src/types.ts +++ b/packages/filecoin-api/src/types.ts @@ -52,6 +52,17 @@ export interface UpdatableStore extends Store { ) => Promise> } +export interface StreammableStore { + /** + * Puts a record in the store. + */ + put: (record: Rec) => Promise> + /** + * Gets a record from the store. + */ + stream: (key: RecKey) => Promise, StoreGetError>> +} + export interface QueryableStore extends Store { /** * Queries for record matching a given criterium. diff --git a/packages/filecoin-api/test/context/mocks.js b/packages/filecoin-api/test/context/mocks.js index f554c7d77..8246b3e48 100644 --- a/packages/filecoin-api/test/context/mocks.js +++ b/packages/filecoin-api/test/context/mocks.js @@ -10,6 +10,7 @@ const notImplemented = () => { * piece: Partial * aggregate: Partial * deal: Partial + * assert: Partial * }>} impl */ export function mockService(impl) { @@ -30,6 +31,9 @@ export function mockService(impl) { deal: { info: withCallParams(impl.deal?.info ?? notImplemented), }, + assert: { + equals: withCallParams(impl.assert?.equals ?? notImplemented) + } } } diff --git a/packages/filecoin-api/test/context/service.js b/packages/filecoin-api/test/context/service.js index e614af6da..03ede702b 100644 --- a/packages/filecoin-api/test/context/service.js +++ b/packages/filecoin-api/test/context/service.js @@ -2,6 +2,7 @@ import * as Client from '@ucanto/client' import * as Server from '@ucanto/server' import * as CAR from '@ucanto/transport/car' +import { Assert } from '@web3-storage/content-claims/capability' import * as StorefrontCaps from '@web3-storage/capabilities/filecoin/storefront' import * as AggregatorCaps from '@web3-storage/capabilities/filecoin/aggregator' import * as DealerCaps from '@web3-storage/capabilities/filecoin/dealer' @@ -215,6 +216,13 @@ export function getMockService() { }, }), }, + assert: { + equals: Server.provide(Assert.equals, async ({ capability, invocation }) => { + return { + ok: {} + } + }) + } }) } diff --git a/packages/filecoin-api/test/context/store-implementations.js b/packages/filecoin-api/test/context/store-implementations.js index 2225543f5..049d60371 100644 --- a/packages/filecoin-api/test/context/store-implementations.js +++ b/packages/filecoin-api/test/context/store-implementations.js @@ -1,4 +1,4 @@ -import { UpdatableStore } from './store.js' +import { UpdatableStore, StreammableStore } from './store.js' /** * @typedef {import('@ucanto/interface').Link} Link @@ -18,7 +18,8 @@ import { UpdatableStore } from './store.js' * @typedef {import('../../src/deal-tracker/api.js').DealRecordKey} DealRecordKey */ export const getStoreImplementations = ( - StoreImplementation = UpdatableStore + StoreImplementation = UpdatableStore, + StreammableStoreImplementation = StreammableStore ) => ({ storefront: { pieceStore: new StoreImplementation({ @@ -76,6 +77,26 @@ export const getStoreImplementations = ( return Array.from(items).find((i) => i.ran.link().equals(record)) }, }), + dataStore: new StreammableStore({ + streamFn: ( + /** @type {Set} */ items, + /** @type {import('@ucanto/interface').UnknownLink} */ record + ) => { + const item = Array.from(items).pop() + if (!item) { + return undefined + } + const asyncIterableRes = { + [Symbol.asyncIterator]: async function* () { + // Yield the Uint8Array asynchronously + if (item) { + yield item + } + }, + } + return asyncIterableRes + }, + }), }, aggregator: { pieceStore: new StoreImplementation({ diff --git a/packages/filecoin-api/test/context/store.js b/packages/filecoin-api/test/context/store.js index 02f98e61e..2a2a10ca7 100644 --- a/packages/filecoin-api/test/context/store.js +++ b/packages/filecoin-api/test/context/store.js @@ -94,6 +94,53 @@ export class Store { } } +/** + * @template K + * @template V + * @implements {API.StreammableStore} + */ +export class StreammableStore { + /** + * @param {import('./types.js').StreammableStoreOptions} options + */ + constructor(options) { + /** @type {Set} */ + this.items = new Set() + this.streamFn = options.streamFn + } + + /** + * @param {V} record + * @returns {Promise>} + */ + async put(record) { + this.items.add(record) + + return Promise.resolve({ + ok: {}, + }) + } + + /** + * @param {K} item + * @returns {Promise, StoreGetError>>} + */ + async stream(item) { + if (!this.streamFn) { + throw new Error('get not supported') + } + const t = this.streamFn(this.items, item) + if (!t) { + return { + error: new RecordNotFound('not found'), + } + } + return { + ok: t, + } + } +} + /** * @template K * @template V diff --git a/packages/filecoin-api/test/context/types.ts b/packages/filecoin-api/test/context/types.ts index 1c183c3a6..d31f5b410 100644 --- a/packages/filecoin-api/test/context/types.ts +++ b/packages/filecoin-api/test/context/types.ts @@ -6,3 +6,7 @@ export interface StoreOptions { export interface UpdatableStoreOptions extends StoreOptions { updateFn?: (items: Set, key: K, item: Partial) => V } + +export interface StreammableStoreOptions extends StoreOptions { + streamFn?: (items: Set, item: K) => AsyncIterable | undefined +} diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index d83671367..bebadd7fb 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -176,29 +176,32 @@ export const test = { bufferQueue: new FailingQueue(), }) ), - 'handles buffer queue messages repeated items as unique': async ( - assert, - context - ) => { - const group = context.id.did() - const { buffers, blocks } = await getBuffers(1, group) - - // Store buffers - for (let i = 0; i < blocks.length; i++) { - const putBufferRes = await context.bufferStore.put({ - buffer: buffers[i], - block: blocks[i].cid, - }) - assert.ok(putBufferRes.ok) - } - - const bufferedPieces = await getBufferedPieces( - [blocks[0].cid, blocks[0].cid], - context.bufferStore - ) + 'handles buffer queue messages repeated items as unique': async ( + assert, + context + ) => { + const group = context.id.did() + const { buffers, blocks } = await getBuffers(1, group) - assert.equal(bufferedPieces.ok?.bufferedPieces.length, buffers[0].pieces.length) - }, + // Store buffers + for (let i = 0; i < blocks.length; i++) { + const putBufferRes = await context.bufferStore.put({ + buffer: buffers[i], + block: blocks[i].cid, + }) + assert.ok(putBufferRes.ok) + } + + const bufferedPieces = await getBufferedPieces( + [blocks[0].cid, blocks[0].cid], + context.bufferStore + ) + + assert.equal( + bufferedPieces.ok?.bufferedPieces.length, + buffers[0].pieces.length + ) + }, 'handles buffer queue messages successfully to requeue bigger buffer': async ( assert, context @@ -397,77 +400,82 @@ export const test = { message.minPieceInsertedAt ) }, - 'handles buffer queue messages successfully to queue aggregate prepended with a buffer piece': async ( - assert, - context - ) => { - const group = context.id.did() - const { buffers, blocks } = await getBuffers(2, group, { - length: 100, - size: 128, - }) + 'handles buffer queue messages successfully to queue aggregate prepended with a buffer piece': + async (assert, context) => { + const group = context.id.did() + const { buffers, blocks } = await getBuffers(2, group, { + length: 100, + size: 128, + }) - const [cargo] = await randomCargo(1, 128) - /** @type {import('../../src/aggregator/api.js').BufferedPiece} */ - const bufferedPiece = { - piece: cargo.link.link(), - policy: 0, - insertedAt: (new Date()).toISOString() - } + const [cargo] = await randomCargo(1, 128) + /** @type {import('../../src/aggregator/api.js').BufferedPiece} */ + const bufferedPiece = { + piece: cargo.link.link(), + policy: 0, + insertedAt: new Date().toISOString(), + } - const totalPieces = buffers.reduce((acc, v) => { - acc += v.pieces.length - return acc - }, 0) + const totalPieces = buffers.reduce((acc, v) => { + acc += v.pieces.length + return acc + }, 0) - // Store buffers - for (let i = 0; i < blocks.length; i++) { - const putBufferRes = await context.bufferStore.put({ - buffer: buffers[i], - block: blocks[i].cid, - }) - assert.ok(putBufferRes.ok) - } + // Store buffers + for (let i = 0; i < blocks.length; i++) { + const putBufferRes = await context.bufferStore.put({ + buffer: buffers[i], + block: blocks[i].cid, + }) + assert.ok(putBufferRes.ok) + } - // Handle messages - const handledMessageRes = await AggregatorEvents.handleBufferQueueMessage( - { - ...context, - config: { - minAggregateSize: 2 ** 19, - minUtilizationFactor: 10e5, - maxAggregateSize: 2 ** 35, - prependBufferedPieces: [bufferedPiece] + // Handle messages + const handledMessageRes = await AggregatorEvents.handleBufferQueueMessage( + { + ...context, + config: { + minAggregateSize: 2 ** 19, + minUtilizationFactor: 10e5, + maxAggregateSize: 2 ** 35, + prependBufferedPieces: [bufferedPiece], + }, }, - }, - blocks.map((b) => ({ - pieces: b.cid, - group, - })) - ) - assert.ok(handledMessageRes.ok) - assert.equal(handledMessageRes.ok?.aggregatedPieces, totalPieces + 1) + blocks.map((b) => ({ + pieces: b.cid, + group, + })) + ) + assert.ok(handledMessageRes.ok) + assert.equal(handledMessageRes.ok?.aggregatedPieces, totalPieces + 1) - // Validate queue and store - await pWaitFor( - () => - context.queuedMessages.get('aggregateOfferQueue')?.length === 1 - ) + // Validate queue and store + await pWaitFor( + () => context.queuedMessages.get('aggregateOfferQueue')?.length === 1 + ) - /** @type {AggregateOfferMessage} */ - // @ts-expect-error cannot infer buffer message - const message = context.queuedMessages.get('aggregateOfferQueue')?.[0] - const bufferGet = await context.bufferStore.get(message.buffer) - assert.ok(bufferGet.ok) - assert.ok(bufferGet.ok?.block.equals(message.buffer)) - assert.equal(bufferGet.ok?.buffer.group, group) - assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) - assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces + 1) + /** @type {AggregateOfferMessage} */ + // @ts-expect-error cannot infer buffer message + const message = context.queuedMessages.get('aggregateOfferQueue')?.[0] + const bufferGet = await context.bufferStore.get(message.buffer) + assert.ok(bufferGet.ok) + assert.ok(bufferGet.ok?.block.equals(message.buffer)) + assert.equal(bufferGet.ok?.buffer.group, group) + assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) + assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces + 1) - // prepended piece - assert.ok(bufferGet.ok?.buffer.pieces.find(p => p.piece.link().equals(bufferedPiece.piece.link()))) - assert.ok(bufferGet.ok?.buffer.pieces[0].piece.link().equals(bufferedPiece.piece.link())) - }, + // prepended piece + assert.ok( + bufferGet.ok?.buffer.pieces.find((p) => + p.piece.link().equals(bufferedPiece.piece.link()) + ) + ) + assert.ok( + bufferGet.ok?.buffer.pieces[0].piece + .link() + .equals(bufferedPiece.piece.link()) + ) + }, 'handles buffer queue messages successfully to queue aggregate and remaining buffer': async (assert, context) => { const group = context.id.did() diff --git a/packages/filecoin-api/test/events/storefront.js b/packages/filecoin-api/test/events/storefront.js index ceadee9c8..3f55699ce 100644 --- a/packages/filecoin-api/test/events/storefront.js +++ b/packages/filecoin-api/test/events/storefront.js @@ -9,6 +9,7 @@ import * as StorefrontEvents from '../../src/storefront/events.js' import { StoreOperationErrorName, UnexpectedStateErrorName, + BlobNotFoundErrorName } from '../../src/errors.js' import { randomCargo, randomAggregate } from '../utils.js' @@ -37,6 +38,9 @@ export const test = { group: context.id.did(), } + // Store bytes on datastore + await context.dataStore.put(cargo.bytes) + // Handle message const handledMessageRes = await StorefrontEvents.handleFilecoinSubmitMessage(context, message) @@ -49,6 +53,23 @@ export const test = { assert.ok(hasStoredPiece.ok) assert.equal(hasStoredPiece.ok?.status, 'submitted') }, + 'handles filecoin submit messages with error if blob of content is not stored': async (assert, context) => { + // Generate piece for test + const [cargo] = await randomCargo(1, 128) + + // Store piece into store + const message = { + piece: cargo.link.link(), + content: cargo.content.link(), + group: context.id.did(), + } + + // Handle message + const handledMessageRes = + await StorefrontEvents.handleFilecoinSubmitMessage(context, message) + assert.ok(handledMessageRes.error) + assert.equal(handledMessageRes.error?.name, BlobNotFoundErrorName) + }, 'handles filecoin submit messages deduping when stored': async ( assert, context @@ -234,6 +255,47 @@ export const test = { ) ) }, + 'handles piece insert event to issue equivalency claims successfully': async (assert, context) => { + // Generate piece for test + const [cargo] = await randomCargo(1, 128) + + // Store piece into store + const message = { + piece: cargo.link.link(), + content: cargo.content.link(), + group: context.id.did(), + } + /** @type {PieceRecord} */ + const pieceRecord = { + ...message, + status: 'submitted', + insertedAt: new Date(Date.now() - 10).toISOString(), + updatedAt: new Date(Date.now() - 5).toISOString(), + } + + // Handle message + const handledMessageRes = await StorefrontEvents.handlePieceInsertToEquivalencyClaim( + context, + pieceRecord + ) + assert.ok(handledMessageRes.ok) + // Verify invocation + // @ts-expect-error not typed hooks + assert.equal(context.service.assert?.equals?.callCount, 1) + assert.ok( + message.content.equals( + // @ts-expect-error not typed hooks + context.service.assert?.equals?._params[0].nb.content + ) + ) + assert.ok( + message.piece.equals( + // @ts-expect-error not typed hooks + context.service.assert?.equals?._params[0].nb.equals + ) + ) + + }, 'handles piece status update event successfully': async (assert, context) => { // Generate piece for test const [cargo] = await randomCargo(1, 128) diff --git a/packages/filecoin-api/test/storefront.spec.js b/packages/filecoin-api/test/storefront.spec.js index 677afe425..ba36540f8 100644 --- a/packages/filecoin-api/test/storefront.spec.js +++ b/packages/filecoin-api/test/storefront.spec.js @@ -87,7 +87,9 @@ describe('storefront', () => { define(name, async () => { const storefrontSigner = await Signer.generate() const aggregatorSigner = await Signer.generate() + const claimsSigner = await Signer.generate() + // TODO: Claims service const service = getMockService() const storefrontConnection = getConnection( storefrontSigner, @@ -97,10 +99,11 @@ describe('storefront', () => { aggregatorSigner, service ).connection + const claimsConnection = getConnection(claimsSigner, service).connection // context const { - storefront: { pieceStore, taskStore, receiptStore }, + storefront: { pieceStore, taskStore, receiptStore, dataStore }, } = getStoreImplementations() await test( @@ -115,6 +118,7 @@ describe('storefront', () => { pieceStore, receiptStore, taskStore, + dataStore, storefrontService: { connection: storefrontConnection, invocationConfig: { @@ -131,6 +135,14 @@ describe('storefront', () => { audience: aggregatorSigner, }, }, + claimsService: { + connection: claimsConnection, + invocationConfig: { + issuer: storefrontSigner, + with: storefrontSigner.did(), + audience: claimsSigner, + }, + }, queuedMessages: new Map(), service, errorReporter: { diff --git a/packages/filecoin-api/test/types.ts b/packages/filecoin-api/test/types.ts index d17cda9b6..03079ac1e 100644 --- a/packages/filecoin-api/test/types.ts +++ b/packages/filecoin-api/test/types.ts @@ -39,6 +39,7 @@ export interface StorefrontTestEventsContext extends StorefrontInterface.FilecoinSubmitMessageContext, StorefrontInterface.PieceOfferMessageContext, StorefrontInterface.StorefrontClientContext, + StorefrontInterface.ClaimsClientContext, StorefrontInterface.CronContext { id: Signer aggregatorId: Signer @@ -47,5 +48,6 @@ export interface StorefrontTestEventsContext piece: Partial aggregate: Partial deal: Partial + assert: Partial }> } diff --git a/packages/filecoin-api/test/utils.js b/packages/filecoin-api/test/utils.js index c13ee6b6a..ebb2dc26c 100644 --- a/packages/filecoin-api/test/utils.js +++ b/packages/filecoin-api/test/utils.js @@ -71,6 +71,7 @@ export async function randomCargo(length, size) { root: piece.root, content: car.cid, padding: piece.padding, + bytes: car.bytes, } }) } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3ed85d28f..5ca039474 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -257,9 +257,15 @@ importers: '@web3-storage/capabilities': specifier: workspace:^ version: link:../capabilities + '@web3-storage/content-claims': + specifier: ^4.0.2 + version: 4.0.2 '@web3-storage/data-segment': specifier: ^4.0.0 version: 4.0.0 + fr32-sha2-256-trunc254-padded-binary-tree-multihash: + specifier: ^3.3.0 + version: 3.3.0 p-map: specifier: ^6.0.0 version: 6.0.0 @@ -4388,6 +4394,17 @@ packages: web-streams-polyfill: 3.2.1 dev: false + /@web3-storage/content-claims@4.0.2: + resolution: {integrity: sha512-k6tIc7YjQtdKWi01r7+5stp2lo13ztwpIz+7NQYEbu5fZEsKKes5B4FKRqPWkZYO17+rPaihOY6sICT498c9EA==} + dependencies: + '@ucanto/client': 9.0.0 + '@ucanto/interface': 9.0.0 + '@ucanto/server': 9.0.1 + '@ucanto/transport': 9.1.0 + carstream: 1.1.1 + multiformats: 12.1.3 + dev: false + /@web3-storage/data-segment@3.2.0: resolution: {integrity: sha512-SM6eNumXzrXiQE2/J59+eEgCRZNYPxKhRoHX2QvV3/scD4qgcf4g+paWBc3UriLEY1rCboygGoPsnqYJNyZyfA==} dependencies: @@ -5233,6 +5250,14 @@ packages: redeyed: 2.1.1 dev: true + /carstream@1.1.1: + resolution: {integrity: sha512-cgn3TqHo6SPsHBTfM5QgXngv6HtwgO1bKCHcdS35vBrweLcYrIG/+UboCbvnIGA0k8NtAYl/DvDdej/9pZGZxQ==} + dependencies: + '@ipld/dag-cbor': 9.0.6 + multiformats: 12.1.3 + uint8arraylist: 2.4.8 + dev: false + /cborg@4.0.5: resolution: {integrity: sha512-q8TAjprr8pn9Fp53rOIGp/UFDdFY6os2Nq62YogPSIzczJD9M6g2b6igxMkpCiZZKJ0kn/KzDLDvG+EqBIEeCg==} hasBin: true @@ -7071,6 +7096,10 @@ packages: engines: {node: '>= 0.6'} dev: true + /fr32-sha2-256-trunc254-padded-binary-tree-multihash@3.3.0: + resolution: {integrity: sha512-O11VDxPmPvbQj5eac2BJXyieNacyd+RCMhwOzXQQM/NCI25x3c32YWB4/JwgOWPCpKnNXF6lpK/j0lj7GWOnYQ==} + dev: false + /fraction.js@4.3.7: resolution: {integrity: sha512-ZsDfxO51wGAXREY55a7la9LScWpwv9RxIrYABrlvOFBlH/ShPnrtsXeuUIfXKKOVicNxQ+o8JTbJvjS4M89yew==} dev: true @@ -9627,6 +9656,10 @@ packages: resolution: {integrity: sha512-eajQ/ZH7qXZQR2AgtfpmSMizQzmyYVmCql7pdhldPuYQi4atACekbJaQplk6dWyIi10jCaFnd6pqvcEFXjbaJw==} engines: {node: '>=16.0.0', npm: '>=7.0.0'} + /multiformats@13.1.0: + resolution: {integrity: sha512-HzdtdBwxsIkzpeXzhQ5mAhhuxcHbjEHH+JQoxt7hG/2HGFjjwyolLo7hbaexcnhoEuV4e0TNJ8kkpMjiEYY4VQ==} + dev: false + /multimatch@5.0.0: resolution: {integrity: sha512-ypMKuglUrZUD99Tk2bUQ+xNQj43lPEfAeX2o9cTteAmShXy2VHDJpuwu1o0xqoKCt9jLVAvwyFKdLTPXKAfJyA==} engines: {node: '>=10'} @@ -12437,11 +12470,23 @@ packages: dev: true optional: true + /uint8arraylist@2.4.8: + resolution: {integrity: sha512-vc1PlGOzglLF0eae1M8mLRTBivsvrGsdmJ5RbK3e+QRvRLOZfZhQROTwH/OfyF3+ZVUg9/8hE8bmKP2CvP9quQ==} + dependencies: + uint8arrays: 5.0.3 + dev: false + /uint8arrays@4.0.6: resolution: {integrity: sha512-4ZesjQhqOU2Ip6GPReIwN60wRxIupavL8T0Iy36BBHr2qyMrNxsPJvr7vpS4eFt8F8kSguWUPad6ZM9izs/vyw==} dependencies: multiformats: 12.1.3 + /uint8arrays@5.0.3: + resolution: {integrity: sha512-6LBuKji28kHjgPJMkQ6GDaBb1lRwIhyOYq6pDGwYMoDPfImE9SkuYENVmR0yu9yGgs2clHUSY9fKDukR+AXfqQ==} + dependencies: + multiformats: 13.1.0 + dev: false + /unbox-primitive@1.0.2: resolution: {integrity: sha512-61pPlCD9h51VoreyJ0BReideM3MDKMKnh6+V9L08331ipq6Q8OFXZYiqP6n/tbHx4s5I9uRhcye6BrbkizkBDw==} dependencies: