From b0c9cc1287349c0d72af22836f6f9a8936db7937 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 17 Jul 2023 14:54:30 -0400 Subject: [PATCH] chore: introduce Throughput and ThroughputSink _Pre-work_ In some upcoming work we want to be able to keep a running window of throughput performance in order to provide improved throughput. This PR introduces a new utility class to model and compute throughput, and the concept of a ThroughputSink which values can be appended to. --- .../com/google/cloud/storage/Throughput.java | 94 ++++ .../cloud/storage/ThroughputMovingWindow.java | 98 ++++ .../google/cloud/storage/ThroughputSink.java | 251 +++++++++++ .../com/google/cloud/storage/TestClock.java | 62 +++ .../ThroughputMovingWindowPropertyTest.java | 422 ++++++++++++++++++ .../cloud/storage/ThroughputSinkTest.java | 262 +++++++++++ .../google/cloud/storage/ThroughputTest.java | 35 ++ 7 files changed, 1224 insertions(+) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/Throughput.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputMovingWindow.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/TestClock.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputMovingWindowPropertyTest.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Throughput.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Throughput.java new file mode 100644 index 0000000000..f0f54140d1 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Throughput.java @@ -0,0 +1,94 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.common.base.MoreObjects; +import java.time.Duration; +import java.util.Objects; + +/** + * Convenience class to encapsulate the concept of a throughput value. + * + *

Given a number of bytes and a duration compute the number of bytes per second. + */ +final class Throughput { + + private static final double NANOS_PER_SECOND = 1_000_000_000d; + private final long numBytes; + private final Duration duration; + + // TODO: is there a efficient way we can limit precision without having to use BigDecimal? + // Realistically, we don't need precision smaller than 1 byte per microsecond, leading to + // 6 digits past the decimal of needed precision. + private final double bytesPerSecond; + + private Throughput(long numBytes, Duration duration) { + this.numBytes = numBytes; + this.duration = duration; + this.bytesPerSecond = numBytes / (duration.toNanos() / NANOS_PER_SECOND); + } + + public long getNumBytes() { + return numBytes; + } + + public Duration getDuration() { + return duration; + } + + public double toBps() { + return bytesPerSecond; + } + + public Throughput plus(Throughput other) { + return new Throughput(this.numBytes + other.numBytes, this.duration.plus(other.duration)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Throughput)) { + return false; + } + Throughput that = (Throughput) o; + return Double.compare(that.bytesPerSecond, bytesPerSecond) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(bytesPerSecond); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("bytesPerSecond", bytesPerSecond).toString(); + } + + public static Throughput zero() { + return new Throughput(0, Duration.ZERO); + } + + public static Throughput of(long numBytes, Duration duration) { + return new Throughput(numBytes, duration); + } + + public static Throughput bytesPerSecond(long numBytes) { + return new Throughput(numBytes, Duration.ofSeconds(1)); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputMovingWindow.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputMovingWindow.java new file mode 100644 index 0000000000..bc10ecc36c --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputMovingWindow.java @@ -0,0 +1,98 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.common.base.MoreObjects; +import java.time.Duration; +import java.time.Instant; +import java.util.Comparator; +import java.util.PriorityQueue; + +/** + * A simple moving window implementation which will keep a {@code window}s worth of Throughput + * values and allow querying for the aggregate avg over that time window. + */ +final class ThroughputMovingWindow { + + private final Duration window; + + private final PriorityQueue values; + + private ThroughputMovingWindow(Duration window) { + this.window = window; + this.values = new PriorityQueue<>(Entry.COMP); + } + + void add(Instant now, Throughput value) { + houseKeeping(now); + values.add(new Entry(now, value)); + } + + Throughput avg(Instant now) { + houseKeeping(now); + return values.stream() + .map(Entry::getValue) + .reduce( + Throughput.zero(), + (tp1, tp2) -> Throughput.of(tp1.getNumBytes() + tp2.getNumBytes(), window)); + } + + private void houseKeeping(Instant now) { + Instant newMin = now.minus(window); + values.removeIf(e -> lteq(e.getAt(), newMin)); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("window", window) + .add("values.size()", values.size()) + .toString(); + } + + static ThroughputMovingWindow of(Duration window) { + return new ThroughputMovingWindow(window); + } + + private static boolean lteq(Instant a, Instant b) { + return a.equals(b) || a.isBefore(b); + } + + private static final class Entry { + private static final Comparator COMP = Comparator.comparing(e -> e.at); + private final Instant at; + private final Throughput value; + + private Entry(Instant at, Throughput value) { + this.at = at; + this.value = value; + } + + public Instant getAt() { + return at; + } + + public Throughput getValue() { + return value; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("at", at).add("value", value).toString(); + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java new file mode 100644 index 0000000000..5ef6e37d10 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java @@ -0,0 +1,251 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.logging.Logger; + +/** + * Interface to mark a location in which throughput of byte movements can be recorded, and which can + * provide a decorated underlying channel. + */ +interface ThroughputSink { + + void recordThroughput(Record r); + + WritableByteChannel decorate(WritableByteChannel wbc); + + static void computeThroughput(Clock clock, ThroughputSink sink, long numBytes, IO io) + throws IOException { + boolean exception = false; + Instant begin = clock.instant(); + try { + io.apply(); + } catch (IOException e) { + exception = true; + throw e; + } finally { + Instant end = clock.instant(); + Record record = Record.of(numBytes, begin, end, exception); + sink.recordThroughput(record); + } + } + + @FunctionalInterface + interface IO { + void apply() throws IOException; + } + + static ThroughputSink logged(String prefix, Clock clock) { + return new LoggedThroughputSink(prefix, clock); + } + + static ThroughputSink windowed(ThroughputMovingWindow w, Clock clock) { + return new ThroughputMovingWindowThroughputSink(w, clock); + } + + static ThroughputSink tee(ThroughputSink a, ThroughputSink b) { + return new TeeThroughputSink(a, b); + } + + final class Record { + private final long numBytes; + private final Instant begin; + private final Instant end; + private final boolean exception; + + private Record(long numBytes, Instant begin, Instant end, boolean exception) { + this.numBytes = numBytes; + this.begin = begin; + this.end = end; + this.exception = exception; + } + + public long getNumBytes() { + return numBytes; + } + + public Instant getBegin() { + return begin; + } + + public Instant getEnd() { + return end; + } + + public Duration getDuration() { + return Duration.between(begin, end); + } + + public boolean isException() { + return exception; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Record)) { + return false; + } + Record record = (Record) o; + return numBytes == record.numBytes + && exception == record.exception + && Objects.equals(begin, record.begin) + && Objects.equals(end, record.end); + } + + @Override + public int hashCode() { + return Objects.hash(numBytes, begin, end, exception); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("numBytes", numBytes) + .add("begin", begin) + .add("end", end) + .add("exception", exception) + .toString(); + } + + public static Record of(long numBytes, Instant begin, Instant end, boolean exception) { + return new Record(numBytes, begin, end, exception); + } + } + + final class LoggedThroughputSink implements ThroughputSink { + private static final Logger LOGGER = Logger.getLogger(ThroughputSink.class.getName()); + + private final String prefix; + private final Clock clock; + + private LoggedThroughputSink(String prefix, Clock clock) { + this.prefix = prefix; + this.clock = clock; + } + + private static final double MiB = 1d / (1024 * 1024); + + @Override + public void recordThroughput(Record r) { + LOGGER.info( + () -> + String.format( + "{%s} (%01.03f MiB/s) %s", + prefix, + ((r.numBytes * MiB) + / (Duration.between(r.getBegin(), r.getEnd()).toMillis() / 1000d)), + r)); + } + + @Override + public WritableByteChannel decorate(WritableByteChannel wbc) { + return new ThroughputRecordingWritableByteChannel(wbc, this, clock); + } + } + + final class ThroughputRecordingWritableByteChannel implements WritableByteChannel { + private final WritableByteChannel delegate; + private final ThroughputSink sink; + private final Clock clock; + + private ThroughputRecordingWritableByteChannel( + WritableByteChannel delegate, ThroughputSink sink, Clock clock) { + this.delegate = delegate; + this.sink = sink; + this.clock = clock; + } + + @Override + public int write(ByteBuffer src) throws IOException { + boolean exception = false; + int remaining = src.remaining(); + Instant begin = clock.instant(); + try { + return delegate.write(src); + } catch (IOException e) { + exception = true; + throw e; + } finally { + Instant end = clock.instant(); + Record record = Record.of(remaining - src.remaining(), begin, end, exception); + sink.recordThroughput(record); + } + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + } + + final class TeeThroughputSink implements ThroughputSink { + private final ThroughputSink a; + private final ThroughputSink b; + + private TeeThroughputSink(ThroughputSink a, ThroughputSink b) { + this.a = a; + this.b = b; + } + + @Override + public void recordThroughput(Record r) { + a.recordThroughput(r); + b.recordThroughput(r); + } + + @Override + public WritableByteChannel decorate(WritableByteChannel wbc) { + return b.decorate(a.decorate(wbc)); + } + } + + final class ThroughputMovingWindowThroughputSink implements ThroughputSink { + private final ThroughputMovingWindow w; + private final Clock clock; + + private ThroughputMovingWindowThroughputSink(ThroughputMovingWindow w, Clock clock) { + this.w = w; + this.clock = clock; + } + + @Override + public synchronized void recordThroughput(Record r) { + w.add(r.end, Throughput.of(r.getNumBytes(), r.getDuration())); + } + + @Override + public WritableByteChannel decorate(WritableByteChannel wbc) { + return new ThroughputRecordingWritableByteChannel(wbc, this, clock); + } + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/TestClock.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestClock.java new file mode 100644 index 0000000000..e37a4f0248 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestClock.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.function.UnaryOperator; + +final class TestClock extends Clock { + + private final Instant begin; + private final UnaryOperator next; + + private Instant now; + + private TestClock(Instant begin, UnaryOperator next) { + this.begin = begin; + this.next = next; + this.now = begin; + } + + @Override + public ZoneId getZone() { + throw new UnsupportedOperationException("TestClock.getZone()"); + } + + @Override + public Clock withZone(ZoneId zone) { + throw new UnsupportedOperationException("TestClock.withZone()"); + } + + @Override + public Instant instant() { + Instant ret = now; + now = next.apply(now); + return ret; + } + + public static TestClock tickBy(Instant begin, Duration d) { + return of(begin, i -> i.plus(d)); + } + + public static TestClock of(Instant begin, UnaryOperator next) { + return new TestClock(begin, next); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputMovingWindowPropertyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputMovingWindowPropertyTest.java new file mode 100644 index 0000000000..181f51aedf --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputMovingWindowPropertyTest.java @@ -0,0 +1,422 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.ByteSizeConstants._5TiB; +import static com.google.common.truth.Truth.assertWithMessage; +import static java.time.Instant.EPOCH; +import static java.time.Instant.ofEpochSecond; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import net.jqwik.api.Arbitraries; +import net.jqwik.api.Arbitrary; +import net.jqwik.api.Combinators; +import net.jqwik.api.Example; +import net.jqwik.api.ForAll; +import net.jqwik.api.Property; +import net.jqwik.api.Provide; +import net.jqwik.api.Tuple; +import net.jqwik.api.Tuple.Tuple1; +import net.jqwik.time.api.DateTimes; +import net.jqwik.time.api.Times; + +final class ThroughputMovingWindowPropertyTest { + + private static final double TOLERANCE = 0.001; + + @Example + void canned() { + test(CANNED_SCENARIO); + } + + @Example + void twoEntriesSameTimeDifferentThroughput() { + Duration ms = Duration.ofMillis(1); + ScenarioTimeline scenario = + new ScenarioTimeline( + ms, + ImmutableList.of( + new TimelineEntry(EPOCH, Throughput.of(1, ms), 1000.0), + new TimelineEntry(EPOCH, Throughput.of(0, ms), 1000.0))); + test(scenario); + } + + @Property + void test(@ForAll("Scenarios") ScenarioTimeline scenario) { + ThroughputMovingWindow window = ThroughputMovingWindow.of(scenario.d); + for (TimelineEntry timelineEntry : scenario.timelineEntries) { + window.add(timelineEntry.i, timelineEntry.t); + Throughput throughput = window.avg(timelineEntry.i); + assertWithMessage(timelineEntry.toString()) + .that(throughput.toBps()) + .isWithin(TOLERANCE) + .of(timelineEntry.expectedMovingAvgBytesPerSecond); + } + } + + @Provide("Scenarios") + static Arbitrary scenarioTimeline() { + return Times.durations() + .ofPrecision(ChronoUnit.MILLIS) + .between(Duration.ofMillis(1), Duration.ofMinutes(10)) + .flatMap( + d -> + Combinators.combine( + Arbitraries.just(d), + // pick an instant, then generate 1 to 100 values between it and d * 3 + DateTimes.instants() + .ofPrecision(ChronoUnit.MILLIS) + .flatMap( + i -> + DateTimes.instants() + .ofPrecision(ChronoUnit.MILLIS) + .between(i, i.plus(d.multipliedBy(3))) + .flatMap( + ii -> + Combinators.combine( + Arbitraries.just(ii), throughput()) + .as(Tuple::of)) + .list() + .ofMinSize(1) + .ofMaxSize(100))) + .as(Tuple::of)) + .map(ScenarioTimeline::create); + } + + static Arbitrary throughput() { + return Times.durations() + .ofPrecision(ChronoUnit.MILLIS) + .between(Duration.ofMillis(1), Duration.ofMinutes(10)) + .flatMap(d -> Arbitraries.longs().between(0, _5TiB).map(n -> Throughput.of(n, d))); + } + + private static final class ScenarioTimeline { + + private static final Comparator> COMP = + Comparator.comparing(Tuple1::get1); + private final Duration d; + private final List timelineEntries; + + private ScenarioTimeline(Duration d, List timelineEntries) { + this.d = d; + this.timelineEntries = timelineEntries; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("d", d) + .add("timelineEntries", timelineEntries) + .toString(); + } + + static ScenarioTimeline create( + Tuple.Tuple2>> tuples) { + + Duration d = tuples.get1(); + List> pairs = tuples.get2(); + + List> tmp = + pairs.stream().sorted(COMP).collect(Collectors.toList()); + + List>> windows = new ArrayList<>(); + int last = tmp.size() - 1; + for (int i = last; i >= 0; i--) { + List> window = new ArrayList<>(); + Tuple.Tuple2 t = tmp.get(i); + window.add(t); + Instant min = t.get1().minus(d); + for (int j = i - 1; j >= 0; j--) { + Tuple.Tuple2 r = tmp.get(j); + if (r.get1().isAfter(min)) { + window.add(r); + } + } + windows.add(ImmutableList.copyOf(window)); + } + + ImmutableList timelineEntries = + windows.stream() + .map( + w -> { + Tuple.Tuple2 max = w.get(0); + Throughput reduce = + w.stream() + .map(Tuple.Tuple2::get2) + .reduce(Throughput.zero(), Throughput::plus); + return new TimelineEntry( + max.get1(), max.get2(), Throughput.of(reduce.getNumBytes(), d).toBps()); + }) + .collect(ImmutableList.toImmutableList()); + return new ScenarioTimeline(d, timelineEntries.reverse()); + } + } + + private static final class TimelineEntry { + private final Instant i; + private final Throughput t; + private final double expectedMovingAvgBytesPerSecond; + + private TimelineEntry(Instant i, Throughput t, double expectedMovingAvgBytesPerSecond) { + this.i = i; + this.t = t; + this.expectedMovingAvgBytesPerSecond = expectedMovingAvgBytesPerSecond; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("i", i) + .add("t", t) + .add("tenSecMovingAvg", String.format("%,.03f", expectedMovingAvgBytesPerSecond)) + .toString(); + } + } + + private static final ScenarioTimeline CANNED_SCENARIO = + new ScenarioTimeline( + Duration.ofSeconds(10), + ImmutableList.builder() + .add(new TimelineEntry(ofEpochSecond(1), Throughput.bytesPerSecond(192), 19.2)) + .add(new TimelineEntry(ofEpochSecond(2), Throughput.bytesPerSecond(1185), 137.7)) + .add(new TimelineEntry(ofEpochSecond(3), Throughput.bytesPerSecond(1363), 274.)) + .add(new TimelineEntry(ofEpochSecond(4), Throughput.bytesPerSecond(234), 297.4)) + .add(new TimelineEntry(ofEpochSecond(5), Throughput.bytesPerSecond(1439), 441.3)) + .add(new TimelineEntry(ofEpochSecond(6), Throughput.bytesPerSecond(1269), 568.2)) + .add(new TimelineEntry(ofEpochSecond(7), Throughput.bytesPerSecond(692), 637.4)) + .add(new TimelineEntry(ofEpochSecond(8), Throughput.bytesPerSecond(667), 704.1)) + .add(new TimelineEntry(ofEpochSecond(9), Throughput.bytesPerSecond(1318), 835.9)) + .add(new TimelineEntry(ofEpochSecond(10), Throughput.bytesPerSecond(1125), 948.4)) + .add(new TimelineEntry(ofEpochSecond(11), Throughput.bytesPerSecond(1124), 1041.6)) + .add(new TimelineEntry(ofEpochSecond(12), Throughput.bytesPerSecond(3), 923.4)) + .add(new TimelineEntry(ofEpochSecond(13), Throughput.bytesPerSecond(185), 805.6)) + .add(new TimelineEntry(ofEpochSecond(14), Throughput.bytesPerSecond(726), 854.8)) + .add(new TimelineEntry(ofEpochSecond(15), Throughput.bytesPerSecond(630), 773.9)) + .add(new TimelineEntry(ofEpochSecond(16), Throughput.bytesPerSecond(874), 734.4)) + .add(new TimelineEntry(ofEpochSecond(17), Throughput.bytesPerSecond(1401), 805.3)) + .add(new TimelineEntry(ofEpochSecond(18), Throughput.bytesPerSecond(533), 791.9)) + .add(new TimelineEntry(ofEpochSecond(19), Throughput.bytesPerSecond(446), 704.7)) + .add(new TimelineEntry(ofEpochSecond(20), Throughput.bytesPerSecond(801), 672.3)) + .add(new TimelineEntry(ofEpochSecond(21), Throughput.bytesPerSecond(61), 566.)) + .add(new TimelineEntry(ofEpochSecond(22), Throughput.bytesPerSecond(1104), 676.1)) + .add(new TimelineEntry(ofEpochSecond(23), Throughput.bytesPerSecond(972), 754.8)) + .add(new TimelineEntry(ofEpochSecond(24), Throughput.bytesPerSecond(1310), 813.2)) + .add(new TimelineEntry(ofEpochSecond(25), Throughput.bytesPerSecond(408), 791.)) + .add(new TimelineEntry(ofEpochSecond(26), Throughput.bytesPerSecond(759), 779.5)) + .add(new TimelineEntry(ofEpochSecond(27), Throughput.bytesPerSecond(674), 706.8)) + .add(new TimelineEntry(ofEpochSecond(28), Throughput.bytesPerSecond(314), 684.9)) + .add(new TimelineEntry(ofEpochSecond(29), Throughput.bytesPerSecond(1311), 771.4)) + .add(new TimelineEntry(ofEpochSecond(30), Throughput.bytesPerSecond(449), 736.2)) + .add(new TimelineEntry(ofEpochSecond(31), Throughput.bytesPerSecond(1273), 857.4)) + .add(new TimelineEntry(ofEpochSecond(32), Throughput.bytesPerSecond(228), 769.8)) + .add(new TimelineEntry(ofEpochSecond(33), Throughput.bytesPerSecond(605), 733.1)) + .add(new TimelineEntry(ofEpochSecond(34), Throughput.bytesPerSecond(537), 655.8)) + .add(new TimelineEntry(ofEpochSecond(35), Throughput.bytesPerSecond(1498), 764.8)) + .add(new TimelineEntry(ofEpochSecond(36), Throughput.bytesPerSecond(694), 758.3)) + .add(new TimelineEntry(ofEpochSecond(37), Throughput.bytesPerSecond(155), 706.4)) + .add(new TimelineEntry(ofEpochSecond(38), Throughput.bytesPerSecond(983), 773.3)) + .add(new TimelineEntry(ofEpochSecond(39), Throughput.bytesPerSecond(1359), 778.1)) + .add(new TimelineEntry(ofEpochSecond(40), Throughput.bytesPerSecond(832), 816.4)) + .add(new TimelineEntry(ofEpochSecond(41), Throughput.bytesPerSecond(1041), 793.2)) + .add(new TimelineEntry(ofEpochSecond(42), Throughput.bytesPerSecond(1459), 916.3)) + .add(new TimelineEntry(ofEpochSecond(43), Throughput.bytesPerSecond(1128), 968.6)) + .add(new TimelineEntry(ofEpochSecond(44), Throughput.bytesPerSecond(1318), 1046.7)) + .add(new TimelineEntry(ofEpochSecond(45), Throughput.bytesPerSecond(620), 958.9)) + .add(new TimelineEntry(ofEpochSecond(46), Throughput.bytesPerSecond(1133), 1002.8)) + .add(new TimelineEntry(ofEpochSecond(47), Throughput.bytesPerSecond(568), 1044.1)) + .add(new TimelineEntry(ofEpochSecond(48), Throughput.bytesPerSecond(561), 1001.9)) + .add(new TimelineEntry(ofEpochSecond(49), Throughput.bytesPerSecond(1483), 1014.3)) + .add(new TimelineEntry(ofEpochSecond(50), Throughput.bytesPerSecond(1405), 1071.6)) + .add(new TimelineEntry(ofEpochSecond(51), Throughput.bytesPerSecond(435), 1011.)) + .add(new TimelineEntry(ofEpochSecond(52), Throughput.bytesPerSecond(664), 931.5)) + .add(new TimelineEntry(ofEpochSecond(53), Throughput.bytesPerSecond(1330), 951.7)) + .add(new TimelineEntry(ofEpochSecond(54), Throughput.bytesPerSecond(540), 873.9)) + .add(new TimelineEntry(ofEpochSecond(55), Throughput.bytesPerSecond(847), 896.6)) + .add(new TimelineEntry(ofEpochSecond(56), Throughput.bytesPerSecond(1231), 906.4)) + .add(new TimelineEntry(ofEpochSecond(57), Throughput.bytesPerSecond(1331), 982.7)) + .add(new TimelineEntry(ofEpochSecond(58), Throughput.bytesPerSecond(154), 942.)) + .add(new TimelineEntry(ofEpochSecond(59), Throughput.bytesPerSecond(801), 873.8)) + .add(new TimelineEntry(ofEpochSecond(60), Throughput.bytesPerSecond(499), 783.2)) + .add(new TimelineEntry(ofEpochSecond(61), Throughput.bytesPerSecond(766), 816.3)) + .add(new TimelineEntry(ofEpochSecond(62), Throughput.bytesPerSecond(1166), 866.5)) + .add(new TimelineEntry(ofEpochSecond(63), Throughput.bytesPerSecond(1408), 874.3)) + .add(new TimelineEntry(ofEpochSecond(64), Throughput.bytesPerSecond(1145), 934.8)) + .add(new TimelineEntry(ofEpochSecond(65), Throughput.bytesPerSecond(433), 893.4)) + .add(new TimelineEntry(ofEpochSecond(66), Throughput.bytesPerSecond(1256), 895.9)) + .add(new TimelineEntry(ofEpochSecond(67), Throughput.bytesPerSecond(847), 847.5)) + .add(new TimelineEntry(ofEpochSecond(68), Throughput.bytesPerSecond(1421), 974.2)) + .add(new TimelineEntry(ofEpochSecond(69), Throughput.bytesPerSecond(347), 928.8)) + .add(new TimelineEntry(ofEpochSecond(70), Throughput.bytesPerSecond(52), 884.1)) + .add(new TimelineEntry(ofEpochSecond(71), Throughput.bytesPerSecond(19), 809.4)) + .add(new TimelineEntry(ofEpochSecond(72), Throughput.bytesPerSecond(1191), 811.9)) + .add(new TimelineEntry(ofEpochSecond(73), Throughput.bytesPerSecond(104), 681.5)) + .add(new TimelineEntry(ofEpochSecond(74), Throughput.bytesPerSecond(640), 631.)) + .add(new TimelineEntry(ofEpochSecond(75), Throughput.bytesPerSecond(535), 641.2)) + .add(new TimelineEntry(ofEpochSecond(76), Throughput.bytesPerSecond(203), 535.9)) + .add(new TimelineEntry(ofEpochSecond(77), Throughput.bytesPerSecond(51), 456.3)) + .add(new TimelineEntry(ofEpochSecond(78), Throughput.bytesPerSecond(1117), 425.9)) + .add(new TimelineEntry(ofEpochSecond(79), Throughput.bytesPerSecond(1390), 530.2)) + .add(new TimelineEntry(ofEpochSecond(80), Throughput.bytesPerSecond(262), 551.2)) + .add(new TimelineEntry(ofEpochSecond(81), Throughput.bytesPerSecond(5), 549.8)) + .add(new TimelineEntry(ofEpochSecond(82), Throughput.bytesPerSecond(802), 510.9)) + .add(new TimelineEntry(ofEpochSecond(83), Throughput.bytesPerSecond(529), 553.4)) + .add(new TimelineEntry(ofEpochSecond(84), Throughput.bytesPerSecond(1261), 615.5)) + .add(new TimelineEntry(ofEpochSecond(85), Throughput.bytesPerSecond(1192), 681.2)) + .add(new TimelineEntry(ofEpochSecond(86), Throughput.bytesPerSecond(276), 688.5)) + .add(new TimelineEntry(ofEpochSecond(87), Throughput.bytesPerSecond(457), 729.1)) + .add(new TimelineEntry(ofEpochSecond(88), Throughput.bytesPerSecond(799), 697.3)) + .add(new TimelineEntry(ofEpochSecond(89), Throughput.bytesPerSecond(443), 602.6)) + .add(new TimelineEntry(ofEpochSecond(90), Throughput.bytesPerSecond(1281), 704.5)) + .add(new TimelineEntry(ofEpochSecond(91), Throughput.bytesPerSecond(97), 713.7)) + .add(new TimelineEntry(ofEpochSecond(92), Throughput.bytesPerSecond(895), 723.)) + .add(new TimelineEntry(ofEpochSecond(93), Throughput.bytesPerSecond(1338), 803.9)) + .add(new TimelineEntry(ofEpochSecond(94), Throughput.bytesPerSecond(554), 733.2)) + .add(new TimelineEntry(ofEpochSecond(95), Throughput.bytesPerSecond(302), 644.2)) + .add(new TimelineEntry(ofEpochSecond(96), Throughput.bytesPerSecond(518), 668.4)) + .add(new TimelineEntry(ofEpochSecond(97), Throughput.bytesPerSecond(502), 672.9)) + .add(new TimelineEntry(ofEpochSecond(98), Throughput.bytesPerSecond(517), 644.7)) + .add(new TimelineEntry(ofEpochSecond(99), Throughput.bytesPerSecond(172), 617.6)) + .add(new TimelineEntry(ofEpochSecond(100), Throughput.bytesPerSecond(909), 580.4)) + .add(new TimelineEntry(ofEpochSecond(101), Throughput.bytesPerSecond(1233), 694.)) + .add(new TimelineEntry(ofEpochSecond(102), Throughput.bytesPerSecond(189), 623.4)) + .add(new TimelineEntry(ofEpochSecond(103), Throughput.bytesPerSecond(244), 514.)) + .add(new TimelineEntry(ofEpochSecond(104), Throughput.bytesPerSecond(886), 547.2)) + .add(new TimelineEntry(ofEpochSecond(105), Throughput.bytesPerSecond(796), 596.6)) + .add(new TimelineEntry(ofEpochSecond(106), Throughput.bytesPerSecond(1072), 652.)) + .add(new TimelineEntry(ofEpochSecond(107), Throughput.bytesPerSecond(602), 662.)) + .add(new TimelineEntry(ofEpochSecond(108), Throughput.bytesPerSecond(507), 661.)) + .add(new TimelineEntry(ofEpochSecond(109), Throughput.bytesPerSecond(432), 687.)) + .add(new TimelineEntry(ofEpochSecond(110), Throughput.bytesPerSecond(661), 662.2)) + .add(new TimelineEntry(ofEpochSecond(111), Throughput.bytesPerSecond(1085), 647.4)) + .add(new TimelineEntry(ofEpochSecond(112), Throughput.bytesPerSecond(157), 644.2)) + .add(new TimelineEntry(ofEpochSecond(113), Throughput.bytesPerSecond(529), 672.7)) + .add(new TimelineEntry(ofEpochSecond(114), Throughput.bytesPerSecond(31), 587.2)) + .add(new TimelineEntry(ofEpochSecond(115), Throughput.bytesPerSecond(464), 554.)) + .add(new TimelineEntry(ofEpochSecond(116), Throughput.bytesPerSecond(1301), 576.9)) + .add(new TimelineEntry(ofEpochSecond(117), Throughput.bytesPerSecond(787), 595.4)) + .add(new TimelineEntry(ofEpochSecond(118), Throughput.bytesPerSecond(908), 635.5)) + .add(new TimelineEntry(ofEpochSecond(119), Throughput.bytesPerSecond(1316), 723.9)) + .add(new TimelineEntry(ofEpochSecond(120), Throughput.bytesPerSecond(764), 734.2)) + .add(new TimelineEntry(ofEpochSecond(121), Throughput.bytesPerSecond(1391), 764.8)) + .add(new TimelineEntry(ofEpochSecond(122), Throughput.bytesPerSecond(819), 831.)) + .add(new TimelineEntry(ofEpochSecond(123), Throughput.bytesPerSecond(219), 800.)) + .add(new TimelineEntry(ofEpochSecond(124), Throughput.bytesPerSecond(601), 857.)) + .add(new TimelineEntry(ofEpochSecond(125), Throughput.bytesPerSecond(1238), 934.4)) + .add(new TimelineEntry(ofEpochSecond(126), Throughput.bytesPerSecond(1392), 943.5)) + .add(new TimelineEntry(ofEpochSecond(127), Throughput.bytesPerSecond(499), 914.7)) + .add(new TimelineEntry(ofEpochSecond(128), Throughput.bytesPerSecond(1153), 939.2)) + .add(new TimelineEntry(ofEpochSecond(129), Throughput.bytesPerSecond(1219), 929.5)) + .add(new TimelineEntry(ofEpochSecond(130), Throughput.bytesPerSecond(519), 905.)) + .add(new TimelineEntry(ofEpochSecond(131), Throughput.bytesPerSecond(337), 799.6)) + .add(new TimelineEntry(ofEpochSecond(132), Throughput.bytesPerSecond(1065), 824.2)) + .add(new TimelineEntry(ofEpochSecond(133), Throughput.bytesPerSecond(789), 881.2)) + .add(new TimelineEntry(ofEpochSecond(134), Throughput.bytesPerSecond(32), 824.3)) + .add(new TimelineEntry(ofEpochSecond(135), Throughput.bytesPerSecond(893), 789.8)) + .add(new TimelineEntry(ofEpochSecond(136), Throughput.bytesPerSecond(1093), 759.9)) + .add(new TimelineEntry(ofEpochSecond(137), Throughput.bytesPerSecond(1218), 831.8)) + .add(new TimelineEntry(ofEpochSecond(138), Throughput.bytesPerSecond(159), 732.4)) + .add(new TimelineEntry(ofEpochSecond(139), Throughput.bytesPerSecond(407), 651.2)) + .add(new TimelineEntry(ofEpochSecond(140), Throughput.bytesPerSecond(615), 660.8)) + .add(new TimelineEntry(ofEpochSecond(141), Throughput.bytesPerSecond(1392), 766.3)) + .add(new TimelineEntry(ofEpochSecond(142), Throughput.bytesPerSecond(1431), 802.9)) + .add(new TimelineEntry(ofEpochSecond(143), Throughput.bytesPerSecond(270), 751.)) + .add(new TimelineEntry(ofEpochSecond(144), Throughput.bytesPerSecond(300), 777.8)) + .add(new TimelineEntry(ofEpochSecond(145), Throughput.bytesPerSecond(1402), 828.7)) + .add(new TimelineEntry(ofEpochSecond(146), Throughput.bytesPerSecond(308), 750.2)) + .add(new TimelineEntry(ofEpochSecond(147), Throughput.bytesPerSecond(125), 640.9)) + .add(new TimelineEntry(ofEpochSecond(148), Throughput.bytesPerSecond(467), 671.7)) + .add(new TimelineEntry(ofEpochSecond(149), Throughput.bytesPerSecond(1339), 764.9)) + .add(new TimelineEntry(ofEpochSecond(150), Throughput.bytesPerSecond(1146), 818.)) + .add(new TimelineEntry(ofEpochSecond(151), Throughput.bytesPerSecond(765), 755.3)) + .add(new TimelineEntry(ofEpochSecond(152), Throughput.bytesPerSecond(649), 677.1)) + .add(new TimelineEntry(ofEpochSecond(153), Throughput.bytesPerSecond(1318), 781.9)) + .add(new TimelineEntry(ofEpochSecond(154), Throughput.bytesPerSecond(199), 771.8)) + .add(new TimelineEntry(ofEpochSecond(155), Throughput.bytesPerSecond(923), 723.9)) + .add(new TimelineEntry(ofEpochSecond(156), Throughput.bytesPerSecond(430), 736.1)) + .add(new TimelineEntry(ofEpochSecond(157), Throughput.bytesPerSecond(158), 739.4)) + .add(new TimelineEntry(ofEpochSecond(158), Throughput.bytesPerSecond(187), 711.4)) + .add(new TimelineEntry(ofEpochSecond(159), Throughput.bytesPerSecond(442), 621.7)) + .add(new TimelineEntry(ofEpochSecond(160), Throughput.bytesPerSecond(82), 515.3)) + .add(new TimelineEntry(ofEpochSecond(161), Throughput.bytesPerSecond(951), 533.9)) + .add(new TimelineEntry(ofEpochSecond(162), Throughput.bytesPerSecond(976), 566.6)) + .add(new TimelineEntry(ofEpochSecond(163), Throughput.bytesPerSecond(1371), 571.9)) + .add(new TimelineEntry(ofEpochSecond(164), Throughput.bytesPerSecond(547), 606.7)) + .add(new TimelineEntry(ofEpochSecond(165), Throughput.bytesPerSecond(370), 551.4)) + .add(new TimelineEntry(ofEpochSecond(166), Throughput.bytesPerSecond(247), 533.1)) + .add(new TimelineEntry(ofEpochSecond(167), Throughput.bytesPerSecond(660), 583.3)) + .add(new TimelineEntry(ofEpochSecond(168), Throughput.bytesPerSecond(1222), 686.8)) + .add(new TimelineEntry(ofEpochSecond(169), Throughput.bytesPerSecond(130), 655.6)) + .add(new TimelineEntry(ofEpochSecond(170), Throughput.bytesPerSecond(512), 698.6)) + .add(new TimelineEntry(ofEpochSecond(171), Throughput.bytesPerSecond(873), 690.8)) + .add(new TimelineEntry(ofEpochSecond(172), Throughput.bytesPerSecond(18), 595.)) + .add(new TimelineEntry(ofEpochSecond(173), Throughput.bytesPerSecond(817), 539.6)) + .add(new TimelineEntry(ofEpochSecond(174), Throughput.bytesPerSecond(1090), 593.9)) + .add(new TimelineEntry(ofEpochSecond(175), Throughput.bytesPerSecond(1201), 677.)) + .add(new TimelineEntry(ofEpochSecond(176), Throughput.bytesPerSecond(1046), 756.9)) + .add(new TimelineEntry(ofEpochSecond(177), Throughput.bytesPerSecond(1075), 798.4)) + .add(new TimelineEntry(ofEpochSecond(178), Throughput.bytesPerSecond(679), 744.1)) + .add(new TimelineEntry(ofEpochSecond(179), Throughput.bytesPerSecond(1043), 835.4)) + .add(new TimelineEntry(ofEpochSecond(180), Throughput.bytesPerSecond(1206), 904.8)) + .add(new TimelineEntry(ofEpochSecond(181), Throughput.bytesPerSecond(701), 887.6)) + .add(new TimelineEntry(ofEpochSecond(182), Throughput.bytesPerSecond(849), 970.7)) + .add(new TimelineEntry(ofEpochSecond(183), Throughput.bytesPerSecond(457), 934.7)) + .add(new TimelineEntry(ofEpochSecond(184), Throughput.bytesPerSecond(400), 865.7)) + .add(new TimelineEntry(ofEpochSecond(185), Throughput.bytesPerSecond(1157), 861.3)) + .add(new TimelineEntry(ofEpochSecond(186), Throughput.bytesPerSecond(235), 780.2)) + .add(new TimelineEntry(ofEpochSecond(187), Throughput.bytesPerSecond(525), 725.2)) + .add(new TimelineEntry(ofEpochSecond(188), Throughput.bytesPerSecond(1415), 798.8)) + .add(new TimelineEntry(ofEpochSecond(189), Throughput.bytesPerSecond(796), 774.1)) + .add(new TimelineEntry(ofEpochSecond(190), Throughput.bytesPerSecond(428), 696.3)) + .add(new TimelineEntry(ofEpochSecond(191), Throughput.bytesPerSecond(417), 667.9)) + .add(new TimelineEntry(ofEpochSecond(192), Throughput.bytesPerSecond(436), 626.6)) + .add(new TimelineEntry(ofEpochSecond(193), Throughput.bytesPerSecond(781), 659.)) + .add(new TimelineEntry(ofEpochSecond(194), Throughput.bytesPerSecond(967), 715.7)) + .add(new TimelineEntry(ofEpochSecond(195), Throughput.bytesPerSecond(398), 639.8)) + .add(new TimelineEntry(ofEpochSecond(196), Throughput.bytesPerSecond(501), 666.4)) + .add(new TimelineEntry(ofEpochSecond(197), Throughput.bytesPerSecond(691), 683.)) + .add(new TimelineEntry(ofEpochSecond(198), Throughput.bytesPerSecond(1492), 690.7)) + .add(new TimelineEntry(ofEpochSecond(199), Throughput.bytesPerSecond(1493), 760.4)) + .add(new TimelineEntry(ofEpochSecond(200), Throughput.bytesPerSecond(5), 718.1)) + .add(new TimelineEntry(ofEpochSecond(201), Throughput.bytesPerSecond(679), 744.3)) + .add(new TimelineEntry(ofEpochSecond(202), Throughput.bytesPerSecond(1027), 803.4)) + .add(new TimelineEntry(ofEpochSecond(203), Throughput.bytesPerSecond(170), 742.3)) + .add(new TimelineEntry(ofEpochSecond(204), Throughput.bytesPerSecond(261), 671.7)) + .add(new TimelineEntry(ofEpochSecond(205), Throughput.bytesPerSecond(309), 662.8)) + .add(new TimelineEntry(ofEpochSecond(206), Throughput.bytesPerSecond(1483), 761.)) + .add(new TimelineEntry(ofEpochSecond(207), Throughput.bytesPerSecond(1154), 807.3)) + .add(new TimelineEntry(ofEpochSecond(208), Throughput.bytesPerSecond(857), 743.8)) + .add(new TimelineEntry(ofEpochSecond(209), Throughput.bytesPerSecond(792), 673.7)) + .add(new TimelineEntry(ofEpochSecond(210), Throughput.bytesPerSecond(819), 755.1)) + .add(new TimelineEntry(ofEpochSecond(211), Throughput.bytesPerSecond(763), 763.5)) + .add(new TimelineEntry(ofEpochSecond(212), Throughput.bytesPerSecond(386), 699.4)) + .add(new TimelineEntry(ofEpochSecond(213), Throughput.bytesPerSecond(789), 761.3)) + .add(new TimelineEntry(ofEpochSecond(214), Throughput.bytesPerSecond(1432), 878.4)) + .add(new TimelineEntry(ofEpochSecond(215), Throughput.bytesPerSecond(205), 868.)) + .add(new TimelineEntry(ofEpochSecond(216), Throughput.bytesPerSecond(905), 810.2)) + .add(new TimelineEntry(ofEpochSecond(217), Throughput.bytesPerSecond(1290), 823.8)) + .add(new TimelineEntry(ofEpochSecond(218), Throughput.bytesPerSecond(639), 802.)) + .add(new TimelineEntry(ofEpochSecond(219), Throughput.bytesPerSecond(1246), 847.4)) + .build()); +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java new file mode 100644 index 0000000000..a7840f45c7 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputSinkTest.java @@ -0,0 +1,262 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.storage.ThroughputSink.Record; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Assert; +import org.junit.Test; + +public final class ThroughputSinkTest { + + @Test + public void tee_record() { + AtomicReference r1 = new AtomicReference<>(null); + AtomicReference r2 = new AtomicReference<>(null); + ThroughputSink test = + ThroughputSink.tee( + new AbstractThroughputSink() { + @Override + public void recordThroughput(Record r) { + r1.compareAndSet(null, r); + } + }, + new AbstractThroughputSink() { + @Override + public void recordThroughput(Record r) { + r2.compareAndSet(null, r); + } + }); + + Record expected = Record.of(10, Instant.EPOCH, Instant.ofEpochSecond(1), false); + test.recordThroughput(expected); + + assertThat(r1.get()).isEqualTo(expected); + assertThat(r2.get()).isEqualTo(expected); + } + + @Test + public void tee_decorate() throws Exception { + AtomicReference b1 = new AtomicReference<>(null); + AtomicReference b2 = new AtomicReference<>(null); + AtomicReference b3 = new AtomicReference<>(null); + ThroughputSink test = + ThroughputSink.tee( + new AbstractThroughputSink() { + @Override + public WritableByteChannel decorate(WritableByteChannel wbc) { + return new WritableByteChannel() { + @Override + public int write(ByteBuffer src) throws IOException { + b1.compareAndSet(null, src.duplicate()); + return wbc.write(src); + } + + @Override + public boolean isOpen() { + return wbc.isOpen(); + } + + @Override + public void close() throws IOException { + wbc.close(); + } + }; + } + }, + new AbstractThroughputSink() { + @Override + public WritableByteChannel decorate(WritableByteChannel wbc) { + return new WritableByteChannel() { + @Override + public int write(ByteBuffer src) throws IOException { + ByteBuffer duplicate = src.duplicate(); + duplicate.position(src.limit()); + b2.compareAndSet(null, duplicate); + return wbc.write(src); + } + + @Override + public boolean isOpen() { + return wbc.isOpen(); + } + + @Override + public void close() throws IOException { + wbc.close(); + } + }; + } + }); + + AtomicBoolean callIsOpen = new AtomicBoolean(false); + AtomicBoolean callClose = new AtomicBoolean(false); + WritableByteChannel anon = + new WritableByteChannel() { + @Override + public int write(ByteBuffer src) { + int remaining = src.remaining(); + src.position(src.limit()); + b3.compareAndSet(null, src); + return remaining; + } + + @Override + public boolean isOpen() { + callIsOpen.compareAndSet(false, true); + return true; + } + + @Override + public void close() { + callClose.compareAndSet(false, true); + } + }; + + byte[] bytes = DataGenerator.base64Characters().genBytes(16); + + ByteBuffer expected1 = ByteBuffer.wrap(bytes); + ByteBuffer expected2 = ByteBuffer.wrap(bytes); + expected2.position(16); + + ByteBuffer buf = ByteBuffer.wrap(bytes); + try (WritableByteChannel decorated = test.decorate(anon)) { + if (decorated.isOpen()) { + decorated.write(buf); + } + } + + assertAll( + () -> assertThat(b1.get()).isEqualTo(expected1), + () -> assertThat(b2.get()).isEqualTo(expected2), + () -> assertThat(b3.get()).isSameInstanceAs(buf), + () -> assertThat(b3.get().hasRemaining()).isFalse(), + () -> assertThat(callIsOpen.get()).isTrue(), + () -> assertThat(callClose.get()).isTrue()); + } + + @Test + public void computeThroughput_noError() throws IOException { + // create a clock that will start at Epoch UTC, and will tick in one second increments + TestClock clock = TestClock.tickBy(Instant.EPOCH, Duration.ofSeconds(1)); + AtomicReference actual = new AtomicReference<>(null); + + ThroughputSink.computeThroughput( + clock, + new AbstractThroughputSink() { + @Override + public void recordThroughput(Record r) { + actual.compareAndSet(null, r); + } + }, + 300, + () -> {}); + + Record expected = Record.of(300, Instant.EPOCH, Instant.ofEpochSecond(1), false); + assertThat(actual.get()).isEqualTo(expected); + } + + @Test + public void computeThroughput_ioError() { + // create a clock that will start at Epoch UTC, and will tick in one second increments + TestClock clock = TestClock.tickBy(Instant.EPOCH, Duration.ofSeconds(1)); + AtomicReference actual = new AtomicReference<>(null); + + IOException ioException = + Assert.assertThrows( + IOException.class, + () -> + ThroughputSink.computeThroughput( + clock, + new AbstractThroughputSink() { + @Override + public void recordThroughput(Record r) { + actual.compareAndSet(null, r); + } + }, + 300, + () -> { + throw new IOException("kablamo!"); + })); + + Record expected = Record.of(300, Instant.EPOCH, Instant.ofEpochSecond(1), true); + assertThat(actual.get()).isEqualTo(expected); + + assertThat(ioException).hasMessageThat().isEqualTo("kablamo!"); + } + + @Test + public void windowed() throws IOException { + // create a clock that will start at Epoch UTC, and will tick in one second increments + TestClock clock = TestClock.tickBy(Instant.EPOCH, Duration.ofSeconds(1)); + + AtomicReference b3 = new AtomicReference<>(null); + WritableByteChannel anon = + new WritableByteChannel() { + @Override + public int write(ByteBuffer src) { + int remaining = src.remaining(); + src.position(src.limit()); + b3.compareAndSet(null, src); + return remaining; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() {} + }; + + Duration windowDuration = Duration.ofMinutes(1); + ThroughputMovingWindow window = ThroughputMovingWindow.of(windowDuration); + ThroughputSink sink = ThroughputSink.windowed(window, clock); + + int numBytes = 120; + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(numBytes); + try (WritableByteChannel decorated = sink.decorate(anon)) { + decorated.write(buf); + } + + Throughput avg = window.avg(clock.instant()); + + assertThat(avg).isEqualTo(Throughput.of(numBytes, windowDuration)); + assertThat(avg).isEqualTo(Throughput.bytesPerSecond(2)); + } + + private abstract static class AbstractThroughputSink implements ThroughputSink { + + @Override + public void recordThroughput(Record r) {} + + @Override + public WritableByteChannel decorate(WritableByteChannel wbc) { + return null; + } + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputTest.java new file mode 100644 index 0000000000..e44d4c1dd5 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ThroughputTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.common.truth.Truth.assertThat; +import static java.time.Duration.ofSeconds; + +import org.junit.Test; + +public final class ThroughputTest { + + @Test + public void a() { + assertThat(Throughput.bytesPerSecond(1).toBps()).isEqualTo(1); + } + + @Test + public void b() { + assertThat(Throughput.of(10, ofSeconds(10)).toBps()).isEqualTo(1); + } +}