diff --git a/transportable-udfs-hive/src/main/java/com/linkedin/transport/hive/StdUdfWrapper.java b/transportable-udfs-hive/src/main/java/com/linkedin/transport/hive/StdUdfWrapper.java index 3b3da9ab..bfa5cb6d 100644 --- a/transportable-udfs-hive/src/main/java/com/linkedin/transport/hive/StdUdfWrapper.java +++ b/transportable-udfs-hive/src/main/java/com/linkedin/transport/hive/StdUdfWrapper.java @@ -231,7 +231,7 @@ public String[] getRequiredFiles() { } _distributedCacheFiles = Arrays.stream(requiredFiles).map(requiredFile -> { try { - return FileSystemUtils.resolveLatest(requiredFile, FileSystemUtils.getHDFSFileSystem()); + return FileSystemUtils.resolveLatest(requiredFile); } catch (IOException e) { throw new RuntimeException("Failed to resolve path: [" + requiredFile + "].", e); } diff --git a/transportable-udfs-presto/src/main/java/com/linkedin/transport/presto/FileSystemClient.java b/transportable-udfs-presto/src/main/java/com/linkedin/transport/presto/FileSystemClient.java index 850ef304..69622a50 100644 --- a/transportable-udfs-presto/src/main/java/com/linkedin/transport/presto/FileSystemClient.java +++ b/transportable-udfs-presto/src/main/java/com/linkedin/transport/presto/FileSystemClient.java @@ -53,7 +53,7 @@ public String copyToLocalFile(String remoteFilename) { Path remotePath = new Path(remoteFilename); Path localPath = new Path(Paths.get(getAndCreateLocalDir(), new File(remoteFilename).getName()).toString()); FileSystem fs = remotePath.getFileSystem(conf); - String resolvedRemoteFilename = FileSystemUtils.resolveLatest(remoteFilename, fs); + String resolvedRemoteFilename = FileSystemUtils.resolveLatest(remoteFilename); Path resolvedRemotePath = new Path(resolvedRemoteFilename); fs.copyToLocalFile(resolvedRemotePath, localPath); return localPath.toString(); diff --git a/transportable-udfs-spark/src/main/scala/com/linkedin/transport/spark/StdUdfWrapper.scala b/transportable-udfs-spark/src/main/scala/com/linkedin/transport/spark/StdUdfWrapper.scala index c87f3bfc..f1cca7d4 100644 --- a/transportable-udfs-spark/src/main/scala/com/linkedin/transport/spark/StdUdfWrapper.scala +++ b/transportable-udfs-spark/src/main/scala/com/linkedin/transport/spark/StdUdfWrapper.scala @@ -95,7 +95,7 @@ abstract class StdUdfWrapper(_expressions: Seq[Expression]) extends Expression lazy val sparkContext = SparkSession.builder().getOrCreate().sparkContext _distributedCacheFiles = requiredFiles.map(file => { try { - val resolvedFile = FileSystemUtils.resolveLatest(file, FileSystemUtils.getHDFSFileSystem) + val resolvedFile = FileSystemUtils.resolveLatest(file) // TODO: Currently does not support adding of files with same file name. E.g dirA/file.txt dirB/file.txt sparkContext.addFile(resolvedFile) resolvedFile diff --git a/transportable-udfs-test/transportable-udfs-test-generic/src/main/java/com/linkedin/transport/test/generic/GenericStdUDFWrapper.java b/transportable-udfs-test/transportable-udfs-test-generic/src/main/java/com/linkedin/transport/test/generic/GenericStdUDFWrapper.java index c610f984..2a345e3c 100644 --- a/transportable-udfs-test/transportable-udfs-test-generic/src/main/java/com/linkedin/transport/test/generic/GenericStdUDFWrapper.java +++ b/transportable-udfs-test/transportable-udfs-test-generic/src/main/java/com/linkedin/transport/test/generic/GenericStdUDFWrapper.java @@ -196,7 +196,7 @@ public String[] getRequiredFiles(StdData[] args) { } _localFiles = Arrays.stream(requiredFiles).map(requiredFile -> { try { - return FileSystemUtils.resolveLatest(requiredFile, FileSystemUtils.getLocalFileSystem()); + return FileSystemUtils.resolveLatest(requiredFile); } catch (IOException e) { throw new RuntimeException("Failed to resolve path: [" + requiredFile + "].", e); } diff --git a/transportable-udfs-utils/src/main/java/com/linkedin/transport/utils/FileSystemUtils.java b/transportable-udfs-utils/src/main/java/com/linkedin/transport/utils/FileSystemUtils.java index 6f56b898..2d59429b 100644 --- a/transportable-udfs-utils/src/main/java/com/linkedin/transport/utils/FileSystemUtils.java +++ b/transportable-udfs-utils/src/main/java/com/linkedin/transport/utils/FileSystemUtils.java @@ -6,6 +6,7 @@ package com.linkedin.transport.utils; import java.io.IOException; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -44,7 +45,7 @@ public static boolean isLocalEnvironment(Configuration conf) { * * @return the HDFS FileSystem if we are not in local mode, local FileSystem if we are. */ - public static FileSystem getHDFSFileSystem() { + public static FileSystem getHDFSFileSystem(String filePath) { FileSystem fs; JobConf conf = new JobConf(); try { @@ -52,9 +53,9 @@ public static FileSystem getHDFSFileSystem() { if (isLocalEnvironment(conf)) { fs = FileSystem.getLocal(conf); } else { - fs = FileSystem.get(conf); + fs = FileSystem.get(new java.net.URI(filePath), conf); } - } catch (IOException e) { + } catch (IOException | URISyntaxException e) { throw new RuntimeException("Failed to load the HDFS file system.", e); } @@ -87,16 +88,16 @@ public static FileSystem getLocalFileSystem() { * the same path. * * @param path the path to resolve - * @param fs the filesystem used to resolve the path * @return the resolved path * @throws IOException when the filesystem could not resolve the path */ - public static String resolveLatest(String path, FileSystem fs) throws IOException { + public static String resolveLatest(String path) throws IOException { if (!StringUtils.isBlank(path)) { path = path.trim(); String[] split = path.split("#LATEST"); String retval = split[0]; - + FileSystem fs = null; + fs = getHDFSFileSystem(path); for (int i = 1; i < split.length; ++i) { retval = resolveLatestHelper(retval, fs, true) + split[i]; } diff --git a/transportable-udfs-utils/src/test/java/com/linkedin/transport/utils/FileSystemUtilsTest.java b/transportable-udfs-utils/src/test/java/com/linkedin/transport/utils/FileSystemUtilsTest.java index e8c37f95..bce276eb 100644 --- a/transportable-udfs-utils/src/test/java/com/linkedin/transport/utils/FileSystemUtilsTest.java +++ b/transportable-udfs-utils/src/test/java/com/linkedin/transport/utils/FileSystemUtilsTest.java @@ -18,20 +18,18 @@ public class FileSystemUtilsTest { @Test public void testResolveLatest() throws IOException, URISyntaxException { - FileSystem fs = FileSystemUtils.getLocalFileSystem(); - - String resourcePath = getPathForResource("root"); + String resourcePath = "file://" + getPathForResource("root"); // Test cases to resolve #LATEST - String filePath = FileSystemUtils.resolveLatest(resourcePath + "/2018/11/02.dat", fs); + String filePath = FileSystemUtils.resolveLatest(resourcePath + "/2018/11/02.dat"); Assert.assertTrue( - FileSystemUtils.resolveLatest(resourcePath + "/2018/11/02.dat", fs).endsWith("/root/2018/11/02.dat")); + FileSystemUtils.resolveLatest(resourcePath + "/2018/11/02.dat").endsWith("/root/2018/11/02.dat")); Assert.assertTrue( - FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/11/#LATEST", fs).endsWith("/root/2019/11/02.dat")); + FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/11/#LATEST").endsWith("/root/2019/11/02.dat")); Assert.assertTrue( - FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/#LATEST/#LATEST", fs).endsWith("/root/2019/12/02.dat")); + FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/#LATEST/#LATEST").endsWith("/root/2019/12/02.dat")); Assert.assertTrue( - FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/#LATEST", fs).endsWith("/root/2019/13.dat")); + FileSystemUtils.resolveLatest(resourcePath + "/#LATEST/#LATEST").endsWith("/root/2019/13.dat")); } private String getPathForResource(String resource) throws URISyntaxException {