Skip to content

Commit

Permalink
refactor: simplify bufferTime.
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Feb 28, 2024
1 parent 3d2cf10 commit e375e74
Showing 1 changed file with 27 additions and 29 deletions.
56 changes: 27 additions & 29 deletions packages/rxjs/src/internal/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,34 +132,32 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper

startBuffer();

const bufferTimeSubscriber = operate({
destination,
next: (value: T) => {
// Copy the records, so if we need to remove one we
// don't mutate the array. It's hard, but not impossible to
// set up a buffer time that could mutate the array and
// cause issues here.
const recordsCopy = bufferRecords!.slice();
for (const record of recordsCopy) {
// Loop over all buffers and
const { buffer } = record;
buffer.push(value);
// If the buffer is over the max size, we need to emit it.
maxBufferSize <= buffer.length && emit(record);
}
},
complete: () => {
// The source completed, emit all of the active
// buffers we have before we complete.
while (bufferRecords?.length) {
destination.next(bufferRecords.shift()!.buffer);
}
bufferTimeSubscriber?.unsubscribe();
destination.complete();
destination.unsubscribe();
},
});

source.subscribe(bufferTimeSubscriber);
source.subscribe(
operate({
destination,
next: (value: T) => {
// Copy the records, so if we need to remove one we
// don't mutate the array. It's hard, but not impossible to
// set up a buffer time that could mutate the array and
// cause issues here.
const recordsCopy = bufferRecords!.slice();
for (const record of recordsCopy) {
// Loop over all buffers and
const { buffer } = record;
buffer.push(value);
// If the buffer is over the max size, we need to emit it.
maxBufferSize <= buffer.length && emit(record);
}
},
complete: () => {
// The source completed, emit all of the active
// buffers we have before we complete.
while (bufferRecords?.length) {
destination.next(bufferRecords.shift()!.buffer);
}
destination.complete();
},
})
);
});
}

0 comments on commit e375e74

Please sign in to comment.