diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Throughput.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Throughput.java
new file mode 100644
index 0000000000..f0f54140d1
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Throughput.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.common.base.MoreObjects;
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * Convenience class to encapsulate the concept of a throughput value.
+ *
+ *
Given a number of bytes and a duration compute the number of bytes per second.
+ */
+final class Throughput {
+
+ private static final double NANOS_PER_SECOND = 1_000_000_000d;
+ private final long numBytes;
+ private final Duration duration;
+
+ // TODO: is there a efficient way we can limit precision without having to use BigDecimal?
+ // Realistically, we don't need precision smaller than 1 byte per microsecond, leading to
+ // 6 digits past the decimal of needed precision.
+ private final double bytesPerSecond;
+
+ private Throughput(long numBytes, Duration duration) {
+ this.numBytes = numBytes;
+ this.duration = duration;
+ this.bytesPerSecond = numBytes / (duration.toNanos() / NANOS_PER_SECOND);
+ }
+
+ public long getNumBytes() {
+ return numBytes;
+ }
+
+ public Duration getDuration() {
+ return duration;
+ }
+
+ public double toBps() {
+ return bytesPerSecond;
+ }
+
+ public Throughput plus(Throughput other) {
+ return new Throughput(this.numBytes + other.numBytes, this.duration.plus(other.duration));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Throughput)) {
+ return false;
+ }
+ Throughput that = (Throughput) o;
+ return Double.compare(that.bytesPerSecond, bytesPerSecond) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bytesPerSecond);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("bytesPerSecond", bytesPerSecond).toString();
+ }
+
+ public static Throughput zero() {
+ return new Throughput(0, Duration.ZERO);
+ }
+
+ public static Throughput of(long numBytes, Duration duration) {
+ return new Throughput(numBytes, duration);
+ }
+
+ public static Throughput bytesPerSecond(long numBytes) {
+ return new Throughput(numBytes, Duration.ofSeconds(1));
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputMovingWindow.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputMovingWindow.java
new file mode 100644
index 0000000000..bc10ecc36c
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputMovingWindow.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.common.base.MoreObjects;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+/**
+ * A simple moving window implementation which will keep a {@code window}s worth of Throughput
+ * values and allow querying for the aggregate avg over that time window.
+ */
+final class ThroughputMovingWindow {
+
+ private final Duration window;
+
+ private final PriorityQueue values;
+
+ private ThroughputMovingWindow(Duration window) {
+ this.window = window;
+ this.values = new PriorityQueue<>(Entry.COMP);
+ }
+
+ void add(Instant now, Throughput value) {
+ houseKeeping(now);
+ values.add(new Entry(now, value));
+ }
+
+ Throughput avg(Instant now) {
+ houseKeeping(now);
+ return values.stream()
+ .map(Entry::getValue)
+ .reduce(
+ Throughput.zero(),
+ (tp1, tp2) -> Throughput.of(tp1.getNumBytes() + tp2.getNumBytes(), window));
+ }
+
+ private void houseKeeping(Instant now) {
+ Instant newMin = now.minus(window);
+ values.removeIf(e -> lteq(e.getAt(), newMin));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("window", window)
+ .add("values.size()", values.size())
+ .toString();
+ }
+
+ static ThroughputMovingWindow of(Duration window) {
+ return new ThroughputMovingWindow(window);
+ }
+
+ private static boolean lteq(Instant a, Instant b) {
+ return a.equals(b) || a.isBefore(b);
+ }
+
+ private static final class Entry {
+ private static final Comparator COMP = Comparator.comparing(e -> e.at);
+ private final Instant at;
+ private final Throughput value;
+
+ private Entry(Instant at, Throughput value) {
+ this.at = at;
+ this.value = value;
+ }
+
+ public Instant getAt() {
+ return at;
+ }
+
+ public Throughput getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("at", at).add("value", value).toString();
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java
new file mode 100644
index 0000000000..5ef6e37d10
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.logging.Logger;
+
+/**
+ * Interface to mark a location in which throughput of byte movements can be recorded, and which can
+ * provide a decorated underlying channel.
+ */
+interface ThroughputSink {
+
+ void recordThroughput(Record r);
+
+ WritableByteChannel decorate(WritableByteChannel wbc);
+
+ static void computeThroughput(Clock clock, ThroughputSink sink, long numBytes, IO io)
+ throws IOException {
+ boolean exception = false;
+ Instant begin = clock.instant();
+ try {
+ io.apply();
+ } catch (IOException e) {
+ exception = true;
+ throw e;
+ } finally {
+ Instant end = clock.instant();
+ Record record = Record.of(numBytes, begin, end, exception);
+ sink.recordThroughput(record);
+ }
+ }
+
+ @FunctionalInterface
+ interface IO {
+ void apply() throws IOException;
+ }
+
+ static ThroughputSink logged(String prefix, Clock clock) {
+ return new LoggedThroughputSink(prefix, clock);
+ }
+
+ static ThroughputSink windowed(ThroughputMovingWindow w, Clock clock) {
+ return new ThroughputMovingWindowThroughputSink(w, clock);
+ }
+
+ static ThroughputSink tee(ThroughputSink a, ThroughputSink b) {
+ return new TeeThroughputSink(a, b);
+ }
+
+ final class Record {
+ private final long numBytes;
+ private final Instant begin;
+ private final Instant end;
+ private final boolean exception;
+
+ private Record(long numBytes, Instant begin, Instant end, boolean exception) {
+ this.numBytes = numBytes;
+ this.begin = begin;
+ this.end = end;
+ this.exception = exception;
+ }
+
+ public long getNumBytes() {
+ return numBytes;
+ }
+
+ public Instant getBegin() {
+ return begin;
+ }
+
+ public Instant getEnd() {
+ return end;
+ }
+
+ public Duration getDuration() {
+ return Duration.between(begin, end);
+ }
+
+ public boolean isException() {
+ return exception;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Record)) {
+ return false;
+ }
+ Record record = (Record) o;
+ return numBytes == record.numBytes
+ && exception == record.exception
+ && Objects.equals(begin, record.begin)
+ && Objects.equals(end, record.end);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(numBytes, begin, end, exception);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("numBytes", numBytes)
+ .add("begin", begin)
+ .add("end", end)
+ .add("exception", exception)
+ .toString();
+ }
+
+ public static Record of(long numBytes, Instant begin, Instant end, boolean exception) {
+ return new Record(numBytes, begin, end, exception);
+ }
+ }
+
+ final class LoggedThroughputSink implements ThroughputSink {
+ private static final Logger LOGGER = Logger.getLogger(ThroughputSink.class.getName());
+
+ private final String prefix;
+ private final Clock clock;
+
+ private LoggedThroughputSink(String prefix, Clock clock) {
+ this.prefix = prefix;
+ this.clock = clock;
+ }
+
+ private static final double MiB = 1d / (1024 * 1024);
+
+ @Override
+ public void recordThroughput(Record r) {
+ LOGGER.info(
+ () ->
+ String.format(
+ "{%s} (%01.03f MiB/s) %s",
+ prefix,
+ ((r.numBytes * MiB)
+ / (Duration.between(r.getBegin(), r.getEnd()).toMillis() / 1000d)),
+ r));
+ }
+
+ @Override
+ public WritableByteChannel decorate(WritableByteChannel wbc) {
+ return new ThroughputRecordingWritableByteChannel(wbc, this, clock);
+ }
+ }
+
+ final class ThroughputRecordingWritableByteChannel implements WritableByteChannel {
+ private final WritableByteChannel delegate;
+ private final ThroughputSink sink;
+ private final Clock clock;
+
+ private ThroughputRecordingWritableByteChannel(
+ WritableByteChannel delegate, ThroughputSink sink, Clock clock) {
+ this.delegate = delegate;
+ this.sink = sink;
+ this.clock = clock;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ boolean exception = false;
+ int remaining = src.remaining();
+ Instant begin = clock.instant();
+ try {
+ return delegate.write(src);
+ } catch (IOException e) {
+ exception = true;
+ throw e;
+ } finally {
+ Instant end = clock.instant();
+ Record record = Record.of(remaining - src.remaining(), begin, end, exception);
+ sink.recordThroughput(record);
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return delegate.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+ }
+
+ final class TeeThroughputSink implements ThroughputSink {
+ private final ThroughputSink a;
+ private final ThroughputSink b;
+
+ private TeeThroughputSink(ThroughputSink a, ThroughputSink b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ @Override
+ public void recordThroughput(Record r) {
+ a.recordThroughput(r);
+ b.recordThroughput(r);
+ }
+
+ @Override
+ public WritableByteChannel decorate(WritableByteChannel wbc) {
+ return b.decorate(a.decorate(wbc));
+ }
+ }
+
+ final class ThroughputMovingWindowThroughputSink implements ThroughputSink {
+ private final ThroughputMovingWindow w;
+ private final Clock clock;
+
+ private ThroughputMovingWindowThroughputSink(ThroughputMovingWindow w, Clock clock) {
+ this.w = w;
+ this.clock = clock;
+ }
+
+ @Override
+ public synchronized void recordThroughput(Record r) {
+ w.add(r.end, Throughput.of(r.getNumBytes(), r.getDuration()));
+ }
+
+ @Override
+ public WritableByteChannel decorate(WritableByteChannel wbc) {
+ return new ThroughputRecordingWritableByteChannel(wbc, this, clock);
+ }
+ }
+}
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/TestClock.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestClock.java
new file mode 100644
index 0000000000..e37a4f0248
--- /dev/null
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestClock.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.function.UnaryOperator;
+
+final class TestClock extends Clock {
+
+ private final Instant begin;
+ private final UnaryOperator next;
+
+ private Instant now;
+
+ private TestClock(Instant begin, UnaryOperator next) {
+ this.begin = begin;
+ this.next = next;
+ this.now = begin;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ throw new UnsupportedOperationException("TestClock.getZone()");
+ }
+
+ @Override
+ public Clock withZone(ZoneId zone) {
+ throw new UnsupportedOperationException("TestClock.withZone()");
+ }
+
+ @Override
+ public Instant instant() {
+ Instant ret = now;
+ now = next.apply(now);
+ return ret;
+ }
+
+ public static TestClock tickBy(Instant begin, Duration d) {
+ return of(begin, i -> i.plus(d));
+ }
+
+ public static TestClock of(Instant begin, UnaryOperator next) {
+ return new TestClock(begin, next);
+ }
+}
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputMovingWindowPropertyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputMovingWindowPropertyTest.java
new file mode 100644
index 0000000000..181f51aedf
--- /dev/null
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputMovingWindowPropertyTest.java
@@ -0,0 +1,422 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.cloud.storage.ByteSizeConstants._5TiB;
+import static com.google.common.truth.Truth.assertWithMessage;
+import static java.time.Instant.EPOCH;
+import static java.time.Instant.ofEpochSecond;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import net.jqwik.api.Arbitraries;
+import net.jqwik.api.Arbitrary;
+import net.jqwik.api.Combinators;
+import net.jqwik.api.Example;
+import net.jqwik.api.ForAll;
+import net.jqwik.api.Property;
+import net.jqwik.api.Provide;
+import net.jqwik.api.Tuple;
+import net.jqwik.api.Tuple.Tuple1;
+import net.jqwik.time.api.DateTimes;
+import net.jqwik.time.api.Times;
+
+final class ThroughputMovingWindowPropertyTest {
+
+ private static final double TOLERANCE = 0.001;
+
+ @Example
+ void canned() {
+ test(CANNED_SCENARIO);
+ }
+
+ @Example
+ void twoEntriesSameTimeDifferentThroughput() {
+ Duration ms = Duration.ofMillis(1);
+ ScenarioTimeline scenario =
+ new ScenarioTimeline(
+ ms,
+ ImmutableList.of(
+ new TimelineEntry(EPOCH, Throughput.of(1, ms), 1000.0),
+ new TimelineEntry(EPOCH, Throughput.of(0, ms), 1000.0)));
+ test(scenario);
+ }
+
+ @Property
+ void test(@ForAll("Scenarios") ScenarioTimeline scenario) {
+ ThroughputMovingWindow window = ThroughputMovingWindow.of(scenario.d);
+ for (TimelineEntry timelineEntry : scenario.timelineEntries) {
+ window.add(timelineEntry.i, timelineEntry.t);
+ Throughput throughput = window.avg(timelineEntry.i);
+ assertWithMessage(timelineEntry.toString())
+ .that(throughput.toBps())
+ .isWithin(TOLERANCE)
+ .of(timelineEntry.expectedMovingAvgBytesPerSecond);
+ }
+ }
+
+ @Provide("Scenarios")
+ static Arbitrary scenarioTimeline() {
+ return Times.durations()
+ .ofPrecision(ChronoUnit.MILLIS)
+ .between(Duration.ofMillis(1), Duration.ofMinutes(10))
+ .flatMap(
+ d ->
+ Combinators.combine(
+ Arbitraries.just(d),
+ // pick an instant, then generate 1 to 100 values between it and d * 3
+ DateTimes.instants()
+ .ofPrecision(ChronoUnit.MILLIS)
+ .flatMap(
+ i ->
+ DateTimes.instants()
+ .ofPrecision(ChronoUnit.MILLIS)
+ .between(i, i.plus(d.multipliedBy(3)))
+ .flatMap(
+ ii ->
+ Combinators.combine(
+ Arbitraries.just(ii), throughput())
+ .as(Tuple::of))
+ .list()
+ .ofMinSize(1)
+ .ofMaxSize(100)))
+ .as(Tuple::of))
+ .map(ScenarioTimeline::create);
+ }
+
+ static Arbitrary throughput() {
+ return Times.durations()
+ .ofPrecision(ChronoUnit.MILLIS)
+ .between(Duration.ofMillis(1), Duration.ofMinutes(10))
+ .flatMap(d -> Arbitraries.longs().between(0, _5TiB).map(n -> Throughput.of(n, d)));
+ }
+
+ private static final class ScenarioTimeline {
+
+ private static final Comparator> COMP =
+ Comparator.comparing(Tuple1::get1);
+ private final Duration d;
+ private final List timelineEntries;
+
+ private ScenarioTimeline(Duration d, List timelineEntries) {
+ this.d = d;
+ this.timelineEntries = timelineEntries;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("d", d)
+ .add("timelineEntries", timelineEntries)
+ .toString();
+ }
+
+ static ScenarioTimeline create(
+ Tuple.Tuple2>> tuples) {
+
+ Duration d = tuples.get1();
+ List> pairs = tuples.get2();
+
+ List> tmp =
+ pairs.stream().sorted(COMP).collect(Collectors.toList());
+
+ List>> windows = new ArrayList<>();
+ int last = tmp.size() - 1;
+ for (int i = last; i >= 0; i--) {
+ List> window = new ArrayList<>();
+ Tuple.Tuple2 t = tmp.get(i);
+ window.add(t);
+ Instant min = t.get1().minus(d);
+ for (int j = i - 1; j >= 0; j--) {
+ Tuple.Tuple2 r = tmp.get(j);
+ if (r.get1().isAfter(min)) {
+ window.add(r);
+ }
+ }
+ windows.add(ImmutableList.copyOf(window));
+ }
+
+ ImmutableList timelineEntries =
+ windows.stream()
+ .map(
+ w -> {
+ Tuple.Tuple2 max = w.get(0);
+ Throughput reduce =
+ w.stream()
+ .map(Tuple.Tuple2::get2)
+ .reduce(Throughput.zero(), Throughput::plus);
+ return new TimelineEntry(
+ max.get1(), max.get2(), Throughput.of(reduce.getNumBytes(), d).toBps());
+ })
+ .collect(ImmutableList.toImmutableList());
+ return new ScenarioTimeline(d, timelineEntries.reverse());
+ }
+ }
+
+ private static final class TimelineEntry {
+ private final Instant i;
+ private final Throughput t;
+ private final double expectedMovingAvgBytesPerSecond;
+
+ private TimelineEntry(Instant i, Throughput t, double expectedMovingAvgBytesPerSecond) {
+ this.i = i;
+ this.t = t;
+ this.expectedMovingAvgBytesPerSecond = expectedMovingAvgBytesPerSecond;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("i", i)
+ .add("t", t)
+ .add("tenSecMovingAvg", String.format("%,.03f", expectedMovingAvgBytesPerSecond))
+ .toString();
+ }
+ }
+
+ private static final ScenarioTimeline CANNED_SCENARIO =
+ new ScenarioTimeline(
+ Duration.ofSeconds(10),
+ ImmutableList.builder()
+ .add(new TimelineEntry(ofEpochSecond(1), Throughput.bytesPerSecond(192), 19.2))
+ .add(new TimelineEntry(ofEpochSecond(2), Throughput.bytesPerSecond(1185), 137.7))
+ .add(new TimelineEntry(ofEpochSecond(3), Throughput.bytesPerSecond(1363), 274.))
+ .add(new TimelineEntry(ofEpochSecond(4), Throughput.bytesPerSecond(234), 297.4))
+ .add(new TimelineEntry(ofEpochSecond(5), Throughput.bytesPerSecond(1439), 441.3))
+ .add(new TimelineEntry(ofEpochSecond(6), Throughput.bytesPerSecond(1269), 568.2))
+ .add(new TimelineEntry(ofEpochSecond(7), Throughput.bytesPerSecond(692), 637.4))
+ .add(new TimelineEntry(ofEpochSecond(8), Throughput.bytesPerSecond(667), 704.1))
+ .add(new TimelineEntry(ofEpochSecond(9), Throughput.bytesPerSecond(1318), 835.9))
+ .add(new TimelineEntry(ofEpochSecond(10), Throughput.bytesPerSecond(1125), 948.4))
+ .add(new TimelineEntry(ofEpochSecond(11), Throughput.bytesPerSecond(1124), 1041.6))
+ .add(new TimelineEntry(ofEpochSecond(12), Throughput.bytesPerSecond(3), 923.4))
+ .add(new TimelineEntry(ofEpochSecond(13), Throughput.bytesPerSecond(185), 805.6))
+ .add(new TimelineEntry(ofEpochSecond(14), Throughput.bytesPerSecond(726), 854.8))
+ .add(new TimelineEntry(ofEpochSecond(15), Throughput.bytesPerSecond(630), 773.9))
+ .add(new TimelineEntry(ofEpochSecond(16), Throughput.bytesPerSecond(874), 734.4))
+ .add(new TimelineEntry(ofEpochSecond(17), Throughput.bytesPerSecond(1401), 805.3))
+ .add(new TimelineEntry(ofEpochSecond(18), Throughput.bytesPerSecond(533), 791.9))
+ .add(new TimelineEntry(ofEpochSecond(19), Throughput.bytesPerSecond(446), 704.7))
+ .add(new TimelineEntry(ofEpochSecond(20), Throughput.bytesPerSecond(801), 672.3))
+ .add(new TimelineEntry(ofEpochSecond(21), Throughput.bytesPerSecond(61), 566.))
+ .add(new TimelineEntry(ofEpochSecond(22), Throughput.bytesPerSecond(1104), 676.1))
+ .add(new TimelineEntry(ofEpochSecond(23), Throughput.bytesPerSecond(972), 754.8))
+ .add(new TimelineEntry(ofEpochSecond(24), Throughput.bytesPerSecond(1310), 813.2))
+ .add(new TimelineEntry(ofEpochSecond(25), Throughput.bytesPerSecond(408), 791.))
+ .add(new TimelineEntry(ofEpochSecond(26), Throughput.bytesPerSecond(759), 779.5))
+ .add(new TimelineEntry(ofEpochSecond(27), Throughput.bytesPerSecond(674), 706.8))
+ .add(new TimelineEntry(ofEpochSecond(28), Throughput.bytesPerSecond(314), 684.9))
+ .add(new TimelineEntry(ofEpochSecond(29), Throughput.bytesPerSecond(1311), 771.4))
+ .add(new TimelineEntry(ofEpochSecond(30), Throughput.bytesPerSecond(449), 736.2))
+ .add(new TimelineEntry(ofEpochSecond(31), Throughput.bytesPerSecond(1273), 857.4))
+ .add(new TimelineEntry(ofEpochSecond(32), Throughput.bytesPerSecond(228), 769.8))
+ .add(new TimelineEntry(ofEpochSecond(33), Throughput.bytesPerSecond(605), 733.1))
+ .add(new TimelineEntry(ofEpochSecond(34), Throughput.bytesPerSecond(537), 655.8))
+ .add(new TimelineEntry(ofEpochSecond(35), Throughput.bytesPerSecond(1498), 764.8))
+ .add(new TimelineEntry(ofEpochSecond(36), Throughput.bytesPerSecond(694), 758.3))
+ .add(new TimelineEntry(ofEpochSecond(37), Throughput.bytesPerSecond(155), 706.4))
+ .add(new TimelineEntry(ofEpochSecond(38), Throughput.bytesPerSecond(983), 773.3))
+ .add(new TimelineEntry(ofEpochSecond(39), Throughput.bytesPerSecond(1359), 778.1))
+ .add(new TimelineEntry(ofEpochSecond(40), Throughput.bytesPerSecond(832), 816.4))
+ .add(new TimelineEntry(ofEpochSecond(41), Throughput.bytesPerSecond(1041), 793.2))
+ .add(new TimelineEntry(ofEpochSecond(42), Throughput.bytesPerSecond(1459), 916.3))
+ .add(new TimelineEntry(ofEpochSecond(43), Throughput.bytesPerSecond(1128), 968.6))
+ .add(new TimelineEntry(ofEpochSecond(44), Throughput.bytesPerSecond(1318), 1046.7))
+ .add(new TimelineEntry(ofEpochSecond(45), Throughput.bytesPerSecond(620), 958.9))
+ .add(new TimelineEntry(ofEpochSecond(46), Throughput.bytesPerSecond(1133), 1002.8))
+ .add(new TimelineEntry(ofEpochSecond(47), Throughput.bytesPerSecond(568), 1044.1))
+ .add(new TimelineEntry(ofEpochSecond(48), Throughput.bytesPerSecond(561), 1001.9))
+ .add(new TimelineEntry(ofEpochSecond(49), Throughput.bytesPerSecond(1483), 1014.3))
+ .add(new TimelineEntry(ofEpochSecond(50), Throughput.bytesPerSecond(1405), 1071.6))
+ .add(new TimelineEntry(ofEpochSecond(51), Throughput.bytesPerSecond(435), 1011.))
+ .add(new TimelineEntry(ofEpochSecond(52), Throughput.bytesPerSecond(664), 931.5))
+ .add(new TimelineEntry(ofEpochSecond(53), Throughput.bytesPerSecond(1330), 951.7))
+ .add(new TimelineEntry(ofEpochSecond(54), Throughput.bytesPerSecond(540), 873.9))
+ .add(new TimelineEntry(ofEpochSecond(55), Throughput.bytesPerSecond(847), 896.6))
+ .add(new TimelineEntry(ofEpochSecond(56), Throughput.bytesPerSecond(1231), 906.4))
+ .add(new TimelineEntry(ofEpochSecond(57), Throughput.bytesPerSecond(1331), 982.7))
+ .add(new TimelineEntry(ofEpochSecond(58), Throughput.bytesPerSecond(154), 942.))
+ .add(new TimelineEntry(ofEpochSecond(59), Throughput.bytesPerSecond(801), 873.8))
+ .add(new TimelineEntry(ofEpochSecond(60), Throughput.bytesPerSecond(499), 783.2))
+ .add(new TimelineEntry(ofEpochSecond(61), Throughput.bytesPerSecond(766), 816.3))
+ .add(new TimelineEntry(ofEpochSecond(62), Throughput.bytesPerSecond(1166), 866.5))
+ .add(new TimelineEntry(ofEpochSecond(63), Throughput.bytesPerSecond(1408), 874.3))
+ .add(new TimelineEntry(ofEpochSecond(64), Throughput.bytesPerSecond(1145), 934.8))
+ .add(new TimelineEntry(ofEpochSecond(65), Throughput.bytesPerSecond(433), 893.4))
+ .add(new TimelineEntry(ofEpochSecond(66), Throughput.bytesPerSecond(1256), 895.9))
+ .add(new TimelineEntry(ofEpochSecond(67), Throughput.bytesPerSecond(847), 847.5))
+ .add(new TimelineEntry(ofEpochSecond(68), Throughput.bytesPerSecond(1421), 974.2))
+ .add(new TimelineEntry(ofEpochSecond(69), Throughput.bytesPerSecond(347), 928.8))
+ .add(new TimelineEntry(ofEpochSecond(70), Throughput.bytesPerSecond(52), 884.1))
+ .add(new TimelineEntry(ofEpochSecond(71), Throughput.bytesPerSecond(19), 809.4))
+ .add(new TimelineEntry(ofEpochSecond(72), Throughput.bytesPerSecond(1191), 811.9))
+ .add(new TimelineEntry(ofEpochSecond(73), Throughput.bytesPerSecond(104), 681.5))
+ .add(new TimelineEntry(ofEpochSecond(74), Throughput.bytesPerSecond(640), 631.))
+ .add(new TimelineEntry(ofEpochSecond(75), Throughput.bytesPerSecond(535), 641.2))
+ .add(new TimelineEntry(ofEpochSecond(76), Throughput.bytesPerSecond(203), 535.9))
+ .add(new TimelineEntry(ofEpochSecond(77), Throughput.bytesPerSecond(51), 456.3))
+ .add(new TimelineEntry(ofEpochSecond(78), Throughput.bytesPerSecond(1117), 425.9))
+ .add(new TimelineEntry(ofEpochSecond(79), Throughput.bytesPerSecond(1390), 530.2))
+ .add(new TimelineEntry(ofEpochSecond(80), Throughput.bytesPerSecond(262), 551.2))
+ .add(new TimelineEntry(ofEpochSecond(81), Throughput.bytesPerSecond(5), 549.8))
+ .add(new TimelineEntry(ofEpochSecond(82), Throughput.bytesPerSecond(802), 510.9))
+ .add(new TimelineEntry(ofEpochSecond(83), Throughput.bytesPerSecond(529), 553.4))
+ .add(new TimelineEntry(ofEpochSecond(84), Throughput.bytesPerSecond(1261), 615.5))
+ .add(new TimelineEntry(ofEpochSecond(85), Throughput.bytesPerSecond(1192), 681.2))
+ .add(new TimelineEntry(ofEpochSecond(86), Throughput.bytesPerSecond(276), 688.5))
+ .add(new TimelineEntry(ofEpochSecond(87), Throughput.bytesPerSecond(457), 729.1))
+ .add(new TimelineEntry(ofEpochSecond(88), Throughput.bytesPerSecond(799), 697.3))
+ .add(new TimelineEntry(ofEpochSecond(89), Throughput.bytesPerSecond(443), 602.6))
+ .add(new TimelineEntry(ofEpochSecond(90), Throughput.bytesPerSecond(1281), 704.5))
+ .add(new TimelineEntry(ofEpochSecond(91), Throughput.bytesPerSecond(97), 713.7))
+ .add(new TimelineEntry(ofEpochSecond(92), Throughput.bytesPerSecond(895), 723.))
+ .add(new TimelineEntry(ofEpochSecond(93), Throughput.bytesPerSecond(1338), 803.9))
+ .add(new TimelineEntry(ofEpochSecond(94), Throughput.bytesPerSecond(554), 733.2))
+ .add(new TimelineEntry(ofEpochSecond(95), Throughput.bytesPerSecond(302), 644.2))
+ .add(new TimelineEntry(ofEpochSecond(96), Throughput.bytesPerSecond(518), 668.4))
+ .add(new TimelineEntry(ofEpochSecond(97), Throughput.bytesPerSecond(502), 672.9))
+ .add(new TimelineEntry(ofEpochSecond(98), Throughput.bytesPerSecond(517), 644.7))
+ .add(new TimelineEntry(ofEpochSecond(99), Throughput.bytesPerSecond(172), 617.6))
+ .add(new TimelineEntry(ofEpochSecond(100), Throughput.bytesPerSecond(909), 580.4))
+ .add(new TimelineEntry(ofEpochSecond(101), Throughput.bytesPerSecond(1233), 694.))
+ .add(new TimelineEntry(ofEpochSecond(102), Throughput.bytesPerSecond(189), 623.4))
+ .add(new TimelineEntry(ofEpochSecond(103), Throughput.bytesPerSecond(244), 514.))
+ .add(new TimelineEntry(ofEpochSecond(104), Throughput.bytesPerSecond(886), 547.2))
+ .add(new TimelineEntry(ofEpochSecond(105), Throughput.bytesPerSecond(796), 596.6))
+ .add(new TimelineEntry(ofEpochSecond(106), Throughput.bytesPerSecond(1072), 652.))
+ .add(new TimelineEntry(ofEpochSecond(107), Throughput.bytesPerSecond(602), 662.))
+ .add(new TimelineEntry(ofEpochSecond(108), Throughput.bytesPerSecond(507), 661.))
+ .add(new TimelineEntry(ofEpochSecond(109), Throughput.bytesPerSecond(432), 687.))
+ .add(new TimelineEntry(ofEpochSecond(110), Throughput.bytesPerSecond(661), 662.2))
+ .add(new TimelineEntry(ofEpochSecond(111), Throughput.bytesPerSecond(1085), 647.4))
+ .add(new TimelineEntry(ofEpochSecond(112), Throughput.bytesPerSecond(157), 644.2))
+ .add(new TimelineEntry(ofEpochSecond(113), Throughput.bytesPerSecond(529), 672.7))
+ .add(new TimelineEntry(ofEpochSecond(114), Throughput.bytesPerSecond(31), 587.2))
+ .add(new TimelineEntry(ofEpochSecond(115), Throughput.bytesPerSecond(464), 554.))
+ .add(new TimelineEntry(ofEpochSecond(116), Throughput.bytesPerSecond(1301), 576.9))
+ .add(new TimelineEntry(ofEpochSecond(117), Throughput.bytesPerSecond(787), 595.4))
+ .add(new TimelineEntry(ofEpochSecond(118), Throughput.bytesPerSecond(908), 635.5))
+ .add(new TimelineEntry(ofEpochSecond(119), Throughput.bytesPerSecond(1316), 723.9))
+ .add(new TimelineEntry(ofEpochSecond(120), Throughput.bytesPerSecond(764), 734.2))
+ .add(new TimelineEntry(ofEpochSecond(121), Throughput.bytesPerSecond(1391), 764.8))
+ .add(new TimelineEntry(ofEpochSecond(122), Throughput.bytesPerSecond(819), 831.))
+ .add(new TimelineEntry(ofEpochSecond(123), Throughput.bytesPerSecond(219), 800.))
+ .add(new TimelineEntry(ofEpochSecond(124), Throughput.bytesPerSecond(601), 857.))
+ .add(new TimelineEntry(ofEpochSecond(125), Throughput.bytesPerSecond(1238), 934.4))
+ .add(new TimelineEntry(ofEpochSecond(126), Throughput.bytesPerSecond(1392), 943.5))
+ .add(new TimelineEntry(ofEpochSecond(127), Throughput.bytesPerSecond(499), 914.7))
+ .add(new TimelineEntry(ofEpochSecond(128), Throughput.bytesPerSecond(1153), 939.2))
+ .add(new TimelineEntry(ofEpochSecond(129), Throughput.bytesPerSecond(1219), 929.5))
+ .add(new TimelineEntry(ofEpochSecond(130), Throughput.bytesPerSecond(519), 905.))
+ .add(new TimelineEntry(ofEpochSecond(131), Throughput.bytesPerSecond(337), 799.6))
+ .add(new TimelineEntry(ofEpochSecond(132), Throughput.bytesPerSecond(1065), 824.2))
+ .add(new TimelineEntry(ofEpochSecond(133), Throughput.bytesPerSecond(789), 881.2))
+ .add(new TimelineEntry(ofEpochSecond(134), Throughput.bytesPerSecond(32), 824.3))
+ .add(new TimelineEntry(ofEpochSecond(135), Throughput.bytesPerSecond(893), 789.8))
+ .add(new TimelineEntry(ofEpochSecond(136), Throughput.bytesPerSecond(1093), 759.9))
+ .add(new TimelineEntry(ofEpochSecond(137), Throughput.bytesPerSecond(1218), 831.8))
+ .add(new TimelineEntry(ofEpochSecond(138), Throughput.bytesPerSecond(159), 732.4))
+ .add(new TimelineEntry(ofEpochSecond(139), Throughput.bytesPerSecond(407), 651.2))
+ .add(new TimelineEntry(ofEpochSecond(140), Throughput.bytesPerSecond(615), 660.8))
+ .add(new TimelineEntry(ofEpochSecond(141), Throughput.bytesPerSecond(1392), 766.3))
+ .add(new TimelineEntry(ofEpochSecond(142), Throughput.bytesPerSecond(1431), 802.9))
+ .add(new TimelineEntry(ofEpochSecond(143), Throughput.bytesPerSecond(270), 751.))
+ .add(new TimelineEntry(ofEpochSecond(144), Throughput.bytesPerSecond(300), 777.8))
+ .add(new TimelineEntry(ofEpochSecond(145), Throughput.bytesPerSecond(1402), 828.7))
+ .add(new TimelineEntry(ofEpochSecond(146), Throughput.bytesPerSecond(308), 750.2))
+ .add(new TimelineEntry(ofEpochSecond(147), Throughput.bytesPerSecond(125), 640.9))
+ .add(new TimelineEntry(ofEpochSecond(148), Throughput.bytesPerSecond(467), 671.7))
+ .add(new TimelineEntry(ofEpochSecond(149), Throughput.bytesPerSecond(1339), 764.9))
+ .add(new TimelineEntry(ofEpochSecond(150), Throughput.bytesPerSecond(1146), 818.))
+ .add(new TimelineEntry(ofEpochSecond(151), Throughput.bytesPerSecond(765), 755.3))
+ .add(new TimelineEntry(ofEpochSecond(152), Throughput.bytesPerSecond(649), 677.1))
+ .add(new TimelineEntry(ofEpochSecond(153), Throughput.bytesPerSecond(1318), 781.9))
+ .add(new TimelineEntry(ofEpochSecond(154), Throughput.bytesPerSecond(199), 771.8))
+ .add(new TimelineEntry(ofEpochSecond(155), Throughput.bytesPerSecond(923), 723.9))
+ .add(new TimelineEntry(ofEpochSecond(156), Throughput.bytesPerSecond(430), 736.1))
+ .add(new TimelineEntry(ofEpochSecond(157), Throughput.bytesPerSecond(158), 739.4))
+ .add(new TimelineEntry(ofEpochSecond(158), Throughput.bytesPerSecond(187), 711.4))
+ .add(new TimelineEntry(ofEpochSecond(159), Throughput.bytesPerSecond(442), 621.7))
+ .add(new TimelineEntry(ofEpochSecond(160), Throughput.bytesPerSecond(82), 515.3))
+ .add(new TimelineEntry(ofEpochSecond(161), Throughput.bytesPerSecond(951), 533.9))
+ .add(new TimelineEntry(ofEpochSecond(162), Throughput.bytesPerSecond(976), 566.6))
+ .add(new TimelineEntry(ofEpochSecond(163), Throughput.bytesPerSecond(1371), 571.9))
+ .add(new TimelineEntry(ofEpochSecond(164), Throughput.bytesPerSecond(547), 606.7))
+ .add(new TimelineEntry(ofEpochSecond(165), Throughput.bytesPerSecond(370), 551.4))
+ .add(new TimelineEntry(ofEpochSecond(166), Throughput.bytesPerSecond(247), 533.1))
+ .add(new TimelineEntry(ofEpochSecond(167), Throughput.bytesPerSecond(660), 583.3))
+ .add(new TimelineEntry(ofEpochSecond(168), Throughput.bytesPerSecond(1222), 686.8))
+ .add(new TimelineEntry(ofEpochSecond(169), Throughput.bytesPerSecond(130), 655.6))
+ .add(new TimelineEntry(ofEpochSecond(170), Throughput.bytesPerSecond(512), 698.6))
+ .add(new TimelineEntry(ofEpochSecond(171), Throughput.bytesPerSecond(873), 690.8))
+ .add(new TimelineEntry(ofEpochSecond(172), Throughput.bytesPerSecond(18), 595.))
+ .add(new TimelineEntry(ofEpochSecond(173), Throughput.bytesPerSecond(817), 539.6))
+ .add(new TimelineEntry(ofEpochSecond(174), Throughput.bytesPerSecond(1090), 593.9))
+ .add(new TimelineEntry(ofEpochSecond(175), Throughput.bytesPerSecond(1201), 677.))
+ .add(new TimelineEntry(ofEpochSecond(176), Throughput.bytesPerSecond(1046), 756.9))
+ .add(new TimelineEntry(ofEpochSecond(177), Throughput.bytesPerSecond(1075), 798.4))
+ .add(new TimelineEntry(ofEpochSecond(178), Throughput.bytesPerSecond(679), 744.1))
+ .add(new TimelineEntry(ofEpochSecond(179), Throughput.bytesPerSecond(1043), 835.4))
+ .add(new TimelineEntry(ofEpochSecond(180), Throughput.bytesPerSecond(1206), 904.8))
+ .add(new TimelineEntry(ofEpochSecond(181), Throughput.bytesPerSecond(701), 887.6))
+ .add(new TimelineEntry(ofEpochSecond(182), Throughput.bytesPerSecond(849), 970.7))
+ .add(new TimelineEntry(ofEpochSecond(183), Throughput.bytesPerSecond(457), 934.7))
+ .add(new TimelineEntry(ofEpochSecond(184), Throughput.bytesPerSecond(400), 865.7))
+ .add(new TimelineEntry(ofEpochSecond(185), Throughput.bytesPerSecond(1157), 861.3))
+ .add(new TimelineEntry(ofEpochSecond(186), Throughput.bytesPerSecond(235), 780.2))
+ .add(new TimelineEntry(ofEpochSecond(187), Throughput.bytesPerSecond(525), 725.2))
+ .add(new TimelineEntry(ofEpochSecond(188), Throughput.bytesPerSecond(1415), 798.8))
+ .add(new TimelineEntry(ofEpochSecond(189), Throughput.bytesPerSecond(796), 774.1))
+ .add(new TimelineEntry(ofEpochSecond(190), Throughput.bytesPerSecond(428), 696.3))
+ .add(new TimelineEntry(ofEpochSecond(191), Throughput.bytesPerSecond(417), 667.9))
+ .add(new TimelineEntry(ofEpochSecond(192), Throughput.bytesPerSecond(436), 626.6))
+ .add(new TimelineEntry(ofEpochSecond(193), Throughput.bytesPerSecond(781), 659.))
+ .add(new TimelineEntry(ofEpochSecond(194), Throughput.bytesPerSecond(967), 715.7))
+ .add(new TimelineEntry(ofEpochSecond(195), Throughput.bytesPerSecond(398), 639.8))
+ .add(new TimelineEntry(ofEpochSecond(196), Throughput.bytesPerSecond(501), 666.4))
+ .add(new TimelineEntry(ofEpochSecond(197), Throughput.bytesPerSecond(691), 683.))
+ .add(new TimelineEntry(ofEpochSecond(198), Throughput.bytesPerSecond(1492), 690.7))
+ .add(new TimelineEntry(ofEpochSecond(199), Throughput.bytesPerSecond(1493), 760.4))
+ .add(new TimelineEntry(ofEpochSecond(200), Throughput.bytesPerSecond(5), 718.1))
+ .add(new TimelineEntry(ofEpochSecond(201), Throughput.bytesPerSecond(679), 744.3))
+ .add(new TimelineEntry(ofEpochSecond(202), Throughput.bytesPerSecond(1027), 803.4))
+ .add(new TimelineEntry(ofEpochSecond(203), Throughput.bytesPerSecond(170), 742.3))
+ .add(new TimelineEntry(ofEpochSecond(204), Throughput.bytesPerSecond(261), 671.7))
+ .add(new TimelineEntry(ofEpochSecond(205), Throughput.bytesPerSecond(309), 662.8))
+ .add(new TimelineEntry(ofEpochSecond(206), Throughput.bytesPerSecond(1483), 761.))
+ .add(new TimelineEntry(ofEpochSecond(207), Throughput.bytesPerSecond(1154), 807.3))
+ .add(new TimelineEntry(ofEpochSecond(208), Throughput.bytesPerSecond(857), 743.8))
+ .add(new TimelineEntry(ofEpochSecond(209), Throughput.bytesPerSecond(792), 673.7))
+ .add(new TimelineEntry(ofEpochSecond(210), Throughput.bytesPerSecond(819), 755.1))
+ .add(new TimelineEntry(ofEpochSecond(211), Throughput.bytesPerSecond(763), 763.5))
+ .add(new TimelineEntry(ofEpochSecond(212), Throughput.bytesPerSecond(386), 699.4))
+ .add(new TimelineEntry(ofEpochSecond(213), Throughput.bytesPerSecond(789), 761.3))
+ .add(new TimelineEntry(ofEpochSecond(214), Throughput.bytesPerSecond(1432), 878.4))
+ .add(new TimelineEntry(ofEpochSecond(215), Throughput.bytesPerSecond(205), 868.))
+ .add(new TimelineEntry(ofEpochSecond(216), Throughput.bytesPerSecond(905), 810.2))
+ .add(new TimelineEntry(ofEpochSecond(217), Throughput.bytesPerSecond(1290), 823.8))
+ .add(new TimelineEntry(ofEpochSecond(218), Throughput.bytesPerSecond(639), 802.))
+ .add(new TimelineEntry(ofEpochSecond(219), Throughput.bytesPerSecond(1246), 847.4))
+ .build());
+}
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java
new file mode 100644
index 0000000000..a7840f45c7
--- /dev/null
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.cloud.storage.TestUtils.assertAll;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.cloud.storage.ThroughputSink.Record;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Assert;
+import org.junit.Test;
+
+public final class ThroughputSinkTest {
+
+ @Test
+ public void tee_record() {
+ AtomicReference r1 = new AtomicReference<>(null);
+ AtomicReference r2 = new AtomicReference<>(null);
+ ThroughputSink test =
+ ThroughputSink.tee(
+ new AbstractThroughputSink() {
+ @Override
+ public void recordThroughput(Record r) {
+ r1.compareAndSet(null, r);
+ }
+ },
+ new AbstractThroughputSink() {
+ @Override
+ public void recordThroughput(Record r) {
+ r2.compareAndSet(null, r);
+ }
+ });
+
+ Record expected = Record.of(10, Instant.EPOCH, Instant.ofEpochSecond(1), false);
+ test.recordThroughput(expected);
+
+ assertThat(r1.get()).isEqualTo(expected);
+ assertThat(r2.get()).isEqualTo(expected);
+ }
+
+ @Test
+ public void tee_decorate() throws Exception {
+ AtomicReference b1 = new AtomicReference<>(null);
+ AtomicReference b2 = new AtomicReference<>(null);
+ AtomicReference b3 = new AtomicReference<>(null);
+ ThroughputSink test =
+ ThroughputSink.tee(
+ new AbstractThroughputSink() {
+ @Override
+ public WritableByteChannel decorate(WritableByteChannel wbc) {
+ return new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ b1.compareAndSet(null, src.duplicate());
+ return wbc.write(src);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wbc.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ wbc.close();
+ }
+ };
+ }
+ },
+ new AbstractThroughputSink() {
+ @Override
+ public WritableByteChannel decorate(WritableByteChannel wbc) {
+ return new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ ByteBuffer duplicate = src.duplicate();
+ duplicate.position(src.limit());
+ b2.compareAndSet(null, duplicate);
+ return wbc.write(src);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wbc.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ wbc.close();
+ }
+ };
+ }
+ });
+
+ AtomicBoolean callIsOpen = new AtomicBoolean(false);
+ AtomicBoolean callClose = new AtomicBoolean(false);
+ WritableByteChannel anon =
+ new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer src) {
+ int remaining = src.remaining();
+ src.position(src.limit());
+ b3.compareAndSet(null, src);
+ return remaining;
+ }
+
+ @Override
+ public boolean isOpen() {
+ callIsOpen.compareAndSet(false, true);
+ return true;
+ }
+
+ @Override
+ public void close() {
+ callClose.compareAndSet(false, true);
+ }
+ };
+
+ byte[] bytes = DataGenerator.base64Characters().genBytes(16);
+
+ ByteBuffer expected1 = ByteBuffer.wrap(bytes);
+ ByteBuffer expected2 = ByteBuffer.wrap(bytes);
+ expected2.position(16);
+
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+ try (WritableByteChannel decorated = test.decorate(anon)) {
+ if (decorated.isOpen()) {
+ decorated.write(buf);
+ }
+ }
+
+ assertAll(
+ () -> assertThat(b1.get()).isEqualTo(expected1),
+ () -> assertThat(b2.get()).isEqualTo(expected2),
+ () -> assertThat(b3.get()).isSameInstanceAs(buf),
+ () -> assertThat(b3.get().hasRemaining()).isFalse(),
+ () -> assertThat(callIsOpen.get()).isTrue(),
+ () -> assertThat(callClose.get()).isTrue());
+ }
+
+ @Test
+ public void computeThroughput_noError() throws IOException {
+ // create a clock that will start at Epoch UTC, and will tick in one second increments
+ TestClock clock = TestClock.tickBy(Instant.EPOCH, Duration.ofSeconds(1));
+ AtomicReference actual = new AtomicReference<>(null);
+
+ ThroughputSink.computeThroughput(
+ clock,
+ new AbstractThroughputSink() {
+ @Override
+ public void recordThroughput(Record r) {
+ actual.compareAndSet(null, r);
+ }
+ },
+ 300,
+ () -> {});
+
+ Record expected = Record.of(300, Instant.EPOCH, Instant.ofEpochSecond(1), false);
+ assertThat(actual.get()).isEqualTo(expected);
+ }
+
+ @Test
+ public void computeThroughput_ioError() {
+ // create a clock that will start at Epoch UTC, and will tick in one second increments
+ TestClock clock = TestClock.tickBy(Instant.EPOCH, Duration.ofSeconds(1));
+ AtomicReference actual = new AtomicReference<>(null);
+
+ IOException ioException =
+ Assert.assertThrows(
+ IOException.class,
+ () ->
+ ThroughputSink.computeThroughput(
+ clock,
+ new AbstractThroughputSink() {
+ @Override
+ public void recordThroughput(Record r) {
+ actual.compareAndSet(null, r);
+ }
+ },
+ 300,
+ () -> {
+ throw new IOException("kablamo!");
+ }));
+
+ Record expected = Record.of(300, Instant.EPOCH, Instant.ofEpochSecond(1), true);
+ assertThat(actual.get()).isEqualTo(expected);
+
+ assertThat(ioException).hasMessageThat().isEqualTo("kablamo!");
+ }
+
+ @Test
+ public void windowed() throws IOException {
+ // create a clock that will start at Epoch UTC, and will tick in one second increments
+ TestClock clock = TestClock.tickBy(Instant.EPOCH, Duration.ofSeconds(1));
+
+ AtomicReference b3 = new AtomicReference<>(null);
+ WritableByteChannel anon =
+ new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer src) {
+ int remaining = src.remaining();
+ src.position(src.limit());
+ b3.compareAndSet(null, src);
+ return remaining;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ Duration windowDuration = Duration.ofMinutes(1);
+ ThroughputMovingWindow window = ThroughputMovingWindow.of(windowDuration);
+ ThroughputSink sink = ThroughputSink.windowed(window, clock);
+
+ int numBytes = 120;
+ ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(numBytes);
+ try (WritableByteChannel decorated = sink.decorate(anon)) {
+ decorated.write(buf);
+ }
+
+ Throughput avg = window.avg(clock.instant());
+
+ assertThat(avg).isEqualTo(Throughput.of(numBytes, windowDuration));
+ assertThat(avg).isEqualTo(Throughput.bytesPerSecond(2));
+ }
+
+ private abstract static class AbstractThroughputSink implements ThroughputSink {
+
+ @Override
+ public void recordThroughput(Record r) {}
+
+ @Override
+ public WritableByteChannel decorate(WritableByteChannel wbc) {
+ return null;
+ }
+ }
+}
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputTest.java
new file mode 100644
index 0000000000..e44d4c1dd5
--- /dev/null
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputTest.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.time.Duration.ofSeconds;
+
+import org.junit.Test;
+
+public final class ThroughputTest {
+
+ @Test
+ public void a() {
+ assertThat(Throughput.bytesPerSecond(1).toBps()).isEqualTo(1);
+ }
+
+ @Test
+ public void b() {
+ assertThat(Throughput.of(10, ofSeconds(10)).toBps()).isEqualTo(1);
+ }
+}