Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write*Blob option to replace existing blob #31729

Merged
merged 3 commits into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public InputStream readBlob(String name) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,20 @@ private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Contai
handlers.insert(path, (request) -> {
final String destContainerName = request.getParam("container");
final String destBlobName = objectName(request.getParameters());
final String ifNoneMatch = request.getHeader("If-None-Match");

final Container destContainer = containers.get(destContainerName);
if (destContainer == null) {
return newContainerNotFoundError(request.getId());
}

byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody());
if (existingBytes != null) {
return newBlobAlreadyExistsError(request.getId());
if ("*".equals(ifNoneMatch)) {
byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody());
if (existingBytes != null) {
return newBlobAlreadyExistsError(request.getId());
}
} else {
destContainer.objects.put(destBlobName, request.getBody());
}

return new Response(RestStatus.CREATED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);

try {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
} catch (URISyntaxException|StorageException e) {
throw new IOException("Can not write blob " + blobName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException,
FileAlreadyExistsException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize);
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,20 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
return blobsBuilder.immutableMap();
}

public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
try {
final AccessCondition accessCondition =
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
SocketAccess.doPrivilegedVoidException(() ->
blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, client.v2().get()));
blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get()));
} catch (final StorageException se) {
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
}

@Override
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
if (blobs.containsKey(blobName)) {
if (failIfAlreadyExists && blobs.containsKey(blobName)) {
throw new FileAlreadyExistsException(blobName);
}
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
handlers.insert("POST /upload/storage/v1/b/{bucket}/o", (request) -> {
final String ifGenerationMatch = request.getParam("ifGenerationMatch");
if ("0".equals(ifGenerationMatch) == false) {
return newError(RestStatus.PRECONDITION_FAILED, "object already exist");
}

final String uploadType = request.getParam("uploadType");
if ("resumable".equals(uploadType)) {
final String objectName = request.getParam("name");
Expand All @@ -172,12 +168,19 @@ private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.putIfAbsent(objectName, EMPTY_BYTE) == null) {
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
if ("0".equals(ifGenerationMatch)) {
if (bucket.objects.putIfAbsent(objectName, EMPTY_BYTE) == null) {
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
+ objectName;
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
} else {
return newError(RestStatus.PRECONDITION_FAILED, "object already exist");
}
} else {
return newError(RestStatus.CONFLICT, "object already exist");
bucket.objects.put(objectName, EMPTY_BYTE);
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
+ objectName;
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
}
} else if ("multipart".equals(uploadType)) {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,31 +193,34 @@ public void close() throws IOException {

/**
* Writes a blob in the specific bucket
*
* @param inputStream content of the blob to be written
* @param inputStream content of the blob to be written
* @param blobSize expected size of the blob to be written
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
if (blobSize > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
writeBlobResumable(blobInfo, inputStream);
writeBlobResumable(blobInfo, inputStream, failIfAlreadyExists);
} else {
writeBlobMultipart(blobInfo, inputStream, blobSize);
writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
}
}

/**
* Uploads a blob using the "resumable upload" method (multiple requests, which
* can be independently retried in case of failure, see
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
*
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
try {
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
new Storage.BlobWriteOption[0];
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
Expand All @@ -236,7 +239,7 @@ public int write(ByteBuffer src) throws IOException {
}
}));
} catch (final StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
Expand All @@ -248,20 +251,24 @@ public int write(ByteBuffer src) throws IOException {
* 'multipart/related' request containing both data and metadata. The request is
* gziped), see:
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload
*
* @param blobInfo the info for the blob to be uploaded
* @param blobInfo the info for the blob to be uploaded
* @param inputStream the stream containing the blob data
* @param blobSize the size
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
*/
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize) throws IOException {
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
try {
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a static method that returns the appropriate options depending on failIfAlreadyExists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do so, but that method cannot be shared between writeBlobResumable and writeBlobMultipart (as in one case these are of type BlobWriteOption and in the other case these are BlobTargetOption). Do you still think these should get a static method (just to have one caller)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed that they didn't have the same return types. Let's keep what you already did. Sory for the noise.

new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
new Storage.BlobTargetOption[0];
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist()));
() -> client().create(blobInfo, baos.toByteArray(), targetOptions));
} catch (final StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
store.execute((Operation<Void>) fileContext -> {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) :
EnumSet.of(CreateFlag.SYNC_BLOCK);
CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)};
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
int bytesRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testReadOnly() throws Exception {
assertTrue(util.exists(hdfsPath));

byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
writeBlob(container, "foo", new BytesArray(data));
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
assertTrue(container.blobExists("foo"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
SocketAccess.doPrivilegedIOException(() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3BlobContainer does not respect the BlobContainer API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but that's not a surprise? It can't respect it because of its weak consistency guarantees

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should document this here

if (blobSize <= blobStore.bufferSizeInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ public interface BlobContainer {
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @throws FileAlreadyExistsException if a blob by the same name already exists
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException;
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it. When the BlobContainer implementation
* does not provide a specific implementation of writeBlobAtomic(String, InputStream, long), then
* the {@link #writeBlob(String, InputStream, long)} method is used.
* the {@link #writeBlob(String, InputStream, long, boolean)} method is used.
*
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
Expand All @@ -90,11 +92,14 @@ public interface BlobContainer {
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @throws FileAlreadyExistsException if a blob by the same name already exists
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
writeBlob(blobName, inputStream, blobSize);
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists)
throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}

/**
Expand Down
Loading