Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADLS Gen 2 Append and Lease Operations #31263

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.azure.storage.file.datalake.models.PathInfo;
import com.azure.storage.file.datalake.models.PathProperties;
import com.azure.storage.file.datalake.options.DataLakeFileAppendOptions;
import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions;
import com.azure.storage.file.datalake.options.DataLakePathDeleteOptions;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
Expand Down Expand Up @@ -1170,9 +1171,12 @@ Mono<Response<Void>> appendWithResponse(Flux<ByteBuffer> data, long fileOffset,
LeaseAccessConditions leaseAccessConditions = new LeaseAccessConditions().setLeaseId(appendOptions.getLeaseId());
PathHttpHeaders headers = new PathHttpHeaders().setTransactionalContentHash(appendOptions.getContentMd5());
context = context == null ? Context.NONE : context;
Long leaseDuration = appendOptions.getLeaseDuration() != null ? Long.valueOf(appendOptions.getLeaseDuration()) : null;

return this.dataLakeStorage.getPaths().appendDataWithResponseAsync(
data, fileOffset, null, length, null, null, appendOptions.isFlush(), headers, leaseAccessConditions, getCpkInfo(), context)
data, fileOffset, null, length, null, appendOptions.getLeaseAction(), leaseDuration,
appendOptions.getProposedLeaseId(), null, appendOptions.isFlush(), headers, leaseAccessConditions,
getCpkInfo(), context)
.map(response -> new SimpleResponse<>(response, null));
}

Expand Down Expand Up @@ -1275,31 +1279,92 @@ public Mono<PathInfo> flush(long position, boolean overwrite) {
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<PathInfo>> flushWithResponse(long position, boolean retainUncommittedData, boolean close,
PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions) {
DataLakeFileFlushOptions flushOptions = new DataLakeFileFlushOptions()
.setUncommittedDataRetained(retainUncommittedData)
.setClose(close)
.setPathHttpHeaders(httpHeaders)
.setRequestConditions(requestConditions);

try {
return withContext(context -> flushWithResponse(position, flushOptions, context));
} catch (RuntimeException ex) {
return monoError(LOGGER, ex);
}
}

/**
* Flushes (writes) data previously appended to the file through a call to append.
* The previously uploaded data must be contiguous.
*
* <p><strong>Code Samples</strong></p>
*
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileAsyncClient.flushWithResponse#long-DataLakeFileFlushOptions -->
* <pre>
* FileRange range = new FileRange&#40;1024, 2048L&#41;;
* DownloadRetryOptions options = new DownloadRetryOptions&#40;&#41;.setMaxRetryRequests&#40;5&#41;;
* byte[] contentMd5 = new byte[0]; &#47;&#47; Replace with valid md5
* boolean retainUncommittedData = false;
* boolean close = false;
* PathHttpHeaders httpHeaders = new PathHttpHeaders&#40;&#41;
* .setContentLanguage&#40;&quot;en-US&quot;&#41;
* .setContentType&#40;&quot;binary&quot;&#41;;
* DataLakeRequestConditions requestConditions = new DataLakeRequestConditions&#40;&#41;
* .setLeaseId&#40;leaseId&#41;;
* Integer leaseDuration = 15;
*
* DataLakeFileFlushOptions flushOptions = new DataLakeFileFlushOptions&#40;&#41;
* .setRetainUncommittedData&#40;retainUncommittedData&#41;
* .setClose&#40;close&#41;
* .setPathHttpHeaders&#40;httpHeaders&#41;
* .setRequestConditions&#40;requestConditions&#41;
* .setLeaseAction&#40;LeaseAction.ACQUIRE&#41;
* .setLeaseDuration&#40;leaseDuration&#41;
* .setProposedLeaseId&#40;leaseId&#41;;
*
* client.flushWithResponse&#40;position, flushOptions&#41;.subscribe&#40;response -&gt;
* System.out.printf&#40;&quot;Flush data completed with status %d%n&quot;, response.getStatusCode&#40;&#41;&#41;&#41;;
* </pre>
* <!-- end com.azure.storage.file.datalake.DataLakeFileAsyncClient.flushWithResponse#long-DataLakeFileFlushOptions -->
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/update">Azure
* Docs</a></p>
*
* @param position The length of the file after all data has been written.
* @param flushOptions {@link DataLakeFileFlushOptions}
*
* @return A reactive response containing the information of the created resource.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<PathInfo>> flushWithResponse(long position, DataLakeFileFlushOptions flushOptions) {
try {
return withContext(context -> flushWithResponse(position, retainUncommittedData,
close, httpHeaders, requestConditions, context));
return withContext(context -> flushWithResponse(position, flushOptions, context));
} catch (RuntimeException ex) {
return monoError(LOGGER, ex);
}
}

Mono<Response<PathInfo>> flushWithResponse(long position, boolean retainUncommittedData, boolean close,
PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions, Context context) {
Mono<Response<PathInfo>> flushWithResponse(long position, DataLakeFileFlushOptions flushOptions, Context context) {

PathHttpHeaders httpHeaders = flushOptions.getPathHttpHeaders() == null
? new PathHttpHeaders() : flushOptions.getPathHttpHeaders();

httpHeaders = httpHeaders == null ? new PathHttpHeaders() : httpHeaders;
requestConditions = requestConditions == null ? new DataLakeRequestConditions() : requestConditions;
DataLakeRequestConditions requestConditions = flushOptions.getRequestConditions() == null
? new DataLakeRequestConditions() : flushOptions.getRequestConditions();

LeaseAccessConditions lac = new LeaseAccessConditions().setLeaseId(requestConditions.getLeaseId());
ModifiedAccessConditions mac = new ModifiedAccessConditions()
.setIfMatch(requestConditions.getIfMatch())
.setIfNoneMatch(requestConditions.getIfNoneMatch())
.setIfModifiedSince(requestConditions.getIfModifiedSince())
.setIfUnmodifiedSince(requestConditions.getIfUnmodifiedSince());

Long leaseDuration = flushOptions.getLeaseDuration() != null ? Long.valueOf(flushOptions.getLeaseDuration()) : null;

context = context == null ? Context.NONE : context;

return this.dataLakeStorage.getPaths().flushDataWithResponseAsync(null, position, retainUncommittedData, close,
(long) 0, null, httpHeaders, lac, mac, getCpkInfo(),
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
return this.dataLakeStorage.getPaths().flushDataWithResponseAsync(null, position, flushOptions.isUncommittedDataRetained(),
flushOptions.isClose(), (long) 0, flushOptions.getLeaseAction(), leaseDuration, flushOptions.getProposedLeaseId(),
null, httpHeaders, lac, mac, getCpkInfo(), context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, new PathInfo(response.getDeserializedHeaders().getETag(),
response.getDeserializedHeaders().getLastModified(),
response.getDeserializedHeaders().isXMsRequestServerEncrypted() != null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.FileQueryAsyncResponse;
import com.azure.storage.file.datalake.options.DataLakeFileAppendOptions;
import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import com.azure.storage.file.datalake.options.DataLakePathDeleteOptions;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
Expand Down Expand Up @@ -950,8 +951,66 @@ public PathInfo flush(long position, boolean overwrite) {
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<PathInfo> flushWithResponse(long position, boolean retainUncommittedData, boolean close,
PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions, Duration timeout, Context context) {
Mono<Response<PathInfo>> response = dataLakeFileAsyncClient.flushWithResponse(position, retainUncommittedData,
close, httpHeaders, requestConditions, context);
DataLakeFileFlushOptions flushOptions = new DataLakeFileFlushOptions()
.setUncommittedDataRetained(retainUncommittedData)
.setClose(close)
.setPathHttpHeaders(httpHeaders)
.setRequestConditions(requestConditions);

return flushWithResponse(position, flushOptions, timeout, context);
}

/**
* Flushes (writes) data previously appended to the file through a call to append.
* The previously uploaded data must be contiguous.
*
* <p><strong>Code Samples</strong></p>
*
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileClient.flushWithResponse#long-DataLakeFileFlushOptions-Duration-Context -->
* <pre>
* FileRange range = new FileRange&#40;1024, 2048L&#41;;
* DownloadRetryOptions options = new DownloadRetryOptions&#40;&#41;.setMaxRetryRequests&#40;5&#41;;
* byte[] contentMd5 = new byte[0]; &#47;&#47; Replace with valid md5
* boolean retainUncommittedData = false;
* boolean close = false;
* PathHttpHeaders httpHeaders = new PathHttpHeaders&#40;&#41;
* .setContentLanguage&#40;&quot;en-US&quot;&#41;
* .setContentType&#40;&quot;binary&quot;&#41;;
* DataLakeRequestConditions requestConditions = new DataLakeRequestConditions&#40;&#41;
* .setLeaseId&#40;leaseId&#41;;
*
* Integer leaseDuration = 15;
*
* DataLakeFileFlushOptions flushOptions = new DataLakeFileFlushOptions&#40;&#41;
* .setRetainUncommittedData&#40;retainUncommittedData&#41;
* .setClose&#40;close&#41;
* .setPathHttpHeaders&#40;httpHeaders&#41;
* .setRequestConditions&#40;requestConditions&#41;
* .setLeaseAction&#40;LeaseAction.ACQUIRE&#41;
* .setLeaseDuration&#40;leaseDuration&#41;
* .setProposedLeaseId&#40;leaseId&#41;;
*
* Response&lt;PathInfo&gt; response = client.flushWithResponse&#40;position, flushOptions, timeout,
* new Context&#40;key1, value1&#41;&#41;;
* System.out.printf&#40;&quot;Flush data completed with status %d%n&quot;, response.getStatusCode&#40;&#41;&#41;;
* </pre>
* <!-- end com.azure.storage.file.datalake.DataLakeFileClient.flushWithResponse#long-DataLakeFileFlushOptions-Duration-Context -->
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/update">Azure
* Docs</a></p>
*
* @param position The length of the file after all data has been written.
* @param flushOptions {@link DataLakeFileFlushOptions}
* @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
* @param context Additional context that is passed through the Http pipeline during the service call.
*
* @return A response containing the information of the created resource.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<PathInfo> flushWithResponse(long position, DataLakeFileFlushOptions flushOptions, Duration timeout,
Context context) {
Mono<Response<PathInfo>> response = dataLakeFileAsyncClient.flushWithResponse(position, flushOptions, context);

return StorageImplUtils.blockWithOptionalTimeout(response, timeout);
}
Expand Down
Loading