Skip to content

Commit

Permalink
chore: introduce Throughput and ThroughputSink
Browse files Browse the repository at this point in the history
_Pre-work_

In some upcoming work we want to be able to keep a running window of throughput performance in order to provide improved throughput. This PR introduces a new utility class to model and compute throughput, and the concept of a ThroughputSink which values can be appended to.
  • Loading branch information
BenWhitehead committed Jul 25, 2023
1 parent e0191b5 commit d277957
Show file tree
Hide file tree
Showing 7 changed files with 1,224 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.common.base.MoreObjects;
import java.time.Duration;
import java.util.Objects;

/**
* Convenience class to encapsulate the concept of a throughput value.
*
* <p>Given a number of bytes and a duration compute the number of bytes per second.
*/
final class Throughput {

private static final double NANOS_PER_SECOND = 1_000_000_000d;
private final long numBytes;
private final Duration duration;

// TODO: is there a efficient way we can limit precision without having to use BigDecimal?
// Realistically, we don't need precision smaller than 1 byte per microsecond, leading to
// 6 digits past the decimal of needed precision.
private final double bytesPerSecond;

private Throughput(long numBytes, Duration duration) {
this.numBytes = numBytes;
this.duration = duration;
this.bytesPerSecond = numBytes / (duration.toNanos() / NANOS_PER_SECOND);
}

public long getNumBytes() {
return numBytes;
}

public Duration getDuration() {
return duration;
}

public double toBps() {
return bytesPerSecond;
}

public Throughput plus(Throughput other) {
return new Throughput(this.numBytes + other.numBytes, this.duration.plus(other.duration));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Throughput)) {
return false;
}
Throughput that = (Throughput) o;
return Double.compare(that.bytesPerSecond, bytesPerSecond) == 0;
}

@Override
public int hashCode() {
return Objects.hash(bytesPerSecond);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("bytesPerSecond", bytesPerSecond).toString();
}

public static Throughput zero() {
return new Throughput(0, Duration.ZERO);
}

public static Throughput of(long numBytes, Duration duration) {
return new Throughput(numBytes, duration);
}

public static Throughput bytesPerSecond(long numBytes) {
return new Throughput(numBytes, Duration.ofSeconds(1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.common.base.MoreObjects;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.PriorityQueue;

/**
* A simple moving window implementation which will keep a {@code window}s worth of Throughput
* values and allow querying for the aggregate avg over that time window.
*/
final class ThroughputMovingWindow {

private final Duration window;

private final PriorityQueue<Entry> values;

private ThroughputMovingWindow(Duration window) {
this.window = window;
this.values = new PriorityQueue<>(Entry.COMP);
}

void add(Instant now, Throughput value) {
houseKeeping(now);
values.add(new Entry(now, value));
}

Throughput avg(Instant now) {
houseKeeping(now);
return values.stream()
.map(Entry::getValue)
.reduce(
Throughput.zero(),
(tp1, tp2) -> Throughput.of(tp1.getNumBytes() + tp2.getNumBytes(), window));
}

private void houseKeeping(Instant now) {
Instant newMin = now.minus(window);
values.removeIf(e -> lteq(e.getAt(), newMin));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("window", window)
.add("values.size()", values.size())
.toString();
}

static ThroughputMovingWindow of(Duration window) {
return new ThroughputMovingWindow(window);
}

private static boolean lteq(Instant a, Instant b) {
return a.equals(b) || a.isBefore(b);
}

private static final class Entry {
private static final Comparator<Entry> COMP = Comparator.comparing(e -> e.at);
private final Instant at;
private final Throughput value;

private Entry(Instant at, Throughput value) {
this.at = at;
this.value = value;
}

public Instant getAt() {
return at;
}

public Throughput getValue() {
return value;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("at", at).add("value", value).toString();
}
}
}
Loading

0 comments on commit d277957

Please sign in to comment.