Skip to content

Commit

Permalink
Merge pull request #271 from mziccard/add-blob-write-options
Browse files Browse the repository at this point in the history
Add BlobWriteOption to support MD5 and CRC32C checks on create/write
  • Loading branch information
aozarov committed Oct 20, 2015
2 parents 2665ab5 + 354d047 commit 5595770
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Lists;
import com.google.gcloud.spi.StorageRpc;
import com.google.gcloud.storage.Storage.BlobTargetOption;
import com.google.gcloud.storage.Storage.BlobWriteOption;
import com.google.gcloud.storage.Storage.CopyRequest;
import com.google.gcloud.storage.Storage.SignUrlOption;

Expand Down Expand Up @@ -269,12 +270,14 @@ public BlobReadChannel reader(BlobSourceOption... options) {
}

/**
* Returns a {@code BlobWriteChannel} object for writing to this blob.
* Returns a {@code BlobWriteChannel} object for writing to this blob. By default any md5 and
* crc32c values in the current blob are ignored unless requested via the
* {@code BlobWriteOption.md5Match} and {@code BlobWriteOption.crc32cMatch} options.
*
* @param options target blob options
* @throws StorageException upon failure
*/
public BlobWriteChannel writer(BlobTargetOption... options) {
public BlobWriteChannel writer(BlobWriteOption... options) {
return storage.writer(info, options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gcloud.AuthCredentials.ServiceAccountAuthCredentials;
import com.google.gcloud.Service;
import com.google.gcloud.spi.StorageRpc;
import com.google.gcloud.spi.StorageRpc.Tuple;

import java.io.InputStream;
import java.io.Serializable;
Expand All @@ -33,6 +35,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -145,6 +148,105 @@ public static BlobTargetOption metagenerationMatch() {
public static BlobTargetOption metagenerationNotMatch() {
return new BlobTargetOption(StorageRpc.Option.IF_METAGENERATION_NOT_MATCH);
}

static Tuple<BlobInfo, BlobTargetOption[]> convert(BlobInfo info, BlobWriteOption... options) {
BlobInfo.Builder infoBuilder = info.toBuilder().crc32c(null).md5(null);
List<BlobTargetOption> targetOptions = Lists.newArrayListWithCapacity(options.length);
for (BlobWriteOption option : options) {
switch (option.option) {
case IF_CRC32C_MATCH:
infoBuilder.crc32c(info.crc32c());
break;
case IF_MD5_MATCH:
infoBuilder.md5(info.md5());
break;
default:
targetOptions.add(option.toTargetOption());
break;
}
}
return Tuple.of(infoBuilder.build(),
targetOptions.toArray(new BlobTargetOption[targetOptions.size()]));
}
}

class BlobWriteOption implements Serializable {

private static final long serialVersionUID = -3880421670966224580L;

private final Option option;
private final Object value;

enum Option {
PREDEFINED_ACL, IF_GENERATION_MATCH, IF_GENERATION_NOT_MATCH, IF_METAGENERATION_MATCH,
IF_METAGENERATION_NOT_MATCH, IF_MD5_MATCH, IF_CRC32C_MATCH;

StorageRpc.Option toRpcOption() {
return StorageRpc.Option.valueOf(this.name());
}
}

BlobTargetOption toTargetOption() {
return new BlobTargetOption(this.option.toRpcOption(), this.value);
}

private BlobWriteOption(Option option, Object value) {
this.option = option;
this.value = value;
}

private BlobWriteOption(Option option) {
this(option, null);
}

@Override
public int hashCode() {
return Objects.hash(option, value);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (!(obj instanceof BlobWriteOption)) {
return false;
}
final BlobWriteOption other = (BlobWriteOption) obj;
return this.option == other.option && Objects.equals(this.value, other.value);
}

public static BlobWriteOption predefinedAcl(PredefinedAcl acl) {
return new BlobWriteOption(Option.PREDEFINED_ACL, acl.entry());
}

public static BlobWriteOption doesNotExist() {
return new BlobWriteOption(Option.IF_GENERATION_MATCH, 0L);
}

public static BlobWriteOption generationMatch() {
return new BlobWriteOption(Option.IF_GENERATION_MATCH);
}

public static BlobWriteOption generationNotMatch() {
return new BlobWriteOption(Option.IF_GENERATION_NOT_MATCH);
}

public static BlobWriteOption metagenerationMatch() {
return new BlobWriteOption(Option.IF_METAGENERATION_MATCH);
}

public static BlobWriteOption metagenerationNotMatch() {
return new BlobWriteOption(Option.IF_METAGENERATION_NOT_MATCH);
}

public static BlobWriteOption md5Match() {
return new BlobWriteOption(Option.IF_MD5_MATCH, true);
}

public static BlobWriteOption crc32cMatch() {
return new BlobWriteOption(Option.IF_CRC32C_MATCH, true);
}
}

class BlobSourceOption extends Option {
Expand Down Expand Up @@ -510,21 +612,25 @@ public static Builder builder() {

/**
* Create a new blob. Direct upload is used to upload {@code content}. For large content,
* {@link #writer} is recommended as it uses resumable upload.
* {@link #writer} is recommended as it uses resumable upload. MD5 and CRC32C hashes of
* {@code content} are computed and used for validating transferred data.
*
* @return a complete blob information.
* @throws StorageException upon failure
* @see <a href="https://cloud.google.com/storage/docs/hashes-etags">Hashes and ETags</a>
*/
BlobInfo create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options);

/**
* Create a new blob. Direct upload is used to upload {@code content}. For large content,
* {@link #writer} is recommended as it uses resumable upload.
* {@link #writer} is recommended as it uses resumable upload. By default any md5 and crc32c
* values in the given {@code blobInfo} are ignored unless requested via the
* {@code BlobWriteOption.md5Match} and {@code BlobWriteOption.crc32cMatch} options.
*
* @return a complete blob information.
* @throws StorageException upon failure
*/
BlobInfo create(BlobInfo blobInfo, InputStream content, BlobTargetOption... options);
BlobInfo create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options);

/**
* Return the requested bucket or {@code null} if not found.
Expand Down Expand Up @@ -679,11 +785,13 @@ public static Builder builder() {
BlobReadChannel reader(BlobId blob, BlobSourceOption... options);

/**
* Create a blob and return a channel for writing its content.
* Create a blob and return a channel for writing its content. By default any md5 and crc32c
* values in the given {@code blobInfo} are ignored unless requested via the
* {@code BlobWriteOption.md5Match} and {@code BlobWriteOption.crc32cMatch} options.
*
* @throws StorageException upon failure
*/
BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options);
BlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options);

/**
* Generates a signed URL for a blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Ints;
import com.google.gcloud.AuthCredentials.ServiceAccountAuthCredentials;
Expand Down Expand Up @@ -93,13 +94,14 @@ public RetryResult beforeEval(Exception exception) {
static final ExceptionHandler EXCEPTION_HANDLER = ExceptionHandler.builder()
.abortOn(RuntimeException.class).interceptor(EXCEPTION_HANDLER_INTERCEPTOR).build();
private static final byte[] EMPTY_BYTE_ARRAY = {};
private static final String EMPTY_BYTE_ARRAY_MD5 = "1B2M2Y8AsgTpgAmY7PhCfg==";
private static final String EMPTY_BYTE_ARRAY_CRC32C = "AAAAAA==";

private final StorageRpc storageRpc;

StorageImpl(StorageOptions options) {
super(options);
storageRpc = options.storageRpc();
// todo: configure timeouts - https://developers.google.com/api-client-library/java/google-api-java-client/errors
// todo: provide rewrite - https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite
// todo: check if we need to expose https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert vs using bucket update/patch
}
Expand All @@ -123,20 +125,33 @@ public com.google.api.services.storage.model.Bucket call() {

@Override
public BlobInfo create(BlobInfo blobInfo, BlobTargetOption... options) {
return create(blobInfo, new ByteArrayInputStream(EMPTY_BYTE_ARRAY), options);
BlobInfo updatedInfo = blobInfo.toBuilder()
.md5(EMPTY_BYTE_ARRAY_MD5)
.crc32c(EMPTY_BYTE_ARRAY_CRC32C)
.build();
return create(updatedInfo, new ByteArrayInputStream(EMPTY_BYTE_ARRAY), options);
}

@Override
public BlobInfo create(BlobInfo blobInfo, final byte[] content, BlobTargetOption... options) {
return create(blobInfo,
new ByteArrayInputStream(firstNonNull(content, EMPTY_BYTE_ARRAY)), options);
public BlobInfo create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options) {
content = firstNonNull(content, EMPTY_BYTE_ARRAY);
BlobInfo updatedInfo = blobInfo.toBuilder()
.md5(BaseEncoding.base64().encode(Hashing.md5().hashBytes(content).asBytes()))
.crc32c(BaseEncoding.base64().encode(
Ints.toByteArray(Hashing.crc32c().hashBytes(content).asInt())))
.build();
return create(updatedInfo, new ByteArrayInputStream(content), options);
}

@Override
public BlobInfo create(BlobInfo blobInfo, final InputStream content,
BlobTargetOption... options) {
final StorageObject blobPb = blobInfo.toPb();
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
public BlobInfo create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
Tuple<BlobInfo, BlobTargetOption[]> targetOptions = BlobTargetOption.convert(blobInfo, options);
return create(targetOptions.x(), content, targetOptions.y());
}

private BlobInfo create(BlobInfo info, final InputStream content, BlobTargetOption... options) {
final StorageObject blobPb = info.toPb();
final Map<StorageRpc.Option, ?> optionsMap = optionMap(info, options);
try {
return BlobInfo.fromPb(runWithRetries(new Callable<StorageObject>() {
@Override
Expand Down Expand Up @@ -544,7 +559,12 @@ public BlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
}

@Override
public BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) {
public BlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
Tuple<BlobInfo, BlobTargetOption[]> targetOptions = BlobTargetOption.convert(blobInfo, options);
return writer(targetOptions.x(), targetOptions.y());
}

private BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) {
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
return new BlobWriteChannelImpl(options(), blobInfo, optionsMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,22 @@ public void testCreateBlobFail() {
assertTrue(storage.delete(bucket, blobName));
}

@Test
public void testCreateBlobMd5Fail() throws UnsupportedEncodingException {
String blobName = "test-create-blob-md5-fail";
BlobInfo blob = BlobInfo.builder(bucket, blobName)
.contentType(CONTENT_TYPE)
.md5("O1R4G1HJSDUISJjoIYmVhQ==")
.build();
ByteArrayInputStream stream = new ByteArrayInputStream(BLOB_STRING_CONTENT.getBytes(UTF_8));
try {
storage.create(blob, stream, Storage.BlobWriteOption.md5Match());
fail("StorageException was expected");
} catch (StorageException ex) {
// expected
}
}

@Test
public void testUpdateBlob() {
String blobName = "test-update-blob";
Expand Down Expand Up @@ -449,7 +465,7 @@ public void testWriteChannelFail() throws UnsupportedEncodingException, IOExcept
BlobInfo blob = BlobInfo.builder(bucket, blobName).generation(-1L).build();
try {
try (BlobWriteChannel writer =
storage.writer(blob, Storage.BlobTargetOption.generationMatch())) {
storage.writer(blob, Storage.BlobWriteOption.generationMatch())) {
writer.write(ByteBuffer.allocate(42));
}
fail("StorageException was expected");
Expand All @@ -458,6 +474,20 @@ public void testWriteChannelFail() throws UnsupportedEncodingException, IOExcept
}
}

@Test
public void testWriteChannelExistingBlob() throws UnsupportedEncodingException, IOException {
String blobName = "test-write-channel-existing-blob";
BlobInfo blob = BlobInfo.builder(bucket, blobName).build();
BlobInfo remoteBlob = storage.create(blob);
byte[] stringBytes;
try (BlobWriteChannel writer = storage.writer(remoteBlob)) {
stringBytes = BLOB_STRING_CONTENT.getBytes(UTF_8);
writer.write(ByteBuffer.wrap(stringBytes));
}
assertArrayEquals(stringBytes, storage.readAllBytes(blob.blobId()));
assertTrue(storage.delete(bucket, blobName));
}

@Test
public void testGetSignedUrl() throws IOException {
String blobName = "test-get-signed-url-blob";
Expand Down
Loading

0 comments on commit 5595770

Please sign in to comment.