From 684639bb8e9b18e6cd0b74c3a802b950a5dbefab Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 6 Feb 2024 13:34:28 +0100 Subject: [PATCH 1/2] feat: add support to prepend pieces while buffering to aggregate --- packages/filecoin-api/src/aggregator/api.ts | 1 + .../src/aggregator/buffer-reducing.js | 25 ++++--- .../filecoin-api/src/aggregator/events.js | 1 + .../filecoin-api/test/events/aggregator.js | 71 +++++++++++++++++++ 4 files changed, 88 insertions(+), 10 deletions(-) diff --git a/packages/filecoin-api/src/aggregator/api.ts b/packages/filecoin-api/src/aggregator/api.ts index aab00ffb8..8e2f7d37f 100644 --- a/packages/filecoin-api/src/aggregator/api.ts +++ b/packages/filecoin-api/src/aggregator/api.ts @@ -328,6 +328,7 @@ export interface AggregateConfig { maxAggregateSize: number minAggregateSize: number minUtilizationFactor: number + prependBufferedPieces?: BufferedPiece[] } // Enums diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index c1262d0ab..7f10a4f07 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -152,28 +152,33 @@ export async function handleBufferReducingWithoutAggregate({ * Attempt to build an aggregate with buffered pieces within ranges. * * @param {BufferedPiece[]} bufferedPieces - * @param {object} sizes - * @param {number} sizes.maxAggregateSize - * @param {number} sizes.minAggregateSize - * @param {number} sizes.minUtilizationFactor + * @param {object} config + * @param {number} config.maxAggregateSize + * @param {number} config.minAggregateSize + * @param {number} config.minUtilizationFactor + * @param {BufferedPiece[]} [config.prependBufferedPieces] */ -export function aggregatePieces(bufferedPieces, sizes) { +export function aggregatePieces(bufferedPieces, config) { + const transformedBufferedPieces = [ + ...(config.prependBufferedPieces || []), + ...bufferedPieces + ] // Guarantee buffered pieces total size is bigger than the minimum utilization - const bufferUtilizationSize = bufferedPieces.reduce((total, p) => { + const bufferUtilizationSize = transformedBufferedPieces.reduce((total, p) => { const piece = Piece.fromLink(p.piece) total += piece.size return total }, 0n) if ( bufferUtilizationSize < - sizes.maxAggregateSize / sizes.minUtilizationFactor + config.maxAggregateSize / config.minUtilizationFactor ) { return } // Create builder with maximum size and try to fill it up const builder = Aggregate.createBuilder({ - size: Aggregate.Size.from(sizes.maxAggregateSize), + size: Aggregate.Size.from(config.maxAggregateSize), }) // add pieces to an aggregate until there is no more space, or no more pieces @@ -182,7 +187,7 @@ export function aggregatePieces(bufferedPieces, sizes) { /** @type {BufferedPiece[]} */ const remainingBufferedPieces = [] - for (const bufferedPiece of bufferedPieces) { + for (const bufferedPiece of transformedBufferedPieces) { const p = Piece.fromLink(bufferedPiece.piece) if (builder.estimate(p).error) { remainingBufferedPieces.push(bufferedPiece) @@ -196,7 +201,7 @@ export function aggregatePieces(bufferedPieces, sizes) { BigInt(builder.limit) * BigInt(Index.EntrySize) // If not enough space return undefined - if (totalUsedSpace < BigInt(sizes.minAggregateSize)) { + if (totalUsedSpace < BigInt(config.minAggregateSize)) { return } diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index 71a59f2ee..66b00c911 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -114,6 +114,7 @@ export const handleBufferQueueMessage = async (context, records) => { maxAggregateSize: context.config.maxAggregateSize, minAggregateSize: context.config.minAggregateSize, minUtilizationFactor: context.config.minUtilizationFactor, + prependBufferedPieces: context.config.prependBufferedPieces }) // Store buffered pieces if not enough to do aggregate and re-queue them diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index d2de21104..d83671367 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -397,6 +397,77 @@ export const test = { message.minPieceInsertedAt ) }, + 'handles buffer queue messages successfully to queue aggregate prepended with a buffer piece': async ( + assert, + context + ) => { + const group = context.id.did() + const { buffers, blocks } = await getBuffers(2, group, { + length: 100, + size: 128, + }) + + const [cargo] = await randomCargo(1, 128) + /** @type {import('../../src/aggregator/api.js').BufferedPiece} */ + const bufferedPiece = { + piece: cargo.link.link(), + policy: 0, + insertedAt: (new Date()).toISOString() + } + + const totalPieces = buffers.reduce((acc, v) => { + acc += v.pieces.length + return acc + }, 0) + + // Store buffers + for (let i = 0; i < blocks.length; i++) { + const putBufferRes = await context.bufferStore.put({ + buffer: buffers[i], + block: blocks[i].cid, + }) + assert.ok(putBufferRes.ok) + } + + // Handle messages + const handledMessageRes = await AggregatorEvents.handleBufferQueueMessage( + { + ...context, + config: { + minAggregateSize: 2 ** 19, + minUtilizationFactor: 10e5, + maxAggregateSize: 2 ** 35, + prependBufferedPieces: [bufferedPiece] + }, + }, + blocks.map((b) => ({ + pieces: b.cid, + group, + })) + ) + assert.ok(handledMessageRes.ok) + assert.equal(handledMessageRes.ok?.aggregatedPieces, totalPieces + 1) + + // Validate queue and store + await pWaitFor( + () => + context.queuedMessages.get('aggregateOfferQueue')?.length === 1 + ) + + /** @type {AggregateOfferMessage} */ + // @ts-expect-error cannot infer buffer message + const message = context.queuedMessages.get('aggregateOfferQueue')?.[0] + const bufferGet = await context.bufferStore.get(message.buffer) + assert.ok(bufferGet.ok) + assert.ok(bufferGet.ok?.block.equals(message.buffer)) + assert.equal(bufferGet.ok?.buffer.group, group) + assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) + assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces + 1) + + // prepended piece + assert.ok(bufferGet.ok?.buffer.pieces.find(p => p.piece.link().equals(bufferedPiece.piece.link()))) + assert.ok(bufferGet.ok?.buffer.pieces[0].piece.link().equals(bufferedPiece.piece.link())) + }, 'handles buffer queue messages successfully to queue aggregate and remaining buffer': async (assert, context) => { const group = context.id.did() From 88233565d174bc091bbc6381e4647577e3f81299 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 6 Feb 2024 14:51:07 +0100 Subject: [PATCH 2/2] chore: address review comments --- .../src/aggregator/buffer-reducing.js | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index 7f10a4f07..0bf74d9f9 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -159,12 +159,8 @@ export async function handleBufferReducingWithoutAggregate({ * @param {BufferedPiece[]} [config.prependBufferedPieces] */ export function aggregatePieces(bufferedPieces, config) { - const transformedBufferedPieces = [ - ...(config.prependBufferedPieces || []), - ...bufferedPieces - ] // Guarantee buffered pieces total size is bigger than the minimum utilization - const bufferUtilizationSize = transformedBufferedPieces.reduce((total, p) => { + const bufferUtilizationSize = bufferedPieces.reduce((total, p) => { const piece = Piece.fromLink(p.piece) total += piece.size return total @@ -187,7 +183,17 @@ export function aggregatePieces(bufferedPieces, config) { /** @type {BufferedPiece[]} */ const remainingBufferedPieces = [] - for (const bufferedPiece of transformedBufferedPieces) { + // start by adding prepend buffered pieces if available + for (const bufferedPiece of (config.prependBufferedPieces || [])) { + const p = Piece.fromLink(bufferedPiece.piece) + if (builder.estimate(p).error) { + throw new Error('aggregate builder is not able to create aggregates with only prepend buffered pieces') + } + builder.write(p) + addedBufferedPieces.push(bufferedPiece) + } + + for (const bufferedPiece of bufferedPieces) { const p = Piece.fromLink(bufferedPiece.piece) if (builder.estimate(p).error) { remainingBufferedPieces.push(bufferedPiece)