diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index fd134c541..c1262d0ab 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -227,14 +227,20 @@ export async function getBufferedPieces(bufferPieces, bufferStore) { bufferPieces.map((bufferPiece) => bufferStore.get(bufferPiece)) ) - // Concatenate pieces and sort them by policy and size + // Concatenate pieces uniquely and sort them by policy and size /** @type {BufferedPiece[]} */ let bufferedPieces = [] + const uniquePieces = new Set() for (const b of getBufferRes) { if (b.error) return b - bufferedPieces = bufferedPieces.concat(b.ok.buffer.pieces || []) + for (const piece of b.ok.buffer.pieces) { + const isDuplicate = uniquePieces.has(piece.piece.toString()) + if (!isDuplicate) { + bufferedPieces.push(piece) + uniquePieces.add(piece.piece.toString()) + } + } } - bufferedPieces.sort(sortPieces) return { diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index c15844c59..d2de21104 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -6,6 +6,7 @@ import { CBOR } from '@ucanto/core' import * as API from '../../src/types.js' import * as TestAPI from '../types.js' import * as AggregatorEvents from '../../src/aggregator/events.js' +import { getBufferedPieces } from '../../src/aggregator/buffer-reducing.js' import { FailingStore } from '../context/store.js' import { FailingQueue } from '../context/queue.js' @@ -175,6 +176,29 @@ export const test = { bufferQueue: new FailingQueue(), }) ), + 'handles buffer queue messages repeated items as unique': async ( + assert, + context + ) => { + const group = context.id.did() + const { buffers, blocks } = await getBuffers(1, group) + + // Store buffers + for (let i = 0; i < blocks.length; i++) { + const putBufferRes = await context.bufferStore.put({ + buffer: buffers[i], + block: blocks[i].cid, + }) + assert.ok(putBufferRes.ok) + } + + const bufferedPieces = await getBufferedPieces( + [blocks[0].cid, blocks[0].cid], + context.bufferStore + ) + + assert.equal(bufferedPieces.ok?.bufferedPieces.length, buffers[0].pieces.length) + }, 'handles buffer queue messages successfully to requeue bigger buffer': async ( assert, context