diff --git a/packages/filecoin-api/src/aggregator/api.ts b/packages/filecoin-api/src/aggregator/api.ts index 7a6b1ae01..1bb1b7b2d 100644 --- a/packages/filecoin-api/src/aggregator/api.ts +++ b/packages/filecoin-api/src/aggregator/api.ts @@ -187,6 +187,10 @@ export interface AggregateRecord { * Insertion date ISO string. */ insertedAt: string + /** + * ISO string date of oldest piece in the pipeline included into the aggregate. + */ + oldestPieceInsertedAt: string } // TODO: probably group should also be key! @@ -314,6 +318,10 @@ export interface AggregateOfferMessage { * Grouping information for submitted piece. */ group: string + /** + * ISO string date of oldest piece in the pipeline included into the aggregate. + */ + oldestPieceInsertedAt: string } export interface AggregateConfig { diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index e7de30d45..124f90885 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -50,6 +50,15 @@ export async function handleBufferReducingWithAggregate({ ) const aggregateBlock = await CBOR.write(aggregateReducedBuffer) + // Get timestamp of oldest piece in the pipeline included in the aggregate + const oldestPieceInsertedAtDate = new Date( + Math.min( + ...aggregateInfo.addedBufferedPieces.map((bf) => + new Date(bf.insertedAt).getTime() + ) + ) + ) + // Store buffered pieces for aggregate const bufferStoreAggregatePut = await bufferStore.put({ buffer: aggregateReducedBuffer, @@ -65,6 +74,7 @@ export async function handleBufferReducingWithAggregate({ buffer: aggregateBlock.cid, pieces: piecesBlock.cid, group, + oldestPieceInsertedAt: oldestPieceInsertedAtDate.toISOString(), }) if (aggregateOfferQueueAdd.error) { return aggregateOfferQueueAdd diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index d8b27c96d..bc8895ea6 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -168,7 +168,7 @@ export const handleBufferQueueMessage = async (context, records) => { * @param {import('./api.js').AggregateOfferMessage} message */ export const handleAggregateOfferMessage = async (context, message) => { - const { pieces, aggregate, buffer, group } = message + const { pieces, aggregate, buffer, group, oldestPieceInsertedAt } = message // Store aggregate information into the store. Store events MAY be used to propagate aggregate over const putRes = await context.aggregateStore.put({ @@ -176,6 +176,7 @@ export const handleAggregateOfferMessage = async (context, message) => { aggregate, buffer, group, + oldestPieceInsertedAt, insertedAt: new Date().toISOString(), }) diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index 04ca2ec0d..67bf2070b 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -352,13 +352,26 @@ export const test = { /** @type {AggregateOfferMessage} */ // @ts-expect-error cannot infer buffer message const message = context.queuedMessages.get('aggregateOfferQueue')?.[0] - const bufferGet = await context.bufferStore.get(message.buffer) assert.ok(bufferGet.ok) assert.ok(bufferGet.ok?.block.equals(message.buffer)) assert.equal(bufferGet.ok?.buffer.group, group) assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces) + // Validate oldest piece date + assert.ok(message.oldestPieceInsertedAt) + + const oldestPieceInsertedAtDate = new Date( + Math.min( + ...(bufferGet.ok?.buffer.pieces?.map((bf) => + new Date(bf.insertedAt).getTime() + ) || []) + ) + ) + assert.equal( + oldestPieceInsertedAtDate.toISOString(), + message.oldestPieceInsertedAt + ) }, 'handles buffer queue messages successfully to queue aggregate and remaining buffer': async (assert, context) => {