Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-17139 Re-enable optimized copyFromLocal implementation in S3AFileSystem #3101

Merged

Conversation

bogthe
Copy link
Contributor

@bogthe bogthe commented Jun 11, 2021

HADOOP-17139

S3a copyFromLocalFile was not using the optimized innerCopyFromLocalFile because the optimized version was not handling directory copies appropriately.

This change fixes innerCopyFromLocalFile to handle directories.

Tested in eu-west-1 with mvn -Dparallel-tests -DtestsThreadCount=32 clean verify

[INFO] Results:
[INFO]
[WARNING] Tests run: 151, Failures: 0, Errors: 0, Skipped: 87

Bogdan Stolojan added 2 commits June 11, 2021 10:41
…ileSystem

S3a `copyFromLocalFile` was not using the optimized
`innerCopyFromLocalFile` because the optimized version was not handling
directory copies appropriately.

This change fixes `innerCopyFromLocalFile` to handle directories.
Turns out `testCostOfCopyFromLocalFile` tests were failing in
`ITestS3AFileOperationCost` because OBJECT_PUT_REQUESTS statistic wasn't
getting incremented properly. This should keep track of that stat.
Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm doing this review from vs code, apologies if it comes mixed up).

Production code

  • I think it needs to create the base dir in the tree then recurse down without any need to create fake markers underneath if there are any child dirs/files (i.e only create the marker if its an empty dir)

Tests: there's more complexity in the source, which means more tests

What are the failure states of

  • source = dir, dest=file
  • source = dir, dest has a parent of a file

What happens for dest is a dir, or file overwriting a file? I don't remember the details but something needs to check these too.

And empty dirs need handling too.

source is empty dir -> dest is an empty dir
source is a dir with an empty dir underneath -> dest has same structure

I can't think of any other corner cases

throw new FileAlreadyExistsException(dst + " exists and is not a file");
S3AFileStatus dstStatus = innerGetFileStatus(dst, false, StatusProbeEnum.ALL);
if (srcFile.isFile() && dstStatus.isDirectory()) {
throw new PathExistsException("Source is file and destination '" + dst + "' is directory");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should include source too; and I suspect the line will need cutting down to be ~= 80 chars wide. (yes, that's very dated; chosen for ease of side-by-side review and all discussion about changing it never ends up well)

@steveloughran
Copy link
Contributor

BTW

  1. My high performance copy from local which parallelises the upload, kicks of the big few first and then does the rest in a shuffled list to avoid hotspots: https://github.com/steveloughran/cloudstore/blob/trunk/src/main/java/org/apache/hadoop/fs/tools/cloudup/Cloudup.java
  2. My PoC of reimplementing DistCp as a spark job. It ignores failures of the job, but otherwise is very good for a small amount of scala code.
    https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/com/cloudera/spark/cloud/applications/CloudCp.scala

@steveloughran
Copy link
Contributor

====

Hi,

I've been thinking a bit more about this.

It's a good place to get familiar with the code as its a low-risk path (we've got a fallback).
It can also give you experience in and some of the challenges of evolving the S3A code

  • The S3AFileSystem class is over-large/complicated
  • The file system API calls are under-documented and don't have enough test coverage. This is the reason why we managed to ship a version of copyFromLocalFile where the author (that's me) I hadn't realised that it had to support directories.

Because of this, I'm going to make two recommendations. This will make the PR a bit more complex, but it will benefit in the long term.

  1. The copy operation is pulled out into its own isolated ExecutingStoreOperation. Have a look at GetContentSummaryOperation for the design: a self contained single-use function with a callback interface when it needs to use bits of S3AFS. My hope is that if/when we can refactor the layers of S3AFS into a two tier view/model design, with the public FS API being just the view, all these operations's interface implementations can be targeted at the model layer (which we effectively have in the innerXXX) methods.
  2. The proposed tests are added to a new class AbstractContractCopyFromLocalTest in hadoop-common/test ; alongside the existing contract test suites call. This should be implemented by the local FS (to provide the reference behaviour) and by S3AFS for the new implementation. Both should work/fail the same way. And if anyone else implement their own copyFromLocal() method they can pick up the same contract test to verify consistency.

I'm not going to be really mean and insist on changes to filesystem.md as well. But if you want to, that'd be welcome.

I know this will have you looking across more of the code -but it's good to be familiar with this stuff, and not afraid of going near hadoop-common, especially its test suites. Those test suites are how we define what filesystems are meant to do, and so at least when we break those assumptions (e.g. HADOOP-16271) we can add the tests and declarative-opt-out options so we know when we break the FS APIs/metaphor, and when other stores do too.

@bogthe
Copy link
Contributor Author

bogthe commented Jun 15, 2021

Thank you for having a look, I really like those suggestions, I'll update this PR and I'll throw in the changes to filesystem.md too!

@bogthe bogthe changed the title S3/hadoop 17139 enable copy from local HADOOP-17139 Re-enable optimized copyFromLocal implementation in S3AFileSystem Jun 16, 2021
Bogdan Stolojan added 5 commits June 17, 2021 21:21
Have to admit I've spent more time than I would've liked trying to make
sure all operations have valid audit spans. `trackDurationAndSpan` on an
operation doesn't do what I think it does with regards to the validity
of the span. That's something to look at.
@bogthe
Copy link
Contributor Author

bogthe commented Jun 25, 2021

This PR is still in development

Alright, finally got some time to implement some of the suggested changes.

What's new up until now:

  • listFilesAndDirs a new RemoteIterator similar to listFiles that includes LocatedFileStatus for directories too. It's handy when we want to detect empty directories;
  • The new CopyFromLocalOperation in S3a which "borrows" ideas from the Cloudup project;

What's left :

  • Write up the test cases for an AbstractContractCopyFromLocalTest class as described above;
  • Update / Add documentation;
  • Do one final "polish" of rough edges;

What was surprising:

  • trackDurationAndSpan(stat, path, new CopyFromLocalOperation(...)) did create a valid span however the operation class "didn't have access to it" (i.e. any span from inside of CopyFromLocalOperation was inactive) hence the () -> new CopyFromLocalOperation(...).execute() call.

@steveloughran
Copy link
Contributor

listFilesAndDirs a new RemoteIterator similar to listFiles that includes LocatedFileStatus for directories too. It's handy when we want to detect empty directories;

-1 to that change.

Making FS changes is a big thing with more trauma and planning. See the comments at the top of FileSystem.java.
Any new list operation should

  • support multiple dirs (for faster partition scanning)
  • builder API for any specific options
  • return a list of Future<>s to make clear that list can be slow & return dirs out of order
  • has high performance impl for HDFS/webHDFS as well as "S3A And ABFS object stores (could just relay to BatchListingOperations & so existing results.)
  • Plus all the spec/contract work.

See HADOOP-16898 for discussion there.

It's not trivial -we need to think about "what is the best list model for the future?".

@apache apache deleted a comment from hadoop-yetus Jun 25, 2021
Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, commented.

I think this has to be done without going near FileSystem APIs...adding something there is making a commitment to maintain forever, and we need to make sure that the different implementors of stores (HDFS, object store clients) are happy. And when we do a better listing, I have bigger goals

List<UploadEntry> uploaded = new ArrayList<>();

// Take only top most X entries and upload
for (int uploadNo = 0; uploadNo < sortedUploadsCount; uploadNo++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this parallelized? I know that the xfer manager does it across threads, but this is still uploading one by one. This is probably simplest to start with...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not parallelized yet, wasn't clear on the best way to do it in the code base so left it simple for the first pass. The reason why I wasn't clear is, as you said xfer manager does it across threads depending on file size, which capacity the executor should work with.

Is there any other executor other than the one available from ContextStore I should be aware of? And do you have any thoughts around its capacity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an executor for them now, let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the xfer manager -which is the simplest- it gets one of the thread pools we create. It has limitations, but we haven't sat down to replace it (yet) as it mostly works OK.

For other uses, the StoreContext executors should be the executor pool. And it's possible to create a restricted subpool of that, which is what S3ABlockOutputStream does.

FWIW, I am not sure that (a) the default size of that pool is right and (b) whether we should just go for for an unlimited pool. Big changes, but as most of the threads are blocked on remote IO, pool size is probably a needless limitation.

@bogthe
Copy link
Contributor Author

bogthe commented Jun 25, 2021

Thanks for the comments, finding them very helpful!

The FS change was a long shot, seemed too simple and convenient for my use case (listing of directories) to pass up. Will change it + address your comments and keep the changes coming.

Bogdan Stolojan added 3 commits June 28, 2021 09:15
A couple of things happening in this commit:
- Added comments to CopyFromLocalOperation since I'm kind of done with
  experimenting with it
- Added a thread executor that takes care of the uploads in the
  operation (uploading files + creating empty dirs)
- Added `AbstractContractCopyFromLocalTest` test class with all the
  tests inside the ITestS3ACopyFromLocalFile file with implementation
  for S3a and Local FS
- Turns out maybe some cases weren't handled properly in S3a when
  compared to local: copy from file to dir, which will get fixed in the
  next commit
Went through tests and polished some rough bits here and there by
abstracting duplicate code and moving a test case to be handled
differently for the 2 file systems, check:
`testSourceIsDirectoryAndDestinationIsFile` for both S3a and LocalFS.

New bug fix:
- Empty source directory is being handled for s3a;
@bogthe
Copy link
Contributor Author

bogthe commented Jun 30, 2021

New changes:

  • Reverted previous change to FS interface;
  • New abstract test class AbstractContractCopyFromLocalTest with the old tests from ITestS3ACopyFromLocalFile and more to handle edge cases, implemented for S3a in ITestS3ACopyFromLocalFile and LocalFS in TestLocalFSCopyFromLocal;
  • copyFromLocal from S3a now handles copies in the same way as the LocalFS implementation with 1 minor difference when handling source directory and destination file:
    • S3a: throws PathExistsException;
    • LocalFS: throws FileAlreadyExistsException;
    • Not clear if there are dependencies of S3a expecting PathExistsException or we can move to using FileAlreadyExistsException for both. @steveloughran
  • New thread executor in CopyFromLocalOperation to handle uploads;

What's left:

  • updates to filesystem.md;

@apache apache deleted a comment from hadoop-yetus Jul 5, 2021
@apache apache deleted a comment from hadoop-yetus Jul 5, 2021
Changes:
- LocalFS directory to directory copy with overwrite flag is now
enabled. Conceptually it should, let's see if it's right or wrong;
- S3a directory to directory should behave in the following way now
(consistent with LocalFS):
If destination exists:
- bar/ -> foo/ => foo/bar/
- bar/ -> foo/bar/ => foo/bar/bar/
If destination doesn't exist:
- bar/ -> foo/ => foo/
- bar/ -> foo/bar/ => foo/bar
@bogthe
Copy link
Contributor Author

bogthe commented Jul 6, 2021

Alright this commit should be the last one introducing any changes.

  • S3a copyFromLocal should behave in the same way as HDFS when handling directories and files;
  • HDFS now overwrites directories for a copyFromLocal operation;
  • filesystem.md changes coming;

PR ready for review.

Bogdan Stolojan added 3 commits July 8, 2021 09:47
- Updates to `filesystem.md`;
- Cleanup code;
- Add missed test case;
@bogthe
Copy link
Contributor Author

bogthe commented Jul 9, 2021

@steveloughran PR is ready to review, added changes to filesystem.md too.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • All the production code changes are good, some minor import/javadoc nits and that's it.
  • Only thing I'm wondering about is tests & spec, and there it's about getting the
    specification rigorous enough.

The PR currently uses english to describe the outcome, but what we need is
some python-syntax declaration of what the updated.

Proposed

  • Define a LocalFS which is the source filesystem and which MUST be a localFS.
  • Use RFC2119 MUST/SHOULD/MAY/SHALL; I think the current revision uses "should"
    when really you mean "MUST"
  • Need to cover copying when dest is in the same filesystem, something like:

The outcome copyFromLocal() where the dest path is under file:// depends on
whether the destination and source are mutually exclusive.

not isDescendant(src, dest) and not isDescendant(dest, src) and src != dest`

This gives a definition (alongside all your text) which tries to declare
the changes to FS and LocalFS:

Preconditions

# local FS special cases
if src = dest: raise FileExistsException
if isDescendant(src, dest) or isDescendant(dest, src): undefined

exists(LocalFS, src) else raise FileNotFoundException

+ what you've done for the other checks

Determining the final name of the copy

Given a base path on the source base and a child path child where
base is in ancestors(child) + child:

def final_name(base, child, dest):
  if base = child:
    return dest
  else:
    return dest + childElements(base, child)

Outcome where isFile(LocalFS, src)

For a file: dest data becomes that of the source data; all
ancestors are directories.

if isFile(LocalFS, src) and not exists(FS, dest):
  FS' = FS where:
    FS'.Files[dest] = LocalFS.Files[src]
    FS'.Directories  = FS.Directories + ancestors(FS, dest)
  LocalFS' =  LocalFS where
    not delSrc or (delSrc = true and delete(LocalFS, src, false))
  else if isFile(LocalFS, src) and isDir(FS, dest):
    FS' = FS where:
      let d = final_name(src, dest) in:
         FS'.Files[d] = LocalFS.Files[src]
    LocalFS' =  LocalFS where:
       not delSrc or (delSrc = true and delete(LocalFS, src, false))

+commentary that there is no expectation on atomic combination of local FS and remote FS changes.

Outcome where isDir(LocalFS, src)

(This is your homework, sorry)

For a source dir, same for all paths under the dest, with statement that there's no
expectation of atomicity/isolation of update. (ooh, that means you can have race conditions...)

(See "Renaming a directory onto a directory" for an example of how to declare the naming
policy)

Copy link
Contributor

@mehakmeet mehakmeet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, just some test comments.
Import re-ordering required. You can set it to be automatic as well in your IDE(if you use Intellij): Preferences > Editor > Code Style > Java > Import Layout:

import javax.*
import java.*
<blank line>
import all other imports
<blank line>
import org.apache.*
<blank line>
import static all other imports

Mostly changes to documentation and test cases. Next PR will be more
focused on updating the `filesystem.md` spec and adding the remaining
test cases
Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've somehow managed to split my review in two. Here's the comments purely on the diff of this commit. big issue: safe use of Optional<>.

updateDestStatus(destination);

// Handles bar/ -> foo/ => foo/bar and bar/ -> foo/bar/ => foo/bar/bar
if (getDestStatus().isPresent() && getDestStatus().get().isDirectory()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may trigger an NPE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully not because of the Optional.ofNullable.

/**
* Does a {@link CopyFromLocalOperationCallbacks#getFileStatus(Path)}
* operation on the provided destination and updates the internal status of
* destPath property
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destStatus field.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And a review of the full patch; it's big enough that this is the best way to review it. Production code all good, tests fine. Do need to fix the "." at the end of all javadocs though; Yetus will be complaining there.

It will be interesting to see how much difference this makes; I know our QE test scripts often upload test results to S3; it was their scripts which occasionally triggered the 404 caching problem....

@apache apache deleted a comment from hadoop-yetus Jul 21, 2021
@apache apache deleted a comment from hadoop-yetus Jul 21, 2021
@apache apache deleted a comment from hadoop-yetus Jul 21, 2021
Alright, this should be the last major commit.
- `filesystem.md` has been updated appropriately;
- java docs updated;
- crazy fun test cases added;
@bogthe
Copy link
Contributor Author

bogthe commented Jul 22, 2021

Thanks everyone for the reviews! All of them should've been addressed. Let me know if all looks good now!

@steveloughran any idea why the build is failing? The changes yetus is complaining about haven't been made on this PR 🤔
Edit: Looks like just pushing another commit solved it!

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great! few minor details and it'll be done.

thanks for this ... that python code for the naming really is nice

@bogthe
Copy link
Contributor Author

bogthe commented Jul 29, 2021

Great! Thanks for catching the above changes. Final push incoming!

@steveloughran steveloughran merged commit a218038 into apache:trunk Jul 30, 2021
@apache apache deleted a comment from hadoop-yetus Jul 30, 2021
@apache apache deleted a comment from hadoop-yetus Jul 30, 2021
@apache apache deleted a comment from hadoop-yetus Jul 30, 2021
@apache apache deleted a comment from hadoop-yetus Jul 30, 2021
@apache apache deleted a comment from hadoop-yetus Jul 30, 2021
@apache apache deleted a comment from hadoop-yetus Jul 30, 2021
@steveloughran
Copy link
Contributor

Merged to trunk. Thank your for this @bogthe

it started off small, but you've now been sucked into the world of trying strictly define the behavior of filesystems based on what HDFS does, and then trying to reimplement a radically different version
for cloud storage performance. Yes, it's hard, but means that compatibility is almost always guaranteed!

If you cherrypick to branch-3.3. do a test run and push up the new PR I'll merge that without doing any more code reviews. The code is done, all we need is a retest.

now, take the rest of the month off!

asfgit pushed a commit that referenced this pull request Aug 2, 2021
…ileSystem (#3101)

This work
* Defines the behavior of FileSystem.copyFromLocal in filesystem.md
* Implements a high performance implementation of copyFromLocalOperation
  for S3
* Adds a contract test for the operation: AbstractContractCopyFromLocalTest
* Implements the contract tests for Local and S3A FileSystems

Contributed by: Bogdan Stolojan

Change-Id: I25d502102775c3626c4264e5a14c649879730050
kiran-maturi pushed a commit to kiran-maturi/hadoop that referenced this pull request Nov 24, 2021
…ileSystem (apache#3101)


This work
* Defines the behavior of FileSystem.copyFromLocal in filesystem.md
* Implements a high performance implementation of copyFromLocalOperation
  for S3 
* Adds a contract test for the operation: AbstractContractCopyFromLocalTest
* Implements the contract tests for Local and S3A FileSystems

Contributed by: Bogdan Stolojan
deepakdamri pushed a commit to acceldata-io/hadoop that referenced this pull request Jan 21, 2025
…ileSystem (apache#3101)

This work
* Defines the behavior of FileSystem.copyFromLocal in filesystem.md
* Implements a high performance implementation of copyFromLocalOperation
  for S3
* Adds a contract test for the operation: AbstractContractCopyFromLocalTest
* Implements the contract tests for Local and S3A FileSystems

Contributed by: Bogdan Stolojan

Change-Id: I25d502102775c3626c4264e5a14c649879730050
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants