Skip to content

Commit

Permalink
Ensure the last element of reduction in the throttle is emitted and u…
Browse files Browse the repository at this point in the history
…se appropriate delay (#292)
  • Loading branch information
phausler authored Sep 21, 2023
1 parent c889832 commit 220f86f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
11 changes: 10 additions & 1 deletion Sources/AsyncAlgorithms/AsyncThrottleSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,16 @@ extension AsyncThrottleSequence: AsyncSequence {
let start = last ?? clock.now
repeat {
guard let element = try await base.next() else {
return nil
if reduced != nil, let last {
// ensure the rate of elements never exceeds the given interval
let amount = interval - last.duration(to: clock.now)
if amount > .zero {
try? await clock.sleep(for: amount)
}
}
// the last value is unable to have any subsequent
// values so always return the last reduction
return reduced
}
let reduction = await reducing(reduced, element)
let now = clock.now
Expand Down
22 changes: 20 additions & 2 deletions Tests/AsyncAlgorithmsTests/TestThrottle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ final class TestThrottle: XCTestCase {
validate {
"abcdefghijk|"
$0.inputs[0].throttle(for: .steps(3), clock: $0.clock)
"a--d--g--j-|"
"a--d--g--j--[k|]"
}
}

Expand All @@ -81,7 +81,7 @@ final class TestThrottle: XCTestCase {
validate {
"abcdefghijk|"
$0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: false)
"a--b--e--h-|"
"a--b--e--h--[k|]"
}
}

Expand Down Expand Up @@ -138,4 +138,22 @@ final class TestThrottle: XCTestCase {
"-a---c---e---g---i---k-|"
}
}

func test_trailing_delay_without_latest() throws {
guard #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) else { throw XCTSkip("Skipped due to Clock/Instant/Duration availability") }
validate {
"abcdefghijkl|"
$0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: false)
"a--b--e--h--[k|]"
}
}

func test_trailing_delay_with_latest() throws {
guard #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) else { throw XCTSkip("Skipped due to Clock/Instant/Duration availability") }
validate {
"abcdefghijkl|"
$0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
"a--d--g--j--[l|]"
}
}
}

0 comments on commit 220f86f

Please sign in to comment.