From 93aa3882ef36c0b86c3f6c81d34cf918c62b7c44 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 2 Jul 2018 14:21:54 +0200 Subject: [PATCH 1/3] Add write*Blob option for replacing existing blob --- .../blobstore/url/URLBlobContainer.java | 2 +- .../azure/AzureStorageFixture.java | 11 ++++-- .../azure/AzureBlobContainer.java | 4 +-- .../repositories/azure/AzureBlobStore.java | 6 ++-- .../azure/AzureStorageService.java | 9 +++-- .../azure/AzureStorageServiceMock.java | 5 +-- .../gcs/GoogleCloudStorageFixture.java | 19 +++++----- .../gcs/GoogleCloudStorageBlobContainer.java | 4 +-- .../gcs/GoogleCloudStorageBlobStore.java | 35 +++++++++++-------- .../repositories/hdfs/HdfsBlobContainer.java | 5 +-- .../hdfs/HdfsBlobStoreContainerTests.java | 2 +- .../repositories/s3/S3BlobContainer.java | 2 +- .../common/blobstore/BlobContainer.java | 17 +++++---- .../common/blobstore/fs/FsBlobContainer.java | 19 +++++++--- .../blobstore/BlobStoreRepository.java | 18 +++++----- .../blobstore/ChecksumBlobStoreFormat.java | 4 +-- .../snapshots/BlobStoreFormatIT.java | 6 ++-- .../mockstore/BlobContainerWrapper.java | 9 ++--- .../snapshots/mockstore/MockRepository.java | 15 ++++---- .../ESBlobStoreContainerTestCase.java | 22 +++++++----- .../repositories/ESBlobStoreTestCase.java | 2 +- 21 files changed, 128 insertions(+), 88 deletions(-) diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index fb20b73b61c00..7b72871f4f78d 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -108,7 +108,7 @@ public InputStream readBlob(String name) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { throw new UnsupportedOperationException("URL repository doesn't support this operation"); } diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java index f906b9fa9a913..0bd9503f43dac 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java +++ b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java @@ -122,15 +122,20 @@ private static PathTrie defaultHandlers(final Map { final String destContainerName = request.getParam("container"); final String destBlobName = objectName(request.getParameters()); + final String ifNoneMatch = request.getHeader("If-None-Match"); final Container destContainer = containers.get(destContainerName); if (destContainer == null) { return newContainerNotFoundError(request.getId()); } - byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody()); - if (existingBytes != null) { - return newBlobAlreadyExistsError(request.getId()); + if ("*".equals(ifNoneMatch)) { + byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody()); + if (existingBytes != null) { + return newBlobAlreadyExistsError(request.getId()); + } + } else { + destContainer.objects.put(destBlobName, request.getBody()); } return new Response(RestStatus.CREATED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); }) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 751e00f06adbb..5d5330e8cb563 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -86,11 +86,11 @@ public InputStream readBlob(String blobName) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); try { - blobStore.writeBlob(buildKey(blobName), inputStream, blobSize); + blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } catch (URISyntaxException|StorageException e) { throw new IOException("Can not write blob " + blobName, e); } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index bcd6b936af1aa..f4bc362e53602 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -117,8 +117,8 @@ public Map listBlobsByPrefix(String keyPath, String prefix return service.listBlobsByPrefix(clientName, container, keyPath, prefix); } - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException, - FileAlreadyExistsException { - service.writeBlob(this.clientName, container, blobName, inputStream, blobSize); + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws URISyntaxException, StorageException, FileAlreadyExistsException { + service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 73dd68f4b5f57..9482182b02d28 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -236,17 +236,20 @@ public Map listBlobsByPrefix(String account, String contai return blobsBuilder.immutableMap(); } - public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize) + public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, + boolean failIfAlreadyExists) throws URISyntaxException, StorageException, FileAlreadyExistsException { logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize)); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName); try { + final AccessCondition accessCondition = + failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition(); SocketAccess.doPrivilegedVoidException(() -> - blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, client.v2().get())); + blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get())); } catch (final StorageException se) { - if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT && + if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) { throw new FileAlreadyExistsException(blobName, null, se.getMessage()); } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java index 264cb90378529..18eb529c0eebe 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java @@ -108,9 +108,10 @@ public Map listBlobsByPrefix(String account, String contai } @Override - public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize) + public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, + boolean failIfAlreadyExists) throws URISyntaxException, StorageException, FileAlreadyExistsException { - if (blobs.containsKey(blobName)) { + if (failIfAlreadyExists && blobs.containsKey(blobName)) { throw new FileAlreadyExistsException(blobName); } try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { diff --git a/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java b/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java index b1a185c9c08c9..b37b89b243ba7 100644 --- a/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java +++ b/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java @@ -158,10 +158,6 @@ private static PathTrie defaultHandlers(final Map { final String ifGenerationMatch = request.getParam("ifGenerationMatch"); - if ("0".equals(ifGenerationMatch) == false) { - return newError(RestStatus.PRECONDITION_FAILED, "object already exist"); - } - final String uploadType = request.getParam("uploadType"); if ("resumable".equals(uploadType)) { final String objectName = request.getParam("name"); @@ -172,12 +168,19 @@ private static PathTrie defaultHandlers(final Map LARGE_BLOB_THRESHOLD_BYTE_SIZE) { - writeBlobResumable(blobInfo, inputStream); + writeBlobResumable(blobInfo, inputStream, failIfAlreadyExists); } else { - writeBlobMultipart(blobInfo, inputStream, blobSize); + writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists); } } @@ -210,14 +210,17 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I * Uploads a blob using the "resumable upload" method (multiple requests, which * can be independently retried in case of failure, see * https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload - * * @param blobInfo the info for the blob to be uploaded * @param inputStream the stream containing the blob data + * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException { + private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException { try { + final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? + new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } : + new Storage.BlobWriteOption[0]; final WriteChannel writeChannel = SocketAccess - .doPrivilegedIOException(() -> client().writer(blobInfo, Storage.BlobWriteOption.doesNotExist())); + .doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions)); Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { @Override public boolean isOpen() { @@ -236,7 +239,7 @@ public int write(ByteBuffer src) throws IOException { } })); } catch (final StorageException se) { - if (se.getCode() == HTTP_PRECON_FAILED) { + if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) { throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); } throw se; @@ -248,20 +251,24 @@ public int write(ByteBuffer src) throws IOException { * 'multipart/related' request containing both data and metadata. The request is * gziped), see: * https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload - * - * @param blobInfo the info for the blob to be uploaded + * @param blobInfo the info for the blob to be uploaded * @param inputStream the stream containing the blob data * @param blobSize the size + * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize) throws IOException { + private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method"; final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize)); Streams.copy(inputStream, baos); try { + final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ? + new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } : + new Storage.BlobTargetOption[0]; SocketAccess.doPrivilegedVoidIOException( - () -> client().create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist())); + () -> client().create(blobInfo, baos.toByteArray(), targetOptions)); } catch (final StorageException se) { - if (se.getCode() == HTTP_PRECON_FAILED) { + if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) { throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); } throw se; diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 23557ae6cf84a..af12e44a9f7b9 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -91,11 +91,12 @@ public InputStream readBlob(String blobName) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { store.execute((Operation) fileContext -> { Path blob = new Path(path, blobName); // we pass CREATE, which means it fails if a blob already exists. - EnumSet flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK); + EnumSet flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) : + EnumSet.of(CreateFlag.SYNC_BLOCK); CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)}; try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) { int bytesRead; diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java index a5d68331db78e..ba00862e93848 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java @@ -135,7 +135,7 @@ public void testReadOnly() throws Exception { assertTrue(util.exists(hdfsPath)); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); - writeBlob(container, "foo", new BytesArray(data)); + writeBlob(container, "foo", new BytesArray(data), randomBoolean()); assertArrayEquals(readBlobFully(container, "foo", data.length), data); assertTrue(container.blobExists("foo")); } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 86b01a3e79cdd..9d650b098a077 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -91,7 +91,7 @@ public InputStream readBlob(String blobName) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { SocketAccess.doPrivilegedIOException(() -> { if (blobSize <= blobStore.bufferSizeInBytes()) { executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize); diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index db185f1e8c11c..2ecce44b55c1e 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -69,16 +69,18 @@ public interface BlobContainer { * @param blobSize * The size of the blob to be written, in bytes. It is implementation dependent whether * this value is used in writing the blob to the repository. - * @throws FileAlreadyExistsException if a blob by the same name already exists + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ - void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException; + void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. When the BlobContainer implementation * does not provide a specific implementation of writeBlobAtomic(String, InputStream, long), then - * the {@link #writeBlob(String, InputStream, long)} method is used. + * the {@link #writeBlob(String, InputStream, long, boolean)} method is used. * * This method assumes the container does not already contain a blob of the same blobName. If a blob by the * same name already exists, the operation will fail and an {@link IOException} will be thrown. @@ -90,11 +92,14 @@ public interface BlobContainer { * @param blobSize * The size of the blob to be written, in bytes. It is implementation dependent whether * this value is used in writing the blob to the repository. - * @throws FileAlreadyExistsException if a blob by the same name already exists + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ - default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { - writeBlob(blobName, inputStream, blobSize); + default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) + throws IOException { + writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } /** diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index a58802ecd1828..bab984bd85c74 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -124,7 +124,10 @@ public InputStream readBlob(String name) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + if (failIfAlreadyExists == false) { + deleteBlobIgnoringIfNotExists(blobName); + } final Path file = path.resolve(blobName); try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { Streams.copy(inputStream, outputStream); @@ -134,7 +137,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) + throws IOException { final String tempBlob = tempBlobName(blobName); final Path tempBlobPath = path.resolve(tempBlob); try { @@ -142,7 +146,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream Streams.copy(inputStream, outputStream); } IOUtils.fsync(tempBlobPath, false); - moveBlobAtomic(tempBlob, blobName); + moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists); } catch (IOException ex) { try { deleteBlobIgnoringIfNotExists(tempBlob); @@ -155,13 +159,18 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream } } - public void moveBlobAtomic(final String sourceBlobName, final String targetBlobName) throws IOException { + public void moveBlobAtomic(final String sourceBlobName, final String targetBlobName, final boolean failIfAlreadyExists) + throws IOException { final Path sourceBlobPath = path.resolve(sourceBlobName); final Path targetBlobPath = path.resolve(targetBlobName); // If the target file exists then Files.move() behaviour is implementation specific // the existing file might be replaced or this method fails by throwing an IOException. if (Files.exists(targetBlobPath)) { - throw new FileAlreadyExistsException("blob [" + targetBlobPath + "] already exists, cannot overwrite"); + if (failIfAlreadyExists) { + throw new FileAlreadyExistsException("blob [" + targetBlobPath + "] already exists, cannot overwrite"); + } else { + deleteBlobIgnoringIfNotExists(targetBlobName); + } } Files.move(sourceBlobPath, targetBlobPath, StandardCopyOption.ATOMIC_MOVE); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 0151e4e7322d5..86131fe468d28 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -556,7 +556,7 @@ public String startVerification() { String blobName = "master.dat"; BytesArray bytes = new BytesArray(testBytes); try (InputStream stream = bytes.streamInput()) { - testContainer.writeBlobAtomic(blobName, stream, bytes.length()); + testContainer.writeBlobAtomic(blobName, stream, bytes.length(), true); } return seed; } @@ -664,7 +664,7 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep // write the index file final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); - writeAtomic(indexBlob, snapshotsBytes); + writeAtomic(indexBlob, snapshotsBytes, true); // delete the N-2 index file if it exists, keep the previous one around as a backup if (isReadOnly() == false && newGen - 2 >= 0) { final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2); @@ -677,9 +677,8 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep bStream.writeLong(newGen); genBytes = bStream.bytes(); } - snapshotsBlobContainer.deleteBlobIgnoringIfNotExists(INDEX_LATEST_BLOB); logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen); - writeAtomic(INDEX_LATEST_BLOB, genBytes); + writeAtomic(INDEX_LATEST_BLOB, genBytes, false); } /** @@ -698,9 +697,8 @@ void writeIncompatibleSnapshots(RepositoryData repositoryData) throws IOExceptio } bytes = bStream.bytes(); } - snapshotsBlobContainer.deleteBlobIgnoringIfNotExists(INCOMPATIBLE_SNAPSHOTS_BLOB); // write the incompatible snapshots blob - writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB, bytes); + writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB, bytes, false); } /** @@ -766,9 +764,9 @@ private long listBlobsToGetLatestIndexId() throws IOException { return latest; } - private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException { + private void writeAtomic(final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists) throws IOException { try (InputStream stream = bytesRef.streamInput()) { - snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length()); + snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists); } } @@ -813,7 +811,7 @@ public void verify(String seed, DiscoveryNode localNode) { try { BytesArray bytes = new BytesArray(seed); try (InputStream stream = bytes.streamInput()) { - testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length()); + testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length(), true); } } catch (IOException exp) { throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp); @@ -1252,7 +1250,7 @@ private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) t snapshotRateLimitingTimeInNanos::inc); } inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); - blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes); + blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes, true); } Store.verify(indexInput); snapshotStatus.addProcessedFile(fileInfo.length()); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index b974be2b869ab..ca6ec74dc2ce2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -132,7 +132,7 @@ public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws final String blobName = blobName(name); writeTo(obj, blobName, bytesArray -> { try (InputStream stream = bytesArray.streamInput()) { - blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length()); + blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length(), true); } }); } @@ -150,7 +150,7 @@ public void write(T obj, BlobContainer blobContainer, String name) throws IOExce final String blobName = blobName(name); writeTo(obj, blobName, bytesArray -> { try (InputStream stream = bytesArray.streamInput()) { - blobContainer.writeBlob(blobName, stream, bytesArray.length()); + blobContainer.writeBlob(blobName, stream, bytesArray.length(), true); } }); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java index 70be72989cf95..6f4f69ad67e88 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java @@ -224,7 +224,8 @@ public void testAtomicWriteFailures() throws Exception { IOException writeBlobException = expectThrows(IOException.class, () -> { BlobContainer wrapper = new BlobContainerWrapper(blobContainer) { @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { throw new IOException("Exception thrown in writeBlobAtomic() for " + blobName); } }; @@ -251,10 +252,9 @@ protected void randomCorruption(BlobContainer blobContainer, String blobName) th int location = randomIntBetween(0, buffer.length - 1); buffer[location] = (byte) (buffer[location] ^ 42); } while (originalChecksum == checksum(buffer)); - blobContainer.deleteBlob(blobName); // delete original before writing new blob BytesArray bytesArray = new BytesArray(buffer); try (StreamInput stream = bytesArray.streamInput()) { - blobContainer.writeBlob(blobName, stream, bytesArray.length()); + blobContainer.writeBlob(blobName, stream, bytesArray.length(), false); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java index b5c6339724123..5666869a1aa0b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java @@ -49,13 +49,14 @@ public InputStream readBlob(String name) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { - delegate.writeBlob(blobName, inputStream, blobSize); + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + delegate.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { - delegate.writeBlobAtomic(blobName, inputStream, blobSize); + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, + boolean failIfAlreadyExists) throws IOException { + delegate.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index d0702acf10373..d05a10905d858 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -346,9 +346,9 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { maybeIOExceptionOrBlock(blobName); - super.writeBlob(blobName, inputStream, blobSize); + super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); if (RandomizedContext.current().getRandom().nextBoolean()) { // for network based repositories, the blob may have been written but we may still // get an error with the client connection, so an IOException here simulates this @@ -357,27 +357,28 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, + final boolean failIfAlreadyExists) throws IOException { final Random random = RandomizedContext.current().getRandom(); if (allowAtomicOperations && random.nextBoolean()) { if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { // Simulate a failure between the write and move operation in FsBlobContainer final String tempBlobName = FsBlobContainer.tempBlobName(blobName); - super.writeBlob(tempBlobName, inputStream, blobSize); + super.writeBlob(tempBlobName, inputStream, blobSize, failIfAlreadyExists); maybeIOExceptionOrBlock(blobName); final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate(); - fsBlobContainer.moveBlobAtomic(tempBlobName, blobName); + fsBlobContainer.moveBlobAtomic(tempBlobName, blobName, failIfAlreadyExists); } else { // Atomic write since it is potentially supported // by the delegating blob container maybeIOExceptionOrBlock(blobName); - super.writeBlobAtomic(blobName, inputStream, blobSize); + super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); } } else { // Simulate a non-atomic write since many blob container // implementations does not support atomic write maybeIOExceptionOrBlock(blobName); - super.writeBlob(blobName, inputStream, blobSize); + super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java index 43a62bbe662cc..9f12c36999145 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java @@ -61,7 +61,12 @@ public void testWriteRead() throws IOException { try(BlobStore store = newBlobStore()) { final BlobContainer container = store.blobContainer(new BlobPath()); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); - writeBlob(container, "foobar", new BytesArray(data)); + writeBlob(container, "foobar", new BytesArray(data), randomBoolean()); + if (randomBoolean()) { + // override file, to check if we get latest contents + data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); + writeBlob(container, "foobar", new BytesArray(data), false); + } try (InputStream stream = container.readBlob("foobar")) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { @@ -123,7 +128,7 @@ public void testDeleteBlob() throws IOException { byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); final BytesArray bytesArray = new BytesArray(data); - writeBlob(container, blobName, bytesArray); + writeBlob(container, blobName, bytesArray, randomBoolean()); container.deleteBlob(blobName); // should not raise // blob deleted, so should raise again @@ -149,20 +154,21 @@ public void testVerifyOverwriteFails() throws IOException { final BlobContainer container = store.blobContainer(new BlobPath()); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); final BytesArray bytesArray = new BytesArray(data); - writeBlob(container, blobName, bytesArray); + writeBlob(container, blobName, bytesArray, true); // should not be able to overwrite existing blob - expectThrows(FileAlreadyExistsException.class, () -> writeBlob(container, blobName, bytesArray)); + expectThrows(FileAlreadyExistsException.class, () -> writeBlob(container, blobName, bytesArray, true)); container.deleteBlob(blobName); - writeBlob(container, blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again + writeBlob(container, blobName, bytesArray, true); // after deleting the previous blob, we should be able to write to it again } } - protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException { + protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray, + boolean failIfAlreadyExists) throws IOException { try (InputStream stream = bytesArray.streamInput()) { if (randomBoolean()) { - container.writeBlob(blobName, stream, bytesArray.length()); + container.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists); } else { - container.writeBlobAtomic(blobName, stream, bytesArray.length()); + container.writeBlobAtomic(blobName, stream, bytesArray.length(), failIfAlreadyExists); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java index 35a17c2a8dd83..ccc38ae362991 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java @@ -80,7 +80,7 @@ public static byte[] randomBytes(int length) { protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException { try (InputStream stream = bytesArray.streamInput()) { - container.writeBlob(blobName, stream, bytesArray.length()); + container.writeBlob(blobName, stream, bytesArray.length(), true); } } From 80057ea0f258e7f12f69541abe473c7df28585e9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 2 Jul 2018 15:47:01 +0200 Subject: [PATCH 2/3] fix HDFS CI failure --- .../org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index af12e44a9f7b9..580d033354e58 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -96,7 +96,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b Path blob = new Path(path, blobName); // we pass CREATE, which means it fails if a blob already exists. EnumSet flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) : - EnumSet.of(CreateFlag.SYNC_BLOCK); + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK); CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)}; try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) { int bytesRead; From f31171db8992d8bf2f4c0e1ee3baf5a13707dc48 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 2 Jul 2018 15:51:06 +0200 Subject: [PATCH 3/3] document that S3BlobContainer does not respect BlobContainer API --- .../org/elasticsearch/repositories/s3/S3BlobContainer.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 9d650b098a077..b7cc2b89605d3 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -90,6 +90,9 @@ public InputStream readBlob(String blobName) throws IOException { } } + /** + * This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model. + */ @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { SocketAccess.doPrivilegedIOException(() -> {