Skip to content

Commit

Permalink
ADLS Gen 2 Append and Lease Operations (#31263)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibrahimrabab authored Oct 18, 2022
1 parent aecbd27 commit 2622ec9
Show file tree
Hide file tree
Showing 17 changed files with 1,282 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.azure.storage.file.datalake.models.PathInfo;
import com.azure.storage.file.datalake.models.PathProperties;
import com.azure.storage.file.datalake.options.DataLakeFileAppendOptions;
import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions;
import com.azure.storage.file.datalake.options.DataLakePathDeleteOptions;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
Expand Down Expand Up @@ -1170,9 +1171,12 @@ Mono<Response<Void>> appendWithResponse(Flux<ByteBuffer> data, long fileOffset,
LeaseAccessConditions leaseAccessConditions = new LeaseAccessConditions().setLeaseId(appendOptions.getLeaseId());
PathHttpHeaders headers = new PathHttpHeaders().setTransactionalContentHash(appendOptions.getContentMd5());
context = context == null ? Context.NONE : context;
Long leaseDuration = appendOptions.getLeaseDuration() != null ? Long.valueOf(appendOptions.getLeaseDuration()) : null;

return this.dataLakeStorage.getPaths().appendDataWithResponseAsync(
data, fileOffset, null, length, null, null, appendOptions.isFlush(), headers, leaseAccessConditions, getCpkInfo(), context)
data, fileOffset, null, length, null, appendOptions.getLeaseAction(), leaseDuration,
appendOptions.getProposedLeaseId(), null, appendOptions.isFlush(), headers, leaseAccessConditions,
getCpkInfo(), context)
.map(response -> new SimpleResponse<>(response, null));
}

Expand Down Expand Up @@ -1275,31 +1279,92 @@ public Mono<PathInfo> flush(long position, boolean overwrite) {
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<PathInfo>> flushWithResponse(long position, boolean retainUncommittedData, boolean close,
PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions) {
DataLakeFileFlushOptions flushOptions = new DataLakeFileFlushOptions()
.setUncommittedDataRetained(retainUncommittedData)
.setClose(close)
.setPathHttpHeaders(httpHeaders)
.setRequestConditions(requestConditions);

try {
return withContext(context -> flushWithResponse(position, flushOptions, context));
} catch (RuntimeException ex) {
return monoError(LOGGER, ex);
}
}

/**
* Flushes (writes) data previously appended to the file through a call to append.
* The previously uploaded data must be contiguous.
*
* <p><strong>Code Samples</strong></p>
*
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileAsyncClient.flushWithResponse#long-DataLakeFileFlushOptions -->
* <pre>
* FileRange range = new FileRange&#40;1024, 2048L&#41;;
* DownloadRetryOptions options = new DownloadRetryOptions&#40;&#41;.setMaxRetryRequests&#40;5&#41;;
* byte[] contentMd5 = new byte[0]; &#47;&#47; Replace with valid md5
* boolean retainUncommittedData = false;
* boolean close = false;
* PathHttpHeaders httpHeaders = new PathHttpHeaders&#40;&#41;
* .setContentLanguage&#40;&quot;en-US&quot;&#41;
* .setContentType&#40;&quot;binary&quot;&#41;;
* DataLakeRequestConditions requestConditions = new DataLakeRequestConditions&#40;&#41;
* .setLeaseId&#40;leaseId&#41;;
* Integer leaseDuration = 15;
*
* DataLakeFileFlushOptions flushOptions = new DataLakeFileFlushOptions&#40;&#41;
* .setRetainUncommittedData&#40;retainUncommittedData&#41;
* .setClose&#40;close&#41;
* .setPathHttpHeaders&#40;httpHeaders&#41;
* .setRequestConditions&#40;requestConditions&#41;
* .setLeaseAction&#40;LeaseAction.ACQUIRE&#41;
* .setLeaseDuration&#40;leaseDuration&#41;
* .setProposedLeaseId&#40;leaseId&#41;;
*
* client.flushWithResponse&#40;position, flushOptions&#41;.subscribe&#40;response -&gt;
* System.out.printf&#40;&quot;Flush data completed with status %d%n&quot;, response.getStatusCode&#40;&#41;&#41;&#41;;
* </pre>
* <!-- end com.azure.storage.file.datalake.DataLakeFileAsyncClient.flushWithResponse#long-DataLakeFileFlushOptions -->
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/update">Azure
* Docs</a></p>
*
* @param position The length of the file after all data has been written.
* @param flushOptions {@link DataLakeFileFlushOptions}
*
* @return A reactive response containing the information of the created resource.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<PathInfo>> flushWithResponse(long position, DataLakeFileFlushOptions flushOptions) {
try {
return withContext(context -> flushWithResponse(position, retainUncommittedData,
close, httpHeaders, requestConditions, context));
return withContext(context -> flushWithResponse(position, flushOptions, context));
} catch (RuntimeException ex) {
return monoError(LOGGER, ex);
}
}

Mono<Response<PathInfo>> flushWithResponse(long position, boolean retainUncommittedData, boolean close,
PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions, Context context) {
Mono<Response<PathInfo>> flushWithResponse(long position, DataLakeFileFlushOptions flushOptions, Context context) {

PathHttpHeaders httpHeaders = flushOptions.getPathHttpHeaders() == null
? new PathHttpHeaders() : flushOptions.getPathHttpHeaders();

httpHeaders = httpHeaders == null ? new PathHttpHeaders() : httpHeaders;
requestConditions = requestConditions == null ? new DataLakeRequestConditions() : requestConditions;
DataLakeRequestConditions requestConditions = flushOptions.getRequestConditions() == null
? new DataLakeRequestConditions() : flushOptions.getRequestConditions();

LeaseAccessConditions lac = new LeaseAccessConditions().setLeaseId(requestConditions.getLeaseId());
ModifiedAccessConditions mac = new ModifiedAccessConditions()
.setIfMatch(requestConditions.getIfMatch())
.setIfNoneMatch(requestConditions.getIfNoneMatch())
.setIfModifiedSince(requestConditions.getIfModifiedSince())
.setIfUnmodifiedSince(requestConditions.getIfUnmodifiedSince());

Long leaseDuration = flushOptions.getLeaseDuration() != null ? Long.valueOf(flushOptions.getLeaseDuration()) : null;

context = context == null ? Context.NONE : context;

return this.dataLakeStorage.getPaths().flushDataWithResponseAsync(null, position, retainUncommittedData, close,
(long) 0, null, httpHeaders, lac, mac, getCpkInfo(),
context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
return this.dataLakeStorage.getPaths().flushDataWithResponseAsync(null, position, flushOptions.isUncommittedDataRetained(),
flushOptions.isClose(), (long) 0, flushOptions.getLeaseAction(), leaseDuration, flushOptions.getProposedLeaseId(),
null, httpHeaders, lac, mac, getCpkInfo(), context.addData(AZ_TRACING_NAMESPACE_KEY, STORAGE_TRACING_NAMESPACE_VALUE))
.map(response -> new SimpleResponse<>(response, new PathInfo(response.getDeserializedHeaders().getETag(),
response.getDeserializedHeaders().getLastModified(),
response.getDeserializedHeaders().isXMsRequestServerEncrypted() != null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.FileQueryAsyncResponse;
import com.azure.storage.file.datalake.options.DataLakeFileAppendOptions;
import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import com.azure.storage.file.datalake.options.DataLakePathDeleteOptions;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
Expand Down Expand Up @@ -950,8 +951,66 @@ public PathInfo flush(long position, boolean overwrite) {
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<PathInfo> flushWithResponse(long position, boolean retainUncommittedData, boolean close,
PathHttpHeaders httpHeaders, DataLakeRequestConditions requestConditions, Duration timeout, Context context) {
Mono<Response<PathInfo>> response = dataLakeFileAsyncClient.flushWithResponse(position, retainUncommittedData,
close, httpHeaders, requestConditions, context);
DataLakeFileFlushOptions flushOptions = new DataLakeFileFlushOptions()
.setUncommittedDataRetained(retainUncommittedData)
.setClose(close)
.setPathHttpHeaders(httpHeaders)
.setRequestConditions(requestConditions);

return flushWithResponse(position, flushOptions, timeout, context);
}

/**
* Flushes (writes) data previously appended to the file through a call to append.
* The previously uploaded data must be contiguous.
*
* <p><strong>Code Samples</strong></p>
*
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileClient.flushWithResponse#long-DataLakeFileFlushOptions-Duration-Context -->
* <pre>
* FileRange range = new FileRange&#40;1024, 2048L&#41;;
* DownloadRetryOptions options = new DownloadRetryOptions&#40;&#41;.setMaxRetryRequests&#40;5&#41;;
* byte[] contentMd5 = new byte[0]; &#47;&#47; Replace with valid md5
* boolean retainUncommittedData = false;
* boolean close = false;
* PathHttpHeaders httpHeaders = new PathHttpHeaders&#40;&#41;
* .setContentLanguage&#40;&quot;en-US&quot;&#41;
* .setContentType&#40;&quot;binary&quot;&#41;;
* DataLakeRequestConditions requestConditions = new DataLakeRequestConditions&#40;&#41;
* .setLeaseId&#40;leaseId&#41;;
*
* Integer leaseDuration = 15;
*
* DataLakeFileFlushOptions flushOptions = new DataLakeFileFlushOptions&#40;&#41;
* .setRetainUncommittedData&#40;retainUncommittedData&#41;
* .setClose&#40;close&#41;
* .setPathHttpHeaders&#40;httpHeaders&#41;
* .setRequestConditions&#40;requestConditions&#41;
* .setLeaseAction&#40;LeaseAction.ACQUIRE&#41;
* .setLeaseDuration&#40;leaseDuration&#41;
* .setProposedLeaseId&#40;leaseId&#41;;
*
* Response&lt;PathInfo&gt; response = client.flushWithResponse&#40;position, flushOptions, timeout,
* new Context&#40;key1, value1&#41;&#41;;
* System.out.printf&#40;&quot;Flush data completed with status %d%n&quot;, response.getStatusCode&#40;&#41;&#41;;
* </pre>
* <!-- end com.azure.storage.file.datalake.DataLakeFileClient.flushWithResponse#long-DataLakeFileFlushOptions-Duration-Context -->
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/update">Azure
* Docs</a></p>
*
* @param position The length of the file after all data has been written.
* @param flushOptions {@link DataLakeFileFlushOptions}
* @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
* @param context Additional context that is passed through the Http pipeline during the service call.
*
* @return A response containing the information of the created resource.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<PathInfo> flushWithResponse(long position, DataLakeFileFlushOptions flushOptions, Duration timeout,
Context context) {
Mono<Response<PathInfo>> response = dataLakeFileAsyncClient.flushWithResponse(position, flushOptions, context);

return StorageImplUtils.blockWithOptionalTimeout(response, timeout);
}
Expand Down
Loading

0 comments on commit 2622ec9

Please sign in to comment.