Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1741] Create manifest based dataset finder #3598

Merged
merged 9 commits into from
Nov 30, 2022

Conversation

ZihanLi58
Copy link
Contributor

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Create manifest based dataset finder

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    unit test, test on azkaban to see it can copy data

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@codecov-commenter
Copy link

codecov-commenter commented Nov 11, 2022

Codecov Report

Merging #3598 (7c4c1dd) into master (c3d1ba8) will decrease coverage by 0.00%.
The diff coverage is 61.53%.

@@             Coverage Diff              @@
##             master    #3598      +/-   ##
============================================
- Coverage     46.89%   46.89%   -0.01%     
- Complexity    10673    10694      +21     
============================================
  Files          2120     2127       +7     
  Lines         83080    83222     +142     
  Branches       9254     9274      +20     
============================================
+ Hits          38962    39023      +61     
- Misses        40548    40615      +67     
- Partials       3570     3584      +14     
Impacted Files Coverage Δ
...c/main/java/org/apache/gobblin/util/PathUtils.java 11.76% <ø> (-1.57%) ⬇️
...lin/data/management/copy/ManifestBasedDataset.java 57.40% <57.40%> (ø)
...ta/management/copy/ManifestBasedDatasetFinder.java 80.00% <80.00%> (ø)
...che/gobblin/data/management/copy/CopyableFile.java 75.17% <100.00%> (ø)
...modules/flowgraph/datanodes/hive/HiveDataNode.java 40.00% <0.00%> (-23.16%) ⬇️
...a/org/apache/gobblin/util/limiter/NoopLimiter.java 40.00% <0.00%> (-20.00%) ⬇️
...a/org/apache/gobblin/cluster/GobblinHelixTask.java 64.51% <0.00%> (-19.36%) ⬇️
...lin/util/filesystem/FileSystemInstrumentation.java 92.85% <0.00%> (-7.15%) ⬇️
.../org/apache/gobblin/cluster/GobblinTaskRunner.java 62.22% <0.00%> (-1.41%) ⬇️
... and 21 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

}
if (Boolean.parseBoolean(this.properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, "false"))) {
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties,
this.deleteEmptyDirectories ? Optional.of(new Path(this.properties.getProperty(DELETE_EMPTY_DIRECTORIES_UPTO))) : Optional.<Path>absent());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this to delete directories that are emptied after we remove the files that do not exist on source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, after removing files on dst which are not exist on source, we use this config to determine whether we need to delete the empty dir as well

//todo: update error message to point to a sample json file instead of schema which is hard to understand
log.warn(String.format("Failed to read Manifest path %s on filesystem %s, please make sure it's in correct json format with schema"
+ " {type:array, items:{type: object, properties:{id:{type:String}, fileName:{type:String}, fileGroup:{type:String}, fileSizeInBytes: {type:Long}}}}",
manifestPath.toString(), fs.getUri().toString()), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing a %s for the exception e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log(String, Exception) is the function I'm trying to call

private final List<Path> manifestLocations;
private final Properties properties;
public ManifestBasedDatasetFinder(final FileSystem fs, Properties properties) {
Preconditions.checkArgument(properties.containsKey(MANIFEST_LOCATION), "Please config " + MANIFEST_LOCATION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improve config "manifest location key required in config. Please set"...

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work, zihan--excited about this new capability!

Comment on lines 54 to 56
/** If true, will delete newly empty directories up to the config set in DELETE_EMPTY_DIRECTORIES_UPTO. */
public static final String DELETE_EMPTY_DIRECTORIES_KEY = CONFIG_PREFIX + ".deleteEmptyDirectories";
public static final String DELETE_EMPTY_DIRECTORIES_UPTO = CONFIG_PREFIX + ".deleteEmptyDirectoriesUpTo";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleting dirs seems orthogonal to the enumerated list of file paths to copy. can you explain the motivation to delete in the first place, then why it belongs with copying?

I'm not clear: is this a true delete or just inhibiting replication from source to dest when the dir is empty? and BTW, are we deleting on the destination--or the actual source dirs?

probably just reflects the need for more detailed source comment

this.manifestPath = manifestPath;
this.properties = properties;
this.deleteEmptyDirectories = Boolean.parseBoolean(properties.getProperty(DELETE_EMPTY_DIRECTORIES_KEY, "false"))
&& properties.containsKey(DELETE_EMPTY_DIRECTORIES_UPTO);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we validate this is a valid value, not merely that it's present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add check to make sure it's not empty as well. Other than that, if this is not valid path, we will throw exception directly when we try to construct Path. If it's a wrong path, we will ends up with not delete any new empty folder. Given we don't have idea about what's the files in manifest, it's not possible for us to determine whether the value is "correct" or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could validation check whether the path exists on either the source OR the destination FileSystem?

I'm actually not familiar enough w/ the impl. to grasp the consequences of a bad path value here... if the path is not found would it lead to deletion not stopping and going as far as root (whatever dirs are empty along the way)? or would it mean that nothing is deleted, not even a terminal empty dir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be files not exists on source are deleted but no empty dir will be deleted

Comment on lines 79 to 82
if (!fs.exists(manifestPath)) {
log.warn(String.format("Manifest path %s does not exist on filesystem %s, will not copy data in this manifest"
+ ", probably due to wrong configuration of %s", manifestPath.toString(), fs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION));
return Iterators.emptyIterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a silent failure. I'm not sure that's what we want, but I agree it could be argued either way. since it's a complex decision, let's document the rationale w/ a code comment.

my thinking: if the caller submits a bad path, not found, an empty iterator here means the job still succeeds, giving no feedback. if I were the caller, however I'd appreciate a job failure to get my attention. hence I suggest either an IOException here or in the ctor.

toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}
if (Boolean.parseBoolean(this.properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, "false"))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd encourage reading all config props in the ctor rather than burying deep in the impl. keeps it cleaner ITO how the class can be configured

}
if (Boolean.parseBoolean(this.properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, "false"))) {
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties,
this.deleteEmptyDirectories ? Optional.of(new Path(this.properties.getProperty(DELETE_EMPTY_DIRECTORIES_UPTO))) : Optional.<Path>absent());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned above, shouldn't we validate this takes a meaningful value?

throw e;
} catch (Exception e ) {
log.warn(String.format("Failed to process Manifest path %s on filesystem %s, due to", manifestPath.toString(), fs.getUri().toString()), e);
throw e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you need to wrap to rethrow? I thought the exception signature was for IOException, not Exception

log.warn(String.format("Failed to read Manifest path %s on filesystem %s, please make sure it's in correct json format with schema"
+ " {type:array, items:{type: object, properties:{id:{type:String}, fileName:{type:String}, fileGroup:{type:String}, fileSizeInBytes: {type:Long}}}}",
manifestPath.toString(), fs.getUri().toString()), e);
throw e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I like that parse errors would likely lead to overall job failure... I just believe a manifest path not found should as well

JsonObject file = GSON.fromJson(reader, JsonObject.class);
Path fileToCopy = new Path(file.get("fileName").getAsString());
if (this.fs.exists(fileToCopy)) {
if (!targetFs.exists(fileToCopy) || needCopy(this.fs.getFileStatus(fileToCopy), targetFs.getFileStatus(fileToCopy))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer the name shouldCopy.

also let's use the .getFileStatus to check existence. that avoids a separate call to .exists. since that method may not have caching, may be better to structure so as to avoid the repeated call on line 113.

overall, I would suggest to encapsulate the check for existence on target inside the shouldCopy(FileStatus src, FileStatus dest) call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without calling .exist to make sure file exist, if we directly call getFileStatus, it will throw FileNotFoundException when file not exist, I think we should avoid that situation? Unless we catch that exception and use it to determine whether file exist or not, which does not look like a correct approach for me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I agree that creating an exception stacktrace is too costly. (I thought it might return null.) seems you'll need to call both. I might abstract into a utility:

Optional<FileStatus> getOptFileStatus(FileSystem fs, Path p)

your choice

@Slf4j
public class ManifestBasedDataset implements IterableCopyableDataset {

public static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + ".manifestBased";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, but in the hive and iceberg case, it's the XDatasetFinder class that defines the config keys, even when then used by the XDataset--not the other way around

Comment on lines +1 to +7
[
{
"id":"1",
"fileName":"/tmp/dataset/test1.txt",
"fileGroup":"/tmp/dataset",
"fileSizeInBytes":"1024"
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad you created this, so we have something to verify the parsing against, but I don't believe it is actually exercised in the included unit test. that only constructs the ManifestBasedDataset, but I believe we must call .getFilesetIterator for parsing to kick in, no? if that's true, let's add such a test

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interested in your thoughts... but otherwise looks good

@@ -45,27 +44,31 @@
/**
* A dataset that based on Manifest. We expect the Manifest contains the list of all the files for this dataset.
* At first phase, we only support copy across different clusters to the same location. (We can add more feature to support rename in the future)
* We will delete the file on target if it's listed in the manifest and not exist on source
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent--I get it now... helpful comment!

Comment on lines 53 to 55
/** If true, after delete the file that not exist on source, will delete newly empty directories up to the config set in DELETE_EMPTY_DIRECTORIES_UPTO. */
public static final String DELETE_EMPTY_DIRECTORIES_KEY = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteEmptyDirectories";
public static final String DELETE_EMPTY_DIRECTORIES_UPTO = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteEmptyDirectoriesUpTo";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having this config does make intuitive sense, yet seems somewhat complex and also gives but one shot, so the user must choose the least-common root path, rather than the specificity of multiple ones.

the complexity comes from two separate configs. instead, couldn't a non-blank value of the "..._UPTO" prop be enough to know that the feature in general should be enabled? (w/ needing a separate prop to say so.)

better still, to explore alternatives, how about either:

  1. rather than config, could we treat the source FS as the SoT? that way we would simply delete up to whichever dir is missing on the source, w/o requiring any extra config specified.

  2. also not requiring config, would be for us to require every parent dir for deletion to be individually specified in the manifest. e.g. perhaps the file would contain:
    /a/b/c/d/e.f
    /a/b/c/d
    /a/b/c

(this means don't delete /a/b, since it wasn't specified.)

as it's least presumptuous, I prefer the last. there's a bit of state to manage during reading the manifest (to keep track of which dirs will become empty following other deletes), but that's not terribly difficult. we could even require forward declarations of children before their parents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer later approach as well as we need to add support for dir sync in the future. So it makes sense to put the dir that they want to delete in the manifest, I don't even care about the order as we can order them all together.

@@ -77,13 +80,11 @@ public String datasetURN() {
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration)
throws IOException {
if (!fs.exists(manifestPath)) {
log.warn(String.format("Manifest path %s does not exist on filesystem %s, will not copy data in this manifest"
throw new IOException(String.format("Manifest path %s does not exist on filesystem %s, will not copy data in this manifest"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rather than, "will not copy data...", either "skipping" or "unable to load"

Comment on lines +78 to +80
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(manifestPath));
Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(sourceFs.open(manifestPath)).thenReturn(localFs.open(manifestPath));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it work to pass in localFs directly rather than the mocked sourceFs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as I don't have the file defined in the manifest. So it always return 0 files

new ManifestBasedDataset(sourceFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build());
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test verification w/ a CopyEntity is challenging, so I see you merely counting them. perhaps that's appropriate here, since you're more exercising the manifest format parser, than proving semantics of particular manifest entries...

nonetheless, if you do expand to that, just reminding you of the abstractions created during the IcebergDataset testing -

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I just want to make sure parser works for the manifest and we are able to get the correct number of file. But the copy entity is pretty corrupted as I give the mocked file status here.
It will make more sense for us to test copy entity if I'm testing writing manifest.

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice tests; clean impl--looks good, zihan!

@ZihanLi58 ZihanLi58 merged commit 712770a into apache:master Nov 30, 2022
phet pushed a commit to phet/gobblin that referenced this pull request Dec 3, 2022
* address comments

* use connectionmanager when httpclient is not cloesable

* [GOBBLIN-1741] Create manifest based dataset finder

* imporve logging

* add unit test

* Add unit test for manifest dataset to test jason parser

* address comments

* update log info

* remove unused property and change comments

Co-authored-by: Zihan Li <[email protected]>
phet added a commit to phet/gobblin that referenced this pull request Dec 5, 2022
* upstream/master:
  [GOBBLIN-1741] Create manifest based dataset finder (apache#3598)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants