Skip to content

Commit

Permalink
Added some datalake buffered upload samples (#8910)
Browse files Browse the repository at this point in the history
  • Loading branch information
gapra-msft authored Mar 11, 2020
1 parent ebe9908 commit 12ccb06
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private static File createTempEmptyFile(String fileName) throws IOException {

File dirPath = new File(pathName);

if (dirPath.exists() || dirPath.mkdir()) {
if (dirPath.exists() || dirPath.mkdirs()) {
File f = new File(pathName + fileName);
if (f.exists() || f.createNewFile()) {
return f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import java.util.Locale;

/**
* This example shows how to upload and download using the Azure Storage Data Lake SDK for Java.
* This example shows how to append, flush and download using the Azure Storage Data Lake SDK for Java.
*/
public class UploadDownloadExample {
public class AppendFlushExample {

/**
* Entry point into the upload download examples for Storage datalake.
* Entry point into the append, flush and download examples for Storage datalake.
*
* @param args Unused. Arguments to the program.
* @throws IOException If an I/O error occurs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake;

import com.azure.storage.common.ParallelTransferOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Locale;
import java.util.Random;

/**
* This example shows how to use the buffered upload method on DataLakeFileAsyncClient.
*
* Note that the use of .block() in the method is only used to enable the sample to run effectively in isolation. It is
* not recommended for use in async environments.
*/
public class AsyncBufferedUploadExample {
/**
* Entry point into the basic examples for Storage datalake.
* @param args Unused. Arguments to the program.
* @throws IOException If an I/O error occurs
* @throws RuntimeException If the downloaded data doesn't match the uploaded data
*/
public static void main(String[] args) throws IOException {

/*
* For more information on this setup, please refer to the BasicExample.
*/
String accountName = SampleHelper.getAccountName();
String accountKey = SampleHelper.getAccountKey();
StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
String endpoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);
String fileSystemName = "myjavafilesystembufferedupload" + System.currentTimeMillis();
DataLakeServiceAsyncClient storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
.buildAsyncClient();

DataLakeFileSystemAsyncClient fileSystemClient = storageClient.getFileSystemAsyncClient(fileSystemName);
fileSystemClient.create().block();

uploadSourceFile(endpoint, credential, fileSystemName);
DataLakeFileAsyncClient fileClient = fileSystemClient.getFileAsyncClient("HelloWorld.txt");


/*
sourceData has a network stream as its source and therefore likely does not support multiple subscribers. Even
if it did support multiple subscribers, it would not produce the same data each time it was subscribed to. While
we could inspect the http headers for the content-length, let us suppose that this information is unavailable
at this time. All three of these factors would individually make the use of the standard upload method
impossible--the first two because retries would not work and the third one because we could not satisfy the
argument list.
*/
Flux<ByteBuffer> sourceData = getSourceFileClient(endpoint, credential, fileSystemName).read()
// Perform some unpredicatable transformation.
.map(AsyncBufferedUploadExample::randomTransformation);

/*
This upload overload permits the use of such unreliable data sources. The length need not be specified, but
the tradeoff is that data must be buffered, so a buffer size and number of buffers is required instead. The
Javadoc on the method will give more detailed information on the significance of these parameters, but they are
likely context dependent.
*/
int blockSize = 10 * 1024;
int numBuffers = 5;
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(numBuffers, blockSize, null, null);
fileClient.upload(sourceData, parallelTransferOptions).block();
}

@SuppressWarnings("cast")
private static ByteBuffer randomTransformation(ByteBuffer buffer) {
// The JDK changed the return type of ByteBuffer#limit between 8 and 9. In 8 and below it returns Buffer, whereas
// in JDK 9 and later, it returns ByteBuffer. To compile on both, we explicitly cast the returned value to
// ByteBuffer.
// See https://bugs-stage.openjdk.java.net/browse/JDK-8062376
return (ByteBuffer) buffer.limit(new Random().nextInt(buffer.limit()));
}

private static void uploadSourceFile(String endpoint, StorageSharedKeyCredential credential, String containerName) {
getSourceFileClient(endpoint, credential, containerName)
.upload(Flux.just(ByteBuffer.wrap("Hello world".getBytes(Charset.defaultCharset()))), null).block();
}

private static DataLakeFileAsyncClient getSourceFileClient(String endpoint, StorageSharedKeyCredential credential,
String containerName) {
return new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential).buildAsyncClient()
.getFileSystemAsyncClient(containerName).getFileAsyncClient("sourceFile");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake;

import com.azure.storage.common.StorageSharedKeyCredential;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Locale;

/**
* This class shows how to upload the file as fast as possible in parallel using the optimized upload API.
*/
public class FileTransferExample {
private static final String LARGE_TEST_FOLDER = "test-large-files/";

/**
* Entry point into the file transfer examples for Storage datalake.
* @param args Unused. Arguments to the program.
* @throws IOException If an I/O error occurs
* @throws NoSuchAlgorithmException If {@code MD5} isn't supported
* @throws RuntimeException If the uploaded or downloaded file wasn't found
*/
public static void main(String[] args) throws IOException, NoSuchAlgorithmException {

/*
* From the Azure portal, get your Storage account's name and account key.
*/
String accountName = SampleHelper.getAccountName();
String accountKey = SampleHelper.getAccountKey();

/*
* Use your Storage account's name and key to create a credential object; this is used to access your account.
*/
StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);

/*
* From the Azure portal, get your Storage account datalake service URL endpoint.
* The URL typically looks like this:
*/
String endPoint = String.format(Locale.ROOT, "https://%s.dfs.core.windows.net", accountName);

/*
* Create a DataLakeServiceClient object that wraps the service endpoint, credential and a request pipeline.
* Now you can use the storageClient to perform various file system and path operations.
*/
DataLakeServiceClient storageClient = new DataLakeServiceClientBuilder().endpoint(endPoint).credential(credential).buildClient();


/*
* This example shows several common operations just to get you started.
*/


/*
* Create a client that references a to-be-created file system in your Azure Storage account. This returns a
* FileSystemClient uses the same endpoint, credential and pipeline from storageClient.
* Note that file system names require lowercase.
*/
DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient("myjavafilesystemparallelupload" + System.currentTimeMillis());

/*
* Create a file system in Storage datalake account.
*/
fileSystemClient.create();

/*
* Create a FileClient object that wraps a file's endpoint and a default pipeline, the client give us access to upload the file.
*/
String filename = "BigFile.bin";
DataLakeFileClient fileClient = fileSystemClient.getFileClient(filename);

/*
* Create the empty uploadFile and downloadFile.
*/
File largeFile = createTempEmptyFile(filename);

File downloadFile = createTempEmptyFile("downloadFile.bin");

/*
* Generate random things to uploadFile, which makes the file with size of 100MB.
*/
long fileSize = 100 * 1024 * 1024L;
createTempFileWithFileSize(largeFile, fileSize);

/*
* Upload the large file to storage file.
*/
fileClient.uploadFromFile(largeFile.getPath());

/*
* Download the large file from storage file to the local downloadFile path.
*/
fileClient.readToFile(downloadFile.getPath(), true);

/*
* Check the files are same after the round-trip.
*/
if (Files.exists(downloadFile.toPath()) && Files.exists(largeFile.toPath())) {
checkTwoFilesAreTheSame(largeFile, downloadFile);
System.out.println("The file we upload is the same as the one we download.");
} else {
throw new RuntimeException("Did not find the upload or download file.");
}

/*
* Clean up the local files and storage file system.
*/
fileSystemClient.delete();
Files.deleteIfExists(largeFile.toPath());
Files.deleteIfExists(downloadFile.toPath());
}

private static File createTempEmptyFile(String fileName) throws IOException {
String pathName = "./folderPath/" + LARGE_TEST_FOLDER;

File dirPath = new File(pathName);

if (dirPath.exists() || dirPath.mkdirs()) {
File f = new File(pathName + fileName);
if (f.exists() || f.createNewFile()) {
return f;
} else {
throw new RuntimeException("Failed to create the large file.");
}
} else {
throw new RuntimeException("Failed to create the large file dir.");
}
}

private static void createTempFileWithFileSize(File f, long size) throws IOException {
RandomAccessFile raf = new RandomAccessFile(f, "rw");
raf.setLength(size);
raf.close();
}

private static void checkTwoFilesAreTheSame(File f1, File f2) throws IOException, NoSuchAlgorithmException {
String checksumUpload = getFileChecksum(f1);
String checksumDownload = getFileChecksum(f2);
if (!checksumUpload.equals(checksumDownload)) {
throw new RuntimeException("The file upload does not match the file download.");
}
}

private static String getFileChecksum(File file) throws IOException, NoSuchAlgorithmException {
MessageDigest md = MessageDigest.getInstance("MD5");

try (FileInputStream fis = new FileInputStream(file); FileChannel ch = fis.getChannel()) {
final ByteBuffer buf = ByteBuffer.allocateDirect(8192);
int b = ch.read(buf);
while (b != -1 && b != 0) {
buf.flip();
final byte[] bytes = new byte[b];
buf.get(bytes);
md.update(bytes, 0, b);
buf.clear();
b = ch.read(buf);
}

return new String(md.digest(), StandardCharsets.UTF_8);
}
}


}

0 comments on commit 12ccb06

Please sign in to comment.