Skip to content

Commit

Permalink
refactor(rxjs): simplify bufferTime
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Feb 29, 2024
1 parent d8c4e84 commit 63fff0f
Showing 1 changed file with 28 additions and 29 deletions.
57 changes: 28 additions & 29 deletions packages/rxjs/src/internal/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,34 +131,33 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper

startBuffer();

const bufferTimeSubscriber = operate({
destination,
next: (value: T) => {
// Copy the records, so if we need to remove one we
// don't mutate the array. It's hard, but not impossible to
// set up a buffer time that could mutate the array and
// cause issues here.
const recordsCopy = bufferRecords!.slice();
for (const record of recordsCopy) {
// Loop over all buffers and
const { buffer } = record;
buffer.push(value);
// If the buffer is over the max size, we need to emit it.
maxBufferSize <= buffer.length && emit(record);
}
},
complete: () => {
// The source completed, emit all of the active
// buffers we have before we complete.
while (bufferRecords?.length) {
destination.next(bufferRecords.shift()!.buffer);
}
bufferTimeSubscriber?.unsubscribe();
destination.complete();
destination.unsubscribe();
},
});

source.subscribe(bufferTimeSubscriber);
source.subscribe(
operate({
destination,
next: (value: T) => {
// Copy the records, so if we need to remove one we
// don't mutate the array. It's hard, but not impossible to
// set up a buffer time that could mutate the array and
// cause issues here.
const recordsCopy = bufferRecords!.slice();
for (const record of recordsCopy) {
// Loop over all buffers and
const { buffer } = record;
buffer.push(value);
// If the buffer is over the max size, we need to emit it.
maxBufferSize <= buffer.length && emit(record);
}
},
complete: () => {
// The source completed, emit all of the active
// buffers we have before we complete.
while (bufferRecords?.length) {
destination.next(bufferRecords.shift()!.buffer);
}
destination.complete();
destination.unsubscribe();
},
})
);
});
}

0 comments on commit 63fff0f

Please sign in to comment.