Skip to content

Commit

Permalink
HDFS-16791 WIP - client protocol and Filesystem apis implemented and …
Browse files Browse the repository at this point in the history
…building.

Tests for Distributed Filesystem, view filesystem and RBF
  • Loading branch information
Tom McCormick committed Oct 4, 2022
1 parent 4891bf5 commit 8f87758
Show file tree
Hide file tree
Showing 19 changed files with 709 additions and 371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4876,6 +4876,16 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {

}

/**
* Return root path
* @param path
* @return
* @throws IOException
*/
public Path getEnclosingRoot(Path path) throws IOException {
return this.makeQualified(new Path("/"));
}

/**
* Create a multipart uploader.
* @param basePath file path under which all files are uploaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,23 @@
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FilterFileSystem extends FileSystem {

protected FileSystem fs;
protected String swapScheme;

/*
* so that extending classes can define it
*/
public FilterFileSystem() {
}

public FilterFileSystem(FileSystem fs) {
this.fs = fs;
this.statistics = fs.statistics;
}

/**
* Get the raw file system
* Get the raw file system
* @return FileSystem being filtered
*/
public FileSystem getRawFileSystem() {
Expand Down Expand Up @@ -108,8 +108,8 @@ public void initialize(URI name, Configuration conf) throws IOException {
public URI getUri() {
return fs.getUri();
}


@Override
protected URI getCanonicalUri() {
return fs.getCanonicalUri();
Expand All @@ -127,7 +127,7 @@ public Path makeQualified(Path path) {
// swap in our scheme if the filtered fs is using a different scheme
if (swapScheme != null) {
try {
// NOTE: should deal with authority, but too much other stuff is broken
// NOTE: should deal with authority, but too much other stuff is broken
fqPath = new Path(
new URI(swapScheme, fqPath.toUri().getSchemeSpecificPart(), null)
);
Expand All @@ -137,7 +137,7 @@ public Path makeQualified(Path path) {
}
return fqPath;
}

///////////////////////////////////////////////////////////////
// FileSystem
///////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -223,14 +223,14 @@ protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {

return fs.createNonRecursive(f, permission, flags, bufferSize, replication, blockSize,
progress);
}

/**
* Set replication for an existing file.
*
*
* @param src file name
* @param replication new replication
* @throws IOException raised on errors performing I/O.
Expand All @@ -241,7 +241,7 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
public boolean setReplication(Path src, short replication) throws IOException {
return fs.setReplication(src, replication);
}

/**
* Renames Path src to Path dst. Can take place on local fs
* or remote DFS.
Expand All @@ -261,13 +261,13 @@ protected void rename(Path src, Path dst, Rename... options)
public boolean truncate(Path f, final long newLength) throws IOException {
return fs.truncate(f, newLength);
}

/** Delete a file */
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return fs.delete(f, recursive);
}

/** List files in a directory. */
@Override
public FileStatus[] listStatus(Path f) throws IOException {
Expand All @@ -286,7 +286,7 @@ public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException {
return fs.listLocatedStatus(f);
}

/** Return a remote iterator for listing in a directory */
@Override
public RemoteIterator<FileStatus> listStatusIterator(Path f)
Expand All @@ -303,34 +303,34 @@ public Path getHomeDirectory() {
/**
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it.
*
*
* @param newDir new dir.
*/
@Override
public void setWorkingDirectory(Path newDir) {
fs.setWorkingDirectory(newDir);
}

/**
* Get the current working directory for the given file system
*
*
* @return the directory pathname
*/
@Override
public Path getWorkingDirectory() {
return fs.getWorkingDirectory();
}

@Override
protected Path getInitialWorkingDirectory() {
return fs.getInitialWorkingDirectory();
}

@Override
public FsStatus getStatus(Path p) throws IOException {
return fs.getStatus(p);
}

@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return fs.mkdirs(f, permission);
Expand All @@ -351,26 +351,26 @@ public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
fs.copyFromLocalFile(delSrc, src, dst);
}

/**
* The src files are on the local disk. Add it to FS at
* the given dst name.
* delSrc indicates if the source should be removed
*/
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path[] srcs, Path dst)
throws IOException {
fs.copyFromLocalFile(delSrc, overwrite, srcs, dst);
}

/**
* The src file is on the local disk. Add it to FS at
* the given dst name.
* delSrc indicates if the source should be removed
*/
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException {
fs.copyFromLocalFile(delSrc, overwrite, src, dst);
Expand All @@ -380,13 +380,13 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite,
* The src file is under FS, and the dst is on the local disk.
* Copy it from FS control to the local dst name.
* delSrc indicates if the src will be removed or not.
*/
*/
@Override
public void copyToLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
fs.copyToLocalFile(delSrc, src, dst);
}

/**
* Returns a local File that the user can write output to. The caller
* provides both the eventual FS target name and the local working
Expand Down Expand Up @@ -427,7 +427,7 @@ public long getUsed(Path path) throws IOException {
public long getDefaultBlockSize() {
return fs.getDefaultBlockSize();
}

@Override
public short getDefaultReplication() {
return fs.getDefaultReplication();
Expand All @@ -438,7 +438,7 @@ public FsServerDefaults getServerDefaults() throws IOException {
return fs.getServerDefaults();
}

// path variants delegate to underlying filesystem
// path variants delegate to underlying filesystem
@Override
public long getDefaultBlockSize(Path f) {
return fs.getDefaultBlockSize(f);
Expand Down Expand Up @@ -476,7 +476,7 @@ public void access(Path path, FsAction mode) throws AccessControlException,
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, UnsupportedFileSystemException,
ParentNotDirectoryException, UnsupportedFileSystemException,
IOException {
fs.createSymlink(target, link, createParent);
}
Expand Down Expand Up @@ -513,7 +513,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException {
public void setVerifyChecksum(boolean verifyChecksum) {
fs.setVerifyChecksum(verifyChecksum);
}

@Override
public void setWriteChecksum(boolean writeChecksum) {
fs.setWriteChecksum(writeChecksum);
Expand All @@ -523,7 +523,7 @@ public void setWriteChecksum(boolean writeChecksum) {
public Configuration getConf() {
return fs.getConf();
}

@Override
public void close() throws IOException {
super.close();
Expand Down Expand Up @@ -564,7 +564,7 @@ protected boolean primitiveMkdir(Path f, FsPermission abdolutePermission)
throws IOException {
return fs.primitiveMkdir(f, abdolutePermission);
}

@Override // FileSystem
public FileSystem[] getChildFileSystems() {
return new FileSystem[]{fs};
Expand All @@ -575,13 +575,13 @@ public Path createSnapshot(Path path, String snapshotName)
throws IOException {
return fs.createSnapshot(path, snapshotName);
}

@Override // FileSystem
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName) throws IOException {
fs.renameSnapshot(path, snapshotOldName, snapshotNewName);
}

@Override // FileSystem
public void deleteSnapshot(Path path, String snapshotName)
throws IOException {
Expand Down Expand Up @@ -732,6 +732,11 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
return fs.openFileWithOptions(pathHandle, parameters);
}

@Override
public Path getEnclosingRoot(Path path) throws IOException {
return fs.getEnclosingRoot(path);
}

@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,14 @@ public boolean hasPathCapability(Path path, String capability)
}
}

@Override
public Path getEnclosingRoot(Path path) throws IOException {
InodeTree.ResolveResult<FileSystem> res = fsState.resolve(getUriPath(path), true);
Path fullPath = new Path(res.resolvedPath);
Path enclosingPath = res.targetFileSystem.getEnclosingRoot(path);
return enclosingPath.depth() > fullPath.depth() ? enclosingPath : fullPath;
}

/**
* An instance of this class represents an internal dir of the viewFs
* that is internal dir of the mount table.
Expand Down Expand Up @@ -1919,6 +1927,14 @@ public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
}
return allPolicies;
}

@Override
public Path getEnclosingRoot(Path path) throws IOException {
InodeTree.ResolveResult<FileSystem> res = fsState.resolve(path.toString(), true);
Path fullPath = new Path(res.resolvedPath);
Path enclosingPath = res.targetFileSystem.getEnclosingRoot(path);
return enclosingPath.depth() > fullPath.depth() ? enclosingPath : fullPath;
}
}

enum RenameStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3528,4 +3528,14 @@ public DatanodeInfo[] slowDatanodeReport() throws IOException {
}
}

public Path getEnclosingRoot(String src) throws IOException {
checkOpen();
try (TraceScope ignored = newPathTraceScope("getEnclosingRoot", src)) {
return new Path(namenode.getEnclosingRoot(src));
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
UnresolvedPathException.class);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public enum OpType {
GET_EC_CODECS("op_get_ec_codecs"),
GET_EC_POLICY("op_get_ec_policy"),
GET_EC_POLICIES("op_get_ec_policies"),
GET_ENCLOSING_ROOT("op_get_enclosing_root"),
GET_ENCRYPTION_ZONE("op_get_encryption_zone"),
GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations"),
GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3898,4 +3898,31 @@ public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
return dfs.slowDatanodeReport();
}

/* HDFS only */
public Path getEnclosingRoot(final Path path) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_ENCLOSING_ROOT);
Preconditions.checkNotNull(path);
Path absF = fixRelativePart(path);
return new FileSystemLinkResolver<Path>() {
@Override
public Path doCall(final Path p) throws IOException {
return dfs.getEnclosingRoot(getPathName(p));
}

@Override
public Path next(final FileSystem fs, final Path p)
throws IOException {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem myDfs = (DistributedFileSystem) fs;
return myDfs.getEnclosingRoot(p);
} else {
throw new UnsupportedOperationException(
"Cannot call getEZForPath"
+ " on a symlink to a non-DistributedFileSystem: " + path
+ " -> " + p);
}
}
}.resolve(this, absF);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.AddBlockFlag;
Expand Down Expand Up @@ -1888,4 +1889,11 @@ BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
@ReadOnly
DatanodeInfo[] getSlowDatanodeReport() throws IOException;

/**
* Get the enclosing root for a path.
*/
@Idempotent
@ReadOnly(isCoordinated = true)
String getEnclosingRoot(String src) throws IOException;

}
Loading

0 comments on commit 8f87758

Please sign in to comment.