Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…java into davidtorres-master
  • Loading branch information
pongad committed Dec 9, 2016
2 parents a39a038 + 7c1c98e commit 57ea1f2
Show file tree
Hide file tree
Showing 20 changed files with 4,827 additions and 105 deletions.
56 changes: 56 additions & 0 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,31 @@
<artifactId>grpc-auth</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>google-cloud-core</artifactId>
Expand All @@ -72,6 +97,12 @@
<version>3.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
Expand All @@ -86,7 +117,32 @@
</profile>
</profiles>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.0.2:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicLong;

/**
* Takes measurements and stores them in linear buckets from 0 to totalBuckets - 1, along with
* utilities to calculate percentiles for analysis of results.
*/
public class Distribution {

private final AtomicLong[] bucketCounts;
private long count;
private double mean;
private double sumOfSquaredDeviation;

public Distribution(int totalBuckets) {
Preconditions.checkArgument(totalBuckets > 0);
bucketCounts = new AtomicLong[totalBuckets];
for (int i = 0; i < totalBuckets; ++i) {
bucketCounts[i] = new AtomicLong();
}
}

/**
* Get the bucket that records values up to the given percentile.
*/
public long getNthPercentile(double percentile) {
Preconditions.checkArgument(percentile > 0.0);
Preconditions.checkArgument(percentile <= 100.0);

long[] bucketCounts = getBucketCounts();
long total = 0;
for (long count : bucketCounts) {
total += count;
}

if (total == 0) {
return 0;
}
long count = (long) Math.ceil(total * percentile / 100.0);
for (int i = 0; i < bucketCounts.length; i++) {
count -= bucketCounts[i];
if (count <= 0) {
return i;
}
}
return 0;
}

/**
* Resets (sets to 0) the recorded values.
*/
public synchronized void reset() {
for (AtomicLong element : bucketCounts) {
element.set(0);
}
count = 0;
mean = 0;
sumOfSquaredDeviation = 0;
}

/**
* Numbers of values recorded.
*/
public long getCount() {
return count;
}

/**
* Square deviations of the recorded values.
*/
public double getSumOfSquareDeviations() {
return sumOfSquaredDeviation;
}

/**
* Mean of the recorded values.
*/
public double getMean() {
return mean;
}

/**
* Gets the accumulated count of every bucket of the distribution.
*/
public long[] getBucketCounts() {
long[] counts = new long[bucketCounts.length];
for (int i = 0; i < counts.length; i++) {
counts[i] = bucketCounts[i].longValue();
}
return counts;
}

/**
* Make a copy of the distribution.
*/
public synchronized Distribution copy() {
Distribution distributionCopy = new Distribution(bucketCounts.length);
distributionCopy.count = count;
distributionCopy.mean = mean;
distributionCopy.sumOfSquaredDeviation = sumOfSquaredDeviation;
System.arraycopy(bucketCounts, 0, distributionCopy.bucketCounts, 0, bucketCounts.length);
return distributionCopy;
}

/**
* Record a new value.
*/
public void record(int bucket) {
Preconditions.checkArgument(bucket >= 0);

synchronized (this) {
count++;
double dev = bucket - mean;
mean += dev / count;
sumOfSquaredDeviation += dev * (bucket - mean);
}

if (bucket >= bucketCounts.length) {
// Account for bucket overflow, records everything that is equals or greater of the last
// bucket.
bucketCounts[bucketCounts.length - 1].incrementAndGet();
return;
}

bucketCounts[bucket].incrementAndGet();
}

@Override
public String toString() {
ToStringHelper helper = MoreObjects.toStringHelper(Distribution.class);
helper.add("bucketCounts", bucketCounts);
helper.add("count", count);
helper.add("mean", mean);
helper.add("sumOfSquaredDeviation", sumOfSquaredDeviation);
return helper.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException;
import com.google.cloud.pubsub.Publisher.MaxOutstandingBytesReachedException;
import com.google.cloud.pubsub.Publisher.MaxOutstandingMessagesReachedException;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

/** Provides flow control capability for Pub/Sub client classes. */
class FlowController {
@Nullable private final Semaphore outstandingMessageCount;
@Nullable private final Semaphore outstandingByteCount;
private final boolean failOnLimits;
private final Optional<Integer> maxOutstandingMessages;
private final Optional<Integer> maxOutstandingBytes;

FlowController(
Optional<Integer> maxOutstandingMessages,
Optional<Integer> maxOutstandingBytes,
boolean failOnFlowControlLimits) {
this.maxOutstandingMessages = Preconditions.checkNotNull(maxOutstandingMessages);
this.maxOutstandingBytes = Preconditions.checkNotNull(maxOutstandingBytes);
outstandingMessageCount =
maxOutstandingMessages.isPresent() ? new Semaphore(maxOutstandingMessages.get()) : null;
outstandingByteCount =
maxOutstandingBytes.isPresent() ? new Semaphore(maxOutstandingBytes.get()) : null;
this.failOnLimits = failOnFlowControlLimits;
}

void reserve(int messages, int bytes) throws CloudPubsubFlowControlException {
Preconditions.checkArgument(messages > 0);

if (outstandingMessageCount != null) {
if (!failOnLimits) {
outstandingMessageCount.acquireUninterruptibly(messages);
} else if (!outstandingMessageCount.tryAcquire(messages)) {
throw new MaxOutstandingMessagesReachedException(maxOutstandingMessages.get());
}
}

// Will always allow to send a message even if it is larger than the flow control limit,
// if it doesn't then it will deadlock the thread.
if (outstandingByteCount != null) {
int permitsToDraw = maxOutstandingBytes.get() > bytes ? bytes : maxOutstandingBytes.get();
if (!failOnLimits) {
outstandingByteCount.acquireUninterruptibly(permitsToDraw);
} else if (!outstandingByteCount.tryAcquire(permitsToDraw)) {
throw new MaxOutstandingBytesReachedException(maxOutstandingBytes.get());
}
}
}

void release(int messages, int bytes) {
Preconditions.checkArgument(messages > 0);

if (outstandingMessageCount != null) {
outstandingMessageCount.release(messages);
}
if (outstandingByteCount != null) {
// Need to return at most as much bytes as it can be drawn.
int permitsToReturn = maxOutstandingBytes.get() > bytes ? bytes : maxOutstandingBytes.get();
outstandingByteCount.release(permitsToReturn);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A barrier kind of object that helps to keep track and synchronously wait on pending messages.
*/
class MessagesWaiter {
private int pendingMessages;

MessagesWaiter() {
pendingMessages = 0;
}

public synchronized void incrementPendingMessages(int messages) {
this.pendingMessages += messages;
if (pendingMessages == 0) {
notifyAll();
}
}

public synchronized void waitNoMessages() {
waitNoMessages(new AtomicBoolean());
}

@VisibleForTesting
synchronized void waitNoMessages(AtomicBoolean waitReached) {
boolean interrupted = false;
try {
while (pendingMessages > 0) {
try {
waitReached.set(true);
wait();
} catch (InterruptedException e) {
// Ignored, uninterruptibly.
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

@VisibleForTesting
public int pendingMessages() {
return pendingMessages;
}
}
Loading

0 comments on commit 57ea1f2

Please sign in to comment.