Skip to content

Commit

Permalink
[MINOR] Improve code style of CLI Command classes (apache#6427)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored and fengjian committed Apr 5, 2023
1 parent 6cdb139 commit a7e14d3
Show file tree
Hide file tree
Showing 25 changed files with 219 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ private Comparable[] readCommit(GenericRecord record, boolean skipMetadata) {
case HoodieTimeline.COMPACTION_ACTION:
return commitDetail(record, "hoodieCompactionMetadata", skipMetadata);
default: {
return new Comparable[]{};
return new Comparable[] {};
}
}
} catch (Exception e) {
e.printStackTrace();
return new Comparable[]{};
return new Comparable[] {};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -113,8 +113,7 @@ public String showBootstrapIndexMapping(
@CliOption(key = {"limit"}, unspecifiedDefaultValue = "-1", help = "Limit rows to be displayed") Integer limit,
@CliOption(key = {"sortBy"}, unspecifiedDefaultValue = "", help = "Sorting Field") final String sortByField,
@CliOption(key = {"desc"}, unspecifiedDefaultValue = "false", help = "Ordering") final boolean descending,
@CliOption(key = {"headeronly"}, unspecifiedDefaultValue = "false", help = "Print Header Only")
final boolean headerOnly) {
@CliOption(key = {"headeronly"}, unspecifiedDefaultValue = "false", help = "Print Header Only") final boolean headerOnly) {

if (partitionPath.isEmpty() && !fileIds.isEmpty()) {
throw new IllegalStateException("PartitionPath is mandatory when passing fileIds.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public String showCleans(
List<Comparable[]> rows = new ArrayList<>();
for (HoodieInstant clean : cleans) {
HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
rows.add(new Comparable[]{clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
rows.add(new Comparable[] {clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
}

TableHeader header =
Expand All @@ -85,7 +85,8 @@ public String showCleans(
}

@CliCommand(value = "clean showpartitions", help = "Show partition level details of a clean")
public String showCleanPartitions(@CliOption(key = {"clean"}, help = "clean to show") final String instantTime,
public String showCleanPartitions(
@CliOption(key = {"clean"}, help = "clean to show") final String instantTime,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
Expand Down Expand Up @@ -122,13 +123,14 @@ public String showCleanPartitions(@CliOption(key = {"clean"}, help = "clean to s
}

@CliCommand(value = "cleans run", help = "run clean")
public String runClean(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for cleaning",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master) throws IOException, InterruptedException, URISyntaxException {
public String runClean(
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for cleaning",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master) throws IOException, InterruptedException, URISyntaxException {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.utilities.UtilHelpers;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
Expand All @@ -33,6 +34,7 @@
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;

import scala.collection.JavaConverters;

@Component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,23 @@ private String printCommits(HoodieDefaultTimeline timeline,
final List<Comparable[]> rows = new ArrayList<>();

final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
.getInstants().collect(Collectors.toList());
.getInstants().collect(Collectors.toList());
// timeline can be read from multiple files. So sort is needed instead of reversing the collection
Collections.sort(commits, HoodieInstant.COMPARATOR.reversed());

for (int i = 0; i < commits.size(); i++) {
final HoodieInstant commit = commits.get(i);
final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
rows.add(new Comparable[]{commit.getTimestamp(),
commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(),
commitMetadata.fetchTotalFilesUpdated(),
commitMetadata.fetchTotalPartitionsWritten(),
commitMetadata.fetchTotalRecordsWritten(),
commitMetadata.fetchTotalUpdateRecordsWritten(),
commitMetadata.fetchTotalWriteErrors()});
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.getTimestamp(),
commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(),
commitMetadata.fetchTotalFilesUpdated(),
commitMetadata.fetchTotalPartitionsWritten(),
commitMetadata.fetchTotalRecordsWritten(),
commitMetadata.fetchTotalUpdateRecordsWritten(),
commitMetadata.fetchTotalWriteErrors()});
}

final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
Expand All @@ -93,47 +93,47 @@ private String printCommits(HoodieDefaultTimeline timeline,
});

final TableHeader header = new TableHeader()
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);

return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
limit, headerOnly, rows, tempTableName);
limit, headerOnly, rows, tempTableName);
}

private String printCommitsWithMetadata(HoodieDefaultTimeline timeline,
final Integer limit, final String sortByField,
final boolean descending,
final boolean headerOnly,
final String tempTableName) throws IOException {
final Integer limit, final String sortByField,
final boolean descending,
final boolean headerOnly,
final String tempTableName) throws IOException {
final List<Comparable[]> rows = new ArrayList<>();

final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
.getInstants().collect(Collectors.toList());
.getInstants().collect(Collectors.toList());
// timeline can be read from multiple files. So sort is needed instead of reversing the collection
Collections.sort(commits, HoodieInstant.COMPARATOR.reversed());

for (int i = 0; i < commits.size(); i++) {
final HoodieInstant commit = commits.get(i);
final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);

for (Map.Entry<String, List<HoodieWriteStat>> partitionWriteStat :
commitMetadata.getPartitionToWriteStats().entrySet()) {
commitMetadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) {
rows.add(new Comparable[]{ commit.getAction(), commit.getTimestamp(), hoodieWriteStat.getPartitionPath(),
hoodieWriteStat.getFileId(), hoodieWriteStat.getPrevCommit(), hoodieWriteStat.getNumWrites(),
hoodieWriteStat.getNumInserts(), hoodieWriteStat.getNumDeletes(),
hoodieWriteStat.getNumUpdateWrites(), hoodieWriteStat.getTotalWriteErrors(),
hoodieWriteStat.getTotalLogBlocks(), hoodieWriteStat.getTotalCorruptLogBlock(),
hoodieWriteStat.getTotalRollbackBlocks(), hoodieWriteStat.getTotalLogRecords(),
hoodieWriteStat.getTotalUpdatedRecordsCompacted(), hoodieWriteStat.getTotalWriteBytes()
rows.add(new Comparable[] {commit.getAction(), commit.getTimestamp(), hoodieWriteStat.getPartitionPath(),
hoodieWriteStat.getFileId(), hoodieWriteStat.getPrevCommit(), hoodieWriteStat.getNumWrites(),
hoodieWriteStat.getNumInserts(), hoodieWriteStat.getNumDeletes(),
hoodieWriteStat.getNumUpdateWrites(), hoodieWriteStat.getTotalWriteErrors(),
hoodieWriteStat.getTotalLogBlocks(), hoodieWriteStat.getTotalCorruptLogBlock(),
hoodieWriteStat.getTotalRollbackBlocks(), hoodieWriteStat.getTotalLogRecords(),
hoodieWriteStat.getTotalUpdatedRecordsCompacted(), hoodieWriteStat.getTotalWriteBytes()
});
}
}
Expand Down Expand Up @@ -162,7 +162,7 @@ private String printCommitsWithMetadata(HoodieDefaultTimeline timeline,
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN);

return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows, tempTableName);
limit, headerOnly, rows, tempTableName);
}

@CliCommand(value = "commits show", help = "Show the commits")
Expand All @@ -182,30 +182,26 @@ public String showCommits(
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
if (includeExtraMetadata) {
return printCommitsWithMetadata(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName);
} else {
} else {
return printCommits(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName);
}
}

@CliCommand(value = "commits showarchived", help = "Show the archived commits")
public String showArchivedCommits(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
@CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table",
unspecifiedDefaultValue = "") final String exportTableName,
@CliOption(key = {"startTs"}, mandatory = false, help = "start time for commits, default: now - 10 days")
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
@CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table",
unspecifiedDefaultValue = "") final String exportTableName,
@CliOption(key = {"startTs"}, mandatory = false, help = "start time for commits, default: now - 10 days")
String startTs,
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for commits, default: now - 1 day")
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for commits, default: now - 1 day")
String endTs,
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1")
final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "")
final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false")
final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false")
final boolean headerOnly)
throws IOException {
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
if (StringUtils.isNullOrEmpty(startTs)) {
startTs = CommitUtil.getTimeDaysAgo(10);
}
Expand All @@ -218,7 +214,7 @@ public String showArchivedCommits(
HoodieDefaultTimeline timelineRange = archivedTimeline.findInstantsInRange(startTs, endTs);
if (includeExtraMetadata) {
return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly, exportTableName);
} else {
} else {
return printCommits(timelineRange, limit, sortByField, descending, headerOnly, exportTableName);
}
} finally {
Expand All @@ -228,13 +224,14 @@ public String showArchivedCommits(
}

@CliCommand(value = "commit rollback", help = "Rollback a commit")
public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String instantTime,
public String rollbackCommit(
@CliOption(key = {"commit"}, help = "Commit to rollback") final String instantTime,
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "false",
help = "Enabling marker based rollback") final String rollbackUsingMarkers)
help = "Enabling marker based rollback") final String rollbackUsingMarkers)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
Expand Down Expand Up @@ -455,9 +452,9 @@ public String syncCommits(@CliOption(key = {"path"}, help = "Path of the table t
* */
private Option<HoodieInstant> getCommitForInstant(HoodieTimeline timeline, String instantTime) throws IOException {
List<HoodieInstant> instants = Arrays.asList(
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime));
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime));

Option<HoodieInstant> hoodieInstant = Option.fromJavaOptional(instants.stream().filter(timeline::containsInstant).findAny());

Expand All @@ -471,7 +468,7 @@ private Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTimeline time
HoodieReplaceCommitMetadata.class));
}
return Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(),
HoodieCommitMetadata.class));
HoodieCommitMetadata.class));
}

return Option.empty();
Expand Down
Loading

0 comments on commit a7e14d3

Please sign in to comment.