-
Notifications
You must be signed in to change notification settings - Fork 78
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: introduce Throughput and ThroughputSink
_Pre-work_ In some upcoming work we want to be able to keep a running window of throughput performance in order to provide improved throughput. This PR introduces a new utility class to model and compute throughput, and the concept of a ThroughputSink which values can be appended to.
- Loading branch information
1 parent
48d96c5
commit c0e9cd3
Showing
7 changed files
with
1,224 additions
and
0 deletions.
There are no files selected for viewing
94 changes: 94 additions & 0 deletions
94
google-cloud-storage/src/main/java/com/google/cloud/storage/Throughput.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
/* | ||
* Copyright 2023 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.storage; | ||
|
||
import com.google.common.base.MoreObjects; | ||
import java.time.Duration; | ||
import java.util.Objects; | ||
|
||
/** | ||
* Convenience class to encapsulate the concept of a throughput value. | ||
* | ||
* <p>Given a number of bytes and a duration compute the number of bytes per second. | ||
*/ | ||
final class Throughput { | ||
|
||
private static final double NANOS_PER_SECOND = 1_000_000_000d; | ||
private final long numBytes; | ||
private final Duration duration; | ||
|
||
// TODO: is there a efficient way we can limit precision without having to use BigDecimal? | ||
// Realistically, we don't need precision smaller than 1 byte per microsecond, leading to | ||
// 6 digits past the decimal of needed precision. | ||
private final double bytesPerSecond; | ||
|
||
private Throughput(long numBytes, Duration duration) { | ||
this.numBytes = numBytes; | ||
this.duration = duration; | ||
this.bytesPerSecond = numBytes / (duration.toNanos() / NANOS_PER_SECOND); | ||
} | ||
|
||
public long getNumBytes() { | ||
return numBytes; | ||
} | ||
|
||
public Duration getDuration() { | ||
return duration; | ||
} | ||
|
||
public double toBps() { | ||
return bytesPerSecond; | ||
} | ||
|
||
public Throughput plus(Throughput other) { | ||
return new Throughput(this.numBytes + other.numBytes, this.duration.plus(other.duration)); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (!(o instanceof Throughput)) { | ||
return false; | ||
} | ||
Throughput that = (Throughput) o; | ||
return Double.compare(that.bytesPerSecond, bytesPerSecond) == 0; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(bytesPerSecond); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return MoreObjects.toStringHelper(this).add("bytesPerSecond", bytesPerSecond).toString(); | ||
} | ||
|
||
public static Throughput zero() { | ||
return new Throughput(0, Duration.ZERO); | ||
} | ||
|
||
public static Throughput of(long numBytes, Duration duration) { | ||
return new Throughput(numBytes, duration); | ||
} | ||
|
||
public static Throughput bytesPerSecond(long numBytes) { | ||
return new Throughput(numBytes, Duration.ofSeconds(1)); | ||
} | ||
} |
98 changes: 98 additions & 0 deletions
98
google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputMovingWindow.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Copyright 2023 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.storage; | ||
|
||
import com.google.common.base.MoreObjects; | ||
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.Comparator; | ||
import java.util.PriorityQueue; | ||
|
||
/** | ||
* A simple moving window implementation which will keep a {@code window}s worth of Throughput | ||
* values and allow querying for the aggregate avg over that time window. | ||
*/ | ||
final class ThroughputMovingWindow { | ||
|
||
private final Duration window; | ||
|
||
private final PriorityQueue<Entry> values; | ||
|
||
private ThroughputMovingWindow(Duration window) { | ||
this.window = window; | ||
this.values = new PriorityQueue<>(Entry.COMP); | ||
} | ||
|
||
void add(Instant now, Throughput value) { | ||
houseKeeping(now); | ||
values.add(new Entry(now, value)); | ||
} | ||
|
||
Throughput avg(Instant now) { | ||
houseKeeping(now); | ||
return values.stream() | ||
.map(Entry::getValue) | ||
.reduce( | ||
Throughput.zero(), | ||
(tp1, tp2) -> Throughput.of(tp1.getNumBytes() + tp2.getNumBytes(), window)); | ||
} | ||
|
||
private void houseKeeping(Instant now) { | ||
Instant newMin = now.minus(window); | ||
values.removeIf(e -> lteq(e.getAt(), newMin)); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return MoreObjects.toStringHelper(this) | ||
.add("window", window) | ||
.add("values.size()", values.size()) | ||
.toString(); | ||
} | ||
|
||
static ThroughputMovingWindow of(Duration window) { | ||
return new ThroughputMovingWindow(window); | ||
} | ||
|
||
private static boolean lteq(Instant a, Instant b) { | ||
return a.equals(b) || a.isBefore(b); | ||
} | ||
|
||
private static final class Entry { | ||
private static final Comparator<Entry> COMP = Comparator.comparing(e -> e.at); | ||
private final Instant at; | ||
private final Throughput value; | ||
|
||
private Entry(Instant at, Throughput value) { | ||
this.at = at; | ||
this.value = value; | ||
} | ||
|
||
public Instant getAt() { | ||
return at; | ||
} | ||
|
||
public Throughput getValue() { | ||
return value; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return MoreObjects.toStringHelper(this).add("at", at).add("value", value).toString(); | ||
} | ||
} | ||
} |
Oops, something went wrong.