From 01285c1c9b10ef55644bb0f271c84072dbab3e55 Mon Sep 17 00:00:00 2001 From: KnightChess <981159963@qq.com> Date: Tue, 30 Aug 2022 10:23:47 +0800 Subject: [PATCH] [HUDI-4483] Fix checkstyle in integ-test module (#6523) (cherry picked from commit ac9ce8533472418101c0cdadf442aa58d2ed159a) --- hudi-integ-test/pom.xml | 1 - .../HoodieContinousTestSuiteWriter.java | 2 -- .../HoodieInlineTestSuiteWriter.java | 8 ------ .../HoodieMultiWriterTestSuiteJob.java | 3 +- .../testsuite/HoodieTestSuiteWriter.java | 4 +-- .../SparkDataSourceContinuousIngestTool.java | 1 - .../configuration/DFSDeltaConfig.java | 2 +- .../hudi/integ/testsuite/dag/DagUtils.java | 28 +++++++------------ .../testsuite/dag/nodes/BaseQueryNode.java | 3 +- .../dag/nodes/BaseValidateDatasetNode.java | 24 ++++++++-------- .../testsuite/dag/nodes/HiveQueryNode.java | 3 +- .../testsuite/dag/nodes/HiveSyncNode.java | 1 - .../testsuite/dag/nodes/PrestoQueryNode.java | 3 +- .../testsuite/dag/nodes/TrinoQueryNode.java | 5 ++-- .../dag/nodes/ValidateAsyncOperations.java | 11 ++------ .../testsuite/dag/scheduler/DagScheduler.java | 1 - .../GenericRecordFullPayloadGenerator.java | 6 ++-- .../reader/DFSAvroDeltaInputReader.java | 8 ++++-- 18 files changed, 41 insertions(+), 73 deletions(-) diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml index 963f699c4341e..7b211d9669b53 100644 --- a/hudi-integ-test/pom.xml +++ b/hudi-integ-test/pom.xml @@ -445,7 +445,6 @@ ${project.basedir}/compose_env ${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244.yml ${skipITs} - true ${project.parent.basedir} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java index 1bf69aaf836cc..76f9d7424ac90 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java @@ -22,8 +22,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; import org.apache.hudi.utilities.schema.SchemaProvider; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java index 63805e71a5645..91a7cf358c011 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java @@ -19,10 +19,8 @@ package org.apache.hudi.integ.testsuite; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -31,12 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.config.HoodiePayloadConfig; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; -import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; import org.apache.hudi.table.HoodieSparkTable; @@ -46,7 +39,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java index 87d2f587597a0..dea16aef5fa4b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java @@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -134,7 +133,7 @@ public static void main(String[] args) throws Exception { AtomicBoolean jobFailed = new AtomicBoolean(false); AtomicInteger counter = new AtomicInteger(0); List waitTimes = new ArrayList<>(); - for (int i = 0;i < jobIndex ;i++) { + for (int i = 0; i < jobIndex; i++) { if (i == 0) { waitTimes.add(0L); } else { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index dc711818d710d..d763115281ce6 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -116,7 +116,7 @@ private boolean allowWriteClientAccess(DagNode dagNode) { public abstract RDD getNextBatch() throws Exception; - public abstract Pair>> fetchSource() throws Exception ; + public abstract Pair>> fetchSource() throws Exception; public abstract Option startCommit(); @@ -132,7 +132,7 @@ private boolean allowWriteClientAccess(DagNode dagNode) { public abstract JavaRDD compact(Option instantTime) throws Exception; - public abstract void inlineClustering() throws Exception ; + public abstract void inlineClustering() throws Exception; public abstract Option scheduleCompaction(Option> previousCommitExtraMetadata) throws Exception; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java index c4f782fe40864..d8a4bbe7dac61 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java @@ -22,7 +22,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.integ.testsuite.SparkDataSourceContinuousIngest; import org.apache.hudi.utilities.HoodieRepairTool; import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java index f6c8c8fc306b0..771127c623eaa 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java @@ -46,7 +46,7 @@ public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInput SerializableConfiguration configuration, String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize, int inputParallelism, boolean deleteOldInputData, boolean useHudiToGenerateUpdates) { - super(deltaOutputMode, deltaInputType, configuration); + super(deltaOutputMode, deltaInputType, configuration); this.deltaBasePath = deltaBasePath; this.schemaStr = schemaStr; this.maxFileSize = maxFileSize; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java index 1d78d0fdbba8a..b5b2aaa51aa21 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java @@ -178,8 +178,7 @@ private static DagNode convertJsonToDagNode(JsonNode node, String type, String n DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node)) .withName(name).build(); return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config); - } - catch (ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new RuntimeException(e); } } @@ -231,8 +230,7 @@ private static List> getQueries(Entry en List flattened = new ArrayList<>(); flattened.add(entry.getValue()); queries = (List>) getQueryMapper().readValue(flattened.toString(), List.class); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); } return queries; @@ -244,8 +242,7 @@ private static List getQuerySessionProperties(Entry en List flattened = new ArrayList<>(); flattened.add(entry.getValue()); properties = (List) getQueryEnginePropertyMapper().readValue(flattened.toString(), List.class); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); } return properties; @@ -254,20 +251,15 @@ private static List getQuerySessionProperties(Entry en private static Object getValue(JsonNode node) { if (node.isInt()) { return node.asInt(); - } - else if (node.isLong()) { + } else if (node.isLong()) { return node.asLong(); - } - else if (node.isShort()) { + } else if (node.isShort()) { return node.asInt(); - } - else if (node.isBoolean()) { + } else if (node.isBoolean()) { return node.asBoolean(); - } - else if (node.isDouble()) { + } else if (node.isDouble()) { return node.asDouble(); - } - else if (node.isFloat()) { + } else if (node.isFloat()) { return node.asDouble(); } return node.textValue(); @@ -287,6 +279,7 @@ private static JsonNode createJsonNode(DagNode node, String type) throws IOExcep case HIVE_PROPERTIES: ((ObjectNode) configNode).put(HIVE_PROPERTIES, MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties()))); + break; case PRESTO_QUERIES: ((ObjectNode) configNode).put(PRESTO_QUERIES, MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries()))); @@ -376,8 +369,7 @@ public List deserialize(JsonParser parser, DeserializationContext context) throw if (fieldName.contains("query")) { query = parser.getValueAsString(); - } - else if (fieldName.contains("result")) { + } else if (fieldName.contains("result")) { result = parser.getValueAsInt(); pairs.add(Pair.of(query, result)); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java index c1a3d23791bd3..3870dce9dd9cd 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseQueryNode.java @@ -40,8 +40,7 @@ public void executeAndValidateQueries(List> queriesWithRes if (!res.next()) { log.info("res.next() was False - typically this means the query returned no rows."); assert 0 == queryAndResult.getRight(); - } - else { + } else { Integer result = res.getInt(1); if (!queryAndResult.getRight().equals(result)) { throw new AssertionError( diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java index 15c209e4752b8..5f659c879850d 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java @@ -88,8 +88,8 @@ public abstract Dataset getDatasetToValidate(SparkSession session, Executio public void execute(ExecutionContext context, int curItrCount) throws Exception { int validateOnceEveryItr = config.validateOnceEveryIteration(); int itrCountToExecute = config.getIterationCountToExecute(); - if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) || - (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) { + if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) + || (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) { FileSystem fs = new Path(context.getHoodieTestSuiteWriter().getCfg().inputBasePath) .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); if (context.getHoodieTestSuiteWriter().getCfg().testContinousMode) { @@ -142,8 +142,8 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key()); log.warn("Validating hive table with db : " + database + " and table : " + tableName); session.sql("REFRESH TABLE " + database + "." + tableName); - Dataset cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " + - "test_suite_source_ordering_field FROM " + database + "." + tableName); + Dataset cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " + + "test_suite_source_ordering_field FROM " + database + "." + tableName); Dataset reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", "_hoodie_is_deleted", "test_suite_source_ordering_field"); @@ -178,9 +178,9 @@ private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hu FileStatus[] subDirs = fs.listStatus(new Path(inputPath)); List subDirList = Arrays.asList(subDirs); subDirList.sort(Comparator.comparingLong(entry -> Long.parseLong(entry.getPath().getName()))); - String latestSubDir = subDirList.get(subDirList.size() -1).getPath().getName(); - log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " + - (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none")); + String latestSubDir = subDirList.get(subDirList.size() - 1).getPath().getName(); + log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " + + (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none")); long maxWaitTime = config.maxWaitTimeForDeltastreamerToCatchupMs(); long waitedSoFar = 0; while (!(latestCheckpoint.isPresent() && latestCheckpoint.get().equals(latestSubDir))) { @@ -191,11 +191,11 @@ private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hu latestCheckpoint = getLatestCheckpoint(commitTimeline); waitedSoFar += 20000; if (waitedSoFar >= maxWaitTime) { - throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint " + - (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir); + throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint " + + (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir); } - log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " + - (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none")); + log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " + + (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none")); } } @@ -223,7 +223,7 @@ private Dataset getInputDf(ExecutionContext context, SparkSession session, Dataset inputDf = session.read().format("avro").load(inputPath); Dataset trimmedDf = inputDf; if (!config.inputPartitonsToSkipWithValidate().isEmpty()) { - trimmedDf = inputDf.filter("instr("+partitionPathField+", \'"+ config.inputPartitonsToSkipWithValidate() +"\') != 1"); + trimmedDf = inputDf.filter("instr(" + partitionPathField + ", \'" + config.inputPartitonsToSkipWithValidate() + "\') != 1"); } ExpressionEncoder encoder = getEncoder(inputDf.schema()); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java index fd04cc34c6866..0b9149f928f95 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java @@ -67,8 +67,7 @@ public void execute(ExecutionContext executionContext, int curItrCount) throws E executeAndValidateQueries(this.config.getHiveQueries(), stmt); stmt.close(); this.hiveServiceProvider.stopLocalHiveServiceIfNeeded(); - } - catch (Exception e) { + } catch (Exception e) { throw new HoodieValidationException("Hive query validation failed due to " + e.getMessage(), e); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java index 3f0c90f03067c..7415880a83616 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java @@ -31,7 +31,6 @@ */ public class HiveSyncNode extends DagNode { - public HiveSyncNode(Config config) { this.config = config; } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java index 9a9bafcf6fb89..94f0a51a4dba9 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java @@ -52,8 +52,7 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception setSessionProperties(this.config.getPrestoProperties(), stmt); executeAndValidateQueries(this.config.getPrestoQueries(), stmt); stmt.close(); - } - catch (Exception e) { + } catch (Exception e) { throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java index ffcc901f67e09..1a53e29fa0987 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/TrinoQueryNode.java @@ -27,7 +27,7 @@ import java.sql.DriverManager; import java.sql.Statement; -public class TrinoQueryNode extends BaseQueryNode{ +public class TrinoQueryNode extends BaseQueryNode { public TrinoQueryNode(DeltaConfig.Config config) { this.config = config; @@ -52,8 +52,7 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception setSessionProperties(this.config.getTrinoProperties(), stmt); executeAndValidateQueries(this.config.getTrinoQueries(), stmt); stmt.close(); - } - catch (Exception e) { + } catch (Exception e) { throw new HoodieValidationException("Trino query validation failed due to " + e.getMessage(), e); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java index 7b5d4dd41881d..0835ec97226e4 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CleanerUtils; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; @@ -36,14 +35,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; /** * Node to validate data set sanity like total file versions retained, has cleaning happened, has archival happened, etc. @@ -77,8 +70,8 @@ public void execute(ExecutionContext executionContext, int curItrCount) throws E log.warn("Earliest commit to retain : " + earliestCommitToRetain); long unCleanedInstants = metaClient.getActiveTimeline().filterCompletedInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestCommitToRetain)).getInstants().count(); - ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants + - " mismatched with max commits retained " + (maxCommitsRetained + 1)); + ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants + + " mismatched with max commits retained " + (maxCommitsRetained + 1)); } if (config.validateArchival() || config.validateClean()) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index ab80df0d6a1db..847f0a43c511c 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -24,7 +24,6 @@ import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode; -import org.apache.hudi.metrics.Metrics; import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java index a936a81665116..9089f751b955a 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java @@ -18,7 +18,6 @@ package org.apache.hudi.integ.testsuite.generator; -import com.google.common.annotations.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.avro.Schema; @@ -331,12 +330,13 @@ public boolean validate(GenericRecord record) { return genericData.validate(baseSchema, record); } - /* + /** * Generates a sequential timestamp (daily increment), and updates the timestamp field of the record. * Note: When generating records, number of records to be generated must be more than numDatePartitions * parallelism, * to guarantee that at least numDatePartitions are created. + * + * @VisibleForTesting */ - @VisibleForTesting public GenericRecord updateTimestamp(GenericRecord record, String fieldName) { long delta = TimeUnit.SECONDS.convert((partitionIndex++ % numDatePartitions) + startPartition, TimeUnit.DAYS); record.put(fieldName, delta); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSAvroDeltaInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSAvroDeltaInputReader.java index f1bb02a6998e8..3fa4d375a6fc3 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSAvroDeltaInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSAvroDeltaInputReader.java @@ -22,11 +22,12 @@ import java.util.Arrays; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.PathFilter; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.SparkSession; + import org.apache.hudi.common.util.Option; import org.apache.hudi.integ.testsuite.writer.AvroFileDeltaInputWriter; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.SparkSession; /** * A reader of {@link DeltaOutputMode#DFS} and {@link DeltaInputType#AVRO}. @@ -46,7 +47,8 @@ public class DFSAvroDeltaInputReader extends DFSDeltaInputReader { } }; - public DFSAvroDeltaInputReader(SparkSession sparkSession, String schemaStr, String basePath, + public DFSAvroDeltaInputReader( + SparkSession sparkSession, String schemaStr, String basePath, Option structName, Option nameSpace) { this.sparkSession = sparkSession;