Skip to content

Commit

Permalink
UDF's files: allow for absolute paths instead of assuming defaultFS
Browse files Browse the repository at this point in the history
  • Loading branch information
SurenNihalani committed Dec 17, 2019
1 parent 44fc3b5 commit cabaf16
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public String[] getRequiredFiles() {
}
_distributedCacheFiles = Arrays.stream(requiredFiles).map(requiredFile -> {
try {
return FileSystemUtils.resolveLatest(requiredFile, FileSystemUtils.getHDFSFileSystem());
return FileSystemUtils.resolveLatest(requiredFile);
} catch (IOException e) {
throw new RuntimeException("Failed to resolve path: [" + requiredFile + "].", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public String copyToLocalFile(String remoteFilename) {
Path remotePath = new Path(remoteFilename);
Path localPath = new Path(Paths.get(getAndCreateLocalDir(), new File(remoteFilename).getName()).toString());
FileSystem fs = remotePath.getFileSystem(conf);
String resolvedRemoteFilename = FileSystemUtils.resolveLatest(remoteFilename, fs);
String resolvedRemoteFilename = FileSystemUtils.resolveLatest(remoteFilename);
Path resolvedRemotePath = new Path(resolvedRemoteFilename);
fs.copyToLocalFile(resolvedRemotePath, localPath);
return localPath.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class StdUdfWrapper(_expressions: Seq[Expression]) extends Expression
lazy val sparkContext = SparkSession.builder().getOrCreate().sparkContext
_distributedCacheFiles = requiredFiles.map(file => {
try {
val resolvedFile = FileSystemUtils.resolveLatest(file, FileSystemUtils.getHDFSFileSystem)
val resolvedFile = FileSystemUtils.resolveLatest(file)
// TODO: Currently does not support adding of files with same file name. E.g dirA/file.txt dirB/file.txt
sparkContext.addFile(resolvedFile)
resolvedFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public String[] getRequiredFiles(StdData[] args) {
}
_localFiles = Arrays.stream(requiredFiles).map(requiredFile -> {
try {
return FileSystemUtils.resolveLatest(requiredFile, FileSystemUtils.getLocalFileSystem());
return FileSystemUtils.resolveLatest(requiredFile);
} catch (IOException e) {
throw new RuntimeException("Failed to resolve path: [" + requiredFile + "].", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.linkedin.transport.utils;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -44,17 +45,17 @@ public static boolean isLocalEnvironment(Configuration conf) {
*
* @return the HDFS FileSystem if we are not in local mode, local FileSystem if we are.
*/
public static FileSystem getHDFSFileSystem() {
public static FileSystem getHDFSFileSystem(String filePath) {
FileSystem fs;
JobConf conf = new JobConf();
try {
// Checks if currently we are in local mode, which is basically when running unit tests
if (isLocalEnvironment(conf)) {
fs = FileSystem.getLocal(conf);
} else {
fs = FileSystem.get(conf);
fs = FileSystem.get(new java.net.URI(filePath), conf);
}
} catch (IOException e) {
} catch (IOException | URISyntaxException e) {
throw new RuntimeException("Failed to load the HDFS file system.", e);
}

Expand Down Expand Up @@ -87,16 +88,16 @@ public static FileSystem getLocalFileSystem() {
* the same path.
*
* @param path the path to resolve
* @param fs the filesystem used to resolve the path
* @return the resolved path
* @throws IOException when the filesystem could not resolve the path
*/
public static String resolveLatest(String path, FileSystem fs) throws IOException {
public static String resolveLatest(String path) throws IOException {
if (!StringUtils.isBlank(path)) {
path = path.trim();
String[] split = path.split("#LATEST");
String retval = split[0];

FileSystem fs = null;
fs = getHDFSFileSystem(path);
for (int i = 1; i < split.length; ++i) {
retval = resolveLatestHelper(retval, fs, true) + split[i];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@ public class FileSystemUtilsTest {

@Test
public void testResolveLatest() throws IOException, URISyntaxException {
FileSystem fs = FileSystemUtils.getLocalFileSystem();

String resourcePath = getPathForResource("root");
String resourcePath = "file://" + getPathForResource("root");

// Test cases to resolve #LATEST
String filePath = FileSystemUtils.resolveLatest(resourcePath + "/2018/11/02.dat", fs);
String filePath = FileSystemUtils.resolveLatest(resourcePath + "/2018/11/02.dat");
Assert.assertTrue(
FileSystemUtils.resolveLatest(resourcePath + "/2018/11/02.dat", fs).endsWith("/root/2018/11/02.dat"));
FileSystemUtils.resolveLatest(resourcePath + "/2018/11/02.dat").endsWith("/root/2018/11/02.dat"));
Assert.assertTrue(
FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/11/#LATEST", fs).endsWith("/root/2019/11/02.dat"));
FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/11/#LATEST").endsWith("/root/2019/11/02.dat"));
Assert.assertTrue(
FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/#LATEST/#LATEST", fs).endsWith("/root/2019/12/02.dat"));
FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/#LATEST/#LATEST").endsWith("/root/2019/12/02.dat"));
Assert.assertTrue(
FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/#LATEST", fs).endsWith("/root/2019/13.dat"));
FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/#LATEST").endsWith("/root/2019/13.dat"));
}

private String getPathForResource(String resource) throws URISyntaxException {
Expand Down

0 comments on commit cabaf16

Please sign in to comment.