Skip to content

Commit

Permalink
Move StreamingBuffer to TableInfo, remove streamingBuffer from Extern…
Browse files Browse the repository at this point in the history
…alTableInfo
  • Loading branch information
mziccard committed Dec 17, 2015
1 parent e91f47b commit 3c2efe6
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.Streamingbuffer;
import com.google.api.services.bigquery.model.Table;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -81,79 +80,6 @@ public enum Type {
EXTERNAL
}

/**
* Google BigQuery Table's Streaming Buffer information. This class contains information on a
* table's streaming buffer as the estimated size in number of rows/bytes.
*/
public static class StreamingBuffer implements Serializable {

private static final long serialVersionUID = -6713971364725267597L;
private final long estimatedRows;
private final long estimatedBytes;
private final long oldestEntryTime;

StreamingBuffer(long estimatedRows, long estimatedBytes, long oldestEntryTime) {
this.estimatedRows = estimatedRows;
this.estimatedBytes = estimatedBytes;
this.oldestEntryTime = oldestEntryTime;
}

/**
* Returns a lower-bound estimate of the number of rows currently in the streaming buffer.
*/
public long estimatedRows() {
return estimatedRows;
}

/**
* Returns a lower-bound estimate of the number of bytes currently in the streaming buffer.
*/
public long estimatedBytes() {
return estimatedBytes;
}

/**
* Returns the timestamp of the oldest entry in the streaming buffer, in milliseconds since
* epoch.
*/
public long oldestEntryTime() {
return oldestEntryTime;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("estimatedRows", estimatedRows)
.add("estimatedBytes", estimatedBytes)
.add("oldestEntryTime", oldestEntryTime)
.toString();
}

@Override
public int hashCode() {
return Objects.hash(estimatedRows, estimatedBytes, oldestEntryTime);
}

@Override
public boolean equals(Object obj) {
return obj instanceof StreamingBuffer
&& Objects.equals(toPb(), ((StreamingBuffer) obj).toPb());
}

Streamingbuffer toPb() {
return new Streamingbuffer()
.setEstimatedBytes(BigInteger.valueOf(estimatedBytes))
.setEstimatedRows(BigInteger.valueOf(estimatedRows))
.setOldestEntryTime(BigInteger.valueOf(oldestEntryTime));
}

static StreamingBuffer fromPb(Streamingbuffer streamingBufferPb) {
return new StreamingBuffer(streamingBufferPb.getEstimatedRows().longValue(),
streamingBufferPb.getEstimatedBytes().longValue(),
streamingBufferPb.getOldestEntryTime().longValue());
}
}

private final String etag;
private final String id;
private final String selfLink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,24 @@
* reside outside of BigQuery but can be queried as normal BigQuery tables. External tables are
* experimental and might be subject to change or removed.
*
* @see <a href="https://cloud.google.com/bigquery/federated-data-sources">Federated Data
* Sources</a>
* @see <a href="https://cloud.google.com/bigquery/federated-data-sources">Federated Data Sources
* </a>
*/
public class ExternalTableInfo extends BaseTableInfo {

private static final long serialVersionUID = -5893406738246214865L;

private final ExternalDataConfiguration configuration;
private final StreamingBuffer streamingBuffer;

public static final class Builder extends BaseTableInfo.Builder<ExternalTableInfo, Builder> {

private ExternalDataConfiguration configuration;
private StreamingBuffer streamingBuffer;

private Builder() {}

private Builder(ExternalTableInfo tableInfo) {
super(tableInfo);
this.configuration = tableInfo.configuration;
this.streamingBuffer = tableInfo.streamingBuffer;
}

protected Builder(Table tablePb) {
Expand All @@ -55,9 +52,6 @@ protected Builder(Table tablePb) {
this.configuration =
ExternalDataConfiguration.fromPb(tablePb.getExternalDataConfiguration());
}
if (tablePb.getStreamingBuffer() != null) {
this.streamingBuffer = StreamingBuffer.fromPb(tablePb.getStreamingBuffer());
}
}

/**
Expand All @@ -71,11 +65,6 @@ public Builder configuration(ExternalDataConfiguration configuration) {
return self();
}

Builder streamingBuffer(StreamingBuffer streamingBuffer) {
this.streamingBuffer = streamingBuffer;
return self();
}

/**
* Creates a {@code ExternalTableInfo} object.
*/
Expand All @@ -88,28 +77,19 @@ public ExternalTableInfo build() {
private ExternalTableInfo(Builder builder) {
super(builder);
this.configuration = builder.configuration;
this.streamingBuffer = builder.streamingBuffer;
}

/**
* Returns the data format, location and other properties of a table stored outside of BigQuery.
* This property is experimental and might be subject to change or removed.
*
* @see <a href="https://cloud.google.com/bigquery/federated-data-sources">Federated Data
* Sources</a>
* @see <a href="https://cloud.google.com/bigquery/federated-data-sources">Federated Data Sources
* </a>
*/
public ExternalDataConfiguration configuration() {
return configuration;
}

/**
* Returns information on the table's streaming buffer if any exists. Returns {@code null} if no
* streaming buffer exists.
*/
public StreamingBuffer streamingBuffer() {
return streamingBuffer;
}

/**
* Returns a builder for the {@code ExternalTableInfo} object.
*/
Expand All @@ -120,18 +100,13 @@ public Builder toBuilder() {

@Override
ToStringHelper toStringHelper() {
return super.toStringHelper()
.add("configuration", configuration)
.add("streamingBuffer", streamingBuffer);
return super.toStringHelper().add("configuration", configuration);
}

@Override
Table toPb() {
Table tablePb = super.toPb();
tablePb.setExternalDataConfiguration(configuration.toPb());
if (streamingBuffer != null) {
tablePb.setStreamingBuffer(streamingBuffer.toPb());
}
return tablePb;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
* return results if the query completes within a specified timeout. The query results are saved to
* a temporary table that is deleted approximately 24 hours after the query is run. The query is run
* through a BigQuery Job whose identity can be accessed via {@link QueryResponse#jobId()}. If the
* query does not complete within the provided {@link Builder#maxWaitTime(Long)} the response
* query does not complete within the provided {@link Builder#maxWaitTime(Long)}, the response
* returned by {@link BigQuery#query(QueryRequest)} will have {@link QueryResponse#jobComplete()}
* set to {@code false} and {@link QueryResponse#result()} set to {@code null}. To obtain query
* results you can use {@link BigQuery#getQueryResults(JobId, BigQuery.QueryResultsOption...)} until
* {@link QueryResponse#jobComplete()} returns {@code true}.
*
* <p>Example usage of a query request:
* <pre> {@code
* // Substitute "field", "table" and "dataset" with real field, table and dataset identifiers
* QueryRequest request = QueryRequest.builder("SELECT field FROM table")
* .defaultDataset(DatasetId.of("dataset"))
* .maxWaitTime(60000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@

package com.google.gcloud.bigquery;

import com.google.api.services.bigquery.model.Streamingbuffer;
import com.google.api.services.bigquery.model.Table;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;

import java.io.Serializable;
import java.math.BigInteger;
import java.util.Objects;

/**
* A Google BigQuery Table information. A BigQuery table is a standard, two-dimensional table with
* individual records organized in rows, and a data type assigned to each column (also called a
Expand All @@ -34,6 +40,79 @@ public class TableInfo extends BaseTableInfo {
private final String location;
private final StreamingBuffer streamingBuffer;

/**
* Google BigQuery Table's Streaming Buffer information. This class contains information on a
* table's streaming buffer as the estimated size in number of rows/bytes.
*/
public static class StreamingBuffer implements Serializable {

private static final long serialVersionUID = -6713971364725267597L;
private final long estimatedRows;
private final long estimatedBytes;
private final long oldestEntryTime;

StreamingBuffer(long estimatedRows, long estimatedBytes, long oldestEntryTime) {
this.estimatedRows = estimatedRows;
this.estimatedBytes = estimatedBytes;
this.oldestEntryTime = oldestEntryTime;
}

/**
* Returns a lower-bound estimate of the number of rows currently in the streaming buffer.
*/
public long estimatedRows() {
return estimatedRows;
}

/**
* Returns a lower-bound estimate of the number of bytes currently in the streaming buffer.
*/
public long estimatedBytes() {
return estimatedBytes;
}

/**
* Returns the timestamp of the oldest entry in the streaming buffer, in milliseconds since
* epoch.
*/
public long oldestEntryTime() {
return oldestEntryTime;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("estimatedRows", estimatedRows)
.add("estimatedBytes", estimatedBytes)
.add("oldestEntryTime", oldestEntryTime)
.toString();
}

@Override
public int hashCode() {
return Objects.hash(estimatedRows, estimatedBytes, oldestEntryTime);
}

@Override
public boolean equals(Object obj) {
return obj instanceof StreamingBuffer
&& Objects.equals(toPb(), ((StreamingBuffer) obj).toPb());
}

Streamingbuffer toPb() {
return new Streamingbuffer()
.setEstimatedBytes(BigInteger.valueOf(estimatedBytes))
.setEstimatedRows(BigInteger.valueOf(estimatedRows))
.setOldestEntryTime(BigInteger.valueOf(oldestEntryTime));
}

static StreamingBuffer fromPb(Streamingbuffer streamingBufferPb) {
return new StreamingBuffer(streamingBufferPb.getEstimatedRows().longValue(),
streamingBufferPb.getEstimatedBytes().longValue(),
streamingBufferPb.getOldestEntryTime().longValue());
}
}

public static final class Builder extends BaseTableInfo.Builder<TableInfo, Builder> {

private String location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public Tuple<String, Iterable<Dataset>> listDatasets(Map<Option, ?> options)
.execute();
Iterable<DatasetList.Datasets> datasets = datasetsList.getDatasets();
return Tuple.of(datasetsList.getNextPageToken(),
Iterables.transform(datasets != null ? datasets :
ImmutableList.<DatasetList.Datasets>of(),
Iterables.transform(datasets != null ? datasets
: ImmutableList.<DatasetList.Datasets>of(),
new Function<DatasetList.Datasets, Dataset>() {
@Override
public Dataset apply(DatasetList.Datasets datasetPb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void testOption() {
assertEquals("token", option.value());
}

@Test(expected=NullPointerException.class)
@Test(expected = NullPointerException.class)
public void testNullRpcOption() {
new Option(null, "token");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.gcloud.AuthCredentials;
import com.google.gcloud.RetryParams;
import com.google.gcloud.bigquery.TableInfo.StreamingBuffer;

import org.junit.Test;

Expand Down Expand Up @@ -94,8 +95,7 @@ public class SerializationTest {
.description("FieldDescription3")
.build();
private static final Schema TABLE_SCHEMA = Schema.of(FIELD_SCHEMA1, FIELD_SCHEMA2, FIELD_SCHEMA3);
private static final BaseTableInfo.StreamingBuffer STREAMING_BUFFER =
new BaseTableInfo.StreamingBuffer(1L, 2L, 3L);
private static final StreamingBuffer STREAMING_BUFFER = new StreamingBuffer(1L, 2L, 3L);
private static final List<String> SOURCE_URIS = ImmutableList.of("uri1", "uri2");
private static final ExternalDataConfiguration EXTERNAL_DATA_CONFIGURATION =
ExternalDataConfiguration.builder(SOURCE_URIS, TABLE_SCHEMA, CSV_OPTIONS)
Expand Down Expand Up @@ -128,7 +128,6 @@ public class SerializationTest {
.description(DESCRIPTION)
.etag(ETAG)
.id(ID)
.streamingBuffer(STREAMING_BUFFER)
.build();
private static final JobStatistics JOB_STATISTICS = JobStatistics.builder()
.creationTime(1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.junit.Assert.assertTrue;

import com.google.common.collect.ImmutableList;
import com.google.gcloud.bigquery.BaseTableInfo.StreamingBuffer;
import com.google.gcloud.bigquery.TableInfo.StreamingBuffer;

import org.junit.Test;

Expand All @@ -46,7 +46,6 @@ public class TableInfoTest {
private static final Schema TABLE_SCHEMA = Schema.of(FIELD_SCHEMA1, FIELD_SCHEMA2, FIELD_SCHEMA3);
private static final String VIEW_QUERY = "VIEW QUERY";
private static final List<String> SOURCE_URIS = ImmutableList.of("uri1", "uri2");
private static final String SOURCE_FORMAT = "CSV";
private static final Integer MAX_BAD_RECORDS = 42;
private static final Boolean IGNORE_UNKNOWN_VALUES = true;
private static final String COMPRESSION = "GZIP";
Expand Down Expand Up @@ -96,7 +95,6 @@ public class TableInfoTest {
.numBytes(NUM_BYTES)
.numRows(NUM_ROWS)
.selfLink(SELF_LINK)
.streamingBuffer(STREAMING_BUFFER)
.build();
private static final List<UserDefinedFunction> USER_DEFINED_FUNCTIONS =
ImmutableList.of(UserDefinedFunction.inline("Function"), UserDefinedFunction.fromUri("URI"));
Expand Down Expand Up @@ -184,7 +182,6 @@ public void testBuilder() {
assertEquals(NUM_BYTES, EXTERNAL_TABLE_INFO.numBytes());
assertEquals(NUM_ROWS, EXTERNAL_TABLE_INFO.numRows());
assertEquals(SELF_LINK, EXTERNAL_TABLE_INFO.selfLink());
assertEquals(STREAMING_BUFFER, EXTERNAL_TABLE_INFO.streamingBuffer());
assertEquals(BaseTableInfo.Type.EXTERNAL, EXTERNAL_TABLE_INFO.type());
}

Expand Down Expand Up @@ -235,6 +232,5 @@ private void compareExternalTableInfo(ExternalTableInfo expected, ExternalTableI
compareBaseTableInfo(expected, value);
assertEquals(expected, value);
assertEquals(expected.configuration(), value.configuration());
assertEquals(expected.streamingBuffer(), value.streamingBuffer());
}
}

0 comments on commit 3c2efe6

Please sign in to comment.