Skip to content

Commit

Permalink
feat: upgrade storefront api with filecoin offer from bucket event in…
Browse files Browse the repository at this point in the history
…stead of compute
  • Loading branch information
vasco-santos committed Mar 26, 2024
1 parent a1f171d commit e40cd0f
Show file tree
Hide file tree
Showing 18 changed files with 362 additions and 379 deletions.
30 changes: 28 additions & 2 deletions filecoin/functions/handle-filecoin-submit-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as Sentry from '@sentry/serverless'
import * as storefrontEvents from '@web3-storage/filecoin-api/storefront/events'

import { createPieceTable } from '../store/piece.js'
import { createDataStore, composeDataStoresWithOrderedStream } from '../store/data.js'
import { decodeMessage } from '../queue/filecoin-submit-queue.js'
import { mustGetEnv } from './utils.js'

Expand All @@ -13,6 +14,7 @@ Sentry.AWSLambda.init({
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'
const R2_REGION = process.env.R2_REGION || 'auto'

/**
* Get EventRecord from the SQS Event triggering the handler.
Expand All @@ -34,12 +36,30 @@ async function handleFilecoinSubmitMessage (sqsEvent) {
})

// create context
const { pieceTableName } = getEnv()
const {
pieceTableName,
s3BucketName,
r2BucketName,
r2BucketEndpoint,
r2BucketAccessKeyId,
r2BucketSecretAccessKey
} = getEnv()
const context = {
pieceStore: createPieceTable(AWS_REGION, pieceTableName)
pieceStore: createPieceTable(AWS_REGION, pieceTableName),
dataStore: composeDataStoresWithOrderedStream(
createDataStore(AWS_REGION, s3BucketName),
createDataStore(R2_REGION, r2BucketName, {
endpoint: r2BucketEndpoint,
credentials: {
accessKeyId: r2BucketAccessKeyId,
secretAccessKey: r2BucketSecretAccessKey,
},
})
)
}

const { ok, error } = await storefrontEvents.handleFilecoinSubmitMessage(context, record)
console.log('handleFilecoinSubmitMessage - ok, error', ok, error)
if (error) {
return {
statusCode: 500,
Expand All @@ -59,6 +79,12 @@ async function handleFilecoinSubmitMessage (sqsEvent) {
function getEnv () {
return {
pieceTableName: mustGetEnv('PIECE_TABLE_NAME'),
// carpark buckets - CAR file bytes may be found here with keys like {cid}/{cid}.car
s3BucketName: mustGetEnv('STORE_BUCKET_NAME'),
r2BucketName: mustGetEnv('R2_CARPARK_BUCKET_NAME'),
r2BucketEndpoint: mustGetEnv('R2_ENDPOINT'),
r2BucketAccessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'),
r2BucketSecretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'),
}
}

Expand Down
39 changes: 19 additions & 20 deletions filecoin/functions/handle-piece-insert-to-content-claim.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import * as Sentry from '@sentry/serverless'
import { Config } from 'sst/node/config'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import { Piece } from '@web3-storage/data-segment'
import { CID } from 'multiformats/cid'
import * as Delegation from '@ucanto/core/delegation'
import { fromString } from 'uint8arrays/from-string'
import * as DID from '@ipld/dag-ucan/did'

import { reportPieceCid } from '../index.js'
import * as storefrontEvents from '@web3-storage/filecoin-api/storefront/events'

import { decodeRecord } from '../store/piece.js'
import { getServiceConnection, getServiceSigner } from '../service.js'
import { mustGetEnv } from './utils.js'

Expand Down Expand Up @@ -35,11 +35,10 @@ async function pieceCidReport (event) {

/** @type {PieceStoreRecord} */
// @ts-expect-error can't figure out type of new
const pieceRecord = unmarshall(records[0].new)
const piece = Piece.fromString(pieceRecord.piece).link
const content = CID.parse(pieceRecord.content)
const storeRecord = unmarshall(records[0].new)
const record = decodeRecord(storeRecord)

const claimsServiceConnection = getServiceConnection({
const connection = getServiceConnection({
did: contentClaimsDid,
url: contentClaimsUrl
})
Expand All @@ -56,24 +55,24 @@ async function pieceCidReport (event) {
claimsIssuer = claimsIssuer.withDID(DID.parse(contentClaimsDid).did())
}

const { ok, error } = await reportPieceCid({
piece,
content,
claimsServiceConnection,
claimsInvocationConfig: /** @type {import('../types.js').ClaimsInvocationConfig} */ ({
issuer: claimsIssuer,
audience: claimsServiceConnection.id,
with: claimsIssuer.did(),
proofs: claimsProofs
})
})
const context = {
claimsService: {
connection,
invocationConfig: {
issuer: claimsIssuer,
audience: connection.id,
with: claimsIssuer.did(),
proofs: claimsProofs
},
},
}

const { ok, error } = await storefrontEvents.handlePieceInsertToEquivalencyClaim(context, record)
if (error) {
console.error(error)

return {
statusCode: 500,
body: error.message || 'failed to add aggregate'
body: error.message || 'failed to handle piece insert event to content claim'
}
}

Expand Down
75 changes: 59 additions & 16 deletions filecoin/functions/piece-cid-compute.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
import { S3Client } from '@aws-sdk/client-s3'
import * as Sentry from '@sentry/serverless'
import { S3Client } from '@aws-sdk/client-s3'
import { Config } from 'sst/node/config'
import { Storefront } from '@web3-storage/filecoin-client'
import * as Delegation from '@ucanto/core/delegation'
import { fromString } from 'uint8arrays/from-string'
import * as DID from '@ipld/dag-ucan/did'

import { computePieceCid } from '../index.js'
import { getServiceConnection, getServiceSigner } from '../service.js'
import { mustGetEnv } from './utils.js'
import { createPieceTable } from '../store/piece.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* Get EventRecord from the SQS Event triggering the handler
* Get EventRecord from the SQS Event triggering the handler.
* Trigger `filecoin/offer` from bucket event
*
* @param {import('aws-lambda').SQSEvent} event
*/
async function computeHandler (event) {
const {
pieceTableName,
disablePieceCidCompute,
did
} = getEnv()
const { PRIVATE_KEY: privateKey } = Config
const { storefrontDid, storefrontUrl, did, storefrontProof, disablePieceCidCompute } = getEnv()

if (disablePieceCidCompute) {
const body = 'piece cid computation is disabled'
Expand All @@ -34,21 +35,46 @@ async function computeHandler (event) {
}
}

// Create context
let storefrontSigner = getServiceSigner({
privateKey
})
const connection = getServiceConnection({
did: storefrontDid,
url: storefrontUrl
})
const storefrontServiceProofs = []
if (storefrontProof) {
const proof = await Delegation.extract(fromString(storefrontProof, 'base64pad'))
if (!proof.ok) throw new Error('failed to extract proof', { cause: proof.error })
storefrontServiceProofs.push(proof.ok)
} else {
// if no proofs, we must be using the service private key to sign
storefrontSigner = storefrontSigner.withDID(DID.parse(did).did())
}
const storefrontService = {
connection,
invocationConfig: {
issuer: storefrontSigner,
with: storefrontSigner.did(),
audience: storefrontSigner,
proofs: storefrontServiceProofs
},
}

// Decode record
const record = parseEvent(event)
if (!record) {
throw new Error('Unexpected sqs record format')
}

const s3Client = new S3Client({ region: record.bucketRegion })
const pieceTable = createPieceTable(AWS_REGION, pieceTableName)

// Compute piece for record
const { error, ok } = await computePieceCid({
record,
s3Client,
pieceTable,
group: did
})

if (error) {
console.error(error)

Expand All @@ -58,9 +84,24 @@ async function computeHandler (event) {
}
}

// Invoke `filecoin/offer`
const filecoinSubmitInv = await Storefront.filecoinOffer(
storefrontService.invocationConfig,
ok.content,
ok.piece,
{ connection: storefrontService.connection }
)
console.log('piece cid compute', record.key, filecoinSubmitInv.out.error, filecoinSubmitInv.out.ok)

if (filecoinSubmitInv.out.error) {
return {
statusCode: 500,
body: filecoinSubmitInv.out.error,
}
}

return {
statusCode: 200,
body: ok
}
}

Expand All @@ -71,8 +112,10 @@ export const handler = Sentry.AWSLambda.wrapHandler(computeHandler)
*/
function getEnv () {
return {
pieceTableName: mustGetEnv('PIECE_TABLE_NAME'),
did: mustGetEnv('DID'),
storefrontDid: mustGetEnv('STOREFRONT_DID'),
storefrontUrl: mustGetEnv('STOREFRONT_URL'),
storefrontProof: process.env.PROOF,
disablePieceCidCompute: process.env.DISABLE_PIECE_CID_COMPUTE === 'true'
}
}
Expand Down
62 changes: 5 additions & 57 deletions filecoin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'
import * as Digest from 'multiformats/hashes/digest'
import { Piece } from '@web3-storage/data-segment'
import { CID } from 'multiformats/cid'
import { Assert } from '@web3-storage/content-claims/capability'

import { GetCarFailed, ComputePieceFailed } from './errors.js'

Expand All @@ -27,14 +26,10 @@ import { GetCarFailed, ComputePieceFailed } from './errors.js'
* @param {object} props
* @param {EventRecord} props.record
* @param {S3Client} props.s3Client
* @param {string} props.group
* @param {import('@web3-storage/filecoin-api/storefront/api').PieceStore} props.pieceTable
*/
export async function computePieceCid({
record,
s3Client,
pieceTable,
group
s3Client
}) {
const key = record.key
// CIDs in carpark are in format `${carCid}/${carCid}.car`
Expand Down Expand Up @@ -80,57 +75,10 @@ export async function computePieceCid({
}
}

// Write to table
const insertedAt = (new Date()).toISOString()
const { ok, error } = await pieceTable.put({
content: CID.parse(cidString),
piece: piece.link,
status: 'submitted',
insertedAt,
updatedAt: insertedAt,
group
})

return {
ok,
error
}
}

/**
* @param {object} props
* @param {import('@web3-storage/data-segment').PieceLink} props.piece
* @param {import('multiformats').CID} props.content
* @param {import('@ucanto/principal/ed25519').ConnectionView<any>} props.claimsServiceConnection
* @param {import('./types.js').ClaimsInvocationConfig} props.claimsInvocationConfig
*/
export async function reportPieceCid ({
piece,
content,
claimsServiceConnection,
claimsInvocationConfig
}) {
// Add claim for reading
const claimResult = await Assert.equals
.invoke({
issuer: claimsInvocationConfig.issuer,
audience: claimsInvocationConfig.audience,
with: claimsInvocationConfig.with,
nb: {
content,
equals: piece
},
expiration: Infinity,
proofs: claimsInvocationConfig.proofs
})
.execute(claimsServiceConnection)
if (claimResult.out.error) {
return {
error: claimResult.out.error
}
}

return {
ok: {},
ok: {
content: CID.parse(cidString),
piece: piece.link
},
}
}
5 changes: 2 additions & 3 deletions filecoin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@
"@aws-sdk/client-dynamodb": "^3.515.0",
"@aws-sdk/client-s3": "^3.515.0",
"@aws-sdk/client-sqs": "^3.515.0",
"@ipld/car": "^5.2.6",
"@sentry/serverless": "^7.74.1",
"@ucanto/client": "^9.0.0",
"@ucanto/core": "^9.0.1",
"@ucanto/interface": "^9.0.0",
"@ucanto/principal": "^9.0.0",
"@ucanto/transport": "^9.0.0",
"@web3-storage/content-claims": "^3.2.0",
"@web3-storage/data-segment": "^5.0.0",
"@web3-storage/filecoin-api": "^4.2.0",
"@web3-storage/filecoin-api": "^4.6.0",
"@web3-storage/filecoin-client": "^3.1.3",
"fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.1.1",
"multiformats": "^13.1.0",
"uint8arrays": "4.0.6"
},
"devDependencies": {
"@ipld/car": "^5.2.6",
"@web-std/blob": "3.0.5",
"ava": "^4.3.3",
"constructs": "*",
Expand Down
Loading

0 comments on commit e40cd0f

Please sign in to comment.