Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to Uploader #676

Merged
merged 2 commits into from
May 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,95 +40,97 @@ public class DataLakeAnalyticsCatalogOperationsTests extends DataLakeAnalyticsMa
private static String credentialName = generateName("testcred1");
private static String secretName = generateName("testsecret1");
private static String secretPwd = generateName("testsecretpwd1");

private static String catalogCreationScript = MessageFormat.format("DROP DATABASE IF EXISTS {0}; CREATE DATABASE {0}; " +
"//Create Table" +
"CREATE TABLE {0}.dbo.{1}" +
"(" +
" //Define schema of table" +
" UserId int, " +
" Start DateTime, " +
" Region string, " +
" Query string, " +
" Duration int, " +
" Urls string, " +
" ClickedUrls string," +
" INDEX idx1 //Name of index" +
" CLUSTERED (Region ASC) //Column to cluster by" +
" PARTITIONED BY HASH (Region) //Column to partition by" +
");" +
"DROP FUNCTION IF EXISTS {0}.dbo.{2};" +
"" +
"//create table weblogs on space-delimited website log data" +
"CREATE FUNCTION {0}.dbo.{2}()" +
"RETURNS @result TABLE" +
"(" +
" s_date DateTime," +
" s_time string," +
" s_sitename string," +
" cs_method string, " +
" cs_uristem string," +
" cs_uriquery string," +
" s_port int," +
" cs_username string, " +
" c_ip string," +
" cs_useragent string," +
" cs_cookie string," +
" cs_referer string, " +
" cs_host string," +
" sc_status int," +
" sc_substatus int," +
" sc_win32status int, " +
" sc_bytes int," +
" cs_bytes int," +
" s_timetaken int" +
")" +
"AS" +
"BEGIN" +
"" +
" @result = EXTRACT" +
" s_date DateTime," +
" s_time string," +
" s_sitename string," +
" cs_method string," +
" cs_uristem string," +
" cs_uriquery string," +
" s_port int," +
" cs_username string," +
" c_ip string," +
" cs_useragent string," +
" cs_cookie string," +
" cs_referer string," +
" cs_host string," +
" sc_status int," +
" sc_substatus int," +
" sc_win32status int," +
" sc_bytes int," +
" cs_bytes int," +
" s_timetaken int" +
" FROM @\"/Samples/Data/WebLog.log\"" +
" USING Extractors.Text(delimiter:' ');" +
"" +
"RETURN;" +
"END;" +
"CREATE VIEW {0}.dbo.{3} " +
"AS " +
" SELECT * FROM " +
" (" +
" VALUES(1,2),(2,4)" +
" ) " +
"AS " +
"T(a, b);" +
"CREATE PROCEDURE {0}.dbo.{4}()" +
"AS BEGIN" +
" CREATE VIEW {0}.dbo.{3} " +
" AS " +
" SELECT * FROM " +
" (" +
" VALUES(1,2),(2,4)" +
" ) " +
" AS " +
" T(a, b);" +
private static String catalogCreationScript = MessageFormat.format("DROP DATABASE IF EXISTS {0}; CREATE DATABASE {0}; \r\n" +
"//Create Table\r\n" +
"CREATE TABLE {0}.dbo.{1}\r\n" +
"(\r\n" +
" //Define schema of table\r\n" +
" UserId int, \r\n" +
" Start DateTime, \r\n" +
" Region string, \r\n" +
" Query string, \r\n" +
" Duration int, \r\n" +
" Urls string, \r\n" +
" ClickedUrls string,\r\n" +
" INDEX idx1 //Name of index\r\n" +
" CLUSTERED (Region ASC) //Column to cluster by\r\n" +
" PARTITIONED BY BUCKETS (UserId) HASH (Region) //Column to partition by\r\n" +
");\r\n" +
"\r\n" +
"ALTER TABLE {0}.dbo.{1} ADD IF NOT EXISTS PARTITION (1);\r\n" +
"\r\n" +
"DROP FUNCTION IF EXISTS {0}.dbo.{2};\r\n" +
"\r\n" +
"//create table weblogs on space-delimited website log data\r\n" +
"CREATE FUNCTION {0}.dbo.{2}()\r\n" +
"RETURNS @result TABLE\r\n" +
"(\r\n" +
" s_date DateTime,\r\n" +
" s_time string,\r\n" +
" s_sitename string,\r\n" +
" cs_method string, \r\n" +
" cs_uristem string,\r\n" +
" cs_uriquery string,\r\n" +
" s_port int,\r\n" +
" cs_username string, \r\n" +
" c_ip string,\r\n" +
" cs_useragent string,\r\n" +
" cs_cookie string,\r\n" +
" cs_referer string, \r\n" +
" cs_host string,\r\n" +
" sc_status int,\r\n" +
" sc_substatus int,\r\n" +
" sc_win32status int, \r\n" +
" sc_bytes int,\r\n" +
" cs_bytes int,\r\n" +
" s_timetaken int\r\n" +
")\r\n" +
"AS\r\n" +
"BEGIN\r\n" +
"\r\n" +
" @result = EXTRACT\r\n" +
" s_date DateTime,\r\n" +
" s_time string,\r\n" +
" s_sitename string,\r\n" +
" cs_method string,\r\n" +
" cs_uristem string,\r\n" +
" cs_uriquery string,\r\n" +
" s_port int,\r\n" +
" cs_username string,\r\n" +
" c_ip string,\r\n" +
" cs_useragent string,\r\n" +
" cs_cookie string,\r\n" +
" cs_referer string,\r\n" +
" cs_host string,\r\n" +
" sc_status int,\r\n" +
" sc_substatus int,\r\n" +
" sc_win32status int,\r\n" +
" sc_bytes int,\r\n" +
" cs_bytes int,\r\n" +
" s_timetaken int\r\n" +
" FROM @\"/Samples/Data/WebLog.log\"\r\n" +
" USING Extractors.Text(delimiter:''' ''');\r\n" +
"\r\n" +
"RETURN;\r\n" +
"END;\r\n" +
"CREATE VIEW {0}.dbo.{3} \r\n" +
"AS \r\n" +
" SELECT * FROM \r\n" +
" (\r\n" +
" VALUES(1,2),(2,4)\r\n" +
" ) \r\n" +
"AS \r\n" +
"T(a, b);\r\n" +
"CREATE PROCEDURE {0}.dbo.{4}()\r\n" +
"AS BEGIN\r\n" +
" CREATE VIEW {0}.dbo.{3} \r\n" +
" AS \r\n" +
" SELECT * FROM \r\n" +
" (\r\n" +
" VALUES(1,2),(2,4)\r\n" +
" ) \r\n" +
" AS \r\n" +
" T(a, b);\r\n" +
"END;", dbName, tableName, tvfName, viewName, procName);

@BeforeClass
Expand Down Expand Up @@ -318,12 +320,6 @@ public void canCreateUpdateDeleteSecretsAndCredentials() throws Exception {
USqlSecret secretCreateResponse = dataLakeAnalyticsCatalogManagementClient.getCatalogOperations().createSecret(
adlaAcct, dbName, secretName,
createParams).getBody();
try {

}
catch(Exception e) {

}

// Attempt to create the secret again, which should throw
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static void runJobToCompletion(DataLakeAnalyticsJobManagementClient jobCl
JobInformation getJobResponse = jobClient.getJobOperations().get(adlaAcct, jobCreateResponse.getJobId()).getBody();
Assert.assertNotNull(getJobResponse);

int maxWaitInSeconds = 180; // 3 minutes should be long enough
int maxWaitInSeconds = 2700; // giving it 45 minutes for now.
int curWaitInSeconds = 0;

while (getJobResponse.getState() != JobState.ENDED && curWaitInSeconds < maxWaitInSeconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import com.microsoft.azure.management.datalake.store.DataLakeStoreFileSystemManagementClient;
import com.microsoft.azure.management.datalake.store.models.FileStatusResult;

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;

/**
* A front end adapter that communicates with the DataLake Store.
Expand Down Expand Up @@ -52,7 +56,7 @@ public void CreateStream(String streamPath, boolean overwrite, byte[] data, int
toCreate = new byte[byteCount];
System.arraycopy(data, 0, toCreate, 0, byteCount);
}
_client.getFileSystemOperations().create(streamPath, _accountName, data, overwrite);
_client.getFileSystemOperations().create(_accountName, streamPath , data, overwrite);
}

/**
Expand All @@ -64,7 +68,7 @@ public void CreateStream(String streamPath, boolean overwrite, byte[] data, int
* @throws CloudException
*/
public void DeleteStream(String streamPath, boolean recurse) throws IOException, CloudException {
_client.getFileSystemOperations().delete(streamPath, _accountName, recurse);
_client.getFileSystemOperations().delete(_accountName, streamPath, recurse);
}

/**
Expand All @@ -80,7 +84,7 @@ public void DeleteStream(String streamPath, boolean recurse) throws IOException,
public void AppendToStream(String streamPath, byte[] data, long offset, int byteCount) throws IOException, CloudException {
byte[] toAppend = new byte[byteCount];
System.arraycopy(data, 0, toAppend, 0, byteCount);
_client.getFileSystemOperations().append(streamPath, _accountName, toAppend);
_client.getFileSystemOperations().append(_accountName, streamPath, toAppend);
}

/**
Expand All @@ -93,7 +97,7 @@ public void AppendToStream(String streamPath, byte[] data, long offset, int byte
*/
public boolean StreamExists(String streamPath) throws CloudException, IOException {
try {
_client.getFileSystemOperations().getFileStatus(streamPath, _accountName);
_client.getFileSystemOperations().getFileStatus(_accountName, streamPath);
} catch (CloudException cloudEx) {
if (cloudEx.getResponse().code() == 404) {
return false;
Expand All @@ -114,7 +118,7 @@ public boolean StreamExists(String streamPath) throws CloudException, IOExceptio
* @throws CloudException
*/
public long GetStreamLength(String streamPath) throws IOException, CloudException {
FileStatusResult fileInfoResponse = _client.getFileSystemOperations().getFileStatus(streamPath, _accountName).getBody();
FileStatusResult fileInfoResponse = _client.getFileSystemOperations().getFileStatus(_accountName, streamPath).getBody();
return fileInfoResponse.getFileStatus().getLength();
}

Expand All @@ -131,9 +135,9 @@ public void Concatenate(String targetStreamPath, String[] inputStreamPaths) thro
// this is required for the current version of the microsoft concatenate
// TODO: Improve WebHDFS concatenate to take in the list of paths to concatenate
// in the request body.
String paths = "sources=" + String.join(",", inputStreamPaths);
String paths = MessageFormat.format("sources={0}", StringUtils.join(inputStreamPaths, ','));

// For the current implementation, we require UTF8 encoding.
_client.getFileSystemOperations().msConcat(targetStreamPath, _accountName, paths.getBytes(), true);
_client.getFileSystemOperations().msConcat(_accountName, targetStreamPath, paths.getBytes(StandardCharsets.UTF_8), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Represents a general purpose file uploader into DataLake. Supports the efficient upload of large files.
Expand Down Expand Up @@ -342,14 +343,27 @@ public void run() {
}

inputPaths[finalI] = remoteStreamPath;

} catch (Exception ex) {
//collect any exceptions, whether we just generated them above or whether they come from the Front End,
exceptions.add(ex);
synchronized (exceptions) {
exceptions.add(ex);
}
}
}
});
}

exec.shutdown();

try {
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // waits ~292 years for completion or interruption.
}
catch (InterruptedException e) {
// add the exception since it will indicate that it was cancelled.
exceptions.add(e);
}

if (exceptions.size() > 0) {
throw new AggregateUploadException("At least one concatenate test failed", exceptions.remove(0), exceptions);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/
package com.microsoft.azure.management.datalake.store.uploader;

import com.microsoft.azure.credentials.AzureEnvironment;
import com.microsoft.azure.credentials.UserTokenCredentials;
import com.microsoft.azure.management.datalake.store.DataLakeStoreAccountManagementClient;
import com.microsoft.azure.management.datalake.store.DataLakeStoreAccountManagementClientImpl;
import com.microsoft.azure.management.datalake.store.DataLakeStoreFileSystemManagementClient;
import com.microsoft.azure.management.datalake.store.DataLakeStoreFileSystemManagementClientImpl;
import com.microsoft.azure.management.resources.ResourceManagementClient;
import com.microsoft.azure.management.resources.ResourceManagementClientImpl;
import okhttp3.logging.HttpLoggingInterceptor;

public abstract class DataLakeUploaderTestBase {
protected static ResourceManagementClient resourceManagementClient;
protected static DataLakeStoreAccountManagementClient dataLakeStoreAccountManagementClient;
protected static DataLakeStoreFileSystemManagementClient dataLakeStoreFileSystemManagementClient;

public static void createClients() {
UserTokenCredentials credentials = new UserTokenCredentials(
System.getenv("arm.clientid"),
System.getenv("arm.domain"),
System.getenv("arm.username"),
System.getenv("arm.password"),
null,
AzureEnvironment.AZURE);

resourceManagementClient = new ResourceManagementClientImpl(credentials);
resourceManagementClient.setSubscriptionId(System.getenv("arm.subscriptionid"));
resourceManagementClient.setLogLevel(HttpLoggingInterceptor.Level.BODY);

dataLakeStoreAccountManagementClient = new DataLakeStoreAccountManagementClientImpl(credentials);
dataLakeStoreAccountManagementClient.setLogLevel(HttpLoggingInterceptor.Level.BODY);
dataLakeStoreAccountManagementClient.setSubscriptionId(System.getenv("arm.subscriptionid"));

dataLakeStoreFileSystemManagementClient = new DataLakeStoreFileSystemManagementClientImpl(credentials);
dataLakeStoreFileSystemManagementClient.setLogLevel(HttpLoggingInterceptor.Level.NONE);
}

public static String generateName(String prefix) {
int randomSuffix = (int)(Math.random() * 1000);
return prefix + randomSuffix;
}
}
Loading