From 05e5db35544c935a6c8e65e8f27583cffcf224e1 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 26 Apr 2024 17:46:30 +0100 Subject: [PATCH] fix!: storefront content store rename and separation for test (#1409) With the aim to facilitate https://github.com/w3s-project/w3up/issues/1349 storefront `dataStore` is renamed to `contentStore` (note alignment of name with the `content` property in Filecoin Pipeline capabilities/message namings). My goal is to make `contentStore.stream()` perform HTTP Request to read thing from Roundabout (if `filecoin/offer` comes with `content` CARCID code, it will try to get a CAR, if `content` comes as RAW, it will try to get RAW). The main point in this change is to not require `contentStore` to have a `put` method, like `dataStore` was requiring just for testing. It extracts that requirement only to run tests, so that we can implement a `contentStore` as something with just a `stream` function that behind the scenes can perform a HTTP Request to Roundabout instead of being an abstraction on top of a S3 bucket like today. Note that this is not strictly needed, we could just use this as is and in `dataStore` implementation and make `put` https://github.com/w3s-project/w3infra/blob/main/filecoin/store/data.js#L39 `throw new Error('not implemented')` and just use an extended `DataStore` class for w3infra testing. But I feel this way is nicer, and also like more new naming as it aligns with everything including content name defined in w3index https://github.com/w3s-project/specs/blob/main/w3-index.md#sharded-dag-index-example BREAKING CHANGE: dataStore in storefront renamed to contentStore --- packages/filecoin-api/src/storefront/api.ts | 13 ++++++++--- .../filecoin-api/src/storefront/events.js | 2 +- packages/filecoin-api/src/storefront/piece.js | 3 ++- packages/filecoin-api/src/types.ts | 4 ++-- .../test/context/store-implementations.js | 23 +++++++++---------- packages/filecoin-api/test/context/store.js | 8 +++---- packages/filecoin-api/test/context/types.ts | 4 ++-- .../filecoin-api/test/events/storefront.js | 2 +- packages/filecoin-api/test/storefront.spec.js | 5 ++-- packages/filecoin-api/test/types.ts | 16 ++++++++++++- 10 files changed, 51 insertions(+), 29 deletions(-) diff --git a/packages/filecoin-api/src/storefront/api.ts b/packages/filecoin-api/src/storefront/api.ts index 0667d71f1..bd666d073 100644 --- a/packages/filecoin-api/src/storefront/api.ts +++ b/packages/filecoin-api/src/storefront/api.ts @@ -8,6 +8,7 @@ import type { DID, Proof, ConnectionView, + Result, } from '@ucanto/interface' import { PieceLink } from '@web3-storage/data-segment' import { @@ -18,9 +19,9 @@ import { import { Store, UpdatableAndQueryableStore, - StreammableStore, Queue, ServiceConfig, + StoreGetError } from '../types.js' export type PieceStore = UpdatableAndQueryableStore< @@ -30,7 +31,6 @@ export type PieceStore = UpdatableAndQueryableStore< > export type FilecoinSubmitQueue = Queue export type PieceOfferQueue = Queue -export type DataStore = StreammableStore export type TaskStore = Store export type ReceiptStore = Store @@ -71,7 +71,7 @@ export interface ServiceContext { export interface FilecoinSubmitMessageContext extends Pick { - dataStore: DataStore + contentStore: ContentStore } export interface PieceOfferMessageContext { @@ -189,3 +189,10 @@ export interface PieceOfferMessage { export interface DataAggregationProofNotFound extends Failure { name: 'DataAggregationProofNotFound' } + +export interface ContentStore { + /** + * Gets a record from the store. + */ + stream: (key: RecKey) => Promise, StoreGetError>> +} diff --git a/packages/filecoin-api/src/storefront/events.js b/packages/filecoin-api/src/storefront/events.js index 85b1f5a0d..56ecac813 100644 --- a/packages/filecoin-api/src/storefront/events.js +++ b/packages/filecoin-api/src/storefront/events.js @@ -40,7 +40,7 @@ export const handleFilecoinSubmitMessage = async (context, message) => { // read and compute piece for content // TODO: needs to be hooked with location claims - const contentStreamRes = await context.dataStore.stream(message.content) + const contentStreamRes = await context.contentStore.stream(message.content) if (contentStreamRes.error) { return { error: new BlobNotFound(contentStreamRes.error.message) } } diff --git a/packages/filecoin-api/src/storefront/piece.js b/packages/filecoin-api/src/storefront/piece.js index 099c8d2a8..2257cad00 100644 --- a/packages/filecoin-api/src/storefront/piece.js +++ b/packages/filecoin-api/src/storefront/piece.js @@ -7,13 +7,14 @@ import { ComputePieceFailed } from '../errors.js' /** * Compute PieceCid for provided async iterable. * - * @param {AsyncIterable} stream + * @param {ReadableStream} stream */ export async function computePieceCid(stream) { /** @type {import('../types.js').PieceLink} */ let piece try { const hasher = Hasher.create() + // @ts-ignore Readable stream is Aync Iterator for await (const chunk of stream) { hasher.write(chunk) } diff --git a/packages/filecoin-api/src/types.ts b/packages/filecoin-api/src/types.ts index 89297d600..07d7e1d96 100644 --- a/packages/filecoin-api/src/types.ts +++ b/packages/filecoin-api/src/types.ts @@ -52,7 +52,7 @@ export interface UpdatableStore extends Store { ) => Promise> } -export interface StreammableStore { +export interface ReadableStreamStore { /** * Puts a record in the store. */ @@ -60,7 +60,7 @@ export interface StreammableStore { /** * Gets a record from the store. */ - stream: (key: RecKey) => Promise, StoreGetError>> + stream: (key: RecKey) => Promise, StoreGetError>> } export interface QueryableStore extends Store { diff --git a/packages/filecoin-api/test/context/store-implementations.js b/packages/filecoin-api/test/context/store-implementations.js index 049d60371..cce8e556a 100644 --- a/packages/filecoin-api/test/context/store-implementations.js +++ b/packages/filecoin-api/test/context/store-implementations.js @@ -1,4 +1,4 @@ -import { UpdatableStore, StreammableStore } from './store.js' +import { UpdatableStore, ReadableStreamStore } from './store.js' /** * @typedef {import('@ucanto/interface').Link} Link @@ -19,7 +19,7 @@ import { UpdatableStore, StreammableStore } from './store.js' */ export const getStoreImplementations = ( StoreImplementation = UpdatableStore, - StreammableStoreImplementation = StreammableStore + ReadableStreamStoreImplementation = ReadableStreamStore ) => ({ storefront: { pieceStore: new StoreImplementation({ @@ -77,7 +77,7 @@ export const getStoreImplementations = ( return Array.from(items).find((i) => i.ran.link().equals(record)) }, }), - dataStore: new StreammableStore({ + contentStore: new ReadableStreamStore({ streamFn: ( /** @type {Set} */ items, /** @type {import('@ucanto/interface').UnknownLink} */ record @@ -86,15 +86,14 @@ export const getStoreImplementations = ( if (!item) { return undefined } - const asyncIterableRes = { - [Symbol.asyncIterator]: async function* () { - // Yield the Uint8Array asynchronously - if (item) { - yield item - } - }, - } - return asyncIterableRes + return new ReadableStream({ + start(controller) { + // Push the data into the stream + controller.enqueue(item) + // Close the stream + controller.close() + } + }) }, }), }, diff --git a/packages/filecoin-api/test/context/store.js b/packages/filecoin-api/test/context/store.js index 2a2a10ca7..9ed687950 100644 --- a/packages/filecoin-api/test/context/store.js +++ b/packages/filecoin-api/test/context/store.js @@ -97,11 +97,11 @@ export class Store { /** * @template K * @template V - * @implements {API.StreammableStore} + * @implements {API.ReadableStreamStore} */ -export class StreammableStore { +export class ReadableStreamStore { /** - * @param {import('./types.js').StreammableStoreOptions} options + * @param {import('./types.js').ReadableStreamStoreOptions} options */ constructor(options) { /** @type {Set} */ @@ -123,7 +123,7 @@ export class StreammableStore { /** * @param {K} item - * @returns {Promise, StoreGetError>>} + * @returns {Promise, StoreGetError>>} */ async stream(item) { if (!this.streamFn) { diff --git a/packages/filecoin-api/test/context/types.ts b/packages/filecoin-api/test/context/types.ts index d31f5b410..8bfb36868 100644 --- a/packages/filecoin-api/test/context/types.ts +++ b/packages/filecoin-api/test/context/types.ts @@ -7,6 +7,6 @@ export interface UpdatableStoreOptions extends StoreOptions { updateFn?: (items: Set, key: K, item: Partial) => V } -export interface StreammableStoreOptions extends StoreOptions { - streamFn?: (items: Set, item: K) => AsyncIterable | undefined +export interface ReadableStreamStoreOptions extends StoreOptions { + streamFn?: (items: Set, item: K) => ReadableStream | undefined } diff --git a/packages/filecoin-api/test/events/storefront.js b/packages/filecoin-api/test/events/storefront.js index 58c9d464c..04018cc89 100644 --- a/packages/filecoin-api/test/events/storefront.js +++ b/packages/filecoin-api/test/events/storefront.js @@ -39,7 +39,7 @@ export const test = { } // Store bytes on datastore - await context.dataStore.put(cargo.bytes) + await context.testContentStore.put(cargo.bytes) // Handle message const handledMessageRes = diff --git a/packages/filecoin-api/test/storefront.spec.js b/packages/filecoin-api/test/storefront.spec.js index ba36540f8..bbe2eec31 100644 --- a/packages/filecoin-api/test/storefront.spec.js +++ b/packages/filecoin-api/test/storefront.spec.js @@ -103,7 +103,7 @@ describe('storefront', () => { // context const { - storefront: { pieceStore, taskStore, receiptStore, dataStore }, + storefront: { pieceStore, taskStore, receiptStore, contentStore }, } = getStoreImplementations() await test( @@ -118,7 +118,8 @@ describe('storefront', () => { pieceStore, receiptStore, taskStore, - dataStore, + contentStore, + testContentStore: contentStore, storefrontService: { connection: storefrontConnection, invocationConfig: { diff --git a/packages/filecoin-api/test/types.ts b/packages/filecoin-api/test/types.ts index d164eb73e..cb2e6fbb3 100644 --- a/packages/filecoin-api/test/types.ts +++ b/packages/filecoin-api/test/types.ts @@ -1,7 +1,13 @@ -import type { Signer } from '@ucanto/interface' +import type { + Signer, + Result, + Unit, + UnknownLink +} from '@ucanto/interface' import * as AggregatorInterface from '../src/aggregator/api.js' import * as DealerInterface from '../src/dealer/api.js' import * as StorefrontInterface from '../src/storefront/api.js' +import { StorePutError } from '../src/types.js' export interface AggregatorTestEventsContext extends AggregatorInterface.PieceMessageContext, @@ -43,6 +49,7 @@ export interface StorefrontTestEventsContext StorefrontInterface.CronContext { id: Signer aggregatorId: Signer + testContentStore: TestContentStore service: Partial<{ filecoin: Partial piece: Partial @@ -53,3 +60,10 @@ export interface StorefrontTestEventsContext > }> } + +export interface TestContentStore extends StorefrontInterface.ContentStore { + /** + * Puts a record in the store. + */ + put: (record: Rec) => Promise> +} \ No newline at end of file