Skip to content

Commit

Permalink
Improve distributed load
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Improve distributed load
1. Configurable job failure criteria 
2. Configuration to determine if the load job should be restored from journal or not
3. Add an option to skip existing fully loaded file 
4. Add retry count for failed files 
5. Bug fixing

### Why are the changes needed?

To enhance the distributed load tool

### Does this PR introduce any user facing changes?

Yes. The skip-if-exists option is added to the distributed load cli.
			pr-link: #18153
			change-id: cid-5644da1c09bd6ee48f628552f51cb570de581b93
  • Loading branch information
elega authored Sep 15, 2023
1 parent 84dc05f commit 6a9f5fd
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 86 deletions.
5 changes: 5 additions & 0 deletions cli/src/alluxio.org/cli/cmd/job/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type LoadCommand struct {
verify bool
partialListing bool
metadataOnly bool
skipIfExists bool
}

func (c *LoadCommand) Base() *env.BaseJavaCommand {
Expand All @@ -60,6 +61,7 @@ func (c *LoadCommand) ToCommand() *cobra.Command {
cmd.Flags().BoolVar(&c.verify, "verify", false, "[submit] Run verification when load finishes and load new files if any")
cmd.Flags().BoolVar(&c.partialListing, "partial-listing", false, "[submit] Use partial directory listing, initializing load before reading the entire directory but cannot report on certain progress details")
cmd.Flags().BoolVar(&c.metadataOnly, "metadata-only", false, "[submit] Only load file metadata")
cmd.Flags().BoolVar(&c.skipIfExists, "skip-if-exists", false, "[submit] Skip existing fullly cached files")
return cmd
}

Expand All @@ -79,5 +81,8 @@ func (c *LoadCommand) Run(_ []string) error {
if c.metadataOnly {
javaArgs = append(javaArgs, "--metadata-only")
}
if c.skipIfExists {
javaArgs = append(javaArgs, "--skip-if-exists")
}
return c.Base().Run(javaArgs)
}
3 changes: 3 additions & 0 deletions common/transport/src/main/proto/grpc/block_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ message LoadFileRequest {
repeated UfsStatus ufs_status = 1;
required UfsReadOptions options = 2;
optional bool load_metadata_only = 3; // If set false, only the metadata of file will be loaded.
optional bool skip_if_exists = 4;
}

message File{
Expand All @@ -255,6 +256,8 @@ message File{
message LoadFileResponse {
required TaskStatus status = 1;
repeated LoadFileFailure failures = 2;
optional int32 files_skipped = 3;
optional int64 bytes_skipped = 4;
}

message FreeWorkerRequest{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ message LoadJobPOptions {
optional bool verify = 2;
optional bool partialListing = 3;
optional bool loadMetadataOnly = 4;
optional bool skipIfExists = 5;
}

message CopyJobPOptions {
Expand Down
1 change: 1 addition & 0 deletions common/transport/src/main/proto/proto/journal/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message LoadJobEntry {
required string job_id = 7;
optional int64 end_time = 8;
optional bool load_metadata_only = 9;
optional bool skip_if_exists = 10;
}

// next available id: 13
Expand Down
40 changes: 40 additions & 0 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,38 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_SCHEDULER_RESTORE_JOB_FROM_JOURNAL =
booleanBuilder(Name.MASTER_SCHEDULER_RESTORE_JOB_FROM_JOURNAL)
.setDefaultValue(true)
.setDescription("If set to true, master will restore incomplete jobs from journal. "
+ "Turn this off if you don't want the scheduler to automatically restore jobs.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD =
intBuilder(Name.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD)
.setDefaultValue(-1)
.setDescription("The load job total load failure count threshold. -1 means never fail.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD =
doubleBuilder(Name.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD)
.setDefaultValue(1.00)
.setDescription("The load job total load failure ratio threshold (0,1)."
+ " 1.00 means never fail.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_RETRIES =
intBuilder(Name.MASTER_DORA_LOAD_JOB_RETRIES)
.setDefaultValue(3)
.setDescription("The number of retry attempts before a load of file "
+ "is considered as failure")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();

public static final PropertyKey MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE =
enumBuilder(Name.MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE, GraceMode.class)
.setDefaultValue(GraceMode.FORCED)
Expand Down Expand Up @@ -7396,6 +7428,8 @@ public static final class Name {
"alluxio.master.block.scan.invalid.batch.max.size";
public static final String MASTER_SCHEDULER_INITIAL_WAIT_TIME =
"alluxio.master.scheduler.initial.wait.time";
public static final String MASTER_SCHEDULER_RESTORE_JOB_FROM_JOURNAL =
"alluxio.master.scheduler.restore.job.from.journal";
public static final String MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE =
"alluxio.master.shell.backup.state.lock.grace.mode";
public static final String MASTER_SHELL_BACKUP_STATE_LOCK_TRY_DURATION =
Expand All @@ -7404,6 +7438,12 @@ public static final class Name {
"alluxio.master.shell.backup.state.lock.sleep.duration";
public static final String MASTER_SHELL_BACKUP_STATE_LOCK_TIMEOUT =
"alluxio.master.shell.backup.state.lock.timeout";
public static final String MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD =
"alluxio.master.dora.load.job.total.failure.count.threshold";
public static final String MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD =
"alluxio.master.dora.load.job.total.failure.ratio.threshold";
public static final String MASTER_DORA_LOAD_JOB_RETRIES =
"alluxio.master.dora.load.job.retries";
public static final String MASTER_DAILY_BACKUP_ENABLED =
"alluxio.master.daily.backup.enabled";
public static final String MASTER_DAILY_BACKUP_FILES_RETAINED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import alluxio.grpc.ExistsPOptions;
import alluxio.grpc.GetStatusPOptions;
import alluxio.grpc.ListStatusPOptions;
import alluxio.grpc.LoadFileFailure;
import alluxio.grpc.LoadFileResponse;
import alluxio.grpc.RenamePOptions;
import alluxio.grpc.Route;
import alluxio.grpc.RouteFailure;
Expand Down Expand Up @@ -100,12 +100,13 @@ BlockWriter createFileWriter(String fileId, String ufsPath)
* Loads the metadata and data of files from UFS to Alluxio.
*
* @param loadData true if data should also be loaded, otherwise metadata only
* @param skipIfExists true if data loading should be skipped if it's already loaded
* @param ufsStatuses the files to load
* @param options
* @return a list of failed files
*/
ListenableFuture<List<LoadFileFailure>> load(
boolean loadData, List<UfsStatus> ufsStatuses, UfsReadOptions options)
ListenableFuture<LoadFileResponse> load(
boolean loadData, boolean skipIfExists, List<UfsStatus> ufsStatuses, UfsReadOptions options)
throws AccessControlException, IOException;

/**
Expand Down
Loading

0 comments on commit 6a9f5fd

Please sign in to comment.