Skip to content

Commit

Permalink
tests: egress traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 17, 2024
1 parent 5f86e7d commit 2194d13
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 15 deletions.
13 changes: 12 additions & 1 deletion billing/data/egress.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,15 @@ export const decode = input => {
error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err })
}
}
}
}

/** @type {import('../lib/api').Decoder<string, import('../lib/api').EgressTrafficData>} */
export const decodeStr = input => {
try {
return decode(JSON.parse(input))
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding str egress traffic event: ${err.message}`, { cause: err })
}
}
}
5 changes: 2 additions & 3 deletions billing/functions/egress-traffic-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export const handler = Sentry.AWSLambda.wrapHandler(
* @param {import('aws-lambda').SQSEvent} event
* @param {import('aws-lambda').Context} context
*/

async (event, context) => {
/** @type {CustomHandlerContext|undefined} */
const customContext = context?.clientContext?.Custom
Expand Down Expand Up @@ -82,10 +81,10 @@ export const handler = Sentry.AWSLambda.wrapHandler(
/**
* Finds the Stripe customer ID for the given customer and records the egress traffic data in the Stripe Billing Meter API.
*
* @param {import('../lib/api.ts').CustomerStore} customerStore
* @param {import('../lib/api.js').CustomerStore} customerStore
* @param {import('stripe').Stripe} stripe
* @param {string} billingMeterName
* @param {import('../lib/api.ts').EgressTrafficData} egressEventData
* @param {import('../lib/api.js').EgressTrafficData} egressEventData
*/
async function recordEgress(customerStore, stripe, billingMeterName, egressEventData) {
const response = await customerStore.get({ customer: egressEventData.customer })
Expand Down
58 changes: 58 additions & 0 deletions billing/test/helpers/context.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dotenv from 'dotenv'
import { createDynamoDB, createSQS, createQueue, createTable } from './aws.js'
import { createCustomerStore, customerTableProps } from '../../tables/customer.js'
import { encode as encodeCustomer, validate as validateCustomer } from '../../data/customer.js'
Expand All @@ -6,6 +7,7 @@ import { decode as decodeSpaceBillingInstruction } from '../../data/space-billin
import { encode as encodeSubscription, validate as validateSubscription } from '../../data/subscription.js'
import { encode as encodeConsumer, validate as validateConsumer } from '../../data/consumer.js'
import { decode as decodeUsage, lister as usageLister } from '../../data/usage.js'
import { decodeStr as decodeEgressTrafficEvent } from '../../data/egress.js'
import { createCustomerBillingQueue } from '../../queues/customer.js'
import { createSpaceBillingQueue } from '../../queues/space.js'
import { consumerTableProps, subscriptionTableProps } from '../../../upload-api/tables/index.js'
Expand All @@ -16,6 +18,11 @@ import { createSpaceDiffStore, spaceDiffTableProps } from '../../tables/space-di
import { createSpaceSnapshotStore, spaceSnapshotTableProps } from '../../tables/space-snapshot.js'
import { createUsageStore, usageTableProps } from '../../tables/usage.js'
import { createQueueRemoverClient } from './queue.js'
import { createEgressTrafficQueue } from '../../queues/egress-traffic.js'
import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-handler.js'
import { fileURLToPath } from 'node:url'
import Stripe from 'stripe'
dotenv.config({ path: fileURLToPath(new URL('../../.env.local', import.meta.url)) })

/**
* @typedef {{
Expand Down Expand Up @@ -137,6 +144,57 @@ export const createUCANStreamTestContext = async () => {
return { consumerStore, spaceDiffStore }
}

/**
* @returns {Promise<import('../lib/api').EgressTrafficTestContext>}
*/
export const createEgressTrafficTestContext = async () => {
await createAWSServices()
const stripeSecretKey = process.env.STRIPE_TEST_SECRET_KEY
if (!stripeSecretKey) {
throw new Error('STRIPE_TEST_SECRET_KEY environment variable is not set')
}

const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-'))
const egressTrafficQueue = {
add: createEgressTrafficQueue(awsServices.sqs.client, { url: egressQueueURL }).add,
remove: createQueueRemoverClient(awsServices.sqs.client, { url: egressQueueURL, decode: decodeEgressTrafficEvent }).remove,
}

const accountId = (await awsServices.sqs.client.config.credentials()).accountId
const region = await awsServices.sqs.client.config.region()

return {
egressTrafficQueue,
egressTrafficQueueUrl: egressQueueURL.toString(),
egressTrafficHandler: createEgressTrafficHandler,
accountId: accountId ?? '',
region: region ?? '',
stripeSecretKey,
stripe: new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' }),
// Add mock properties for default Context
callbackWaitsForEmptyEventLoop: false,
functionName: 'egress-traffic-handler',
functionVersion: '1',
invokedFunctionArn: `arn:aws:lambda:${region}:${accountId}:function:egress-traffic-handler`,
memoryLimitInMB: '128',
awsRequestId: 'mockRequestId',
logGroupName: 'mockLogGroup',
logStreamName: 'mockLogStream',
identity: undefined,
clientContext: undefined,
getRemainingTimeInMillis: () => 30000, // mock implementation
done: () => {
console.log('Egress traffic handler done')
},
fail: () => {
console.log('Egress traffic handler fail')
},
succeed: () => {
console.log('Egress traffic handler succeed')
}
}
}

/**
* @template C
* @param {import('../lib/api').TestSuite<C>} suite
Expand Down
14 changes: 14 additions & 0 deletions billing/test/helpers/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { randomDIDMailto } from './did.js'
import { randomLink } from './dag.js'

/**
* @param {Partial<import('../../lib/api').EgressTrafficData>} [base]
* @returns {Promise<import('../../lib/api').EgressTrafficData>}
*/
export const randomEgressEvent = async (base = {}) => ({
customer: await randomDIDMailto(),
resource: randomLink(),
bytes: BigInt(Math.floor(Math.random() * 1000000)),
servedAt: new Date(),
...base
})
4 changes: 4 additions & 0 deletions billing/test/lib.egress-traffic.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import * as EgressTrafficSuite from './lib/egress-traffic.js'
import { bindTestContext, createEgressTrafficTestContext } from './helpers/context.js'

export const test = bindTestContext(EgressTrafficSuite.test, createEgressTrafficTestContext)
18 changes: 17 additions & 1 deletion billing/test/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import {
SpaceSnapshotStore,
UsageStore,
UsageListKey,
Usage
Usage,
EgressTrafficQueue,
EgressTrafficData
} from '../../lib/api.js'
import { Context, Handler, SQSEvent } from 'aws-lambda'
import Stripe from 'stripe'

export interface BillingCronTestContext {
customerStore: CustomerStore & StorePutter<Customer>
Expand Down Expand Up @@ -47,12 +51,24 @@ export interface UCANStreamTestContext {
consumerStore: ConsumerStore & StorePutter<Consumer>
}


export interface EgressTrafficTestContext extends Context {
egressTrafficQueue: EgressTrafficQueue & QueueRemover<EgressTrafficData>
egressTrafficQueueUrl: string
egressTrafficHandler: Handler<SQSEvent, { statusCode: number, body: string }>
accountId: string
region: string
stripeSecretKey: string
stripe: Stripe
}

export type TestContext =
& BillingCronTestContext
& CustomerBillingQueueTestContext
& SpaceBillingQueueTestContext
& StripeTestContext
& UCANStreamTestContext
& EgressTrafficTestContext

/** QueueRemover can remove items from the head of the queue. */
export interface QueueRemover<T> {
Expand Down
67 changes: 67 additions & 0 deletions billing/test/lib/egress-traffic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { randomEgressEvent } from '../helpers/egress.js'
import { collectQueueMessages } from '../helpers/queue.js'

/** @type {import('./api').TestSuite<import('./api').EgressTrafficTestContext>} */
export const test = {
'should process egress events': async (/** @type {import('entail').assert} */ assert, ctx) => {
const maxEvents = 100
const events = await Promise.all(
Array.from({ length: maxEvents }, () => randomEgressEvent())
)

// 1. Add egress events to the queue to simulate events from the Freeway worker
for (const e of events) {
console.log(`Adding egress event to the queue: CustomerId: ${e.customer}, ResourceId: ${e.resource}, ServedAt: ${e.servedAt.toISOString()}`)
await ctx.egressTrafficQueue.add(e)
}

// 2. Simulate the egress traffic (SQS events) to trigger the handler
// FIXME (fforbeck): why the events are not collected?
const collected = await collectQueueMessages(ctx.egressTrafficQueue)
assert.ok(collected.ok, 'Failed to collect queue messages')
assert.equal(collected.ok.length, events.length, 'Collected queue messages length does not match')

// @type {import('aws-lambda').SQSEvent}
const sqsEventBatch = {
Records: collected.ok.map(e => ({
// @type {import('aws-lambda').SQSRecord}
body: JSON.stringify(e),
messageId: Math.random().toString(),
receiptHandle: Math.random().toString(),
awsRegion: ctx.region,
eventSource: 'aws:sqs',
eventSourceARN: `arn:aws:sqs:${ctx.region}:${ctx.accountId}:${ctx.egressTrafficQueueUrl}`,
awsAccountId: ctx.accountId,
md5OfBody: '',
md5OfMessageAttributes: '',
attributes: {
ApproximateReceiveCount: '1',
SentTimestamp: e.servedAt.getTime().toString(),
SenderId: ctx.accountId,
ApproximateFirstReceiveTimestamp: e.servedAt.getTime().toString(),
},
messageAttributes: {},
}))
}

// 3. Process the SQS event to trigger the handler
await ctx.egressTrafficHandler(sqsEventBatch, ctx, (err, res) => {
if (err) {
assert.fail(err)
}
assert.ok(res)
assert.equal(res.statusCode, 200)
assert.equal(res.body, 'Egress events processed successfully')
})

// 4. Ensure we got a billing meter event or each egress event in the queue
// query stripe for the billing meter events
// const billingMeterEvents = await ctx.stripe.billing.meterEvents.list({
// limit: maxEvents,
// })
// assert.equal(billingMeterEvents.data.length, events.length)
// FIXME (fforbeck): how to check we send the events to stripe?
// we need to mock the stripe client
// and check that the correct events are sent to stripe
}
}
13 changes: 8 additions & 5 deletions filecoin/test/filecoin-events.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@ test.after(async t => {
})

for (const [title, unit] of Object.entries(filecoinApiTest.events.storefront)) {
const define = title.startsWith('only ')
let define;
if (title.startsWith('only ')) {
// eslint-disable-next-line no-only-tests/no-only-tests
? test.only
: title.startsWith('skip ')
? test.skip
: test
define = test.only;
} else if (title.startsWith('skip ')) {
define = test.skip;
} else {
define = test;
}

define(title, async (t) => {
const queues = getQueues(t.context)
Expand Down
13 changes: 8 additions & 5 deletions filecoin/test/filecoin-service.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,15 @@ test.after(async t => {
})

for (const [title, unit] of Object.entries(filecoinApiTest.service.storefront)) {
const define = title.startsWith('only ')
let define;
if (title.startsWith('only ')) {
// eslint-disable-next-line no-only-tests/no-only-tests
? test.only
: title.startsWith('skip ')
? test.skip
: test
define = test.only;
} else if (title.startsWith('skip ')) {
define = test.skip;
} else {
define = test;
}

define(title, async (t) => {
const queues = getQueues(t.context)
Expand Down

0 comments on commit 2194d13

Please sign in to comment.