Skip to content

Commit

Permalink
feat: api waits for trigger filecoin pipeline from the client (storac…
Browse files Browse the repository at this point in the history
…ha#1332)

This PR hooks up possibility to compute Piece CID when submit message is
received.

For extra context, today w3infra sets `skipFilecoinSubmitQueue` on
`filecoin/offer` from users, resulting `filecoin/offer` in a noop that
simply links receipts for effects of `filecoin/submit` and
`filecoin/accept`. This is due to the API to not care about what was
submit by client as it is calculated in bucket event anyway. However, we
want to change this behaviour to have filecoin pipeline only triggered
on `filecoin/offer`, putting it in the submit queue where validation
will now happen.

We can summarise these changes in two units:
- `handleFilecoinSubmitMessage` now reads data from store, computes
piece CID for the bytes and checks equality with provided Piece (i.e.
bucket event code
https://github.com/web3-storage/w3infra/blob/main/filecoin/index.js#L33
now lives in the handler
- `handlePieceInsertToEquivalencyClaim` now exposed in storefront
events. This behaviour was [also triggered from bucket
event](https://github.com/web3-storage/w3infra/blob/main/filecoin/index.js#L107)
by the service. Now we make it a side effect of insertion to piece
table, which happens on `handleFilecoinSubmitMessage`

Rollout plan:
- w3infra swaps `skipFilecoinSubmitQueue` to False when releasing with
this. For the old bucket we invoke `filecoin/offer` from bucket event,
given current client does not do it

Follow ups:
- [ ] `filecoin/offer` from service on old bucket event
- [ ] hook up finding where data is with datastore
- [ ] clean up `skipFilecoinSubmitQueue` option
  • Loading branch information
vasco-santos authored Mar 26, 2024
1 parent 9aba8a1 commit 421bacb
Show file tree
Hide file tree
Showing 20 changed files with 492 additions and 93 deletions.
2 changes: 2 additions & 0 deletions packages/filecoin-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@
"@ucanto/server": "^9.0.1",
"@ucanto/transport": "^9.1.0",
"@web3-storage/capabilities": "workspace:^",
"@web3-storage/content-claims": "^4.0.2",
"@web3-storage/data-segment": "^4.0.0",
"fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.3.0",
"p-map": "^6.0.0"
},
"devDependencies": {
Expand Down
6 changes: 4 additions & 2 deletions packages/filecoin-api/src/aggregator/buffer-reducing.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,12 @@ export function aggregatePieces(bufferedPieces, config) {
const remainingBufferedPieces = []

// start by adding prepend buffered pieces if available
for (const bufferedPiece of (config.prependBufferedPieces || [])) {
for (const bufferedPiece of config.prependBufferedPieces || []) {
const p = Piece.fromLink(bufferedPiece.piece)
if (builder.estimate(p).error) {
throw new Error('aggregate builder is not able to create aggregates with only prepend buffered pieces')
throw new Error(
'aggregate builder is not able to create aggregates with only prepend buffered pieces'
)
}
builder.write(p)
addedBufferedPieces.push(bufferedPiece)
Expand Down
2 changes: 1 addition & 1 deletion packages/filecoin-api/src/aggregator/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export const handleBufferQueueMessage = async (context, records) => {
maxAggregateSize: context.config.maxAggregateSize,
minAggregateSize: context.config.minAggregateSize,
minUtilizationFactor: context.config.minUtilizationFactor,
prependBufferedPieces: context.config.prependBufferedPieces
prependBufferedPieces: context.config.prependBufferedPieces,
})

// Store buffered pieces if not enough to do aggregate and re-queue them
Expand Down
33 changes: 33 additions & 0 deletions packages/filecoin-api/src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,36 @@ export class DecodeBlockOperationFailed extends Server.Failure {
return DecodeBlockOperationErrorName
}
}

export const BlobNotFoundErrorName = /** @type {const} */ ('BlobNotFound')
export class BlobNotFound extends Server.Failure {
get reason() {
return this.message
}

get name() {
return BlobNotFoundErrorName
}
}

export const ComputePieceErrorName = /** @type {const} */ ('ComputePieceFailed')
export class ComputePieceFailed extends Error {
get reason() {
return this.message
}

get name() {
return ComputePieceErrorName
}
}

export const UnexpectedPieceErrorName = /** @type {const} */ ('UnexpectedPiece')
export class UnexpectedPiece extends Error {
get reason() {
return this.message
}

get name() {
return UnexpectedPieceErrorName
}
}
38 changes: 37 additions & 1 deletion packages/filecoin-api/src/storefront/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import type {
Receipt,
Invocation,
Failure,
DID,
Proof,
ConnectionView,
} from '@ucanto/interface'
import { PieceLink } from '@web3-storage/data-segment'
import {
Expand All @@ -15,6 +18,7 @@ import {
import {
Store,
UpdatableAndQueryableStore,
StreammableStore,
Queue,
ServiceConfig,
} from '../types.js'
Expand All @@ -26,6 +30,7 @@ export type PieceStore = UpdatableAndQueryableStore<
>
export type FilecoinSubmitQueue = Queue<FilecoinSubmitMessage>
export type PieceOfferQueue = Queue<PieceOfferMessage>
export type DataStore = StreammableStore<UnknownLink, Uint8Array>
export type TaskStore = Store<UnknownLink, Invocation>
export type ReceiptStore = Store<UnknownLink, Receipt>

Expand Down Expand Up @@ -76,7 +81,9 @@ export interface ServiceContext {
}

export interface FilecoinSubmitMessageContext
extends Pick<ServiceContext, 'pieceStore'> {}
extends Pick<ServiceContext, 'pieceStore'> {
dataStore: DataStore
}

export interface PieceOfferMessageContext {
/**
Expand All @@ -92,6 +99,35 @@ export interface StorefrontClientContext {
storefrontService: ServiceConfig<StorefrontService>
}

export interface ClaimsInvocationConfig {
/**
* Signing authority that is issuing the UCAN invocation(s).
*/
issuer: Signer
/**
* The principal delegated to in the current UCAN.
*/
audience: Principal
/**
* The resource the invocation applies to.
*/
with: DID
/**
* Proof(s) the issuer has the capability to perform the action.
*/
proofs?: Proof[]
}

export interface ClaimsClientContext {
/**
* Claims own connection to issue claims.
*/
claimsService: {
invocationConfig: ClaimsInvocationConfig
connection: ConnectionView<import('@web3-storage/content-claims/server/service/api').Service>
}
}

export interface CronContext
extends Pick<
ServiceContext,
Expand Down
58 changes: 57 additions & 1 deletion packages/filecoin-api/src/storefront/events.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import pMap from 'p-map'
import { Storefront, Aggregator } from '@web3-storage/filecoin-client'
import * as AggregatorCaps from '@web3-storage/capabilities/filecoin/aggregator'
import { Assert } from '@web3-storage/content-claims/capability'

import { computePieceCid } from './piece.js'
// eslint-disable-next-line no-unused-vars
import * as API from '../types.js'
import {
RecordNotFoundErrorName,
BlobNotFound,
StoreOperationFailed,
UnexpectedPiece,
UnexpectedState,
} from '../errors.js'

Expand Down Expand Up @@ -34,7 +38,28 @@ export const handleFilecoinSubmitMessage = async (context, message) => {
}
}

// TODO: verify piece
// read and compute piece for content
// TODO: needs to be hooked with location claims
const contentStreamRes = await context.dataStore.stream(message.content)
if (contentStreamRes.error) {
return { error: new BlobNotFound(contentStreamRes.error.message) }
}

const computedPieceCid = await computePieceCid(contentStreamRes.ok)
if (computedPieceCid.error) {
return computedPieceCid
}

// check provided piece equals the one computed
if (!message.piece.equals(computedPieceCid.ok.piece.link)) {
return {
error: new UnexpectedPiece(
`provided piece ${message.piece.toString()} is not the same as computed ${
computedPieceCid.ok.piece
}`
),
}
}

const putRes = await context.pieceStore.put({
piece: message.piece,
Expand Down Expand Up @@ -95,6 +120,37 @@ export const handlePieceInsert = async (context, record) => {
return { ok: {} }
}

/**
* On piece inserted into store, invoke equivalency claim to enable reads.
*
* @param {import('./api.js').ClaimsClientContext} context
* @param {PieceRecord} record
*/
export const handlePieceInsertToEquivalencyClaim = async (context, record) => {
const claimResult = await Assert.equals
.invoke({
issuer: context.claimsService.invocationConfig.issuer,
audience: context.claimsService.invocationConfig.audience,
with: context.claimsService.invocationConfig.with,
nb: {
content: record.content,
equals: record.piece,
},
expiration: Infinity,
proofs: context.claimsService.invocationConfig.proofs,
})
.execute(context.claimsService.connection)
if (claimResult.out.error) {
return {
error: claimResult.out.error,
}
}

return {
ok: {},
}
}

/**
* @param {import('./api.js').StorefrontClientContext} context
* @param {PieceRecord} record
Expand Down
44 changes: 44 additions & 0 deletions packages/filecoin-api/src/storefront/piece.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { Piece } from '@web3-storage/data-segment'
import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'
import * as Digest from 'multiformats/hashes/digest'

import { ComputePieceFailed } from '../errors.js'

/**
* Compute PieceCid for provided async iterable.
*
* @param {AsyncIterable<Uint8Array>} stream
*/
export async function computePieceCid(stream) {
/** @type {import('../types.js').PieceLink} */
let piece
try {
const hasher = Hasher.create()
for await (const chunk of stream) {
hasher.write(chunk)
}

// ⚠️ Because digest size will dependen on the payload (padding)
// we have to determine number of bytes needed after we're done
// writing payload
const digest = new Uint8Array(hasher.multihashByteLength())
hasher.digestInto(digest, 0, true)

// There's no GC (yet) in WASM so you should free up
// memory manually once you're done.
hasher.free()
const multihashDigest = Digest.decode(digest)
// @ts-expect-error some properties from PieceDigest are not present in MultihashDigest
piece = Piece.fromDigest(multihashDigest)
} catch (/** @type {any} */ error) {
return {
error: new ComputePieceFailed(`failed to compute piece CID for bytes`, {
cause: error,
}),
}
}

return {
ok: { piece },
}
}
1 change: 1 addition & 0 deletions packages/filecoin-api/src/storefront/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export const filecoinOffer = async ({ capability }, context) => {
const { piece, content } = capability.nb

// Queue offer for filecoin submission
// We need to identify new client here...
if (!context.options?.skipFilecoinSubmitQueue) {
// dedupe
const hasRes = await context.pieceStore.has({ piece })
Expand Down
11 changes: 11 additions & 0 deletions packages/filecoin-api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ export interface UpdatableStore<RecKey, Rec> extends Store<RecKey, Rec> {
) => Promise<Result<Rec, StoreGetError>>
}

export interface StreammableStore<RecKey, Rec> {
/**
* Puts a record in the store.
*/
put: (record: Rec) => Promise<Result<Unit, StorePutError>>
/**
* Gets a record from the store.
*/
stream: (key: RecKey) => Promise<Result<AsyncIterable<Rec>, StoreGetError>>
}

export interface QueryableStore<RecKey, Rec, Query> extends Store<RecKey, Rec> {
/**
* Queries for record matching a given criterium.
Expand Down
4 changes: 4 additions & 0 deletions packages/filecoin-api/test/context/mocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const notImplemented = () => {
* piece: Partial<import('../../src/types.js').AggregatorService['piece']>
* aggregate: Partial<import('../../src/types.js').DealerService['aggregate']>
* deal: Partial<import('../../src/types.js').DealTrackerService['deal']>
* assert: Partial<import('@web3-storage/content-claims/server/service/api').AssertService>
* }>} impl
*/
export function mockService(impl) {
Expand All @@ -30,6 +31,9 @@ export function mockService(impl) {
deal: {
info: withCallParams(impl.deal?.info ?? notImplemented),
},
assert: {
equals: withCallParams(impl.assert?.equals ?? notImplemented)
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions packages/filecoin-api/test/context/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as Client from '@ucanto/client'
import * as Server from '@ucanto/server'
import * as CAR from '@ucanto/transport/car'

import { Assert } from '@web3-storage/content-claims/capability'
import * as StorefrontCaps from '@web3-storage/capabilities/filecoin/storefront'
import * as AggregatorCaps from '@web3-storage/capabilities/filecoin/aggregator'
import * as DealerCaps from '@web3-storage/capabilities/filecoin/dealer'
Expand Down Expand Up @@ -215,6 +216,13 @@ export function getMockService() {
},
}),
},
assert: {
equals: Server.provide(Assert.equals, async ({ capability, invocation }) => {
return {
ok: {}
}
})
}
})
}

Expand Down
25 changes: 23 additions & 2 deletions packages/filecoin-api/test/context/store-implementations.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { UpdatableStore } from './store.js'
import { UpdatableStore, StreammableStore } from './store.js'

/**
* @typedef {import('@ucanto/interface').Link} Link
Expand All @@ -18,7 +18,8 @@ import { UpdatableStore } from './store.js'
* @typedef {import('../../src/deal-tracker/api.js').DealRecordKey} DealRecordKey
*/
export const getStoreImplementations = (
StoreImplementation = UpdatableStore
StoreImplementation = UpdatableStore,
StreammableStoreImplementation = StreammableStore
) => ({
storefront: {
pieceStore: new StoreImplementation({
Expand Down Expand Up @@ -76,6 +77,26 @@ export const getStoreImplementations = (
return Array.from(items).find((i) => i.ran.link().equals(record))
},
}),
dataStore: new StreammableStore({
streamFn: (
/** @type {Set<Uint8Array>} */ items,
/** @type {import('@ucanto/interface').UnknownLink} */ record
) => {
const item = Array.from(items).pop()
if (!item) {
return undefined
}
const asyncIterableRes = {
[Symbol.asyncIterator]: async function* () {
// Yield the Uint8Array asynchronously
if (item) {
yield item
}
},
}
return asyncIterableRes
},
}),
},
aggregator: {
pieceStore: new StoreImplementation({
Expand Down
Loading

0 comments on commit 421bacb

Please sign in to comment.