Skip to content

Commit

Permalink
Merge pull request #676 from begoldsm/autorest
Browse files Browse the repository at this point in the history
Updates to Uploader
  • Loading branch information
jianghaolu committed May 3, 2016
2 parents 3e1439f + c82391a commit 96829b6
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,95 +40,97 @@ public class DataLakeAnalyticsCatalogOperationsTests extends DataLakeAnalyticsMa
private static String credentialName = generateName("testcred1");
private static String secretName = generateName("testsecret1");
private static String secretPwd = generateName("testsecretpwd1");

private static String catalogCreationScript = MessageFormat.format("DROP DATABASE IF EXISTS {0}; CREATE DATABASE {0}; " +
"//Create Table" +
"CREATE TABLE {0}.dbo.{1}" +
"(" +
" //Define schema of table" +
" UserId int, " +
" Start DateTime, " +
" Region string, " +
" Query string, " +
" Duration int, " +
" Urls string, " +
" ClickedUrls string," +
" INDEX idx1 //Name of index" +
" CLUSTERED (Region ASC) //Column to cluster by" +
" PARTITIONED BY HASH (Region) //Column to partition by" +
");" +
"DROP FUNCTION IF EXISTS {0}.dbo.{2};" +
"" +
"//create table weblogs on space-delimited website log data" +
"CREATE FUNCTION {0}.dbo.{2}()" +
"RETURNS @result TABLE" +
"(" +
" s_date DateTime," +
" s_time string," +
" s_sitename string," +
" cs_method string, " +
" cs_uristem string," +
" cs_uriquery string," +
" s_port int," +
" cs_username string, " +
" c_ip string," +
" cs_useragent string," +
" cs_cookie string," +
" cs_referer string, " +
" cs_host string," +
" sc_status int," +
" sc_substatus int," +
" sc_win32status int, " +
" sc_bytes int," +
" cs_bytes int," +
" s_timetaken int" +
")" +
"AS" +
"BEGIN" +
"" +
" @result = EXTRACT" +
" s_date DateTime," +
" s_time string," +
" s_sitename string," +
" cs_method string," +
" cs_uristem string," +
" cs_uriquery string," +
" s_port int," +
" cs_username string," +
" c_ip string," +
" cs_useragent string," +
" cs_cookie string," +
" cs_referer string," +
" cs_host string," +
" sc_status int," +
" sc_substatus int," +
" sc_win32status int," +
" sc_bytes int," +
" cs_bytes int," +
" s_timetaken int" +
" FROM @\"/Samples/Data/WebLog.log\"" +
" USING Extractors.Text(delimiter:' ');" +
"" +
"RETURN;" +
"END;" +
"CREATE VIEW {0}.dbo.{3} " +
"AS " +
" SELECT * FROM " +
" (" +
" VALUES(1,2),(2,4)" +
" ) " +
"AS " +
"T(a, b);" +
"CREATE PROCEDURE {0}.dbo.{4}()" +
"AS BEGIN" +
" CREATE VIEW {0}.dbo.{3} " +
" AS " +
" SELECT * FROM " +
" (" +
" VALUES(1,2),(2,4)" +
" ) " +
" AS " +
" T(a, b);" +
private static String catalogCreationScript = MessageFormat.format("DROP DATABASE IF EXISTS {0}; CREATE DATABASE {0}; \r\n" +
"//Create Table\r\n" +
"CREATE TABLE {0}.dbo.{1}\r\n" +
"(\r\n" +
" //Define schema of table\r\n" +
" UserId int, \r\n" +
" Start DateTime, \r\n" +
" Region string, \r\n" +
" Query string, \r\n" +
" Duration int, \r\n" +
" Urls string, \r\n" +
" ClickedUrls string,\r\n" +
" INDEX idx1 //Name of index\r\n" +
" CLUSTERED (Region ASC) //Column to cluster by\r\n" +
" PARTITIONED BY BUCKETS (UserId) HASH (Region) //Column to partition by\r\n" +
");\r\n" +
"\r\n" +
"ALTER TABLE {0}.dbo.{1} ADD IF NOT EXISTS PARTITION (1);\r\n" +
"\r\n" +
"DROP FUNCTION IF EXISTS {0}.dbo.{2};\r\n" +
"\r\n" +
"//create table weblogs on space-delimited website log data\r\n" +
"CREATE FUNCTION {0}.dbo.{2}()\r\n" +
"RETURNS @result TABLE\r\n" +
"(\r\n" +
" s_date DateTime,\r\n" +
" s_time string,\r\n" +
" s_sitename string,\r\n" +
" cs_method string, \r\n" +
" cs_uristem string,\r\n" +
" cs_uriquery string,\r\n" +
" s_port int,\r\n" +
" cs_username string, \r\n" +
" c_ip string,\r\n" +
" cs_useragent string,\r\n" +
" cs_cookie string,\r\n" +
" cs_referer string, \r\n" +
" cs_host string,\r\n" +
" sc_status int,\r\n" +
" sc_substatus int,\r\n" +
" sc_win32status int, \r\n" +
" sc_bytes int,\r\n" +
" cs_bytes int,\r\n" +
" s_timetaken int\r\n" +
")\r\n" +
"AS\r\n" +
"BEGIN\r\n" +
"\r\n" +
" @result = EXTRACT\r\n" +
" s_date DateTime,\r\n" +
" s_time string,\r\n" +
" s_sitename string,\r\n" +
" cs_method string,\r\n" +
" cs_uristem string,\r\n" +
" cs_uriquery string,\r\n" +
" s_port int,\r\n" +
" cs_username string,\r\n" +
" c_ip string,\r\n" +
" cs_useragent string,\r\n" +
" cs_cookie string,\r\n" +
" cs_referer string,\r\n" +
" cs_host string,\r\n" +
" sc_status int,\r\n" +
" sc_substatus int,\r\n" +
" sc_win32status int,\r\n" +
" sc_bytes int,\r\n" +
" cs_bytes int,\r\n" +
" s_timetaken int\r\n" +
" FROM @\"/Samples/Data/WebLog.log\"\r\n" +
" USING Extractors.Text(delimiter:''' ''');\r\n" +
"\r\n" +
"RETURN;\r\n" +
"END;\r\n" +
"CREATE VIEW {0}.dbo.{3} \r\n" +
"AS \r\n" +
" SELECT * FROM \r\n" +
" (\r\n" +
" VALUES(1,2),(2,4)\r\n" +
" ) \r\n" +
"AS \r\n" +
"T(a, b);\r\n" +
"CREATE PROCEDURE {0}.dbo.{4}()\r\n" +
"AS BEGIN\r\n" +
" CREATE VIEW {0}.dbo.{3} \r\n" +
" AS \r\n" +
" SELECT * FROM \r\n" +
" (\r\n" +
" VALUES(1,2),(2,4)\r\n" +
" ) \r\n" +
" AS \r\n" +
" T(a, b);\r\n" +
"END;", dbName, tableName, tvfName, viewName, procName);

@BeforeClass
Expand Down Expand Up @@ -318,12 +320,6 @@ public void canCreateUpdateDeleteSecretsAndCredentials() throws Exception {
USqlSecret secretCreateResponse = dataLakeAnalyticsCatalogManagementClient.getCatalogOperations().createSecret(
adlaAcct, dbName, secretName,
createParams).getBody();
try {

}
catch(Exception e) {

}

// Attempt to create the secret again, which should throw
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static void runJobToCompletion(DataLakeAnalyticsJobManagementClient jobCl
JobInformation getJobResponse = jobClient.getJobOperations().get(adlaAcct, jobCreateResponse.getJobId()).getBody();
Assert.assertNotNull(getJobResponse);

int maxWaitInSeconds = 180; // 3 minutes should be long enough
int maxWaitInSeconds = 2700; // giving it 45 minutes for now.
int curWaitInSeconds = 0;

while (getJobResponse.getState() != JobState.ENDED && curWaitInSeconds < maxWaitInSeconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import com.microsoft.azure.management.datalake.store.DataLakeStoreFileSystemManagementClient;
import com.microsoft.azure.management.datalake.store.models.FileStatusResult;

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;

/**
* A front end adapter that communicates with the DataLake Store.
Expand Down Expand Up @@ -52,7 +56,7 @@ public void CreateStream(String streamPath, boolean overwrite, byte[] data, int
toCreate = new byte[byteCount];
System.arraycopy(data, 0, toCreate, 0, byteCount);
}
_client.getFileSystemOperations().create(streamPath, _accountName, data, overwrite);
_client.getFileSystemOperations().create(_accountName, streamPath , data, overwrite);
}

/**
Expand All @@ -64,7 +68,7 @@ public void CreateStream(String streamPath, boolean overwrite, byte[] data, int
* @throws CloudException
*/
public void DeleteStream(String streamPath, boolean recurse) throws IOException, CloudException {
_client.getFileSystemOperations().delete(streamPath, _accountName, recurse);
_client.getFileSystemOperations().delete(_accountName, streamPath, recurse);
}

/**
Expand All @@ -80,7 +84,7 @@ public void DeleteStream(String streamPath, boolean recurse) throws IOException,
public void AppendToStream(String streamPath, byte[] data, long offset, int byteCount) throws IOException, CloudException {
byte[] toAppend = new byte[byteCount];
System.arraycopy(data, 0, toAppend, 0, byteCount);
_client.getFileSystemOperations().append(streamPath, _accountName, toAppend);
_client.getFileSystemOperations().append(_accountName, streamPath, toAppend);
}

/**
Expand All @@ -93,7 +97,7 @@ public void AppendToStream(String streamPath, byte[] data, long offset, int byte
*/
public boolean StreamExists(String streamPath) throws CloudException, IOException {
try {
_client.getFileSystemOperations().getFileStatus(streamPath, _accountName);
_client.getFileSystemOperations().getFileStatus(_accountName, streamPath);
} catch (CloudException cloudEx) {
if (cloudEx.getResponse().code() == 404) {
return false;
Expand All @@ -114,7 +118,7 @@ public boolean StreamExists(String streamPath) throws CloudException, IOExceptio
* @throws CloudException
*/
public long GetStreamLength(String streamPath) throws IOException, CloudException {
FileStatusResult fileInfoResponse = _client.getFileSystemOperations().getFileStatus(streamPath, _accountName).getBody();
FileStatusResult fileInfoResponse = _client.getFileSystemOperations().getFileStatus(_accountName, streamPath).getBody();
return fileInfoResponse.getFileStatus().getLength();
}

Expand All @@ -131,9 +135,9 @@ public void Concatenate(String targetStreamPath, String[] inputStreamPaths) thro
// this is required for the current version of the microsoft concatenate
// TODO: Improve WebHDFS concatenate to take in the list of paths to concatenate
// in the request body.
String paths = "sources=" + String.join(",", inputStreamPaths);
String paths = MessageFormat.format("sources={0}", StringUtils.join(inputStreamPaths, ','));

// For the current implementation, we require UTF8 encoding.
_client.getFileSystemOperations().msConcat(targetStreamPath, _accountName, paths.getBytes(), true);
_client.getFileSystemOperations().msConcat(_accountName, targetStreamPath, paths.getBytes(StandardCharsets.UTF_8), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Represents a general purpose file uploader into DataLake. Supports the efficient upload of large files.
Expand Down Expand Up @@ -342,14 +343,27 @@ public void run() {
}

inputPaths[finalI] = remoteStreamPath;

} catch (Exception ex) {
//collect any exceptions, whether we just generated them above or whether they come from the Front End,
exceptions.add(ex);
synchronized (exceptions) {
exceptions.add(ex);
}
}
}
});
}

exec.shutdown();

try {
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // waits ~292 years for completion or interruption.
}
catch (InterruptedException e) {
// add the exception since it will indicate that it was cancelled.
exceptions.add(e);
}

if (exceptions.size() > 0) {
throw new AggregateUploadException("At least one concatenate test failed", exceptions.remove(0), exceptions);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/
package com.microsoft.azure.management.datalake.store.uploader;

import com.microsoft.azure.credentials.AzureEnvironment;
import com.microsoft.azure.credentials.UserTokenCredentials;
import com.microsoft.azure.management.datalake.store.DataLakeStoreAccountManagementClient;
import com.microsoft.azure.management.datalake.store.DataLakeStoreAccountManagementClientImpl;
import com.microsoft.azure.management.datalake.store.DataLakeStoreFileSystemManagementClient;
import com.microsoft.azure.management.datalake.store.DataLakeStoreFileSystemManagementClientImpl;
import com.microsoft.azure.management.resources.ResourceManagementClient;
import com.microsoft.azure.management.resources.ResourceManagementClientImpl;
import okhttp3.logging.HttpLoggingInterceptor;

public abstract class DataLakeUploaderTestBase {
protected static ResourceManagementClient resourceManagementClient;
protected static DataLakeStoreAccountManagementClient dataLakeStoreAccountManagementClient;
protected static DataLakeStoreFileSystemManagementClient dataLakeStoreFileSystemManagementClient;

public static void createClients() {
UserTokenCredentials credentials = new UserTokenCredentials(
System.getenv("arm.clientid"),
System.getenv("arm.domain"),
System.getenv("arm.username"),
System.getenv("arm.password"),
null,
AzureEnvironment.AZURE);

resourceManagementClient = new ResourceManagementClientImpl(credentials);
resourceManagementClient.setSubscriptionId(System.getenv("arm.subscriptionid"));
resourceManagementClient.setLogLevel(HttpLoggingInterceptor.Level.BODY);

dataLakeStoreAccountManagementClient = new DataLakeStoreAccountManagementClientImpl(credentials);
dataLakeStoreAccountManagementClient.setLogLevel(HttpLoggingInterceptor.Level.BODY);
dataLakeStoreAccountManagementClient.setSubscriptionId(System.getenv("arm.subscriptionid"));

dataLakeStoreFileSystemManagementClient = new DataLakeStoreFileSystemManagementClientImpl(credentials);
dataLakeStoreFileSystemManagementClient.setLogLevel(HttpLoggingInterceptor.Level.NONE);
}

public static String generateName(String prefix) {
int randomSuffix = (int)(Math.random() * 1000);
return prefix + randomSuffix;
}
}
Loading

0 comments on commit 96829b6

Please sign in to comment.