Skip to content

Commit

Permalink
[HUDI-4483] Fix checkstyle in integ-test module (apache#6523)
Browse files Browse the repository at this point in the history
  • Loading branch information
KnightChess authored Aug 30, 2022
1 parent c50b634 commit ac9ce85
Show file tree
Hide file tree
Showing 18 changed files with 41 additions and 73 deletions.
1 change: 0 additions & 1 deletion hudi-integ-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@
<dockerCompose.envFile>${project.basedir}/compose_env</dockerCompose.envFile>
<dockerCompose.file>${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244.yml</dockerCompose.file>
<docker.compose.skip>${skipITs}</docker.compose.skip>
<checkstyle.skip>true</checkstyle.skip>
<main.basedir>${project.parent.basedir}</main.basedir>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.utilities.schema.SchemaProvider;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.hudi.integ.testsuite;

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -31,12 +29,7 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.table.HoodieSparkTable;
Expand All @@ -46,7 +39,6 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -134,7 +133,7 @@ public static void main(String[] args) throws Exception {
AtomicBoolean jobFailed = new AtomicBoolean(false);
AtomicInteger counter = new AtomicInteger(0);
List<Long> waitTimes = new ArrayList<>();
for (int i = 0;i < jobIndex ;i++) {
for (int i = 0; i < jobIndex; i++) {
if (i == 0) {
waitTimes.add(0L);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private boolean allowWriteClientAccess(DagNode dagNode) {

public abstract RDD<GenericRecord> getNextBatch() throws Exception;

public abstract Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception ;
public abstract Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception;

public abstract Option<String> startCommit();

Expand All @@ -132,7 +132,7 @@ private boolean allowWriteClientAccess(DagNode dagNode) {

public abstract JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception;

public abstract void inlineClustering() throws Exception ;
public abstract void inlineClustering() throws Exception;

public abstract Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.integ.testsuite.SparkDataSourceContinuousIngest;
import org.apache.hudi.utilities.HoodieRepairTool;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInput
SerializableConfiguration configuration,
String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize,
int inputParallelism, boolean deleteOldInputData, boolean useHudiToGenerateUpdates) {
super(deltaOutputMode, deltaInputType, configuration);
super(deltaOutputMode, deltaInputType, configuration);
this.deltaBasePath = deltaBasePath;
this.schemaStr = schemaStr;
this.maxFileSize = maxFileSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ private static DagNode convertJsonToDagNode(JsonNode node, String type, String n
DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node))
.withName(name).build();
return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config);
}
catch (ClassNotFoundException e) {
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -231,8 +230,7 @@ private static List<Pair<String, Integer>> getQueries(Entry<String, JsonNode> en
List<JsonNode> flattened = new ArrayList<>();
flattened.add(entry.getValue());
queries = (List<Pair<String, Integer>>) getQueryMapper().readValue(flattened.toString(), List.class);
}
catch (Exception e) {
} catch (Exception e) {
e.printStackTrace();
}
return queries;
Expand All @@ -244,8 +242,7 @@ private static List<String> getQuerySessionProperties(Entry<String, JsonNode> en
List<JsonNode> flattened = new ArrayList<>();
flattened.add(entry.getValue());
properties = (List<String>) getQueryEnginePropertyMapper().readValue(flattened.toString(), List.class);
}
catch (Exception e) {
} catch (Exception e) {
e.printStackTrace();
}
return properties;
Expand All @@ -254,20 +251,15 @@ private static List<String> getQuerySessionProperties(Entry<String, JsonNode> en
private static Object getValue(JsonNode node) {
if (node.isInt()) {
return node.asInt();
}
else if (node.isLong()) {
} else if (node.isLong()) {
return node.asLong();
}
else if (node.isShort()) {
} else if (node.isShort()) {
return node.asInt();
}
else if (node.isBoolean()) {
} else if (node.isBoolean()) {
return node.asBoolean();
}
else if (node.isDouble()) {
} else if (node.isDouble()) {
return node.asDouble();
}
else if (node.isFloat()) {
} else if (node.isFloat()) {
return node.asDouble();
}
return node.textValue();
Expand All @@ -287,6 +279,7 @@ private static JsonNode createJsonNode(DagNode node, String type) throws IOExcep
case HIVE_PROPERTIES:
((ObjectNode) configNode).put(HIVE_PROPERTIES,
MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
break;
case PRESTO_QUERIES:
((ObjectNode) configNode).put(PRESTO_QUERIES,
MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
Expand Down Expand Up @@ -376,8 +369,7 @@ public List deserialize(JsonParser parser, DeserializationContext context) throw

if (fieldName.contains("query")) {
query = parser.getValueAsString();
}
else if (fieldName.contains("result")) {
} else if (fieldName.contains("result")) {
result = parser.getValueAsInt();
pairs.add(Pair.of(query, result));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public void executeAndValidateQueries(List<Pair<String, Integer>> queriesWithRes
if (!res.next()) {
log.info("res.next() was False - typically this means the query returned no rows.");
assert 0 == queryAndResult.getRight();
}
else {
} else {
Integer result = res.getInt(1);
if (!queryAndResult.getRight().equals(result)) {
throw new AssertionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public abstract Dataset<Row> getDatasetToValidate(SparkSession session, Executio
public void execute(ExecutionContext context, int curItrCount) throws Exception {
int validateOnceEveryItr = config.validateOnceEveryIteration();
int itrCountToExecute = config.getIterationCountToExecute();
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
(itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount)
|| (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
FileSystem fs = new Path(context.getHoodieTestSuiteWriter().getCfg().inputBasePath)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
if (context.getHoodieTestSuiteWriter().getCfg().testContinousMode) {
Expand Down Expand Up @@ -142,8 +142,8 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
session.sql("REFRESH TABLE " + database + "." + tableName);
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
"test_suite_source_ordering_field FROM " + database + "." + tableName);
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, "
+ "test_suite_source_ordering_field FROM " + database + "." + tableName);
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
"_hoodie_is_deleted", "test_suite_source_ordering_field");

Expand Down Expand Up @@ -178,9 +178,9 @@ private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hu
FileStatus[] subDirs = fs.listStatus(new Path(inputPath));
List<FileStatus> subDirList = Arrays.asList(subDirs);
subDirList.sort(Comparator.comparingLong(entry -> Long.parseLong(entry.getPath().getName())));
String latestSubDir = subDirList.get(subDirList.size() -1).getPath().getName();
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
String latestSubDir = subDirList.get(subDirList.size() - 1).getPath().getName();
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer "
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
long maxWaitTime = config.maxWaitTimeForDeltastreamerToCatchupMs();
long waitedSoFar = 0;
while (!(latestCheckpoint.isPresent() && latestCheckpoint.get().equals(latestSubDir))) {
Expand All @@ -191,11 +191,11 @@ private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hu
latestCheckpoint = getLatestCheckpoint(commitTimeline);
waitedSoFar += 20000;
if (waitedSoFar >= maxWaitTime) {
throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir);
throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint "
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir);
}
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer "
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
}
}

Expand Down Expand Up @@ -223,7 +223,7 @@ private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session,
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
Dataset<Row> trimmedDf = inputDf;
if (!config.inputPartitonsToSkipWithValidate().isEmpty()) {
trimmedDf = inputDf.filter("instr("+partitionPathField+", \'"+ config.inputPartitonsToSkipWithValidate() +"\') != 1");
trimmedDf = inputDf.filter("instr(" + partitionPathField + ", \'" + config.inputPartitonsToSkipWithValidate() + "\') != 1");
}

ExpressionEncoder encoder = getEncoder(inputDf.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public void execute(ExecutionContext executionContext, int curItrCount) throws E
executeAndValidateQueries(this.config.getHiveQueries(), stmt);
stmt.close();
this.hiveServiceProvider.stopLocalHiveServiceIfNeeded();
}
catch (Exception e) {
} catch (Exception e) {
throw new HoodieValidationException("Hive query validation failed due to " + e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
*/
public class HiveSyncNode extends DagNode<Boolean> {


public HiveSyncNode(Config config) {
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception
setSessionProperties(this.config.getPrestoProperties(), stmt);
executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
stmt.close();
}
catch (Exception e) {
} catch (Exception e) {
throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.sql.DriverManager;
import java.sql.Statement;

public class TrinoQueryNode extends BaseQueryNode{
public class TrinoQueryNode extends BaseQueryNode {

public TrinoQueryNode(DeltaConfig.Config config) {
this.config = config;
Expand All @@ -52,8 +52,7 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception
setSessionProperties(this.config.getTrinoProperties(), stmt);
executeAndValidateQueries(this.config.getTrinoQueries(), stmt);
stmt.close();
}
catch (Exception e) {
} catch (Exception e) {
throw new HoodieValidationException("Trino query validation failed due to " + e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
Expand All @@ -36,14 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Node to validate data set sanity like total file versions retained, has cleaning happened, has archival happened, etc.
Expand Down Expand Up @@ -77,8 +70,8 @@ public void execute(ExecutionContext executionContext, int curItrCount) throws E
log.warn("Earliest commit to retain : " + earliestCommitToRetain);
long unCleanedInstants = metaClient.getActiveTimeline().filterCompletedInstants().filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestCommitToRetain)).getInstants().count();
ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants +
" mismatched with max commits retained " + (maxCommitsRetained + 1));
ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants
+ " mismatched with max commits retained " + (maxCommitsRetained + 1));
}

if (config.validateArchival() || config.validateClean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode;
import org.apache.hudi.metrics.Metrics;

import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.integ.testsuite.generator;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -331,12 +330,13 @@ public boolean validate(GenericRecord record) {
return genericData.validate(baseSchema, record);
}

/*
/**
* Generates a sequential timestamp (daily increment), and updates the timestamp field of the record.
* Note: When generating records, number of records to be generated must be more than numDatePartitions * parallelism,
* to guarantee that at least numDatePartitions are created.
*
* @VisibleForTesting
*/
@VisibleForTesting
public GenericRecord updateTimestamp(GenericRecord record, String fieldName) {
long delta = TimeUnit.SECONDS.convert((partitionIndex++ % numDatePartitions) + startPartition, TimeUnit.DAYS);
record.put(fieldName, delta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import java.util.Arrays;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.PathFilter;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.writer.AvroFileDeltaInputWriter;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

/**
* A reader of {@link DeltaOutputMode#DFS} and {@link DeltaInputType#AVRO}.
Expand All @@ -46,7 +47,8 @@ public class DFSAvroDeltaInputReader extends DFSDeltaInputReader {
}
};

public DFSAvroDeltaInputReader(SparkSession sparkSession, String schemaStr, String basePath,
public DFSAvroDeltaInputReader(
SparkSession sparkSession, String schemaStr, String basePath,
Option<String> structName,
Option<String> nameSpace) {
this.sparkSession = sparkSession;
Expand Down

0 comments on commit ac9ce85

Please sign in to comment.