Skip to content

Commit

Permalink
[GOBBLIN-1667] Create new predicate - ExistingPartitionSkipPredicate
Browse files Browse the repository at this point in the history
Currently the hive.dataset.existing.entity.policy.ABORT will not
abort if there is an existing partition. One option to resolve this
is to support the ABORT configuration but that might be backwards
incompatible, so introducing a new skip predicate called
ExistingPartitionSkipPredicate that will skip any partition that
already exists in the target table
  • Loading branch information
cbrentharris committed Jun 29, 2022
1 parent 7aad1f9 commit 2d0c4eb
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.gobblin.data.management.copy.predicates;

import com.google.common.base.Predicate;
import javax.annotation.Nullable;
import org.apache.gobblin.data.management.copy.hive.HivePartitionFileSet;


/**
* This skip predicate will skip any partition that's already registered in the destination
* hive table.
*/
public class ExistingPartitionSkipPredicate implements Predicate<HivePartitionFileSet> {
@Override
public boolean apply(@Nullable HivePartitionFileSet input) {
if (input == null) {
return true;
}

return input.getExistingTargetPartition().isPresent();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.apache.gobblin.data.management.copy.predicates;

import com.google.common.base.Optional;
import org.apache.gobblin.data.management.copy.hive.HivePartitionFileSet;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;


public class ExistingPartitionSkipPredicateTest {
ExistingPartitionSkipPredicate predicate = new ExistingPartitionSkipPredicate();

@Test
public void shouldSkipHiveDatasetWithExistingPartition() {
HivePartitionFileSet fileSetWithExistingPartition = mock(HivePartitionFileSet.class);
HivePartitionFileSet fileSetWithoutExistingPartition = mock(HivePartitionFileSet.class);
Partition partition = mock(Partition.class);
when(fileSetWithExistingPartition.getExistingTargetPartition()).thenReturn(Optional.of(partition));
when(fileSetWithoutExistingPartition.getExistingTargetPartition()).thenReturn(Optional.absent());
Assert.assertTrue(predicate.apply(fileSetWithExistingPartition));
Assert.assertFalse(predicate.apply(fileSetWithoutExistingPartition));
}
}
6 changes: 4 additions & 2 deletions gobblin-docs/case-studies/Hive-Distcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ A partition filter can be applied when copying partitioned tables. Filters can o

## Fast partition skip predicate

A predicate that operates on partitions can be provided to distcp-ng to allow it to quickly skip partitions without having to list all of the source and target files and do a diff on those sets (a costly operation). To set this predicate, provide the class name of the predicate with the key `hive.dataset.copy.fast.partition.skip.predicate`. Currently only one such predicate exists:
A predicate that operates on partitions can be provided to distcp-ng to allow it to quickly skip partitions without having to list all of the source and target files and do a diff on those sets (a costly operation). To set this predicate, provide the class name of the predicate with the key `hive.dataset.copy.fast.partition.skip.predicate`. Below are the following predicates that exist

* `RegistrationTimeSkipPredicate`: This predicate compares the Hive partition attribute `registrationGenerationTimeMillis` in the target with the modification time of the partition directory in the source. The partition is skipped unless the directory was modified more recently than the registrationGenerationTime. The attribute `registrationGenerationTimeMillis` is an attribute set by distcp-ng representing (for all practical purposes) the time at which the distcp-ng job that registered that table started.

* `NonPartitionTableRegistrationTimeSkipPredicate`: This predicate can be used on non partition tables and compares the `registrationGenerationTimeMillis` in the target.
* `ExistingPartitionSkipPredicate`: This predicate can be used to skip any partition that already exists in the target table.
* `RootDirectoryModtimeSkipPredicate`: This predicate can be used to skip any partition whose root directory modified time is later than the copy source.

0 comments on commit 2d0c4eb

Please sign in to comment.