-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19047: Support InMemory Tracking Of S3A Magic Commits #6468
Conversation
...ols/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java
Show resolved
Hide resolved
3d0489f
to
ac49002
Compare
@steveloughran - I have converted draft PR to final one. Could you please review the changes. |
💔 -1 overall
This message was automatically generated. |
ac49002
to
a5b27c5
Compare
💔 -1 overall
This message was automatically generated. |
a5b27c5
to
02fc9f1
Compare
🎊 +1 overall
This message was automatically generated. |
@steveloughran - Could you please review the changes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the InMemoryMagicCommitTracker
It is using a static map of path to metadata. This will grow without constraint on a long live process. And you are left with the question "what if two jobs ever use the same path"
I would be happier if the static structures (which I can see why are needed) mapped job id -> task attempt id to something to track all pending files for that TA...the static map would be a weak ref to something held strongly by the actual committer (see WeakReferenceMap). Once the actual task attempt is gc'd, there will be an automatic cleanup. Oh, and the static structures should be per fs instances, so when an fs is cleaned up: everything goes. things like hive to call .closeAllforUGI() to get rid of all filesystems for a given user in a long-lived process.
I'm also worried about how a job could abort a task attempt on a different process which has failed. Before worrying about that too much, why don't you look in spark to see how it calls abort. I'm not worried about MapReduce except for testing -so how do itself calls the committee isn't so important. For example: we don't care about recovery from a failed attempt as spark itself cannot do this.
It seems to me that the key costs of using S3 as as the store are:
- file write: extra overhead of probes, need to always use MPU and write of two files
- task commit: scan and read of all .pending files, write of .pendingset.
- job commit: scan and read of .pendingset files
How important are the operations of phase #1? as writing the .pending file as today would allow for task abort on different process -task commit (the normal path) doesn't need it, though there will be extra list and delete overhead in job commmit.
Why don't you look into the spark code and see how it does it abort and therefore how important being able to support task abort from a separate process is. I think it probably is part of the cleanup.
@@ -52,6 +52,7 @@ | |||
import java.util.concurrent.atomic.AtomicBoolean; | |||
import javax.annotation.Nullable; | |||
|
|||
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to same group as rest of apache imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
import org.apache.hadoop.fs.statistics.IOStatistics; | ||
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; | ||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; | ||
import software.amazon.awssdk.services.s3.model.CompletedPart; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review import ordering and grouping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. I will import Code formatter xml is present here: https://github.com/apache/hadoop/tree/trunk/dev-support/code-formatter . IntelliJ users can directly import hadoop_idea_formatter.xml
...ols/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java
Show resolved
Hide resolved
...hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
Show resolved
Hide resolved
return pendingSet; | ||
} | ||
|
||
private List<SinglePendingCommit> loadPendingCommitsFromMemory(TaskAttemptContext context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, javadocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
...ls/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
Show resolved
Hide resolved
@@ -3906,6 +3908,21 @@ public void access(final Path f, final FsAction mode) | |||
@Retries.RetryTranslated | |||
public FileStatus getFileStatus(final Path f) throws IOException { | |||
Path path = qualify(f); | |||
if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit of a hack. not saying that's bad, just wondering if there is a more elegant solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this an ugly hack. I couldn't find any better alternative. This will be used by downstream application like Spark which wants to get the file size of the file written by the task. This is supposed to be used in the same process which writes the file/initiated and upload the MPU.
I've thought about this some more. Here are some things which I believe we need
Ignoring item number #3 for now, remember that we have #1 solved by adding a 0 byte marker with a header of "final length"; spark has some special handling zero byte files to use getXattr() and fall back to the probe for this -at the expense of a second HEAD request. Generating a modified FileStatus response from a single HEAD/getObjectMetadata() call Wood actually eliminate the need for that I wish I'd thought of it myself. Yes, we do break that guarantee that files listed are the same size as the files opened… but magic paths are, well, magic. We break a lot of guarantees there already. The existing design should be retained even in memory; the calculation of final length something which can be done for all. But: we do not need to save the .pending files just for task abort. All we need to do is be able to enumerate the upload IDs of all the files from that task attempt and cancel them. We can do that just by adding another header to the marker file. Task committee uses the memory data; task abort will need a deep scan of the task attempt, and all zero bite files with the proposed new header used to initiate water operations. This is only for task board an outlier case. For normal task commit there is no need to Scan the directory pause the pending files then generate a new pending set file for later pause commit. It is probably the Jason on the marshalling which is as much a performance killer here as the listing operation. What do you think? |
@steveloughran - Thanks a lot a for a detailed review and some amazing question, The following are my thoughts on the different asks.
As far i know (Please correct me if i am wrong)
That being said, Since the same process is calling BasicWriteStatsTracker#getFileSize is it still required to have 0 marker file? I have solved this by adding a check in FileStatus method by returing the file size corresponding to the magic path/file.
Thinking from Spark's perspective,
That being said, I am not sure if there is any such use case of abortingTask from another process. In such cases, The abortJob will handle it i guess.
Doe it make sense? Or am i missing anything? |
The entries to the Map are removed during commitTask or abortTask operation to keep memory under control.
No, The path (complete) is guaranteed to be unique The paths stored here as part of
Since the entries from the HashMap are removed during commitTask or abortTask operation is WeakHashMap still required?
I am not sure why it should be scoped under fs object. For a simiar behaviour with storing in s3, Shouldn't the static structure be available to the whole JVM ? I mean shouldn't we able to access static structure irrespective of the fs object.
I have covered this as part of the comment here. |
@steveloughran - Thanks a lot a detailed review as well as amazing follow up question. I have addressed your comments, Please let me know your thoughts. |
🎊 +1 overall
This message was automatically generated. |
@steveloughran - Gentle reminder for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, minor comments and i've looked at where hadoop mr and spark both look at progress -it does seem to be in the same process.
Now, one more test: ITestTerasortOnS3A
it'd be great to add a new parameter of magic + in memory; this test actually uses a yarn minicluster and so really does run across processes. only runs with -Dscale, but it is the real test. will even let us compare the two options
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
Show resolved
Hide resolved
@@ -20,17 +20,19 @@ | |||
|
|||
import java.util.List; | |||
|
|||
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
goes into the hadoop block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
...doop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java
Show resolved
Hide resolved
*/ | ||
public class InMemoryMagicCommitTracker extends MagicCommitTracker { | ||
|
||
// stores taskAttemptId to commit data mapping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make javadocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
public class InMemoryMagicCommitTracker extends MagicCommitTracker { | ||
|
||
// stores taskAttemptId to commit data mapping | ||
private static Map<String, List<SinglePendingCommit>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and make these all final. I do think they should use weak/soft references, somehow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack for final.
I do think they should use weak/soft references,
Is this required ? Given that we proactively remove the entries from HashMap when the task commits or aborts. Since it is not referenced any where, when gc happens it reclaims the memory.
* @param bytesWritten bytes written | ||
* @param iostatistics nullable IO statistics | ||
* @return false, indicating that the commit must fail. | ||
* @throws IOException any IO problem. | ||
* @throws IOException any IO problem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
.../hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java
Show resolved
Hide resolved
@steveloughran - Thanks a lot for review comments, I have addressed the comments with the new commit
|
🎊 +1 overall
This message was automatically generated. |
@steveloughran - Gentle reminder to review the changes. |
@shameersss1 I'm working on assisting getting the 3.4.0 release out right now. Anything you can do to assist testing would be wonderful, as I'm only worrying about release blockers. |
@steveloughran - I am glad to assist with the testing. Is there any release candidate branch for the same? Could you please share the wiki on what tests needs to be done? |
@steveloughran - Gentle reminder for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, just a typo in a method name.
+1 pending that.
note, you will need to a followup in the docs -but we can get this in and tested while you do that...
} | ||
|
||
|
||
public static Map<String, List<SinglePendingCommit>> getTaskAttemptIdToMpuMetdadata() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo in method name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack.
@steveloughran - Thanks a lot for the detailed review. I have addressed your comments.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- typo to fix on CommitOperations
- sorry, and missed the docs; only was looking at the more recent changes.
@@ -584,7 +584,7 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile, | |||
destKey, | |||
uploadId, | |||
partNumber, | |||
size).build(); | |||
size).build();x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
💔 -1 overall
This message was automatically generated. |
7239b2f
to
8d739be
Compare
@steveloughran - I have addressed your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
+1
🎊 +1 overall
This message was automatically generated. |
…6468) If the option fs.s3a.committer.magic.track.commits.in.memory.enabled is set to true, then rather than save data about in-progress uploads to S3, this information is cached in memory. If the number of files being committed is low, this will save network IO in both the generation of .pending and marker files, and in the scanning of task attempt directory trees during task commit. Contributed by Syed Shameerur Rahman
Description
The following are the operations which happens within a Task when it uses S3A Magic Committer.
During closing of stream
A 0-byte file with a same name of the original file is uploaded to S3 using PUT operation. Refer here for more information. This is done so that the downstream application like Spark could get the size of the file which is being written.
MultiPartUpload(MPU) metadata is uploaded to S3. Refer here for more information.
During TaskCommit
Since these operations happens with the Task JVM, We could optimize as well as save cost by storing these information in memory when Task memory usage is not a constraint. Hence the proposal here is to introduce a new MagicCommit Tracker called "InMemoryMagicCommitTracker" which will store the
This optimization will save 2 PUT S3 calls, 1 LIST S3 call, and 1 GET S3 call given a Task writes only 1 file.
Testing
Ran S3A integration test in us-west-1 region using the following command
mvn -Dparallel-tests clean verify -Dit.test=ITestMagicCommitProtocol,ITestS3ACommitterMRJob,ITestMagicCommitProtocolFailure,ITestS3AHugeMagicCommits,ITestCommitOperationCost,ITestCommitOperations -Dtest=none -DtestsThreadCount=7
Manual verfication
Added Parameterized UnitTest in ITestMagicCommitProtocol