Skip to content

Commit

Permalink
HADOOP-17139 Re-enable optimized copyFromLocal implementation in S3AF…
Browse files Browse the repository at this point in the history
…ileSystem (#3101)


This work
* Defines the behavior of FileSystem.copyFromLocal in filesystem.md
* Implements a high performance implementation of copyFromLocalOperation
  for S3 
* Adds a contract test for the operation: AbstractContractCopyFromLocalTest
* Implements the contract tests for Local and S3A FileSystems

Contributed by: Bogdan Stolojan
  • Loading branch information
bogthe authored Jul 30, 2021
1 parent 6d77f3b commit a218038
Show file tree
Hide file tree
Showing 7 changed files with 1,171 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ private static Path checkDest(String srcName, FileSystem dstFS, Path dst,
if (null != sdst) {
if (sdst.isDirectory()) {
if (null == srcName) {
if (overwrite) {
return dst;
}
throw new PathIsDirectoryException(dst.toString());
}
return checkDest(null, dstFS, new Path(dst, srcName), overwrite);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,112 @@ operations related to the part of the file being truncated is undefined.



### `boolean copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)`

The source file or directory at `src` is on the local disk and is copied into the file system at
destination `dst`. If the source must be deleted after the move then `delSrc` flag must be
set to TRUE. If destination already exists, and the destination contents must be overwritten
then `overwrite` flag must be set to TRUE.

#### Preconditions
Source and destination must be different
```python
if src = dest : raise FileExistsException
```

Destination and source must not be descendants one another
```python
if isDescendant(src, dest) or isDescendant(dest, src) : raise IOException
```

The source file or directory must exist locally:
```python
if not exists(LocalFS, src) : raise FileNotFoundException
```

Directories cannot be copied into files regardless to what the overwrite flag is set to:

```python
if isDir(LocalFS, src) and isFile(FS, dst) : raise PathExistsException
```

For all cases, except the one for which the above precondition throws, the overwrite flag must be
set to TRUE for the operation to succeed if destination exists. This will also overwrite any files
/ directories at the destination:

```python
if exists(FS, dst) and not overwrite : raise PathExistsException
```

#### Determining the final name of the copy
Given a base path on the source `base` and a child path `child` where `base` is in
`ancestors(child) + child`:

```python
def final_name(base, child, dest):
is base = child:
return dest
else:
return dest + childElements(base, child)
```

#### Outcome where source is a file `isFile(LocalFS, src)`
For a file, data at destination becomes that of the source. All ancestors are directories.
```python
if isFile(LocalFS, src) and (not exists(FS, dest) or (exists(FS, dest) and overwrite)):
FS' = FS where:
FS'.Files[dest] = LocalFS.Files[src]
FS'.Directories = FS.Directories + ancestors(FS, dest)
LocalFS' = LocalFS where
not delSrc or (delSrc = true and delete(LocalFS, src, false))
else if isFile(LocalFS, src) and isDir(FS, dest):
FS' = FS where:
let d = final_name(src, dest)
FS'.Files[d] = LocalFS.Files[src]
LocalFS' = LocalFS where:
not delSrc or (delSrc = true and delete(LocalFS, src, false))
```
There are no expectations that the file changes are atomic for both local `LocalFS` and remote `FS`.

#### Outcome where source is a directory `isDir(LocalFS, src)`
```python
if isDir(LocalFS, src) and (isFile(FS, dest) or isFile(FS, dest + childElements(src))):
raise FileAlreadyExistsException
else if isDir(LocalFS, src):
if exists(FS, dest):
dest' = dest + childElements(src)
if exists(FS, dest') and not overwrite:
raise PathExistsException
else:
dest' = dest

FS' = FS where:
forall c in descendants(LocalFS, src):
not exists(FS', final_name(c)) or overwrite
and forall c in descendants(LocalFS, src) where isDir(LocalFS, c):
FS'.Directories = FS'.Directories + (dest' + childElements(src, c))
and forall c in descendants(LocalFS, src) where isFile(LocalFS, c):
FS'.Files[final_name(c, dest')] = LocalFS.Files[c]
LocalFS' = LocalFS where
not delSrc or (delSrc = true and delete(LocalFS, src, true))
```
There are no expectations of operation isolation / atomicity.
This means files can change in source or destination while the operation is executing.
No guarantees are made for the final state of the file or directory after a copy other than it is
best effort. E.g.: when copying a directory, one file can be moved from source to destination but
there's nothing stopping the new file at destination being updated while the copy operation is still
in place.

#### Implementation

The default HDFS implementation, is to recurse through each file and folder, found at `src`, and
copy them sequentially to their final destination (relative to `dst`).

Object store based file systems should be mindful of what limitations arise from the above
implementation and could take advantage of parallel uploads and possible re-ordering of files copied
into the store to maximize throughput.


## <a name="RemoteIterator"></a> interface `RemoteIterator`

The `RemoteIterator` interface is used as a remote-access equivalent
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs;

import java.io.File;

import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.localfs.LocalFSContract;

import static org.apache.hadoop.test.LambdaTestUtils.intercept;

public class TestLocalFSCopyFromLocal extends AbstractContractCopyFromLocalTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf);
}

@Test
public void testDestinationFileIsToParentDirectory() throws Throwable {
describe("Source is a file and destination is its own parent directory");

File file = createTempFile("local");
Path dest = new Path(file.getParentFile().toURI());
Path src = new Path(file.toURI());

intercept(PathOperationException.class,
() -> getFileSystem().copyFromLocalFile( true, true, src, dest));
}

@Test
public void testDestinationDirectoryToSelf() throws Throwable {
describe("Source is a directory and it is copied into itself with " +
"delSrc flag set, destination must not exist");

File source = createTempDirectory("srcDir");
Path dest = new Path(source.toURI());
getFileSystem().copyFromLocalFile( true, true, dest, dest);

assertPathDoesNotExist("Source found", dest);
}

@Test
public void testSourceIntoDestinationSubDirectoryWithDelSrc() throws Throwable {
describe("Copying a parent folder inside a child folder with" +
" delSrc=TRUE");
File parent = createTempDirectory("parent");
File child = createTempDirectory(parent, "child");

Path src = new Path(parent.toURI());
Path dest = new Path(child.toURI());
getFileSystem().copyFromLocalFile(true, true, src, dest);

assertPathDoesNotExist("Source found", src);
assertPathDoesNotExist("Destination found", dest);
}

@Test
public void testSourceIntoDestinationSubDirectory() throws Throwable {
describe("Copying a parent folder inside a child folder with" +
" delSrc=FALSE");
File parent = createTempDirectory("parent");
File child = createTempDirectory(parent, "child");

Path src = new Path(parent.toURI());
Path dest = new Path(child.toURI());
getFileSystem().copyFromLocalFile(false, true, src, dest);

Path recursiveParent = new Path(dest, parent.getName());
Path recursiveChild = new Path(recursiveParent, child.getName());

// This definitely counts as interesting behaviour which needs documented
// Depending on the underlying system this can recurse 15+ times
recursiveParent = new Path(recursiveChild, parent.getName());
recursiveChild = new Path(recursiveParent, child.getName());
assertPathExists("Recursive parent not found", recursiveParent);
assertPathExists("Recursive child not found", recursiveChild);
}
}
Loading

0 comments on commit a218038

Please sign in to comment.