diff --git a/README.md b/README.md
index 2d1c1f01db..88cabeffaa 100644
--- a/README.md
+++ b/README.md
@@ -48,7 +48,7 @@ To add a dependency on Cloud Storage connector using Maven, use the following:
com.google.cloud.bigdataoss
gcs-connector
- 3.0.0
+ 3.0.1
```
diff --git a/coverage/pom.xml b/coverage/pom.xml
index 6f436b709f..934e4f703c 100644
--- a/coverage/pom.xml
+++ b/coverage/pom.xml
@@ -21,11 +21,11 @@
com.google.cloud.bigdataoss
bigdataoss-parent
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
coverage
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
pom
diff --git a/gcs/CHANGES.md b/gcs/CHANGES.md
index 18afd7f948..8dbb5de4d8 100644
--- a/gcs/CHANGES.md
+++ b/gcs/CHANGES.md
@@ -1,8 +1,12 @@
# Release Notes
## Next
+
+## 3.0.1 - 2024-07-10
1. Add readVectored API implementation.
+1. Add gRPC configuration documentation.
+
## 3.0.0 - 2023-12-03
diff --git a/gcs/CONFIGURATION.md b/gcs/CONFIGURATION.md
index 19ebbbbf10..02e674dc31 100644
--- a/gcs/CONFIGURATION.md
+++ b/gcs/CONFIGURATION.md
@@ -457,6 +457,33 @@ Knobs configure the vectoredRead API
Minimum size in bytes of the read range for Cloud Storage request when
opening a new stream to read an object.
+### grpc configuration
+
+gRPC is an optimized way to connect with gcs backend. It offers
+better latency and increased bandwidth. Currently supported only for read/write operations.
+
+* `fs.gs.client.type` (default: `HTTP_API_CLIENT`)
+
+ Valid values:
+
+ * `HTTP_API_CLIENT` uses json api to connect to gcs backend. Uses http
+ over cloudpath.
+
+ * `STORAGE_CLIENT` uses Java-storage client to connect to gcs backend. Uses
+ gRPC.
+
+* `fs.gs.grpc.write.enable` (default: `false`)
+ Is effective only of if `STORAGE_CLIENT` is selected. Enables write to go over
+ grpc.
+
+* `fs.gs.client.upload.type` (default: `CHUNK_UPLOAD`)
+ This is only effective if `STORAGE_CLIENT` is selected.
+
+ Valid values:
+
+ * `CHUNK_UPLOAD` uploads file in chunks, size of chunks are configurable via
+ `fs.gs.outputstream.upload.chunk.size`
+
### Performance cache configuration
* `fs.gs.performance.cache.enable` (default: `false`)
diff --git a/gcs/pom.xml b/gcs/pom.xml
index 024faef895..d1fb9f6c7e 100644
--- a/gcs/pom.xml
+++ b/gcs/pom.xml
@@ -21,7 +21,7 @@
com.google.cloud.bigdataoss
bigdataoss-parent
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
../pom.xml
@@ -31,7 +31,7 @@
gcs-connector
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java
index a8624ee0f1..6c02ca456b 100644
--- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java
+++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java
@@ -20,24 +20,31 @@
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.FILES_CREATED;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.FILES_DELETED;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.EXCEPTION_COUNT;
-import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_RATE_LIMIT_COUNT;
-import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_SIDE_ERROR_COUNT;
-import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_REQUEST_COUNT;
-import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_SERVER_SIDE_ERROR_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_BAD_REQUEST_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_GONE_RESPONSE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_NOT_FOUND_RESPONSE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_PRECONDITION_FAILED_RESPONSE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_RATE_LIMIT_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_REQUESTED_RANGE_NOT_SATISFIABLE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_REQUEST_TIMEOUT_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_SIDE_ERROR_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_UNAUTHORIZED_RESPONSE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_REQUEST_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_BAD_GATEWAY_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_INTERNAL_ERROR_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_SERVICE_UNAVAILABLE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_SIDE_ERROR_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_TIMEOUT_COUNT;
import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_DURATION;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.HttpResponseException;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics;
-import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
-import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus.StatisticsType;
+import com.google.cloud.hadoop.gcsio.StatisticTypeEnum;
import com.google.cloud.hadoop.util.ITraceFactory;
import com.google.cloud.hadoop.util.ITraceOperation;
import com.google.common.base.Stopwatch;
-import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.GoogleLogger;
-import io.grpc.Status;
+import com.google.common.util.concurrent.AtomicDouble;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -73,6 +80,7 @@ public class GhfsGlobalStorageStatistics extends StorageStatistics {
private final Map minimums = new HashMap<>();
private final Map maximums = new HashMap<>();
private final Map means = new HashMap<>();
+ private final Map total = new HashMap<>();
public GhfsGlobalStorageStatistics() {
super(NAME);
@@ -86,10 +94,14 @@ public GhfsGlobalStorageStatistics() {
String symbol = opType.getSymbol();
opsCount.put(symbol, new AtomicLong(0));
- if (opType.getType() == TYPE_DURATION) {
+ if (opType.getType() == StatisticTypeEnum.TYPE_DURATION
+ || opType.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
minimums.put(getMinKey(symbol), null);
maximums.put(getMaxKey(symbol), new AtomicLong(0));
means.put(getMeanKey(symbol), new MeanStatistic());
+ if (opType.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
+ total.put(getTimeKey(symbol), new AtomicDouble(0.0));
+ }
}
}
}
@@ -144,8 +156,9 @@ void incrementCounter(GoogleCloudStorageStatistics op, long count) {
@Override
public void reset() {
- resetMetrics(opsCount);
- resetMetrics(maximums);
+ resetLongMetrics(opsCount);
+ resetLongMetrics(maximums);
+ resetDoubleMetrics(total);
for (String ms : means.keySet()) {
means.get(ms).reset();
@@ -156,12 +169,18 @@ public void reset() {
}
}
- private void resetMetrics(Map metrics) {
+ private void resetLongMetrics(Map metrics) {
for (AtomicLong value : metrics.values()) {
value.set(0);
}
}
+ private void resetDoubleMetrics(Map metrics) {
+ for (AtomicDouble value : metrics.values()) {
+ value.set(0.0);
+ }
+ }
+
void updateStats(GhfsStatistic statistic, long durationMs, Object context) {
checkArgument(
statistic.getType() == TYPE_DURATION,
@@ -178,6 +197,18 @@ private void addMeanStatistic(GhfsStatistic statistic, long totalDurationMs, int
}
}
+ protected void addTotalTimeStatistic(String statistic) {
+ assert (statistic.contains("_duration"));
+ String parentCounterKey = statistic.replace("_duration", "");
+ String parentMeanKey = getMeanKey(parentCounterKey);
+
+ assert (means.containsKey(parentMeanKey) && opsCount.containsKey(parentCounterKey));
+ double meanValue = means.get(parentMeanKey).getValue();
+ long operationValue = opsCount.get(parentCounterKey).get();
+
+ total.get(statistic).set(1.0 * meanValue * operationValue);
+ }
+
void updateStats(
GhfsStatistic statistic,
long minLatency,
@@ -219,148 +250,68 @@ private void updateMinMaxStats(
}
}
- /**
- * Updating the required gcs specific statistics based on httpresponse.
- *
- * @param statusCode
- */
- private void updateGcsIOSpecificStatistics(int statusCode) {
-
- if (statusCode >= 400 && statusCode < 500) {
- incrementGcsClientSideCounter();
-
- if (statusCode == 429) {
- incrementRateLimitingCounter();
- }
- }
+ void incrementGcsExceptionCount() {
+ increment(EXCEPTION_COUNT);
+ }
- if (statusCode >= 500 && statusCode < 600) {
- incrementGcsServerSideCounter();
- }
+ void incrementGcsTotalRequestCount() {
+ increment(GCS_API_REQUEST_COUNT);
}
- private int grpcToHttpStatusCodeMapping(Status grpcStatusCode) {
- // using code.proto as reference
- // https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
- switch (grpcStatusCode.getCode()) {
- case OK:
- return 200;
- case CANCELLED:
- return 499;
- case INVALID_ARGUMENT:
- case FAILED_PRECONDITION:
- case OUT_OF_RANGE:
- return 400;
- case DEADLINE_EXCEEDED:
- return 504;
- case NOT_FOUND:
- return 404;
- case ALREADY_EXISTS:
- case ABORTED:
- return 409;
- case PERMISSION_DENIED:
- return 403;
- case RESOURCE_EXHAUSTED:
- return 429;
- case UNIMPLEMENTED:
- return 501;
- case UNAVAILABLE:
- return 503;
- case UNAUTHENTICATED:
- return 401;
- case UNKNOWN:
- case INTERNAL:
- case DATA_LOSS:
- default:
- return 500;
- }
+ void incrementRateLimitingCounter() {
+ increment(GCS_API_CLIENT_RATE_LIMIT_COUNT);
}
- /**
- * Updating the required gcs specific statistics based on HttpResponseException.
- *
- * @param responseException contains statusCode based on which metrics are updated
- */
- @Subscribe
- private void subscriberOnHttpResponseException(@Nonnull HttpResponseException responseException) {
- updateGcsIOSpecificStatistics(responseException.getStatusCode());
+ void incrementGcsClientSideCounter() {
+ increment(GCS_API_CLIENT_SIDE_ERROR_COUNT);
}
- /**
- * Updating the required gcs specific statistics based on GoogleJsonResponseException.
- *
- * @param responseException contains statusCode based on which metrics are updated
- */
- @Subscribe
- private void subscriberOnGoogleJsonResponseException(
- @Nonnull GoogleJsonResponseException responseException) {
- updateGcsIOSpecificStatistics(responseException.getStatusCode());
+ void incrementGcsServerSideCounter() {
+ increment(GCS_API_SERVER_SIDE_ERROR_COUNT);
}
- /**
- * Updating the required gcs specific statistics based on HttpResponse.
- *
- * @param responseStatus responseStatus status code from HTTP response
- */
- @Subscribe
- private void subscriberOnHttpResponseStatus(@Nonnull Integer responseStatus) {
- updateGcsIOSpecificStatistics(responseStatus);
+ void incrementGcsClientBadRequestCount() {
+ increment(GCS_API_CLIENT_BAD_REQUEST_COUNT);
}
- @Subscribe
- private void subscriberOnGcsRequest(@Nonnull GcsRequestExecutionEvent event) {
- incrementGcsTotalRequestCount();
+ void incrementGcsClientUnauthorizedResponseCount() {
+ increment(GCS_API_CLIENT_UNAUTHORIZED_RESPONSE_COUNT);
}
- @Subscribe
- private void subscriberOnGrpcStatus(@Nonnull Status status) {
- updateGcsIOSpecificStatistics(grpcToHttpStatusCodeMapping(status));
+ void incrementGcsClientNotFoundResponseCount() {
+ increment(GCS_API_CLIENT_NOT_FOUND_RESPONSE_COUNT);
}
- /**
- * Updating the EXCEPTION_COUNT
- *
- * @param exception
- */
- @Subscribe
- private void subscriberOnException(IOException exception) {
- incrementGcsExceptionCount();
+ void incrementGcsClientRequestTimeoutCount() {
+ increment(GCS_API_CLIENT_REQUEST_TIMEOUT_COUNT);
}
- /**
- * Updating the corresponding statistics
- *
- * @param strType
- */
- @Subscribe
- private void subscriberOnStatisticsType(StatisticsType strType) {
- if (strType == StatisticsType.DIRECTORIES_DELETED) {
- incrementDirectoriesDeleted();
- }
+ void incrementGcsClientGoneResponseCount() {
+ increment(GCS_API_CLIENT_GONE_RESPONSE_COUNT);
}
- private void incrementDirectoriesDeleted() {
- increment(GhfsStatistic.DIRECTORIES_DELETED);
+ void incrementGcsClientPreconditionFailedResponseCount() {
+ increment(GCS_API_CLIENT_PRECONDITION_FAILED_RESPONSE_COUNT);
}
- private void incrementGcsExceptionCount() {
- increment(EXCEPTION_COUNT);
+ void incrementGcsClientRequestedRangeNotSatisfiableCount() {
+ increment(GCS_API_CLIENT_REQUESTED_RANGE_NOT_SATISFIABLE_COUNT);
}
- private void incrementGcsTotalRequestCount() {
- increment(GCS_REQUEST_COUNT);
+ void incrementGcsServerInternalErrorCount() {
+ increment(GCS_API_SERVER_INTERNAL_ERROR_COUNT);
}
- private void incrementRateLimitingCounter() {
- increment(GCS_CLIENT_RATE_LIMIT_COUNT);
+ void incrementGcsServerBadGatewayCount() {
+ increment(GCS_API_SERVER_BAD_GATEWAY_COUNT);
}
- private void incrementGcsClientSideCounter() {
- increment(GCS_CLIENT_SIDE_ERROR_COUNT);
+ void incrementGcsServerServiceUnavailableCount() {
+ increment(GCS_API_SERVER_SERVICE_UNAVAILABLE_COUNT);
}
- private void incrementGcsServerSideCounter() {
- increment(GCS_SERVER_SIDE_ERROR_COUNT);
+ void incrementGcsServerTimeoutCount() {
+ increment(GCS_API_SERVER_TIMEOUT_COUNT);
}
void streamReadBytes(int bytesRead) {
@@ -413,6 +364,11 @@ private Iterator getMetricNames() {
metrics.addAll(minimums.keySet());
metrics.addAll(maximums.keySet());
metrics.addAll(means.keySet());
+ for (String statistic : total.keySet()) {
+ addTotalTimeStatistic(statistic);
+ }
+
+ metrics.addAll(total.keySet());
return metrics.iterator();
}
@@ -455,6 +411,10 @@ private long getValue(String key) {
return Math.round(means.get(key).getValue());
}
+ if (total.containsKey(key)) {
+ return total.get(key).longValue();
+ }
+
return 0L;
}
@@ -473,7 +433,8 @@ public boolean isTracked(String key) {
return opsCount.containsKey(key)
|| maximums.containsKey(key)
|| minimums.containsKey(key)
- || means.containsKey(key);
+ || means.containsKey(key)
+ || total.containsKey(key);
}
/**
@@ -503,6 +464,10 @@ private String getMeanKey(String symbol) {
return symbol + "_mean";
}
+ private String getTimeKey(String symbol) {
+ return symbol + "_duration";
+ }
+
/**
* To get the maximum value which is stored with MAXIMUM extension
*
diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInputStreamStatistics.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInputStreamStatistics.java
index 48cc6a7c80..4c181e6f26 100644
--- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInputStreamStatistics.java
+++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInputStreamStatistics.java
@@ -55,6 +55,9 @@ interface GhfsInputStreamStatistics extends AutoCloseable, GhfsStatisticInterfac
*/
void readOperationCompleted(int requested, int actual);
+ /** Return the aggregated bytesBread across multiple read and readVectoredOperation */
+ long getBytesRead();
+
@Override
void close();
}
diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java
index 20a34053f0..9343c82c9f 100644
--- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java
+++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java
@@ -575,6 +575,11 @@ public void readOperationCompleted(int requested, int actual) {
}
}
+ @Override
+ public long getBytesRead() {
+ return bytesRead.get();
+ }
+
/**
* {@code close()} merges the stream statistics into the filesystem's instrumentation instance.
* The filesystem statistics of {@link #filesystemStatistics} updated with the bytes read
@@ -766,6 +771,9 @@ private IOStatisticsStoreBuilder createStoreBuilder() {
} else if (stat.getType() == StatisticTypeEnum.TYPE_DURATION) {
duration(stat);
storeBuilder.withDurationTracking(stat.getSymbol());
+ } else if (stat.getType() == StatisticTypeEnum.TYPE_DURATION_TOTAL) {
+ duration(stat);
+ storeBuilder.withDurationTracking(stat.getSymbol());
}
});
diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsStatistic.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsStatistic.java
index 6d09152155..9fe5fa85ec 100644
--- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsStatistic.java
+++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsStatistic.java
@@ -18,6 +18,7 @@
import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_COUNTER;
import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_DURATION;
+import static com.google.cloud.hadoop.gcsio.StatisticTypeEnum.TYPE_DURATION_TOTAL;
import com.google.cloud.hadoop.gcsio.StatisticTypeEnum;
import com.google.common.collect.ImmutableMap;
@@ -54,11 +55,6 @@ public enum GhfsStatistic {
"Total number of directories created through the object store.",
TYPE_COUNTER),
- DIRECTORIES_DELETED(
- "directories_deleted",
- "Total number of directories deleted through the object store.",
- TYPE_COUNTER),
-
FILES_CREATED(
"files_created", "Total number of files created through the object store.", TYPE_COUNTER),
FILES_DELETED(
@@ -105,9 +101,6 @@ public enum GhfsStatistic {
"Calls of read stream close()",
TYPE_DURATION),
- STREAM_READ_OPERATIONS(
- StreamStatisticNames.STREAM_READ_OPERATIONS, "Calls of read()", TYPE_DURATION),
-
STREAM_READ_VECTORED_OPERATIONS(
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
"Calls of readVectored()",
@@ -125,6 +118,7 @@ public enum GhfsStatistic {
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
"size of fileRanges requested in readVectoredRequest",
TYPE_COUNTER),
+ STREAM_READ_OPERATIONS("stream_read_operations", "Calls of read()", TYPE_DURATION_TOTAL),
STREAM_READ_VECTORED_READ_COMBINED_RANGES(
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
@@ -160,7 +154,7 @@ public enum GhfsStatistic {
TYPE_COUNTER),
STREAM_WRITE_CLOSE_OPERATIONS(
"stream_write_close_operations", "Calls of write stream close()", TYPE_DURATION),
- STREAM_WRITE_OPERATIONS("stream_write_operations", "Calls of write()", TYPE_DURATION),
+ STREAM_WRITE_OPERATIONS("stream_write_operations", "Calls of write()", TYPE_DURATION_TOTAL),
/** The XAttr API statistics */
INVOCATION_XATTR_GET_MAP(
diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageEventSubscriber.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageEventSubscriber.java
new file mode 100644
index 0000000000..55aafcf21e
--- /dev/null
+++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageEventSubscriber.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2024 Google LLC. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.hadoop.fs.gcs;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpResponseException;
+import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
+import com.google.common.eventbus.Subscribe;
+import io.grpc.Status;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+
+/* Stores the subscriber methods corresponding to GoogleCloudStorageEventBus */
+public class GoogleCloudStorageEventSubscriber {
+ private static GhfsGlobalStorageStatistics storageStatistics;
+
+ public GoogleCloudStorageEventSubscriber(GhfsGlobalStorageStatistics storageStatistics) {
+ this.storageStatistics = storageStatistics;
+ }
+
+ /**
+ * Updating the required gcs specific statistics based on HttpResponseException.
+ *
+ * @param responseException contains statusCode based on which metrics are updated
+ */
+ @Subscribe
+ private void subscriberOnHttpResponseException(@Nonnull HttpResponseException responseException) {
+ updateGcsIOSpecificStatistics(responseException.getStatusCode());
+ }
+
+ /**
+ * Updating the required gcs specific statistics based on GoogleJsonResponseException.
+ *
+ * @param responseException contains statusCode based on which metrics are updated
+ */
+ @Subscribe
+ private void subscriberOnGoogleJsonResponseException(
+ @Nonnull GoogleJsonResponseException responseException) {
+ updateGcsIOSpecificStatistics(responseException.getStatusCode());
+ }
+
+ /**
+ * Updating the required gcs specific statistics based on HttpResponse.
+ *
+ * @param responseStatus status code from HTTP response
+ */
+ @Subscribe
+ private void subscriberOnHttpResponseStatus(@Nonnull Integer responseStatus) {
+ updateGcsIOSpecificStatistics(responseStatus);
+ incrementStatusCode(responseStatus);
+ }
+
+ @Subscribe
+ private void subscriberOnGcsRequest(@Nonnull GcsRequestExecutionEvent event) {
+ storageStatistics.incrementGcsTotalRequestCount();
+ }
+
+ @Subscribe
+ private void subscriberOnGrpcStatus(@Nonnull Status status) {
+ updateGcsIOSpecificStatistics(grpcToHttpStatusCodeMapping(status));
+ }
+
+ /**
+ * Updating the EXCEPTION_COUNT
+ *
+ * @param exception
+ */
+ @Subscribe
+ private void subscriberOnException(IOException exception) {
+ storageStatistics.incrementGcsExceptionCount();
+ }
+
+ /**
+ * Updating the required gcs specific statistics based on httpresponse.
+ *
+ * @param statusCode
+ */
+ protected void updateGcsIOSpecificStatistics(int statusCode) {
+
+ if (statusCode >= 400 && statusCode < 500) {
+ storageStatistics.incrementGcsClientSideCounter();
+
+ if (statusCode == 429) {
+ storageStatistics.incrementRateLimitingCounter();
+ }
+ }
+ if (statusCode >= 500 && statusCode < 600) {
+ storageStatistics.incrementGcsServerSideCounter();
+ }
+ }
+
+ private int grpcToHttpStatusCodeMapping(Status grpcStatusCode) {
+ // using code.proto as reference
+ // https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
+ switch (grpcStatusCode.getCode()) {
+ case OK:
+ return 200;
+ case CANCELLED:
+ return 499;
+ case INVALID_ARGUMENT:
+ case FAILED_PRECONDITION:
+ case OUT_OF_RANGE:
+ return 400;
+ case DEADLINE_EXCEEDED:
+ return 504;
+ case NOT_FOUND:
+ return 404;
+ case ALREADY_EXISTS:
+ case ABORTED:
+ return 409;
+ case PERMISSION_DENIED:
+ return 403;
+ case RESOURCE_EXHAUSTED:
+ return 429;
+ case UNIMPLEMENTED:
+ return 501;
+ case UNAVAILABLE:
+ return 503;
+ case UNAUTHENTICATED:
+ return 401;
+ case UNKNOWN:
+ case INTERNAL:
+ case DATA_LOSS:
+ default:
+ return 500;
+ }
+ }
+
+ private void incrementStatusCode(int statusCode) {
+ switch (statusCode) {
+ case 400:
+ storageStatistics.incrementGcsClientBadRequestCount();
+ break;
+ case 401:
+ storageStatistics.incrementGcsClientUnauthorizedResponseCount();
+ break;
+ case 404:
+ storageStatistics.incrementGcsClientNotFoundResponseCount();
+ break;
+ case 408:
+ storageStatistics.incrementGcsClientRequestTimeoutCount();
+ break;
+ case 410:
+ storageStatistics.incrementGcsClientGoneResponseCount();
+ break;
+ case 412:
+ storageStatistics.incrementGcsClientPreconditionFailedResponseCount();
+ break;
+ case 416:
+ storageStatistics.incrementGcsClientRequestedRangeNotSatisfiableCount();
+ break;
+ case 500:
+ storageStatistics.incrementGcsServerInternalErrorCount();
+ break;
+ case 502:
+ storageStatistics.incrementGcsServerBadGatewayCount();
+ break;
+ case 503:
+ storageStatistics.incrementGcsServerServiceUnavailableCount();
+ break;
+ case 504:
+ storageStatistics.incrementGcsServerTimeoutCount();
+ break;
+ }
+ }
+}
diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java
index c19cf1c727..f62f7e5e53 100644
--- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java
+++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java
@@ -180,7 +180,9 @@ public void readVectored(List extends FileRange> ranges, IntFunction {
long startTimeNs = System.nanoTime();
- vectoredIOSupplier.get().readVectored(ranges, allocate, gcsFs, fileInfo, gcsPath);
+ vectoredIOSupplier
+ .get()
+ .readVectored(ranges, allocate, gcsFs, fileInfo, gcsPath, streamStatistics);
statistics.incrementReadOps(1);
vectoredReadStats.updateVectoredReadStreamStats(startTimeNs);
return null;
@@ -222,7 +224,6 @@ public synchronized int read(@Nonnull byte[] buf, int offset, int length) throws
if (numRead > 0) {
// -1 means we actually read 0 bytes, but requested at least one byte.
totalBytesRead += numRead;
- statistics.incrementBytesRead(numRead);
statistics.incrementReadOps(1);
streamStats.updateReadStreamStats(numRead, startTimeNs);
}
@@ -295,6 +296,7 @@ public synchronized void close() throws IOException {
"Error while closing underneath read channel resources for path: %s", gcsPath);
}
} finally {
+ statistics.incrementBytesRead(streamStatistics.getBytesRead());
streamStats.close();
seekStreamStats.close();
vectoredReadStats.close();
diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java
index 3b25b1c96d..6bf6ff6939 100644
--- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java
+++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java
@@ -249,7 +249,8 @@ public GoogleHadoopFileSystem() {
globalStorageStatistics = GhfsGlobalStorageStatistics.DUMMY_INSTANCE;
}
- GoogleCloudStorageEventBus.register(globalStorageStatistics);
+ GoogleCloudStorageEventBus.register(
+ new GoogleCloudStorageEventSubscriber(globalStorageStatistics));
}
/**
@@ -387,8 +388,7 @@ private synchronized void initializeVectoredIO(
new VectoredIOImpl(
GoogleHadoopFileSystemConfiguration.getVectoredReadOptionBuilder(config)
.build(),
- globalStorageStatistics,
- statistics);
+ globalStorageStatistics);
vectoredIOInitialized = true;
return vectoredIO;
} catch (Exception e) {
diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java
index 901350c50e..9d2c97f71e 100644
--- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java
+++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java
@@ -35,12 +35,15 @@
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise;
import com.google.cloud.hadoop.gcsio.PerformanceCachingGoogleCloudStorageOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PipeType;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.UploadType;
import com.google.cloud.hadoop.util.RedactedString;
import com.google.cloud.hadoop.util.RequesterPaysOptions;
import com.google.cloud.hadoop.util.RequesterPaysOptions.RequesterPaysMode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import java.util.Collection;
import java.util.List;
@@ -494,6 +497,70 @@ public class GoogleHadoopFileSystemConfiguration {
new HadoopConfigurationProperty<>(
"fs.gs.client.type", GoogleCloudStorageFileSystemOptions.DEFAULT.getClientType());
+ /** Configuration key to configure client to use for GCS access. */
+ public static final HadoopConfigurationProperty GCS_GRPC_WRITE_ENABLE =
+ new HadoopConfigurationProperty<>(
+ "fs.gs.grpc.write.enable", GoogleCloudStorageOptions.DEFAULT.isGrpcWriteEnabled());
+
+ /**
+ * Configuration key to configure the properties to optimize gcs-write. This config will be
+ * effective only if fs.gs.client.type is set to STORAGE_CLIENT.
+ */
+ public static final HadoopConfigurationProperty GCS_CLIENT_UPLOAD_TYPE =
+ new HadoopConfigurationProperty<>("fs.gs.client.upload.type", UploadType.CHUNK_UPLOAD);
+
+ /**
+ * Configuration key to configure the Path where uploads will be parked on disk. If not set then
+ * uploads will be parked at default location pointed by java-storage client. This will only be
+ * effective if fs.gs.client.upload.type is set to non-default value.
+ */
+ public static final HadoopConfigurationProperty>
+ GCS_WRITE_TEMPORARY_FILES_PATH =
+ new HadoopConfigurationProperty<>("fs.gs.write.temporary.dirs", ImmutableSet.of());
+
+ /**
+ * Configuration key to configure the Buffers for UploadType.PARALLEL_COMPOSITE_UPLOAD. It is in
+ * alignment with configuration of java-storage client
+ * https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy#com_google_cloud_storage_ParallelCompositeUploadBlobWriteSessionConfig_BufferAllocationStrategy_fixedPool_int_int_
+ */
+ public static final HadoopConfigurationProperty GCS_PCU_BUFFER_COUNT =
+ new HadoopConfigurationProperty<>(
+ "fs.gs.write.parallel.composite.upload.buffer.count",
+ AsyncWriteChannelOptions.DEFAULT.getPCUBufferCount());
+
+ /**
+ * Configuration key to configure the buffer capacity for UploadType.PARALLEL_COMPOSITE_UPLOAD. It
+ * is in alignment with configuration of java-storage client
+ * https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy#com_google_cloud_storage_ParallelCompositeUploadBlobWriteSessionConfig_BufferAllocationStrategy_fixedPool_int_int_
+ */
+ public static final HadoopConfigurationProperty GCS_PCU_BUFFER_CAPACITY =
+ new HadoopConfigurationProperty<>(
+ "fs.gs.write.parallel.composite.upload.buffer.capacity",
+ (long) AsyncWriteChannelOptions.DEFAULT.getPCUBufferCapacity());
+
+ /**
+ * Configuration key to clean up strategy of part files created via
+ * UploadType.PARALLEL_COMPOSITE_UPLOAD. It is in alignment with configuration of java-storage
+ * client
+ * https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy
+ */
+ public static final HadoopConfigurationProperty
+ GCS_PCU_PART_FILE_CLEANUP_TYPE =
+ new HadoopConfigurationProperty<>(
+ "fs.gs.write.parallel.composite.upload.part.file.cleanup.type",
+ AsyncWriteChannelOptions.DEFAULT.getPartFileCleanupType());
+
+ /**
+ * Configuration key to set up the naming strategy of part files created via
+ * UploadType.PARALLEL_COMPOSITE_UPLOAD. It is in alignment with configuration of java-storage
+ * client
+ * https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy
+ */
+ public static final HadoopConfigurationProperty GCS_PCU_PART_FILE_NAME_PREFIX =
+ new HadoopConfigurationProperty<>(
+ "fs.gs.write.parallel.composite.upload.part.file.name.prefix",
+ AsyncWriteChannelOptions.DEFAULT.getPartFileNamePrefix());
+
static GoogleCloudStorageFileSystemOptions.Builder getGcsFsOptionsBuilder(Configuration config) {
return GoogleCloudStorageFileSystemOptions.builder()
.setBucketDeleteEnabled(GCE_BUCKET_DELETE_ENABLE.get(config, config::getBoolean))
@@ -519,6 +586,7 @@ static GoogleCloudStorageOptions.Builder getGcsOptionsBuilder(Configuration conf
String projectId = GCS_PROJECT_ID.get(config, config::get);
return GoogleCloudStorageOptions.builder()
.setAppName(getApplicationName(config))
+ .setGrpcWriteEnabled(GCS_GRPC_WRITE_ENABLE.get(config, config::getBoolean))
.setAutoRepairImplicitDirectoriesEnabled(
GCS_REPAIR_IMPLICIT_DIRECTORIES_ENABLE.get(config, config::getBoolean))
.setBatchThreads(GCS_BATCH_THREADS.get(config, config::getInt))
@@ -607,6 +675,13 @@ private static AsyncWriteChannelOptions getWriteChannelOptions(Configuration con
toIntExact(GCS_OUTPUT_STREAM_UPLOAD_CACHE_SIZE.get(config, config::getLongBytes)))
.setUploadChunkSize(
toIntExact(GCS_OUTPUT_STREAM_UPLOAD_CHUNK_SIZE.get(config, config::getLongBytes)))
+ .setUploadType(GCS_CLIENT_UPLOAD_TYPE.get(config, config::getEnum))
+ .setTemporaryPaths(
+ ImmutableSet.copyOf(GCS_WRITE_TEMPORARY_FILES_PATH.getStringCollection(config)))
+ .setPCUBufferCount(GCS_PCU_BUFFER_COUNT.get(config, config::getInt))
+ .setPCUBufferCapacity(toIntExact(GCS_PCU_BUFFER_CAPACITY.get(config, config::getLongBytes)))
+ .setPartFileCleanupType(GCS_PCU_PART_FILE_CLEANUP_TYPE.get(config, config::getEnum))
+ .setPartFileNamePrefix(GCS_PCU_PART_FILE_NAME_PREFIX.get(config, config::get))
.build();
}
diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/VectoredIOImpl.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/VectoredIOImpl.java
index 582af0a034..69cd0a5ad8 100644
--- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/VectoredIOImpl.java
+++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/VectoredIOImpl.java
@@ -44,23 +44,20 @@
import java.util.function.IntFunction;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.VectoredReadUtils;
import org.apache.hadoop.fs.impl.CombinedFileRange;
@VisibleForTesting
public class VectoredIOImpl implements Closeable {
+
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final BlockingQueue taskQueue = new LinkedBlockingQueue();
private final VectoredReadOptions vectoredReadOptions;
private final GhfsGlobalStorageStatistics storageStatistics;
- private final FileSystem.Statistics statistics;
private ExecutorService boundedThreadPool;
public VectoredIOImpl(
- VectoredReadOptions vectoredReadOptions,
- GhfsGlobalStorageStatistics storageStatistics,
- FileSystem.Statistics statistics) {
+ VectoredReadOptions vectoredReadOptions, GhfsGlobalStorageStatistics storageStatistics) {
this.vectoredReadOptions = vectoredReadOptions;
// same fixedThreadPool executor, but provided a way to query task queue
this.boundedThreadPool =
@@ -74,7 +71,6 @@ public VectoredIOImpl(
.setNameFormat("vectoredRead-range-pool-%d")
.setDaemon(true)
.build());
- this.statistics = statistics;
this.storageStatistics = storageStatistics;
}
@@ -101,202 +97,259 @@ public void readVectored(
IntFunction allocate,
GoogleCloudStorageFileSystem gcsFs,
FileInfo fileInfo,
- @Nonnull URI gcsPath)
+ @Nonnull URI gcsPath,
+ GhfsInputStreamStatistics streamStatistics)
throws IOException {
- ReadChannelProvider channelProvider = new ReadChannelProvider(gcsFs, fileInfo, gcsPath);
- readVectored(ranges, allocate, channelProvider);
+ VectoredReadChannel vectoredReadChannel =
+ new VectoredReadChannel(gcsFs, fileInfo, gcsPath, streamStatistics);
+ vectoredReadChannel.readVectored(ranges, allocate);
}
- private void readVectored(
- List extends FileRange> ranges,
- IntFunction allocate,
- ReadChannelProvider channelProvider)
- throws IOException {
- List extends FileRange> sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges);
- for (FileRange range : ranges) {
- // TODO(user): upgrade to use validateAndSortRanges
- validateRangeRequest(range);
- CompletableFuture result = new CompletableFuture<>();
- range.setData(result);
+ class VectoredReadChannel {
+
+ private final GhfsInputStreamStatistics streamStatistics;
+ private final ReadChannelProvider channelProvider;
+
+ public VectoredReadChannel(
+ GoogleCloudStorageFileSystem gcsFs,
+ FileInfo fileInfo,
+ URI gcsPath,
+ GhfsInputStreamStatistics streamStatistics) {
+ this.channelProvider = new ReadChannelProvider(gcsFs, fileInfo, gcsPath);
+ this.streamStatistics = streamStatistics;
}
- if (shouldMergeRanges(ranges)) {
- updateRangeSizeCounters(sortedRanges.size(), sortedRanges.size());
- // case when ranges are not merged
- for (FileRange sortedRange : sortedRanges) {
- long startTimer = System.currentTimeMillis();
- boundedThreadPool.submit(
- () -> {
- logger.atFiner().log("Submitting range %s for execution.", sortedRange);
- readSingleRange(sortedRange, allocate, channelProvider);
- long endTimer = System.currentTimeMillis();
- storageStatistics.updateStats(
- GhfsStatistic.STREAM_READ_VECTORED_READ_RANGE_DURATION,
- endTimer - startTimer,
- channelProvider.gcsPath);
- });
- }
- } else {
- List combinedFileRanges = getCombinedFileRange(sortedRanges);
- updateRangeSizeCounters(sortedRanges.size(), combinedFileRanges.size());
- // case where ranges can be merged
- for (CombinedFileRange combinedFileRange : combinedFileRanges) {
+
+ private void readVectored(List extends FileRange> ranges, IntFunction allocate)
+ throws IOException {
+ List extends FileRange> sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges);
+ for (FileRange range : ranges) {
+ // TODO(user): upgrade to use validateAndSortRanges
+ validateRangeRequest(range);
CompletableFuture result = new CompletableFuture<>();
- combinedFileRange.setData(result);
- long startTimer = System.currentTimeMillis();
- boundedThreadPool.submit(
- () -> {
- logger.atFiner().log("Submitting combinedRange %s for execution.", combinedFileRange);
- readCombinedRange(combinedFileRange, allocate, channelProvider);
- long endTimer = System.currentTimeMillis();
- storageStatistics.updateStats(
- GhfsStatistic.STREAM_READ_VECTORED_READ_RANGE_DURATION,
- endTimer - startTimer,
- channelProvider.gcsPath);
- });
+ range.setData(result);
+ }
+ if (shouldMergeRanges(ranges)) {
+ updateRangeSizeCounters(sortedRanges.size(), sortedRanges.size());
+ // case when ranges are not merged
+ for (FileRange sortedRange : sortedRanges) {
+ long startTimer = System.currentTimeMillis();
+ boundedThreadPool.submit(
+ () -> {
+ logger.atFiner().log("Submitting range %s for execution.", sortedRange);
+ readSingleRange(sortedRange, allocate, channelProvider);
+ long endTimer = System.currentTimeMillis();
+ storageStatistics.updateStats(
+ GhfsStatistic.STREAM_READ_VECTORED_READ_RANGE_DURATION,
+ endTimer - startTimer,
+ channelProvider.gcsPath);
+ });
+ }
+ } else {
+ List combinedFileRanges = getCombinedFileRange(sortedRanges);
+ updateRangeSizeCounters(sortedRanges.size(), combinedFileRanges.size());
+ // case where ranges can be merged
+ for (CombinedFileRange combinedFileRange : combinedFileRanges) {
+ CompletableFuture result = new CompletableFuture<>();
+ combinedFileRange.setData(result);
+ long startTimer = System.currentTimeMillis();
+ boundedThreadPool.submit(
+ () -> {
+ logger.atFiner().log(
+ "Submitting combinedRange %s for execution.", combinedFileRange);
+ readCombinedRange(combinedFileRange, allocate, channelProvider);
+ long endTimer = System.currentTimeMillis();
+ storageStatistics.updateStats(
+ GhfsStatistic.STREAM_READ_VECTORED_READ_RANGE_DURATION,
+ endTimer - startTimer,
+ channelProvider.gcsPath);
+ });
+ }
}
}
- }
- private void updateRangeSizeCounters(int incomingRangeSize, int combinedRangeSize) {
- storageStatistics.incrementCounter(
- GhfsStatistic.STREAM_READ_VECTORED_READ_INCOMING_RANGES, incomingRangeSize);
- storageStatistics.incrementCounter(
- GhfsStatistic.STREAM_READ_VECTORED_READ_COMBINED_RANGES, combinedRangeSize);
- }
+ private void updateRangeSizeCounters(int incomingRangeSize, int combinedRangeSize) {
+ storageStatistics.incrementCounter(
+ GhfsStatistic.STREAM_READ_VECTORED_READ_INCOMING_RANGES, incomingRangeSize);
+ storageStatistics.incrementCounter(
+ GhfsStatistic.STREAM_READ_VECTORED_READ_COMBINED_RANGES, combinedRangeSize);
+ }
- private void updateBytesRead(int readBytes) {
- statistics.incrementBytesRead(readBytes);
- storageStatistics.streamReadBytes(readBytes);
- }
+ private void updateBytesRead(int readBytes) {
+ streamStatistics.bytesRead(readBytes);
+ storageStatistics.streamReadBytes(readBytes);
+ }
- private List getCombinedFileRange(List extends FileRange> sortedRanges) {
- return mergeSortedRanges(
- sortedRanges,
- 1,
- vectoredReadOptions.getMinSeekVectoredReadSize(),
- vectoredReadOptions.getMergeRangeMaxSize());
- }
+ private List getCombinedFileRange(List extends FileRange> sortedRanges) {
+ return mergeSortedRanges(
+ sortedRanges,
+ 1,
+ vectoredReadOptions.getMinSeekVectoredReadSize(),
+ vectoredReadOptions.getMergeRangeMaxSize());
+ }
- /**
- * function for reading combined or merged FileRanges. It reads the range and update the child
- * fileRange's content.
- *
- * @param combinedFileRange merge file range, keeps track of source file ranges which were merged
- * @param allocate Byte buffer allocator
- */
- private void readCombinedRange(
- CombinedFileRange combinedFileRange,
- IntFunction allocate,
- ReadChannelProvider channelProvider) {
- try (SeekableByteChannel channel = channelProvider.getReadChannel()) {
- channel.position(combinedFileRange.getOffset());
- ByteBuffer readContent = allocate.apply(combinedFileRange.getLength());
- int numRead = channel.read(readContent);
+ /**
+ * function for reading combined or merged FileRanges. It reads the range and update the child
+ * fileRange's content.
+ *
+ * @param combinedFileRange merge file range, keeps track of source file ranges which were
+ * merged
+ * @param allocate Byte buffer allocator
+ */
+ private void readCombinedRange(
+ CombinedFileRange combinedFileRange,
+ IntFunction allocate,
+ ReadChannelProvider channelProvider) {
+ try (SeekableByteChannel channel = channelProvider.getReadChannel()) {
+ channel.position(combinedFileRange.getOffset());
+ ByteBuffer readContent = allocate.apply(combinedFileRange.getLength());
+ int numRead = channel.read(readContent);
- // making it ready for reading
- readContent.flip();
- logger.atFiner().log(
- "Read combinedFileRange completed from range: %s, path: %s, readBytes: %d",
- combinedFileRange, channelProvider.gcsPath, numRead);
- if (numRead < 0) {
- throw new EOFException(
- String.format(
- "EOF reached while reading combinedFileRange, range: %s, path: %s, numRead: %d",
- combinedFileRange, channelProvider.gcsPath, numRead));
+ // making it ready for reading
+ readContent.flip();
+ logger.atFiner().log(
+ "Read combinedFileRange completed from range: %s, path: %s, readBytes: %d",
+ combinedFileRange, channelProvider.gcsPath, numRead);
+ if (numRead < 0) {
+ throw new EOFException(
+ String.format(
+ "EOF reached while reading combinedFileRange, range: %s, path: %s, numRead: %d",
+ combinedFileRange, channelProvider.gcsPath, numRead));
+ }
+
+ // populate child ranges
+ long currentPosition = combinedFileRange.getOffset();
+ long totalBytesRead = 0;
+ // Note: child ranges will be sorted as well, given Range merge was called on sortedList
+ for (FileRange child : combinedFileRange.getUnderlying()) {
+ logger.atFiner().log(
+ "Populating childRange: %s from combinedRange:%s", child, combinedFileRange);
+ int discardedBytes = (int) (child.getOffset() - currentPosition);
+ logger.atFiner().log(
+ "Discarding %d bytes at offset: %d from read combinedRange %s while updating childRange: %s",
+ discardedBytes, currentPosition, combinedFileRange, child);
+ totalBytesRead += discardedBytes + child.getLength();
+ currentPosition = child.getOffset() + child.getLength();
+
+ storageStatistics.incrementCounter(
+ GhfsStatistic.STREAM_READ_VECTORED_EXTRA_READ_BYTES, discardedBytes);
+
+ if (numRead >= totalBytesRead) {
+ ByteBuffer childBuffer = sliceTo(readContent, combinedFileRange.getOffset(), child);
+ child.getData().complete(childBuffer);
+ updateBytesRead(child.getLength());
+ } else {
+ throw new EOFException(
+ String.format(
+ "EOF reached before all child ranges can be populated, combinedFileRange: %s, expected length: %s, readBytes: %s, path: %s",
+ combinedFileRange,
+ combinedFileRange.getLength(),
+ numRead,
+ channelProvider.gcsPath));
+ }
+ }
+ combinedFileRange.getData().complete(readContent);
+ } catch (Exception e) {
+ logger.atWarning().withCause(e).log(
+ "Exception while reading combinedFileRange:%s for path: %s",
+ combinedFileRange, channelProvider.gcsPath);
+ combinedFileRange.getData().completeExceptionally(e);
+ // complete exception all the underlying ranges which have not already
+ // finished.
+ completeExceptionally(combinedFileRange, e);
}
+ }
- // populate child ranges
- long currentPosition = combinedFileRange.getOffset();
- long totalBytesRead = 0;
- // Note: child ranges will be sorted as well, given Range merge was called on sortedList
+ private void completeExceptionally(CombinedFileRange combinedFileRange, Throwable e) {
for (FileRange child : combinedFileRange.getUnderlying()) {
- logger.atFiner().log(
- "Populating childRange: %s from combinedRange:%s", child, combinedFileRange);
- int discardedBytes = (int) (child.getOffset() - currentPosition);
- logger.atFiner().log(
- "Discarding %d bytes at offset: %d from read combinedRange %s while updating childRange: %s",
- discardedBytes, currentPosition, combinedFileRange, child);
- totalBytesRead += discardedBytes + child.getLength();
- currentPosition = child.getOffset() + child.getLength();
-
- storageStatistics.incrementCounter(
- GhfsStatistic.STREAM_READ_VECTORED_EXTRA_READ_BYTES, discardedBytes);
+ if (!child.getData().isDone()) {
+ logger.atFiner().withCause(e).log(
+ "Marking child:%s as `completeExceptionally` of combinedRange:%s",
+ child, combinedFileRange);
+ child
+ .getData()
+ .completeExceptionally(
+ new IOException(
+ String.format(
+ "Error while populating childRange: %s from combinedRange: %s",
+ child, combinedFileRange),
+ e));
+ }
+ }
+ }
- if (numRead >= totalBytesRead) {
- ByteBuffer childBuffer = sliceTo(readContent, combinedFileRange.getOffset(), child);
- child.getData().complete(childBuffer);
- updateBytesRead(child.getLength());
- } else {
+ /**
+ * Read data from GCS for this range and populate the buffer.
+ *
+ * @param range range of data to read.
+ * @param allocate lambda function to allocate byteBuffer.
+ */
+ private void readSingleRange(
+ FileRange range, IntFunction allocate, ReadChannelProvider channelProvider) {
+ try (SeekableByteChannel channel = channelProvider.getReadChannel()) {
+ channel.position(range.getOffset());
+ ByteBuffer dst = allocate.apply(range.getLength());
+ int numRead = channel.read(dst.duplicate());
+ if (numRead < range.getLength()) {
throw new EOFException(
String.format(
- "EOF reached before all child ranges can be populated, combinedFileRange: %s, expected length: %s, readBytes: %s, path: %s",
- combinedFileRange,
- combinedFileRange.getLength(),
- numRead,
- channelProvider.gcsPath));
+ "EOF reached before whole range can be read, range: %s, path: %s",
+ range, channelProvider.gcsPath));
}
+ range.getData().complete(dst);
+ updateBytesRead(range.getLength());
+ logger.atFiner().log(
+ "Read single range completed from range: %s, path: %s", range, channelProvider.gcsPath);
+ } catch (Exception e) {
+ logger.atWarning().withCause(e).log(
+ "Exception while reading range:%s for path: %s", range, channelProvider.gcsPath);
+ range.getData().completeExceptionally(e);
}
- combinedFileRange.getData().complete(readContent);
- } catch (Exception e) {
- logger.atWarning().withCause(e).log(
- "Exception while reading combinedFileRange:%s for path: %s",
- combinedFileRange, channelProvider.gcsPath);
- combinedFileRange.getData().completeExceptionally(e);
- // complete exception all the underlying ranges which have not already
- // finished.
- completeExceptionally(combinedFileRange, e);
}
- }
- private void completeExceptionally(CombinedFileRange combinedFileRange, Throwable e) {
- for (FileRange child : combinedFileRange.getUnderlying()) {
- if (!child.getData().isDone()) {
- logger.atFiner().withCause(e).log(
- "Marking child:%s as `completeExceptionally` of combinedRange:%s",
- child, combinedFileRange);
- child
- .getData()
- .completeExceptionally(
- new IOException(
- String.format(
- "Error while populating childRange: %s from combinedRange: %s",
- child, combinedFileRange),
- e));
- }
+ private boolean shouldMergeRanges(List extends FileRange> ranges) {
+ return (isOrderedDisjoint(ranges, 1, vectoredReadOptions.getMinSeekVectoredReadSize()));
}
- }
- /**
- * Read data from GCS for this range and populate the buffer.
- *
- * @param range range of data to read.
- * @param allocate lambda function to allocate byteBuffer.
- */
- private void readSingleRange(
- FileRange range, IntFunction allocate, ReadChannelProvider channelProvider) {
- try (SeekableByteChannel channel = channelProvider.getReadChannel()) {
- channel.position(range.getOffset());
- ByteBuffer dst = allocate.apply(range.getLength());
- int numRead = channel.read(dst.duplicate());
- if (numRead < range.getLength()) {
- throw new EOFException(
- String.format(
- "EOF reached before whole range can be read, range: %s, path: %s",
- range, channelProvider.gcsPath));
+ private class ReadChannelProvider {
+ private final GoogleCloudStorageFileSystem gcsFs;
+ private final FileInfo fileInfo;
+ private final URI gcsPath;
+
+ public ReadChannelProvider(
+ GoogleCloudStorageFileSystem gcsFS, FileInfo fileInfo, URI gcsPath) {
+ this.gcsFs = gcsFS;
+ this.fileInfo = fileInfo;
+ this.gcsPath = gcsPath;
}
- range.getData().complete(dst);
- updateBytesRead(range.getLength());
- logger.atFiner().log(
- "Read single range completed from range: %s, path: %s", range, channelProvider.gcsPath);
- } catch (Exception e) {
- logger.atWarning().withCause(e).log(
- "Exception while reading range:%s for path: %s", range, channelProvider.gcsPath);
- range.getData().completeExceptionally(e);
- }
- }
- private boolean shouldMergeRanges(List extends FileRange> ranges) {
- return (isOrderedDisjoint(ranges, 1, vectoredReadOptions.getMinSeekVectoredReadSize()));
+ public SeekableByteChannel getReadChannel() throws IOException {
+ GoogleCloudStorageReadOptions options =
+ channelReadOptions(gcsFs.getOptions().getCloudStorageOptions().getReadChannelOptions());
+ if (fileInfo != null) {
+ return gcsFs.open(fileInfo, options);
+ }
+ return gcsFs.open(gcsPath, options);
+ }
+
+ /**
+ * Returns Overriden GCS read options. These options will be used while creating channel per
+ * FileRange. By default, channel is optimized to perform multiple read request from same
+ * channel. Given in readVectored, only one read is performed per channel overriding some
+ * configuration to optimize it.
+ *
+ * @param readOptions original read options extracted from GCSFileSystem
+ * @return The modified read options.
+ */
+ private GoogleCloudStorageReadOptions channelReadOptions(
+ GoogleCloudStorageReadOptions readOptions) {
+ GoogleCloudStorageReadOptions.Builder builder = readOptions.toBuilder();
+ // For single range read we don't want Read channel to adjust around on channel boundaries
+ // as
+ // channel is used just for one read request.
+ builder.setFadvise(GoogleCloudStorageReadOptions.Fadvise.SEQUENTIAL);
+ return builder.build();
+ }
+ }
}
/**
@@ -340,43 +393,4 @@ public void close() {
boundedThreadPool = null;
}
}
-
- private class ReadChannelProvider {
- private final GoogleCloudStorageFileSystem gcsFs;
- private final FileInfo fileInfo;
- private final URI gcsPath;
-
- public ReadChannelProvider(GoogleCloudStorageFileSystem gcsFS, FileInfo fileInfo, URI gcsPath) {
- this.gcsFs = gcsFS;
- this.fileInfo = fileInfo;
- this.gcsPath = gcsPath;
- }
-
- public SeekableByteChannel getReadChannel() throws IOException {
- GoogleCloudStorageReadOptions options =
- channelReadOptions(gcsFs.getOptions().getCloudStorageOptions().getReadChannelOptions());
- if (fileInfo != null) {
- return gcsFs.open(fileInfo, options);
- }
- return gcsFs.open(gcsPath, options);
- }
-
- /**
- * Returns Overriden GCS read options. These options will be used while creating channel per
- * FileRange. By default, channel is optimized to perform multiple read request from same
- * channel. Given in readVectored, only one read is performed per channel overriding some
- * configuration to optimize it.
- *
- * @param readOptions original read options extracted from GCSFileSystem
- * @return The modified read options.
- */
- private GoogleCloudStorageReadOptions channelReadOptions(
- GoogleCloudStorageReadOptions readOptions) {
- GoogleCloudStorageReadOptions.Builder builder = readOptions.toBuilder();
- // For single range read we don't want Read channel to adjust around on channel boundaries as
- // channel is used just for one read request.
- builder.setFadvise(GoogleCloudStorageReadOptions.Fadvise.SEQUENTIAL);
- return builder.build();
- }
- }
}
diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageStatisticsTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageStatisticsTest.java
index 7d01b01a0a..f5e02e59b9 100644
--- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageStatisticsTest.java
+++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageStatisticsTest.java
@@ -15,10 +15,21 @@
package com.google.cloud.hadoop.fs.gcs;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.EXCEPTION_COUNT;
-import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_RATE_LIMIT_COUNT;
-import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_SIDE_ERROR_COUNT;
-import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_REQUEST_COUNT;
-import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_SERVER_SIDE_ERROR_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_BAD_REQUEST_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_GONE_RESPONSE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_NOT_FOUND_RESPONSE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_PRECONDITION_FAILED_RESPONSE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_RATE_LIMIT_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_REQUESTED_RANGE_NOT_SATISFIABLE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_REQUEST_TIMEOUT_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_SIDE_ERROR_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_CLIENT_UNAUTHORIZED_RESPONSE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_REQUEST_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_BAD_GATEWAY_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_INTERNAL_ERROR_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_SERVICE_UNAVAILABLE_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_SIDE_ERROR_COUNT;
+import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_API_SERVER_TIMEOUT_COUNT;
import static com.google.common.truth.Truth.assertThat;
import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
@@ -36,17 +47,17 @@
@RunWith(JUnit4.class)
public class GoogleCloudStorageStatisticsTest {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
- private GhfsGlobalStorageStatistics subscriber = new GhfsGlobalStorageStatistics();
+ private GhfsGlobalStorageStatistics storageStatistics = new GhfsGlobalStorageStatistics();
+ protected GoogleCloudStorageEventSubscriber subscriber =
+ new GoogleCloudStorageEventSubscriber(storageStatistics);
@Before
public void setUp() throws Exception {
-
GoogleCloudStorageEventBus.register(subscriber);
}
@After
public void cleanup() throws Exception {
-
GoogleCloudStorageEventBus.unregister(subscriber);
}
@@ -55,7 +66,7 @@ private void verifyStatistics(GhfsGlobalStorageStatistics expectedStats) {
boolean metricsVerified = true;
while (statsIterator.hasNext()) {
LongStatistic stats = statsIterator.next();
- Long value = subscriber.getLong(stats.getName());
+ Long value = storageStatistics.getLong(stats.getName());
if (stats.getValue() != value) {
logger.atWarning().log(
"Metric values not matching. for: %s, expected: %d, got: %d",
@@ -71,7 +82,7 @@ private void verifyStatistics(GhfsGlobalStorageStatistics expectedStats) {
public void gcs_requestCounter() throws Exception {
GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
- verifyCounterStats.incrementCounter(GCS_REQUEST_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_REQUEST_COUNT, 1);
verifyStatistics(verifyCounterStats);
}
@@ -80,11 +91,11 @@ public void gcs_rateLimitCounter() {
// verify for http event i.e. via Apiary
GoogleCloudStorageEventBus.postOnHttpResponseStatus(429);
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
- verifyCounterStats.incrementCounter(GCS_CLIENT_RATE_LIMIT_COUNT, 1);
- verifyCounterStats.incrementCounter(GCS_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_RATE_LIMIT_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
verifyStatistics(verifyCounterStats);
- subscriber.reset();
+ storageStatistics.reset();
// verify for gRPC event i.e. via java-storage
GoogleCloudStorageEventBus.onGrpcStatus(Status.RESOURCE_EXHAUSTED);
@@ -95,13 +106,17 @@ public void gcs_rateLimitCounter() {
public void gcs_clientSideErrorCounter() {
GoogleCloudStorageEventBus.postOnHttpResponseStatus(404);
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
- verifyCounterStats.incrementCounter(GCS_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_NOT_FOUND_RESPONSE_COUNT, 1);
verifyStatistics(verifyCounterStats);
+ }
- subscriber.reset();
-
- // verify for gRPC event i.e. via java-storage
+ @Test
+ public void gcs_grpcCancelledStatusCounter() {
GoogleCloudStorageEventBus.onGrpcStatus(Status.CANCELLED);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
+ // verify for gRPC event i.e. via java-storage
verifyStatistics(verifyCounterStats);
}
@@ -109,21 +124,124 @@ public void gcs_clientSideErrorCounter() {
public void gcs_serverSideErrorCounter() {
GoogleCloudStorageEventBus.postOnHttpResponseStatus(503);
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
- verifyCounterStats.incrementCounter(GCS_SERVER_SIDE_ERROR_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_SIDE_ERROR_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_SERVICE_UNAVAILABLE_COUNT, 1);
verifyStatistics(verifyCounterStats);
+ }
- subscriber.reset();
-
+ @Test
+ public void gcs_grpcInternalStatusCounter() {
// verify for gRPC event i.e. via java-storage
GoogleCloudStorageEventBus.onGrpcStatus(Status.INTERNAL);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_SIDE_ERROR_COUNT, 1);
verifyStatistics(verifyCounterStats);
}
@Test
- public void gcs_ExceptionCounter() {
+ public void gcs_exceptionCounter() {
GoogleCloudStorageEventBus.postOnException();
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(EXCEPTION_COUNT, 1);
verifyStatistics(verifyCounterStats);
}
+
+ @Test
+ public void gcs_clientBadRequestCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(400);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_BAD_REQUEST_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_clientUnauthorizedResponseCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(401);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_UNAUTHORIZED_RESPONSE_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_clientNotFoundResponseCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(404);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_NOT_FOUND_RESPONSE_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_clientRequestTimeoutCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(408);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_REQUEST_TIMEOUT_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_clientGoneResponseCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(410);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_GONE_RESPONSE_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_clientPreconditionFailedResponseCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(412);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_PRECONDITION_FAILED_RESPONSE_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_clientRequestedRangeNotSatisfiableCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(416);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_REQUESTED_RANGE_NOT_SATISFIABLE_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_CLIENT_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_serverInternalErrorCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(500);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_INTERNAL_ERROR_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_serverBadGatewayCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(502);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_BAD_GATEWAY_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_serverServiceUnavailableCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(503);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_SERVICE_UNAVAILABLE_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
+
+ @Test
+ public void gcs_serverTimeoutCount() {
+ GoogleCloudStorageEventBus.postOnHttpResponseStatus(504);
+ GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_TIMEOUT_COUNT, 1);
+ verifyCounterStats.incrementCounter(GCS_API_SERVER_SIDE_ERROR_COUNT, 1);
+ verifyStatistics(verifyCounterStats);
+ }
}
diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStreamIntegrationTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStreamIntegrationTest.java
index ca9b1ad2d6..a939554557 100644
--- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStreamIntegrationTest.java
+++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStreamIntegrationTest.java
@@ -110,6 +110,32 @@ public void read_singleBytes() throws Exception {
assertThat(value).isEqualTo(expected);
}
+ @Test
+ public void read_multiple() throws Exception {
+ URI path = gcsFsIHelper.getUniqueObjectUri(getClass(), "read_singleBytes");
+
+ GoogleHadoopFileSystem ghfs =
+ GoogleHadoopFileSystemIntegrationHelper.createGhfs(
+ path, GoogleHadoopFileSystemIntegrationHelper.getTestConfig());
+
+ String testContent = "test content";
+ gcsFsIHelper.writeTextFile(path, testContent);
+
+ byte[] value = new byte[11];
+ byte[] expected = Arrays.copyOf(testContent.getBytes(StandardCharsets.UTF_8), 11);
+
+ FileSystem.Statistics statistics = new FileSystem.Statistics(ghfs.getScheme());
+ try (GoogleHadoopFSInputStream in = GoogleHadoopFSInputStream.create(ghfs, path, statistics)) {
+ assertThat(in.read(value, 0, 1)).isEqualTo(1);
+ assertThat(statistics.getReadOps()).isEqualTo(1);
+ assertThat(in.read(1, value, 1, 10)).isEqualTo(10);
+ assertThat(statistics.getReadOps()).isEqualTo(2);
+ }
+
+ assertThat(statistics.getBytesRead()).isEqualTo(11);
+ assertThat(value).isEqualTo(expected);
+ }
+
@Test
public void testMergedRangeRequest() throws Exception {
URI path = gcsFsIHelper.getUniqueObjectUri(getClass(), "read_mergedRange");
@@ -131,17 +157,24 @@ public void testMergedRangeRequest() throws Exception {
GoogleHadoopFSInputStream in = GoogleHadoopFSInputStream.create(ghfs, path, statistics);
List fileRanges = new ArrayList<>();
+ int totalBytesRead = 0;
// below two ranges will be merged
fileRanges.add(FileRange.createFileRange(0, 5));
+ totalBytesRead += 5;
fileRanges.add(FileRange.createFileRange(6, 2)); // read till 8
+ totalBytesRead += 2;
fileRanges.add(FileRange.createFileRange(11, 4)); // read till 15
+ totalBytesRead += 4;
fileRanges.add(FileRange.createFileRange(20, 15));
+ totalBytesRead += 15;
try (GoogleHadoopFSInputStream ignore = in) {
in.readVectored(fileRanges, ByteBuffer::allocate);
validateVectoredReadResult(fileRanges, path);
}
+
+ assertThat(statistics.getBytesRead()).isEqualTo(totalBytesRead);
}
@Test
diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java
index 342a6dd441..9012b5715f 100644
--- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java
+++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java
@@ -32,10 +32,13 @@
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions.MetricsSink;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise;
import com.google.cloud.hadoop.gcsio.PerformanceCachingGoogleCloudStorageOptions;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PipeType;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.UploadType;
import com.google.cloud.hadoop.util.RequesterPaysOptions.RequesterPaysMode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -78,6 +81,7 @@ public class GoogleHadoopFileSystemConfigurationTest {
put("fs.gs.grpc.trafficdirector.enable", true);
put("fs.gs.grpc.write.buffered.requests", 20);
put("fs.gs.grpc.write.message.timeout", 3_000L);
+ put("fs.gs.grpc.write.enable", false);
put("fs.gs.hierarchical.namespace.folders.enable", false);
put("fs.gs.grpc.write.timeout", 600_000L);
put("fs.gs.http.connect-timeout", 5_000L);
@@ -116,6 +120,14 @@ public class GoogleHadoopFileSystemConfigurationTest {
put("fs.gs.tracelog.enable", false);
put("fs.gs.operation.tracelog.enable", false);
put("fs.gs.working.dir", "/");
+ put("fs.gs.client.upload.type", UploadType.CHUNK_UPLOAD);
+ put("fs.gs.write.temporary.dirs", ImmutableSet.of());
+ put("fs.gs.write.parallel.composite.upload.buffer.count", 1);
+ put("fs.gs.write.parallel.composite.upload.buffer.capacity", 32 * 1024 * 1024L);
+ put(
+ "fs.gs.write.parallel.composite.upload.part.file.cleanup.type",
+ PartFileCleanupType.ALWAYS);
+ put("fs.gs.write.parallel.composite.upload.part.file.name.prefix", "");
}
};
diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemIntegrationTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemIntegrationTest.java
index 9cd3904c37..c4c38d714c 100644
--- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemIntegrationTest.java
+++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemIntegrationTest.java
@@ -20,7 +20,6 @@
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.ACTION_HTTP_GET_REQUEST;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.ACTION_HTTP_PATCH_REQUEST;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.ACTION_HTTP_PUT_REQUEST;
-import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.DIRECTORIES_DELETED;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.FILES_CREATED;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.INVOCATION_COPY_FROM_LOCAL_FILE;
import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.INVOCATION_CREATE;
@@ -551,23 +550,6 @@ public void delete_IOstatistics() throws IOException {
(GhfsGlobalStorageStatistics) stats, INVOCATION_DELETE.getSymbol(), 1);
}
- @Test
- public void statistics_check_directories_deleted() throws IOException {
-
- GoogleHadoopFileSystem myGhfs = createInMemoryGoogleHadoopFileSystem();
-
- StorageStatistics GlobalStorageStats = TestUtils.getStorageStatistics();
- Path testRoot = new Path("/directory1/");
- myGhfs.mkdirs(testRoot);
- FSDataOutputStream fout = myGhfs.create(new Path("/directory1/file1"));
- fout.writeBytes("Test Content");
- fout.close();
-
- assertThat(myGhfs.delete(testRoot, /* recursive= */ true)).isTrue();
- TestUtils.verifyCounter(
- (GhfsGlobalStorageStatistics) GlobalStorageStats, DIRECTORIES_DELETED, 1);
- }
-
@Test
public void statistics_check_get_list_status_result_size() throws IOException {
diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemTest.java
index 2fdf12068b..f2c4af5b85 100644
--- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemTest.java
+++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemTest.java
@@ -16,6 +16,8 @@
package com.google.cloud.hadoop.fs.gcs;
+import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_READ_OPERATIONS;
+import static com.google.cloud.hadoop.fs.gcs.GhfsStatistic.STREAM_WRITE_OPERATIONS;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_CLIENT_TYPE;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemTestHelper.createInMemoryGoogleHadoopFileSystem;
import static com.google.cloud.hadoop.gcsio.testing.InMemoryGoogleCloudStorage.getInMemoryGoogleCloudStorageOptions;
@@ -255,6 +257,18 @@ public void testGetDefaultPortIndicatesPortsAreNotUsed() throws Exception {
assertThat(ghfs.getDefaultPort()).isEqualTo(-1);
}
+ @Test
+ public void testTotalTimeStatistics() throws IOException {
+ GhfsGlobalStorageStatistics stats = new GhfsGlobalStorageStatistics();
+ stats.updateStats(STREAM_READ_OPERATIONS, 10, 100, 200, 10, new Object());
+ stats.addTotalTimeStatistic(STREAM_READ_OPERATIONS.getSymbol() + "_duration");
+ assertThat(stats.getLong(STREAM_READ_OPERATIONS.getSymbol() + "_duration")).isEqualTo(200);
+
+ stats.updateStats(STREAM_WRITE_OPERATIONS, 10, 100, 200, 10, new Object());
+ stats.addTotalTimeStatistic(STREAM_WRITE_OPERATIONS.getSymbol() + "_duration");
+ assertThat(stats.getLong(STREAM_WRITE_OPERATIONS.getSymbol() + "_duration")).isEqualTo(200);
+ }
+
// -----------------------------------------------------------------
// Inherited tests that we suppress because their behavior differs
// from the base class.
diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/VectoredIOImplTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/VectoredIOImplTest.java
index b7f46e702d..dadf9a7506 100644
--- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/VectoredIOImplTest.java
+++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/VectoredIOImplTest.java
@@ -74,6 +74,7 @@ public class VectoredIOImplTest {
// stores the path of default object
private Path path;
private GhfsGlobalStorageStatistics ghfsStorageStatistics;
+ private GhfsInputStreamStatistics streamStatistics;
private FileSystem.Statistics statistics;
@Before
@@ -92,7 +93,8 @@ public void before() throws Exception {
this.gcsFs = spy(ghfs.getGcsFs());
this.statistics = new FileSystem.Statistics(ghfs.getScheme());
this.ghfsStorageStatistics = new GhfsGlobalStorageStatistics();
- this.vectoredIO = new VectoredIOImpl(vectoredReadOptions, ghfsStorageStatistics, statistics);
+ this.vectoredIO = new VectoredIOImpl(vectoredReadOptions, ghfsStorageStatistics);
+ this.streamStatistics = ghfs.getInstrumentation().newInputStreamStatistics(statistics);
}
@After
@@ -101,6 +103,7 @@ public void cleanup() {
vectoredIO.close();
}
ghfsStorageStatistics.reset();
+ streamStatistics.close();
}
@Test
@@ -112,14 +115,18 @@ public void testInvalidRangeRequest() throws IOException {
fileRanges.add(FileRange.createFileRange(/* offset */ 11, /* length */ -50));
assertThrows(
IllegalArgumentException.class,
- () -> vectoredIO.readVectored(fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath()));
+ () ->
+ vectoredIO.readVectored(
+ fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath(), streamStatistics));
fileRanges.clear();
// invalid offset
fileRanges.add(FileRange.createFileRange(/* offset */ -1, /* length */ 50));
assertThrows(
EOFException.class,
- () -> vectoredIO.readVectored(fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath()));
+ () ->
+ vectoredIO.readVectored(
+ fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath(), streamStatistics));
}
@Test
@@ -131,7 +138,8 @@ public void testDisjointRangeReads() throws Exception {
fileRanges.add(
FileRange.createFileRange(
/* offset */ vectoredReadOptions.getMinSeekVectoredReadSize() + 10, rangeLength));
- vectoredIO.readVectored(fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath());
+ vectoredIO.readVectored(
+ fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath(), streamStatistics);
verifyRangeContent(fileRanges);
// callCount is equal to disjointRangeRequests
verifyGcsFsOpenCalls(/* callCount */ 2);
@@ -142,7 +150,7 @@ public void testDisjointRangeReads() throws Exception {
.isEqualTo(0);
assertThat(ghfsStorageStatistics.getLong(GhfsStatistic.STREAM_READ_BYTES.getSymbol()))
.isEqualTo(rangeLength * 2);
- assertThat(statistics.getBytesRead()).isEqualTo(rangeLength * 2);
+ assertThat(streamStatistics.getBytesRead()).isEqualTo(rangeLength * 2);
assertThat(
ghfsStorageStatistics.getLong(
@@ -165,7 +173,8 @@ public void testMergedRangeReads() throws Exception {
int discardedBytes = vectoredReadOptions.getMinSeekVectoredReadSize() - 1;
offset += discardedBytes;
fileRanges.add(FileRange.createFileRange(offset, rangeLength));
- vectoredIO.readVectored(fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath());
+ vectoredIO.readVectored(
+ fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath(), streamStatistics);
verifyRangeContent(fileRanges);
// Ranges are merged, data is read from single channel
verifyGcsFsOpenCalls(/* callCount */ 1);
@@ -176,7 +185,7 @@ public void testMergedRangeReads() throws Exception {
assertThat(ghfsStorageStatistics.getLong(GhfsStatistic.STREAM_READ_BYTES.getSymbol()))
.isEqualTo(rangeLength * 2);
- assertThat(statistics.getBytesRead()).isEqualTo(rangeLength * 2);
+ assertThat(streamStatistics.getBytesRead()).isEqualTo(rangeLength * 2);
assertThat(
ghfsStorageStatistics.getLong(
@@ -208,7 +217,8 @@ public void error_disjoint_range() throws Exception {
when(mockedGcsFs.open(fileInfoArgumentCaptor.capture(), readOptionsArgumentCaptor.capture()))
.thenReturn(new MockedReadChannel(), new MockedReadChannel());
- vectoredIO.readVectored(fileRanges, allocate, mockedGcsFs, fileInfo, fileInfo.getPath());
+ vectoredIO.readVectored(
+ fileRanges, allocate, mockedGcsFs, fileInfo, fileInfo.getPath(), streamStatistics);
verifyRangeException(fileRanges);
verify(mockedGcsFs, times(2)).open((FileInfo) any(), any());
@@ -236,7 +246,8 @@ public void error_merged_range() throws Exception {
when(mockedGcsFs.open(fileInfoArgumentCaptor.capture(), readOptionsArgumentCaptor.capture()))
.thenReturn(new MockedReadChannel());
- vectoredIO.readVectored(fileRanges, allocate, mockedGcsFs, fileInfo, fileInfo.getPath());
+ vectoredIO.readVectored(
+ fileRanges, allocate, mockedGcsFs, fileInfo, fileInfo.getPath(), streamStatistics);
verifyRangeException(fileRanges);
@@ -256,7 +267,8 @@ public void overlappingRangeTest() {
assertThrows(
IllegalArgumentException.class,
() ->
- vectoredIO.readVectored(fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath()));
+ vectoredIO.readVectored(
+ fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath(), streamStatistics));
assertThat((e.getMessage())).contains("overlapping");
}
@@ -294,10 +306,9 @@ public void range_merge_denied() throws Exception {
// verify mocks ( and also avoid flakiness of test)
VectoredIOImpl vectoredIO =
new VectoredIOImpl(
- vectoredReadOptions.toBuilder().setReadThreads(1).build(),
- ghfsStorageStatistics,
- statistics);
- vectoredIO.readVectored(fileRanges, allocate, mockedGcsFs, fileInfo, fileInfo.getPath());
+ vectoredReadOptions.toBuilder().setReadThreads(1).build(), ghfsStorageStatistics);
+ vectoredIO.readVectored(
+ fileRanges, allocate, mockedGcsFs, fileInfo, fileInfo.getPath(), streamStatistics);
verifyRangeException(fileRanges);
// open is called only as per combinedRange and not as per request FileRange
@@ -358,7 +369,8 @@ public void rangeOverFlowMergedRange() throws Exception {
FileRange overFlowRange = FileRange.createFileRange(offset, rangeLength);
fileRanges.add(overFlowRange);
- vectoredIO.readVectored(fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath());
+ vectoredIO.readVectored(
+ fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath(), streamStatistics);
verifyRangeContent(Arrays.asList(validRange));
verifyRangeException(Arrays.asList(overFlowRange));
assertThat(
@@ -368,7 +380,7 @@ public void rangeOverFlowMergedRange() throws Exception {
assertThat(ghfsStorageStatistics.getLong(GhfsStatistic.STREAM_READ_BYTES.getSymbol()))
.isEqualTo(validRange.getLength());
- assertThat(statistics.getBytesRead()).isEqualTo(validRange.getLength());
+ assertThat(streamStatistics.getBytesRead()).isEqualTo(validRange.getLength());
assertThat(
ghfsStorageStatistics.getLong(
@@ -388,13 +400,14 @@ public void rangeOverFlowSingleRange() throws Exception {
int offset = (int) fileInfo.getSize() - 1;
FileRange overFlowRange = FileRange.createFileRange(offset, rangeLength);
fileRanges.add(overFlowRange);
- vectoredIO.readVectored(fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath());
+ vectoredIO.readVectored(
+ fileRanges, allocate, gcsFs, fileInfo, fileInfo.getPath(), streamStatistics);
verifyRangeException(fileRanges);
assertThat(ghfsStorageStatistics.getLong(GhfsStatistic.STREAM_READ_BYTES.getSymbol()))
.isEqualTo(0);
- assertThat(statistics.getBytesRead()).isEqualTo(0);
+ assertThat(streamStatistics.getBytesRead()).isEqualTo(0);
assertThat(
ghfsStorageStatistics.getLong(
GhfsStatistic.STREAM_READ_VECTORED_READ_INCOMING_RANGES.getSymbol()))
diff --git a/gcsio/pom.xml b/gcsio/pom.xml
index 88bf94f81f..fbb7923620 100644
--- a/gcsio/pom.xml
+++ b/gcsio/pom.xml
@@ -21,7 +21,7 @@
com.google.cloud.bigdataoss
bigdataoss-parent
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
../pom.xml
@@ -31,7 +31,7 @@
gcsio
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/DeleteFolderOperation.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/DeleteFolderOperation.java
index 8e348542a0..a8215f4d66 100644
--- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/DeleteFolderOperation.java
+++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/DeleteFolderOperation.java
@@ -123,9 +123,8 @@ public FolderInfo getElementFromBlockingQueue() throws InterruptedException {
}
/** Adding to batch executor's queue */
- public void addToToBatchExecutorQueue(
- Callable callable, final FolderInfo folder, final int attempt) {
- batchExecutor.queue(callable, getDeletionCallback(folder, allExceptions, attempt));
+ public void addToToBatchExecutorQueue(Callable callable, FutureCallback callback) {
+ batchExecutor.queue(callable, callback);
}
/** Computes the number of children for each folder resource */
@@ -148,22 +147,13 @@ public void computeChildrenForFolderResource() {
}
}
- /**
- * Helper function to add folderResource to blocking queue
- *
- * @param folderResource
- */
- private void addFolderResourceInBlockingQueue(FolderInfo folderResource) {
- folderDeleteBlockingQueue.add(folderResource);
- }
-
/**
* Helper function to add the parent of successfully deleted folder resource into the blocking
* queue
*
* @param folderResource of the folder that is now deleted
*/
- private void successfullDeletionOfFolderResource(FolderInfo folderResource) {
+ protected synchronized void successfullDeletionOfFolderResource(FolderInfo folderResource) {
// remove the folderResource from list of map
countOfChildren.remove(folderResource.getFolderName());
@@ -183,7 +173,7 @@ private void successfullDeletionOfFolderResource(FolderInfo folderResource) {
}
/** Helper function to delete a single folder resource */
- private void queueSingleFolderDelete(@Nonnull final FolderInfo folder, final int attempt) {
+ protected void queueSingleFolderDelete(@Nonnull final FolderInfo folder, final int attempt) {
final String bucketName = folder.getBucket();
final String folderName = folder.getFolderName();
checkArgument(
@@ -194,7 +184,17 @@ private void queueSingleFolderDelete(@Nonnull final FolderInfo folder, final int
String.format("No folder path for folder resource %s", folderName));
addToToBatchExecutorQueue(
- new DeleteFolderRequestCallable(folder, storageControlClient), folder, attempt);
+ new DeleteFolderRequestCallable(folder, storageControlClient),
+ getDeletionCallback(folder, allExceptions, attempt));
+ }
+
+ /**
+ * Helper function to add folderResource to blocking queue
+ *
+ * @param folderResource
+ */
+ private void addFolderResourceInBlockingQueue(FolderInfo folderResource) {
+ folderDeleteBlockingQueue.add(folderResource);
}
/** Helper to create a callback for a particular deletion request for folder. */
diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java
index 97ad3671ec..9833f752c6 100644
--- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java
+++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java
@@ -27,14 +27,25 @@
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auto.value.AutoBuilder;
import com.google.cloud.hadoop.util.AccessBoundary;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType;
import com.google.cloud.hadoop.util.ErrorTypeExtractor;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.cloud.hadoop.util.GrpcErrorTypeExtractor;
+import com.google.cloud.storage.BlobWriteSessionConfig;
import com.google.cloud.storage.BlobWriteSessionConfigs;
+import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy;
+import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier;
+import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
+import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.ClientInterceptor;
@@ -43,6 +54,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -58,6 +70,8 @@
*/
@VisibleForTesting
public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
+
+ private static final String USER_AGENT = "user-agent";
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final GoogleCloudStorageOptions storageOptions;
@@ -84,7 +98,8 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
@Nullable HttpTransport httpTransport,
@Nullable HttpRequestInitializer httpRequestInitializer,
@Nullable ImmutableList gRPCInterceptors,
- @Nullable Function, String> downscopedAccessTokenFn)
+ @Nullable Function, String> downscopedAccessTokenFn,
+ @Nullable ExecutorService pCUExecutorService)
throws IOException {
super(
GoogleCloudStorageImpl.builder()
@@ -97,13 +112,18 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
this.storageOptions = options;
this.storage =
clientLibraryStorage == null
- ? createStorage(credentials, options, gRPCInterceptors, downscopedAccessTokenFn)
+ ? createStorage(
+ credentials, options, gRPCInterceptors, downscopedAccessTokenFn, pCUExecutorService)
: clientLibraryStorage;
}
@Override
public WritableByteChannel create(StorageResourceId resourceId, CreateObjectOptions options)
throws IOException {
+ if (!storageOptions.isGrpcWriteEnabled()) {
+ return super.create(resourceId, options);
+ }
+
logger.atFiner().log("create(%s)", resourceId);
checkArgument(
resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);
@@ -207,10 +227,13 @@ private static Storage createStorage(
Credentials credentials,
GoogleCloudStorageOptions storageOptions,
List interceptors,
- Function, String> downscopedAccessTokenFn) {
+ Function, String> downscopedAccessTokenFn,
+ ExecutorService pCUExecutorService)
+ throws IOException {
+ final ImmutableMap headers = getUpdatedHeadersWithUserAgent(storageOptions);
return StorageOptions.grpc()
.setAttemptDirectPath(storageOptions.isDirectPathPreferred())
- .setHeaderProvider(() -> storageOptions.getHttpRequestHeaders())
+ .setHeaderProvider(() -> headers)
.setGrpcInterceptorProvider(
() -> {
List list = new ArrayList<>();
@@ -238,8 +261,7 @@ private static Storage createStorage(
.setCredentials(
credentials != null ? credentials : getNoCredentials(downscopedAccessTokenFn))
.setBlobWriteSessionConfig(
- BlobWriteSessionConfigs.getDefault()
- .withChunkSize(storageOptions.getWriteChannelOptions().getUploadChunkSize()))
+ getSessionConfig(storageOptions.getWriteChannelOptions(), pCUExecutorService))
.build()
.getService();
}
@@ -255,6 +277,90 @@ private static Credentials getNoCredentials(
return GoogleCredentials.create(new AccessToken("", null));
}
+ private static ImmutableMap getUpdatedHeadersWithUserAgent(
+ GoogleCloudStorageOptions storageOptions) {
+ ImmutableMap httpRequestHeaders =
+ MoreObjects.firstNonNull(storageOptions.getHttpRequestHeaders(), ImmutableMap.of());
+ String appName = storageOptions.getAppName();
+ if (!httpRequestHeaders.containsKey(USER_AGENT) && !Strings.isNullOrEmpty(appName)) {
+ logger.atFiner().log("Setting useragent %s", appName);
+ return ImmutableMap.builder()
+ .putAll(httpRequestHeaders)
+ .put(USER_AGENT, appName)
+ .build();
+ }
+
+ return httpRequestHeaders;
+ }
+
+ private static BlobWriteSessionConfig getSessionConfig(
+ AsyncWriteChannelOptions writeOptions, ExecutorService pCUExecutorService)
+ throws IOException {
+ logger.atFiner().log("Upload strategy in use: %s", writeOptions.getUploadType());
+ switch (writeOptions.getUploadType()) {
+ case CHUNK_UPLOAD:
+ return BlobWriteSessionConfigs.getDefault()
+ .withChunkSize(writeOptions.getUploadChunkSize());
+ case WRITE_TO_DISK_THEN_UPLOAD:
+ if (writeOptions.getTemporaryPaths() == null
+ || writeOptions.getTemporaryPaths().isEmpty()) {
+ return BlobWriteSessionConfigs.bufferToTempDirThenUpload();
+ }
+ return BlobWriteSessionConfigs.bufferToDiskThenUpload(
+ writeOptions.getTemporaryPaths().stream()
+ .map(x -> Paths.get(x))
+ .collect(ImmutableSet.toImmutableSet()));
+ case JOURNALING:
+ if (writeOptions.getTemporaryPaths() == null
+ || writeOptions.getTemporaryPaths().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Upload using `Journaling` requires the property:fs.gs.write.temporary.dirs to be set.");
+ }
+ return BlobWriteSessionConfigs.journaling(
+ writeOptions.getTemporaryPaths().stream()
+ .map(x -> Paths.get(x))
+ .collect(ImmutableSet.toImmutableSet()));
+ case PARALLEL_COMPOSITE_UPLOAD:
+ return BlobWriteSessionConfigs.parallelCompositeUpload()
+ .withBufferAllocationStrategy(
+ BufferAllocationStrategy.fixedPool(
+ writeOptions.getPCUBufferCount(), writeOptions.getPCUBufferCapacity()))
+ .withPartCleanupStrategy(getPartCleanupStrategy(writeOptions.getPartFileCleanupType()))
+ .withExecutorSupplier(getPCUExecutorSupplier(pCUExecutorService))
+ .withPartNamingStrategy(getPartNamingStrategy(writeOptions.getPartFileNamePrefix()));
+ default:
+ throw new IllegalArgumentException(
+ String.format("Upload type:%s is not supported.", writeOptions.getUploadType()));
+ }
+ }
+
+ private static PartCleanupStrategy getPartCleanupStrategy(PartFileCleanupType cleanupType) {
+ switch (cleanupType) {
+ case NEVER:
+ return PartCleanupStrategy.never();
+ case ON_SUCCESS:
+ return PartCleanupStrategy.onlyOnSuccess();
+ case ALWAYS:
+ return PartCleanupStrategy.always();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Cleanup type:%s is not handled.", cleanupType));
+ }
+ }
+
+ private static PartNamingStrategy getPartNamingStrategy(String partFilePrefix) {
+ if (Strings.isNullOrEmpty(partFilePrefix)) {
+ return PartNamingStrategy.useObjectNameAsPrefix();
+ }
+ return PartNamingStrategy.prefix(partFilePrefix);
+ }
+
+ private static ExecutorSupplier getPCUExecutorSupplier(ExecutorService pCUExecutorService) {
+ return pCUExecutorService == null
+ ? ExecutorSupplier.cachedPool()
+ : ExecutorSupplier.useExecutor(pCUExecutorService);
+ }
+
public static Builder builder() {
return new AutoBuilder_GoogleCloudStorageClientImpl_Builder();
}
@@ -281,6 +387,9 @@ public abstract Builder setGRPCInterceptors(
@VisibleForTesting
public abstract Builder setClientLibraryStorage(@Nullable Storage clientLibraryStorage);
+ @VisibleForTesting
+ public abstract Builder setPCUExecutorService(@Nullable ExecutorService pCUExecutorService);
+
public abstract GoogleCloudStorageClientImpl build() throws IOException;
}
}
diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemImpl.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemImpl.java
index 2464b7b3da..02f66db41c 100644
--- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemImpl.java
+++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystemImpl.java
@@ -374,7 +374,6 @@ public void delete(URI path, boolean recursive) throws IOException {
GoogleCloudStorageEventBus.postOnException();
throw new DirectoryNotEmptyException("Cannot delete a non-empty directory.");
}
- GoogleCloudStorageEventBus.postOnStatisticsType();
} else {
itemsToDelete = new ArrayList<>();
}
diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageOptions.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageOptions.java
index f0d54ec8e1..a170dc0c6d 100644
--- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageOptions.java
+++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageOptions.java
@@ -73,7 +73,8 @@ public static Builder builder() {
.setOperationTraceLogEnabled(false)
.setTrafficDirectorEnabled(true)
.setWriteChannelOptions(AsyncWriteChannelOptions.DEFAULT)
- .setHnBucketRenameEnabled(false);
+ .setHnBucketRenameEnabled(false)
+ .setGrpcWriteEnabled(false);
}
public abstract Builder toBuilder();
@@ -92,6 +93,8 @@ public static Builder builder() {
public abstract String getStorageServicePath();
+ public abstract boolean isGrpcWriteEnabled();
+
@Nullable
public abstract String getProjectId();
@@ -239,6 +242,8 @@ public abstract Builder setGrpcMessageTimeoutCheckInterval(
public abstract Builder setOperationTraceLogEnabled(Boolean enable);
+ public abstract Builder setGrpcWriteEnabled(boolean grpcWriteEnabled);
+
abstract GoogleCloudStorageOptions autoBuild();
public GoogleCloudStorageOptions build() {
diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageStatistics.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageStatistics.java
index 48d74d9fb1..9e234bd7bb 100644
--- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageStatistics.java
+++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageStatistics.java
@@ -23,27 +23,79 @@
import com.google.common.collect.Maps;
import java.util.EnumSet;
-/** Statistics which are collected in GCS Connector */
+/** Statistics which are collected in GCS Connector. */
public enum GoogleCloudStorageStatistics {
-
- /** GCS connector specific statistics */
- GCS_REQUEST_COUNT(
- "gcs_total_request_count", "Counts the total number of gcs requests made", TYPE_COUNTER),
-
EXCEPTION_COUNT("exception_count", "Counts the number of exceptions encountered", TYPE_COUNTER),
- GCS_CLIENT_SIDE_ERROR_COUNT(
- "gcs_client_side_error_count",
+ /** Status Code Counters for JSON Path */
+ GCS_API_REQUEST_COUNT(
+ "gcs_api_total_request_count", "Counts the total number of gcs requests made", TYPE_COUNTER),
+
+ GCS_API_CLIENT_SIDE_ERROR_COUNT(
+ "gcs_api_client_side_error_count",
"Counts the occurrence of client side error status code",
TYPE_COUNTER),
- GCS_SERVER_SIDE_ERROR_COUNT(
- "gcs_server_side_error_count",
+ GCS_API_SERVER_SIDE_ERROR_COUNT(
+ "gcs_api_server_side_error_count",
"Counts the occurrence of server side error status code",
TYPE_COUNTER),
- GCS_CLIENT_RATE_LIMIT_COUNT(
- "gcs_client_rate_limit_error_count", "Counts the occurence of 429 status code", TYPE_COUNTER);
+ GCS_API_CLIENT_RATE_LIMIT_COUNT(
+ "gcs_api_client_rate_limit_error_count", "Counts the occurence of rate limit", TYPE_COUNTER),
+
+ GCS_API_CLIENT_BAD_REQUEST_COUNT(
+ "gcs_api_client_bad_request_count", "Counts the occurence of 400 status code", TYPE_COUNTER),
+
+ GCS_API_CLIENT_UNAUTHORIZED_RESPONSE_COUNT(
+ "gcs_api_client_unauthorized_response_count",
+ "Counts the occurence of 401 status code",
+ TYPE_COUNTER),
+
+ GCS_API_CLIENT_NOT_FOUND_RESPONSE_COUNT(
+ "gcs_api_client_non_found_response_count",
+ "Counts the occurence of 404 status code",
+ TYPE_COUNTER),
+
+ GCS_API_CLIENT_REQUEST_TIMEOUT_COUNT(
+ "gcs_api_client_request_timeout_count",
+ "Counts the occurence of 408 status code",
+ TYPE_COUNTER),
+
+ GCS_API_CLIENT_GONE_RESPONSE_COUNT(
+ "gcs_api_client_gone_response_count",
+ "Counts the occurence of 410 status code",
+ TYPE_COUNTER),
+
+ GCS_API_CLIENT_PRECONDITION_FAILED_RESPONSE_COUNT(
+ "gcs_api_client_precondition_failed_response_count",
+ "Counts the occurence of 412 status code",
+ TYPE_COUNTER),
+
+ GCS_API_CLIENT_REQUESTED_RANGE_NOT_SATISFIABLE_COUNT(
+ "gcs_api_client_requested_range_not_statisfiable_count",
+ "Counts the occurence of 416 status code",
+ TYPE_COUNTER),
+
+ GCS_API_SERVER_INTERNAL_ERROR_COUNT(
+ "gcs_api_server_internal_error_count",
+ "Counts the occurrence of server side 500 error status code",
+ TYPE_COUNTER),
+
+ GCS_API_SERVER_BAD_GATEWAY_COUNT(
+ "gcs_api_server_bad_gateway_count",
+ "Counts the occurrence of server side 502 error status code",
+ TYPE_COUNTER),
+
+ GCS_API_SERVER_SERVICE_UNAVAILABLE_COUNT(
+ "gcs_api_server_unavailable_count",
+ "Counts the occurrence of server side 503 error status code",
+ TYPE_COUNTER),
+
+ GCS_API_SERVER_TIMEOUT_COUNT(
+ "gcs_api_server_timeout_count",
+ "Counts the occurrence of server side 504 error status code",
+ TYPE_COUNTER);
public static final ImmutableSet VALUES =
ImmutableSet.copyOf(EnumSet.allOf(GoogleCloudStorageStatistics.class));
diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/StatisticTypeEnum.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/StatisticTypeEnum.java
index e5a9ca1f3c..2eed3a4faf 100644
--- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/StatisticTypeEnum.java
+++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/StatisticTypeEnum.java
@@ -26,5 +26,10 @@ public enum StatisticTypeEnum {
TYPE_DURATION,
/** Gauge. */
- TYPE_GAUGE
+ TYPE_GAUGE,
+
+ /* Duration. Stores everything stored by TYPE_DURATION and total time taken.
+ * This is to avoid storing and computing total duration of an operation repeatedly. Instread this can be done at the time of querying metric by multipying mean and count
+ * */
+ TYPE_DURATION_TOTAL
}
diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/DeleteFolderOperationTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/DeleteFolderOperationTest.java
new file mode 100644
index 0000000000..dee9bc0677
--- /dev/null
+++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/DeleteFolderOperationTest.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2024 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.hadoop.gcsio;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.storage.control.v2.StorageControlClient;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import org.junit.Test;
+
+public class DeleteFolderOperationTest {
+
+ private static final String BUCKET_NAME = "foo-bucket";
+
+ @Test
+ public void checkDeletionOrderForHnBucketBalancedFolders() throws InterruptedException {
+ String folderString = "test-folder-start/";
+ List foldersToDelete = new LinkedList<>();
+
+ addFolders(foldersToDelete, folderString);
+ CustomDeleteFolderOperationTest deleteFolderOperation =
+ new CustomDeleteFolderOperationTest(
+ foldersToDelete, GoogleCloudStorageOptions.DEFAULT, null);
+
+ List orderOfDeletion = deleteFolderOperation.getOrderOfDeletion();
+ deleteFolderOperation.performDeleteOperation();
+ assertThat(orderOfDeletion.size()).isEqualTo(foldersToDelete.size());
+
+ // Map to store the index at which a folder was deleted
+ HashMap deletionOrder = new HashMap<>();
+ for (int i = 0; i < orderOfDeletion.size(); i++) {
+ deletionOrder.put(orderOfDeletion.get(i).getFolderName(), i);
+ }
+
+ for (int i = 0; i < orderOfDeletion.size(); i++) {
+ FolderInfo curFolder = orderOfDeletion.get(i);
+ String curFolderName = curFolder.getFolderName();
+ String parentFolderName = curFolder.getParentFolderName();
+
+ if (!Strings.isNullOrEmpty(parentFolderName)) {
+ assertThat(deletionOrder.get(parentFolderName) > deletionOrder.get(curFolderName)).isTrue();
+ }
+ }
+ }
+
+ @Test
+ public void checkDeletionOrderForHnBucketSkewedFolders() throws InterruptedException {
+ String folderString = "test-folder-start/";
+ List foldersToDelete = new LinkedList<>();
+
+ for (int i = 0; i < 10; i++) {
+ foldersToDelete.add(
+ new FolderInfo(FolderInfo.createFolderInfoObject(BUCKET_NAME, folderString)));
+ folderString += ("test-folder-" + i + "/");
+ }
+
+ CustomDeleteFolderOperationTest deleteFolderOperation =
+ new CustomDeleteFolderOperationTest(
+ foldersToDelete, GoogleCloudStorageOptions.DEFAULT, null);
+
+ deleteFolderOperation.performDeleteOperation();
+ List orderOfDeletion = deleteFolderOperation.getOrderOfDeletion();
+ assertThat(orderOfDeletion.size()).isEqualTo(foldersToDelete.size());
+ for (int i = 1; i < orderOfDeletion.size(); i++) {
+ FolderInfo prev = orderOfDeletion.get(i - 1);
+ FolderInfo cur = orderOfDeletion.get(i);
+ assertThat(prev.getParentFolderName()).isEqualTo(cur.getFolderName());
+ }
+ }
+
+ private void addFolders(List foldersToDelete, String curFolderName) {
+ Random r = new Random();
+ Queue q = new ArrayDeque<>();
+ q.add(curFolderName);
+
+ while (!q.isEmpty()) {
+ String top = q.poll();
+ foldersToDelete.add(new FolderInfo(FolderInfo.createFolderInfoObject(BUCKET_NAME, top)));
+ if (foldersToDelete.size() > 2000) return;
+
+ for (int i = 0; i < 3; i++) {
+ long nextFolderName = r.nextInt(100000);
+ q.add(top + nextFolderName + "/");
+ }
+ }
+ }
+
+ /** Custom DeleteFolderOperation class to store order of folder deletion */
+ private class CustomDeleteFolderOperationTest extends DeleteFolderOperation {
+
+ /* Stores the order of deletion of folder resources*/
+ private List orderOfDeletion;
+
+ CustomDeleteFolderOperationTest(
+ List folders,
+ GoogleCloudStorageOptions storageOptions,
+ StorageControlClient storageControlClient) {
+ super(folders, storageOptions, storageControlClient);
+ this.orderOfDeletion = new ArrayList<>(folders.size());
+ }
+
+ public List getOrderOfDeletion() {
+ return orderOfDeletion;
+ }
+
+ public void queueSingleFolderDelete(final FolderInfo folder, final int attempt) {
+ addToToBatchExecutorQueue(() -> null, getDeletionCallback(folder));
+ }
+
+ private synchronized void addToOrderOfDeletion(FolderInfo folderDeleted) {
+ orderOfDeletion.add(folderDeleted);
+ }
+
+ protected FutureCallback getDeletionCallback(final FolderInfo resourceId) {
+ return new FutureCallback() {
+ @Override
+ public synchronized void onSuccess(Void result) {
+ addToOrderOfDeletion(resourceId);
+ successfullDeletionOfFolderResource(resourceId);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // do nothing
+ }
+ };
+ }
+ }
+}
diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java
new file mode 100644
index 0000000000..85c646a780
--- /dev/null
+++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java
@@ -0,0 +1,482 @@
+/*
+ * Copyright 2023 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.hadoop.gcsio.integration;
+
+import static com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper.assertObjectContent;
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.auth.Credentials;
+import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageClientImpl;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageItemInfo;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
+import com.google.cloud.hadoop.gcsio.ListObjectOptions;
+import com.google.cloud.hadoop.gcsio.StorageResourceId;
+import com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper.TestBucketHelper;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.UploadType;
+import com.google.cloud.storage.StorageException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Tests that are very specific to Java-storage client AND will nto make sense for Apiary client.
+ * Any generic test which is agnostic of client-type should reside in GoogleCloudStorageImplTest
+ */
+public class GoogleCloudStorageClientImplIntegrationTest {
+
+ private static final TestBucketHelper BUCKET_HELPER =
+ new TestBucketHelper("dataproc-gcs-client-impl");
+ private static final String TEST_BUCKET = BUCKET_HELPER.getUniqueBucketPrefix();
+ private static final String TEMP_DIR_PATH = Files.createTempDir().getAbsolutePath();
+
+ // Do cleanup the path after every test.
+ private static final String GCS_WRITE_TMP_DIR =
+ String.format("%s/%s", TEMP_DIR_PATH, "gcs-write-dir");
+ private static final String GCS_WRITE_TMP_DIR_1 =
+ String.format("%s/%s", TEMP_DIR_PATH, "gcs-write-dir-1");
+
+ private static final int ONE_MiB = 1024 * 1024;
+
+ private static GoogleCloudStorage helperGcs;
+
+ private static final int partFileCount = 2;
+ private static final int bufferCapacity = partFileCount * ONE_MiB;
+
+ private static final AsyncWriteChannelOptions pcuDefaultOptions =
+ AsyncWriteChannelOptions.builder()
+ .setUploadType(UploadType.PARALLEL_COMPOSITE_UPLOAD)
+ .setPartFileCleanupType(PartFileCleanupType.ALWAYS)
+ .setPCUBufferCount(partFileCount)
+ .setPCUBufferCapacity(bufferCapacity)
+ .build();
+
+ private static ImmutableSet tempDirs =
+ ImmutableSet.of(GCS_WRITE_TMP_DIR_1, GCS_WRITE_TMP_DIR);
+ private static ImmutableSet tempDirsPath =
+ tempDirs.stream().map(x -> Paths.get(x)).collect(ImmutableSet.toImmutableSet());
+
+ @Rule public TestName name = new TestName();
+
+ private GoogleCloudStorage gcs;
+
+ @BeforeClass
+ public static void before() throws IOException {
+ helperGcs = GoogleCloudStorageTestHelper.createGcsClientImpl();
+ helperGcs.createBucket(TEST_BUCKET);
+ }
+
+ @AfterClass
+ public static void after() throws IOException {
+ try {
+ BUCKET_HELPER.cleanup(helperGcs);
+ } finally {
+ helperGcs.close();
+ }
+ }
+
+ @Before
+ public void setUp() {
+ System.setProperty("java.io.tmpdir", GCS_WRITE_TMP_DIR);
+ }
+
+ @After
+ public void cleanUp() {
+ // cleanup any leaked files
+ ImmutableSet tempDirs = ImmutableSet.of(GCS_WRITE_TMP_DIR_1, GCS_WRITE_TMP_DIR);
+ Iterator iterator = tempDirs.stream().iterator();
+ while (iterator.hasNext()) {
+ String filePath = iterator.next();
+ File directory = new File(filePath);
+ if (directory.listFiles() != null) {
+ for (File file : new File(filePath).listFiles()) {
+ file.delete();
+ }
+ }
+ }
+
+ // close cloudStorage to free up resources
+ if (gcs != null) {
+ gcs.close();
+ }
+ }
+
+ @Test
+ public void writeToDiskDisabled() throws IOException {
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ AsyncWriteChannelOptions.builder().setUploadType(UploadType.CHUNK_UPLOAD).build())
+ .build();
+
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+
+ // validate that there were no temporaryFiles created files
+ writeAndVerifyTemporaryFiles(resourceId, /* expectedPartFileCountAfterCleanup */ 0);
+ }
+
+ @Test
+ public void writeToDefaultPathThenUploadEnabled() throws IOException {
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ AsyncWriteChannelOptions.builder()
+ .setUploadType(UploadType.WRITE_TO_DISK_THEN_UPLOAD)
+ .build())
+ .build();
+
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+
+ writeAndVerifyTemporaryFiles(resourceId, /* expectedPartFileCountAfterCleanup */ 1);
+ }
+
+ @Test
+ public void writeToPathThenUploadEnabled() throws IOException {
+
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ AsyncWriteChannelOptions.builder()
+ .setUploadType(UploadType.WRITE_TO_DISK_THEN_UPLOAD)
+ .setTemporaryPaths(tempDirs)
+ .build())
+ .build();
+
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+
+ writeAndVerifyTemporaryFiles(resourceId, /* expectedPartFileCountAfterCleanup */ 1);
+ }
+
+ @Test
+ public void uploadViaJournalingThrowsIfTempDirNotProvided() {
+
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ AsyncWriteChannelOptions.builder().setUploadType(UploadType.JOURNALING).build())
+ .build();
+
+ assertThrows(IllegalArgumentException.class, () -> getGCSImpl(storageOptions));
+ }
+
+ @Test
+ public void uploadViaJournaling() throws IOException {
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ AsyncWriteChannelOptions.builder()
+ .setTemporaryPaths(tempDirs)
+ .setUploadType(UploadType.JOURNALING)
+ .build())
+ .build();
+
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+
+ writeAndVerifyTemporaryFiles(resourceId, 1);
+ }
+
+ @Test
+ public void uploadViaPCUVerifyPartFileCleanup() throws IOException, InterruptedException {
+ String partFilePrefix = name.getMethodName();
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ pcuDefaultOptions.toBuilder().setPartFileNamePrefix(partFilePrefix).build())
+ .build();
+
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+
+ writeAndVerifyPartFiles(
+ bufferCapacity, resourceId, /* expectedPartFileCountAfterCleanup */ 0, partFilePrefix);
+ }
+
+ @Test
+ public void uploadViaPCUVerifyPartFileNotCleanedUp() throws IOException {
+ String partFilePrefix = name.getMethodName();
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ pcuDefaultOptions.toBuilder()
+ .setPartFileNamePrefix(partFilePrefix)
+ .setPartFileCleanupType(PartFileCleanupType.NEVER)
+ .build())
+ .build();
+
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+ // part file not cleaned up because PartFileCleanupType.NEVER is used.
+ writeAndVerifyPartFiles(bufferCapacity, resourceId, partFileCount, partFilePrefix);
+ }
+
+ @Test
+ public void uploadViaPCUComposeFileMissingFailure() throws IOException {
+ String partFilePrefix = name.getMethodName();
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ pcuDefaultOptions.toBuilder().setPartFileNamePrefix(partFilePrefix).build())
+ .build();
+
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+
+ byte[] bytesToWrite = new byte[partFileCount * bufferCapacity];
+ GoogleCloudStorageTestHelper.fillBytes(bytesToWrite);
+ WritableByteChannel writeChannel = gcs.create(resourceId);
+ writeChannel.write(ByteBuffer.wrap(bytesToWrite));
+
+ List partFiles = getPartFiles(partFilePrefix);
+
+ // delete one part file
+ StorageResourceId partFileToBeDeleted = partFiles.get(0).getResourceId();
+ gcs.deleteObjects(ImmutableList.of(partFileToBeDeleted));
+
+ Exception e = assertThrows(IOException.class, writeChannel::close);
+ verifyPartFileNotFound(e, partFileToBeDeleted.getObjectName());
+
+ partFiles = getPartFiles(partFilePrefix);
+ // part files were cleaned up even after failure
+ assertThat(partFiles.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void uploadViaPCUComposeMissingObjectVersion() throws IOException {
+ String partFilePrefix = name.getMethodName();
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ pcuDefaultOptions.toBuilder()
+ .setPartFileNamePrefix(partFilePrefix)
+ .setPartFileCleanupType(PartFileCleanupType.ON_SUCCESS)
+ .build())
+ .build();
+
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+
+ byte[] bytesToWrite = new byte[partFileCount * bufferCapacity];
+ GoogleCloudStorageTestHelper.fillBytes(bytesToWrite);
+ WritableByteChannel writeChannel = gcs.create(resourceId);
+ writeChannel.write(ByteBuffer.wrap(bytesToWrite));
+
+ List partFiles = getPartFiles(partFilePrefix);
+ // get one part file and override its content
+ GoogleCloudStorageItemInfo itemInfoBeforeModification = partFiles.get(0);
+ gcs.create(itemInfoBeforeModification.getResourceId(), CreateObjectOptions.DEFAULT_OVERWRITE)
+ .close();
+
+ GoogleCloudStorageItemInfo itemInfoAfterModification =
+ gcs.getItemInfo(itemInfoBeforeModification.getResourceId());
+ List updatedFiles = getPartFiles(partFilePrefix);
+ // object with same name is present but generationId is different
+ assertThat(
+ updatedFiles.stream()
+ .anyMatch(
+ itemInfo ->
+ (itemInfo.getObjectName().equals(itemInfoAfterModification.getObjectName())
+ && itemInfo.getContentGeneration()
+ != itemInfoBeforeModification.getContentGeneration())
+ ? true
+ : false))
+ .isTrue();
+
+ Exception e = assertThrows(IOException.class, writeChannel::close);
+ verifyPartFileNotFound(e, itemInfoBeforeModification.getObjectName());
+ partFiles = getPartFiles(partFilePrefix);
+ // part files weren't cleaned up on failure as PartFileCleanupType.ON_SUCCESS was used
+ assertThat(partFiles.size()).isEqualTo(partFileCount);
+ }
+
+ @Test
+ public void uploadViaPCUInvalidPartFileNamePrefix() throws IOException {
+ // invalid object name https://cloud.google.com/storage/docs/objects#naming
+ String partFilePrefix = "\n";
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ pcuDefaultOptions.toBuilder()
+ .setPartFileNamePrefix(partFilePrefix)
+ .setPartFileCleanupType(PartFileCleanupType.NEVER)
+ .build())
+ .build();
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+ byte[] bytesToWrite = new byte[partFileCount * bufferCapacity];
+ GoogleCloudStorageTestHelper.fillBytes(bytesToWrite);
+ WritableByteChannel writeChannel = gcs.create(resourceId);
+ Exception e =
+ assertThrows(
+ StorageException.class, () -> writeChannel.write(ByteBuffer.wrap(bytesToWrite)));
+ verifyPartFileInvalidArgument(e);
+ }
+
+ @Test
+ public void uploadViaPCUPartFileCleanupOnSuccess() throws IOException, InterruptedException {
+ String partFilePrefix = name.getMethodName();
+ GoogleCloudStorageOptions storageOptions =
+ GoogleCloudStorageTestHelper.getStandardOptionBuilder()
+ .setWriteChannelOptions(
+ pcuDefaultOptions.toBuilder()
+ .setPartFileNamePrefix(partFilePrefix)
+ .setPartFileCleanupType(PartFileCleanupType.ON_SUCCESS)
+ .build())
+ .build();
+ gcs = getGCSImpl(storageOptions);
+ StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName());
+ writeAndVerifyPartFiles(
+ bufferCapacity, resourceId, /* expectedPartFileCountAfterCleanup */ 0, partFilePrefix);
+ }
+
+ private void verifyPartFileNotFound(Throwable throwable, String partFileName) {
+ StorageException exception = getStorageException(throwable);
+ assertThat(exception.getMessage()).contains(partFileName);
+ assertThat(exception.getCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
+ }
+
+ private void verifyPartFileInvalidArgument(Throwable throwable) {
+ StorageException exception = getStorageException(throwable);
+ assertThat(exception.getMessage()).contains("INVALID_ARGUMENT");
+ assertThat(exception.getCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_BAD_REQUEST);
+ }
+
+ private StorageException getStorageException(Throwable throwable) {
+ Throwable cause = throwable;
+ while (cause != null) {
+ if (cause instanceof StorageException) {
+ return (StorageException) cause;
+ }
+ cause = cause.getCause();
+ }
+ return null;
+ }
+
+ private List getPartFiles(String prefix) throws IOException {
+ // list all object
+ List itemInfos =
+ gcs.listObjectInfo(
+ TEST_BUCKET, prefix, ListObjectOptions.builder().setDelimiter(null).build());
+ return itemInfos.stream()
+ .filter(x -> x.getObjectName().endsWith(".part"))
+ .collect(Collectors.toList());
+ }
+
+ private void writeAndVerifyPartFiles(
+ int bufferCapacity,
+ StorageResourceId resourceId,
+ int expectedPartFileCountAfterCleanup,
+ String partFilePrefix)
+ throws IOException {
+ byte[] bytesToWrite = new byte[partFileCount * bufferCapacity];
+ GoogleCloudStorageTestHelper.fillBytes(bytesToWrite);
+ WritableByteChannel writeChannel = gcs.create(resourceId);
+ writeChannel.write(ByteBuffer.wrap(bytesToWrite));
+
+ writeChannel.close();
+ List partFiles = getPartFiles(partFilePrefix);
+ // part files are deleted once upload is finished.
+ assertThat(partFiles.stream().count()).isEqualTo(expectedPartFileCountAfterCleanup);
+ // verify file content
+ verifyFileContent(resourceId, bytesToWrite);
+ }
+
+ private void writeAndVerifyTemporaryFiles(
+ StorageResourceId resourceId, int expectedTemporaryFileCount) throws IOException {
+ byte[] bytesToWrite = new byte[1024 * 1024 * 3];
+ GoogleCloudStorageTestHelper.fillBytes(bytesToWrite);
+
+ verifyTemporaryFileCount(tempDirsPath, 0);
+
+ WritableByteChannel writeChannel = gcs.create(resourceId);
+ writeChannel.write(ByteBuffer.wrap(bytesToWrite));
+ // temporary files created in disk.
+ verifyTemporaryFileCount(tempDirsPath, expectedTemporaryFileCount);
+
+ writeChannel.close();
+ // temporary files will be deleted from disk once upload is finished.
+ verifyTemporaryFileCount(tempDirsPath, 0);
+ }
+
+ private GoogleCloudStorage getGCSImpl(GoogleCloudStorageOptions storageOptions)
+ throws IOException {
+ Credentials credentials = GoogleCloudStorageTestHelper.getCredentials();
+ return GoogleCloudStorageClientImpl.builder()
+ .setOptions(storageOptions)
+ .setCredentials(credentials)
+ .setPCUExecutorService(MoreExecutors.newDirectExecutorService())
+ .build();
+ }
+
+ private void verifyTemporaryFileCount(ImmutableSet paths, int expectedCount) {
+ Iterator iterator = paths.stream().iterator();
+ int fileCount = 0;
+ while (iterator.hasNext()) {
+ Path path = iterator.next();
+ File directory = path.toFile();
+ fileCount += getFileCount(directory);
+ }
+ assertThat(fileCount).isEqualTo(expectedCount);
+ }
+
+ private void verifyFileContent(StorageResourceId resourceId, byte[] bytesWritten)
+ throws IOException {
+ GoogleCloudStorageItemInfo fileInfo = gcs.getItemInfo(resourceId);
+ assertThat(fileInfo.exists()).isTrue();
+
+ assertObjectContent(gcs, resourceId, bytesWritten);
+ }
+
+ private int getFileCount(File file) {
+ File[] files = file.listFiles();
+ if (files == null) {
+ return 0;
+ }
+ int count = 0;
+ for (File f : files) {
+ if (f.isDirectory()) {
+ count += getFileCount(f);
+ } else {
+ count = count + 1;
+ }
+ }
+ return count;
+ }
+}
diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java
index 9ab68664ce..58b8ca8b9f 100644
--- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java
+++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageImplTest.java
@@ -150,6 +150,58 @@ public void open_lazyInit_whenFastFailOnNotFound_isFalse() throws IOException {
trackingGcs.delegate.close();
}
+ /**
+ * Even when java-storage client in use, write path get short-circuited via {@link
+ * GoogleCloudStorageOptions} to use the http implementation.
+ */
+ @Test
+ public void writeObject_withGrpcWriteDisabled() throws IOException {
+ StorageResourceId resourceId = new StorageResourceId(testBucket, name.getMethodName());
+
+ int uploadChunkSize = 2 * 1024 * 1024;
+ TrackingStorageWrapper trackingGcs =
+ newTrackingGoogleCloudStorage(
+ getOptionsWithUploadChunk(uploadChunkSize).toBuilder()
+ .setGrpcWriteEnabled(false)
+ .build());
+
+ int partitionsCount = 1;
+ byte[] partition =
+ writeObject(
+ trackingGcs.delegate,
+ resourceId,
+ /* partitionSize= */ uploadChunkSize,
+ partitionsCount);
+
+ assertObjectContent(helperGcs, resourceId, partition, partitionsCount);
+ assertThat(trackingGcs.requestsTracker.getAllRequestInvocationIds().size())
+ .isEqualTo(trackingGcs.requestsTracker.getAllRequests().size());
+
+ assertThat(trackingGcs.getAllRequestStrings())
+ .containsExactlyElementsIn(
+ ImmutableList.builder()
+ .add(getRequestString(resourceId.getBucketName(), resourceId.getObjectName()))
+ .add(
+ TrackingHttpRequestInitializer.resumableUploadRequestString(
+ resourceId.getBucketName(),
+ resourceId.getObjectName(),
+ /* generationId= */ 1,
+ /* replaceGenerationId= */ true))
+ .addAll(
+ ImmutableList.of(
+ TrackingHttpRequestInitializer.resumableUploadChunkRequestString(
+ resourceId.getBucketName(),
+ resourceId.getObjectName(),
+ /* generationId= */ 2,
+ /* uploadId= */ 1)))
+ .build()
+ .toArray())
+ .inOrder();
+
+ assertThat(trackingGcs.grpcRequestInterceptor.getAllRequestStrings().size()).isEqualTo(0);
+ trackingGcs.delegate.close();
+ }
+
@Test
public void open_withItemInfo() throws IOException {
int expectedSize = 5 * 1024 * 1024;
diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageTestHelper.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageTestHelper.java
index c06ada81af..e5296a05ed 100644
--- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageTestHelper.java
+++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageTestHelper.java
@@ -112,6 +112,7 @@ public static GoogleCloudStorageOptions.Builder getStandardOptionBuilder() {
return GoogleCloudStorageOptions.builder()
.setAppName(GoogleCloudStorageTestHelper.APP_NAME)
.setDirectPathPreferred(TestConfiguration.getInstance().isDirectPathPreferred())
+ .setGrpcWriteEnabled(true)
.setProjectId(checkNotNull(TestConfiguration.getInstance().getProjectId()));
}
diff --git a/pom.xml b/pom.xml
index ae287fea8d..9c9fe3cc1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
com.google.cloud.bigdataoss
bigdataoss-parent
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
pom
https://github.com/GoogleCloudDataproc/hadoop-connectors
@@ -79,7 +79,7 @@
true
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
2.1.1
2.0.0
@@ -500,7 +500,7 @@
maven-gpg-plugin
- 3.0.1
+ 3.0.2-SNAPSHOT
maven-source-plugin
diff --git a/util-hadoop/pom.xml b/util-hadoop/pom.xml
index 91b7048b09..7f351795e3 100644
--- a/util-hadoop/pom.xml
+++ b/util-hadoop/pom.xml
@@ -21,14 +21,14 @@
com.google.cloud.bigdataoss
bigdataoss-parent
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
../pom.xml
util-hadoop
util-hadoop
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
diff --git a/util/pom.xml b/util/pom.xml
index b6eca61152..f8d3429a71 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -21,12 +21,12 @@
com.google.cloud.bigdataoss
bigdataoss-parent
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
../pom.xml
util
- 3.0.1-SNAPSHOT
+ 3.0.2-SNAPSHOT
diff --git a/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java b/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java
index 56ea2f061d..336346c2a1 100644
--- a/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java
+++ b/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java
@@ -20,6 +20,7 @@
import com.google.api.client.googleapis.media.MediaHttpUploader;
import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
@@ -35,9 +36,38 @@ public enum PipeType {
NIO_CHANNEL_PIPE,
}
+ /** Part file cleanup strategy for parallel composite upload. */
+ public enum PartFileCleanupType {
+ ALWAYS,
+ NEVER,
+ ON_SUCCESS
+ }
+
+ /**
+ * UploadType are in parity with various upload configuration offered by google-java-storage
+ * client ref:
+ * https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.BlobWriteSessionConfigs
+ */
+ public enum UploadType {
+ /* Upload chunks to gcs and waits for acknowledgement before uploading another chunk*/
+ CHUNK_UPLOAD,
+ /* Write whole file to disk and then upload.*/
+ WRITE_TO_DISK_THEN_UPLOAD,
+ /* Write chunks to file along with uploading to gcs, and failure will be retried from data on disk.*/
+ JOURNALING,
+ /* Write are performed using parallel composite upload strategy. */
+ PARALLEL_COMPOSITE_UPLOAD
+ }
+
+ // TODO: update these config with better default values.
+ private static final int PARALLEL_COMPOSITE_UPLOAD_BUFFER_COUNT = 1;
+ private static final int PARALLEL_COMPOSITE_UPLOAD_BUFFER_CAPACITY = 32 * 1024 * 1024;
+
/** Upload chunk size granularity */
private static final int UPLOAD_CHUNK_SIZE_GRANULARITY = 8 * 1024 * 1024;
+ private static final String PART_FILE_PREFIX = "";
+
/** Default upload chunk size. */
private static final int DEFAULT_UPLOAD_CHUNK_SIZE =
Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
@@ -57,7 +87,13 @@ public static Builder builder() {
.setPipeBufferSize(1024 * 1024)
.setPipeType(PipeType.IO_STREAM_PIPE)
.setUploadCacheSize(0)
- .setUploadChunkSize(DEFAULT_UPLOAD_CHUNK_SIZE);
+ .setUploadChunkSize(DEFAULT_UPLOAD_CHUNK_SIZE)
+ .setUploadType(UploadType.CHUNK_UPLOAD)
+ .setTemporaryPaths(ImmutableSet.of())
+ .setPCUBufferCount(PARALLEL_COMPOSITE_UPLOAD_BUFFER_COUNT)
+ .setPCUBufferCapacity(PARALLEL_COMPOSITE_UPLOAD_BUFFER_CAPACITY)
+ .setPartFileCleanupType(PartFileCleanupType.ALWAYS)
+ .setPartFileNamePrefix(PART_FILE_PREFIX);
}
public abstract Builder toBuilder();
@@ -82,6 +118,18 @@ public static Builder builder() {
public abstract Duration getGrpcWriteMessageTimeout();
+ public abstract UploadType getUploadType();
+
+ public abstract PartFileCleanupType getPartFileCleanupType();
+
+ public abstract ImmutableSet getTemporaryPaths();
+
+ public abstract int getPCUBufferCount();
+
+ public abstract int getPCUBufferCapacity();
+
+ public abstract String getPartFileNamePrefix();
+
/** Mutable builder for the GoogleCloudStorageWriteChannelOptions class. */
@AutoValue.Builder
public abstract static class Builder {
@@ -110,6 +158,18 @@ public abstract static class Builder {
public abstract Builder setGrpcWriteMessageTimeout(Duration grpcWriteMessageTimeout);
+ public abstract Builder setUploadType(UploadType uploadType);
+
+ public abstract Builder setPartFileCleanupType(PartFileCleanupType partFileCleanupType);
+
+ public abstract Builder setTemporaryPaths(ImmutableSet temporaryPaths);
+
+ public abstract Builder setPCUBufferCount(int bufferCount);
+
+ public abstract Builder setPCUBufferCapacity(int bufferCapacity);
+
+ public abstract Builder setPartFileNamePrefix(String prefix);
+
abstract AsyncWriteChannelOptions autoBuild();
public AsyncWriteChannelOptions build() {
diff --git a/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java b/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java
index 31dfb58052..973b0a7561 100644
--- a/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java
+++ b/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java
@@ -29,11 +29,6 @@ public static void postGcsJsonApiEvent(GcsJsonApiEvent gcsJsonApiEvent) {
eventBus.post(gcsJsonApiEvent);
}
- /** Translates increment of statistics from API calls into StatisticsType */
- public enum StatisticsType {
- DIRECTORIES_DELETED
- }
-
/** Hold the instance of the event bus here */
private static EventBus eventBus = new EventBus();
@@ -102,13 +97,6 @@ public static void postOnException() {
eventBus.post(exception);
}
- /**
- * Posting StatisticsType to invoke corresponding Subscriber method. Passing an Object as EventBus
- * has @ElementTypesAreNonnullByDefault annotation.
- */
- public static void postOnStatisticsType() {
- eventBus.post(StatisticsType.DIRECTORIES_DELETED);
- }
/**
* Posting grpc Status to invoke the corresponding Subscriber method.
*