Skip to content

Commit

Permalink
[HUDI-4482][HUDI-4483] fix checkstyle error and remove guava
Browse files Browse the repository at this point in the history
  • Loading branch information
KnightChess committed Jul 30, 2022
1 parent 0a5ce00 commit 15cbbea
Show file tree
Hide file tree
Showing 24 changed files with 134 additions and 161 deletions.
1 change: 0 additions & 1 deletion hudi-integ-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@
<dockerCompose.envFile>${project.basedir}/compose_env</dockerCompose.envFile>
<dockerCompose.file>${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244.yml</dockerCompose.file>
<docker.compose.skip>${skipITs}</docker.compose.skip>
<checkstyle.skip>true</checkstyle.skip>
<main.basedir>${project.parent.basedir}</main.basedir>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.utilities.schema.SchemaProvider;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.hudi.integ.testsuite;

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -31,12 +29,7 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.table.HoodieSparkTable;
Expand All @@ -46,7 +39,6 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -134,7 +133,7 @@ public static void main(String[] args) throws Exception {
AtomicBoolean jobFailed = new AtomicBoolean(false);
AtomicInteger counter = new AtomicInteger(0);
List<Long> waitTimes = new ArrayList<>();
for (int i = 0;i < jobIndex ;i++) {
for (int i = 0; i < jobIndex; i++) {
if (i == 0) {
waitTimes.add(0L);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private boolean allowWriteClientAccess(DagNode dagNode) {

public abstract RDD<GenericRecord> getNextBatch() throws Exception;

public abstract Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception ;
public abstract Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception;

public abstract Option<String> startCommit();

Expand All @@ -132,7 +132,7 @@ private boolean allowWriteClientAccess(DagNode dagNode) {

public abstract JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception;

public abstract void inlineClustering() throws Exception ;
public abstract void inlineClustering() throws Exception;

public abstract Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.integ.testsuite.SparkDataSourceContinuousIngest;
import org.apache.hudi.utilities.HoodieRepairTool;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInput
SerializableConfiguration configuration,
String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize,
int inputParallelism, boolean deleteOldInputData, boolean useHudiToGenerateUpdates) {
super(deltaOutputMode, deltaInputType, configuration);
super(deltaOutputMode, deltaInputType, configuration);
this.deltaBasePath = deltaBasePath;
this.schemaStr = schemaStr;
this.maxFileSize = maxFileSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public abstract Dataset<Row> getDatasetToValidate(SparkSession session, Executio
public void execute(ExecutionContext context, int curItrCount) throws Exception {
int validateOnceEveryItr = config.validateOnceEveryIteration();
int itrCountToExecute = config.getIterationCountToExecute();
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
(itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount)
|| (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
FileSystem fs = new Path(context.getHoodieTestSuiteWriter().getCfg().inputBasePath)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
if (context.getHoodieTestSuiteWriter().getCfg().testContinousMode) {
Expand Down Expand Up @@ -142,8 +142,8 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
session.sql("REFRESH TABLE " + database + "." + tableName);
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
"test_suite_source_ordering_field FROM " + database + "." + tableName);
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, "
+ "test_suite_source_ordering_field FROM " + database + "." + tableName);
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
"_hoodie_is_deleted", "test_suite_source_ordering_field");

Expand Down Expand Up @@ -178,9 +178,9 @@ private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hu
FileStatus[] subDirs = fs.listStatus(new Path(inputPath));
List<FileStatus> subDirList = Arrays.asList(subDirs);
subDirList.sort(Comparator.comparingLong(entry -> Long.parseLong(entry.getPath().getName())));
String latestSubDir = subDirList.get(subDirList.size() -1).getPath().getName();
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
String latestSubDir = subDirList.get(subDirList.size() - 1).getPath().getName();
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer "
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
long maxWaitTime = config.maxWaitTimeForDeltastreamerToCatchupMs();
long waitedSoFar = 0;
while (!(latestCheckpoint.isPresent() && latestCheckpoint.get().equals(latestSubDir))) {
Expand All @@ -191,11 +191,11 @@ private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hu
latestCheckpoint = getLatestCheckpoint(commitTimeline);
waitedSoFar += 20000;
if (waitedSoFar >= maxWaitTime) {
throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir);
throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint "
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir);
}
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer "
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
}
}

Expand Down Expand Up @@ -223,7 +223,7 @@ private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session,
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
Dataset<Row> trimmedDf = inputDf;
if (!config.inputPartitonsToSkipWithValidate().isEmpty()) {
trimmedDf = inputDf.filter("instr("+partitionPathField+", \'"+ config.inputPartitonsToSkipWithValidate() +"\') != 1");
trimmedDf = inputDf.filter("instr(" + partitionPathField + ", \'" + config.inputPartitonsToSkipWithValidate() + "\') != 1");
}

ExpressionEncoder encoder = getEncoder(inputDf.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
*/
public class HiveSyncNode extends DagNode<Boolean> {


public HiveSyncNode(Config config) {
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
Expand All @@ -36,14 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Node to validate data set sanity like total file versions retained, has cleaning happened, has archival happened, etc.
Expand Down Expand Up @@ -77,8 +70,8 @@ public void execute(ExecutionContext executionContext, int curItrCount) throws E
log.warn("Earliest commit to retain : " + earliestCommitToRetain);
long unCleanedInstants = metaClient.getActiveTimeline().filterCompletedInstants().filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestCommitToRetain)).getInstants().count();
ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants +
" mismatched with max commits retained " + (maxCommitsRetained + 1));
ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants
+ " mismatched with max commits retained " + (maxCommitsRetained + 1));
}

if (config.validateArchival() || config.validateClean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode;
import org.apache.hudi.metrics.Metrics;

import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.integ.testsuite.generator;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -336,7 +335,6 @@ public boolean validate(GenericRecord record) {
* Note: When generating records, number of records to be generated must be more than numDatePartitions * parallelism,
* to guarantee that at least numDatePartitions are created.
*/
@VisibleForTesting
public GenericRecord updateTimestamp(GenericRecord record, String fieldName) {
long delta = TimeUnit.SECONDS.convert((partitionIndex++ % numDatePartitions) + startPartition, TimeUnit.DAYS);
record.put(fieldName, delta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi.command.payload

import com.google.common.cache.{Cache, CacheBuilder}
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
Expand All @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hudi.command.payload.SqlTypedRecord.{getAvroDeserializer, getSqlType}
import org.apache.spark.sql.types.StructType

import java.util.concurrent.Callable
import java.util.function.Function

/**
* A sql typed record which will convert the avro field to sql typed value.
Expand All @@ -49,30 +49,24 @@ class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {

object SqlTypedRecord {

private val sqlTypeCache = CacheBuilder.newBuilder().build[Schema, StructType]()
private val sqlTypeCache = Caffeine.newBuilder()
.maximumSize(16).build[Schema, StructType]

private val avroDeserializerCacheLocal = new ThreadLocal[Cache[Schema, HoodieAvroDeserializer]] {
override def initialValue(): Cache[Schema, HoodieAvroDeserializer] =
CacheBuilder.newBuilder().build[Schema, HoodieAvroDeserializer]()
override def initialValue(): Cache[Schema, HoodieAvroDeserializer] = {
Caffeine.newBuilder().maximumSize(16).build[Schema, HoodieAvroDeserializer]
}
}

def getSqlType(schema: Schema): StructType = {
sqlTypeCache.get(schema, new Callable[StructType] {
override def call(): StructType = {
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
sqlTypeCache.put(schema, structType)
structType
}
sqlTypeCache.get(schema, new Function[Schema, StructType] {
override def apply(t: Schema): StructType = AvroConversionUtils.convertAvroSchemaToStructType(t)
})
}

def getAvroDeserializer(schema: Schema): HoodieAvroDeserializer= {
avroDeserializerCacheLocal.get().get(schema, new Callable[HoodieAvroDeserializer] {
override def call(): HoodieAvroDeserializer = {
val deserializer = sparkAdapter.createAvroDeserializer(schema, getSqlType(schema))
avroDeserializerCacheLocal.get().put(schema, deserializer)
deserializer
}
avroDeserializerCacheLocal.get().get(schema, new Function[Schema, HoodieAvroDeserializer] {
override def apply(t: Schema): HoodieAvroDeserializer = sparkAdapter.createAvroDeserializer(t, getSqlType(t))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi.command.payload

import com.google.common.cache.CacheBuilder
import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.hudi.AvroConversionUtils
Expand All @@ -32,11 +32,11 @@ import org.apache.hudi.sql.IExpressionEvaluator
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, setWriteSchema, getMergedSchema}
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema, setWriteSchema}
import org.apache.spark.sql.types.{StructField, StructType}

import java.util.concurrent.Callable
import java.util.{Base64, Properties}
import java.util.function.Function
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -270,19 +270,19 @@ object ExpressionPayload {
* The Map[IExpressionEvaluator, IExpressionEvaluator] is the map of the condition expression
* to the assignments expression.
*/
private val cache = CacheBuilder.newBuilder()
private val cache = Caffeine.newBuilder()
.maximumSize(1024)
.build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()

private val writeSchemaCache = CacheBuilder.newBuilder().build[String, Schema]()
private val writeSchemaCache = Caffeine.newBuilder()
.maximumSize(16).build[String, Schema]()

def setWriteSchema(properties: Properties): Schema = {
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
writeSchemaCache.get(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key),
new Callable[Schema] {
override def call(): Schema =
new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
new Function[String, Schema] {
override def apply(t: String): Schema = new Schema.Parser().parse(t)
})
}

Expand All @@ -293,10 +293,9 @@ object ExpressionPayload {
def getEvaluator(
serializedConditionAssignments: String, writeSchema: Schema): Map[IExpressionEvaluator, IExpressionEvaluator] = {
cache.get(serializedConditionAssignments,
new Callable[Map[IExpressionEvaluator, IExpressionEvaluator]] {

override def call(): Map[IExpressionEvaluator, IExpressionEvaluator] = {
val serializedBytes = Base64.getDecoder.decode(serializedConditionAssignments)
new Function[String, Map[IExpressionEvaluator, IExpressionEvaluator]] {
override def apply(t: String): Map[IExpressionEvaluator, IExpressionEvaluator] = {
val serializedBytes = Base64.getDecoder.decode(t)
val conditionAssignments = SerDeUtils.toObject(serializedBytes)
.asInstanceOf[Map[Expression, Seq[Expression]]]
// Do the CodeGen for condition expression and assignment expression
Expand All @@ -316,14 +315,14 @@ object ExpressionPayload {
})
}

private val mergedSchemaCache = CacheBuilder.newBuilder().build[TupleSchema, Schema]()
private val mergedSchemaCache = Caffeine.newBuilder().maximumSize(16).build[TupleSchema, Schema]()

def getMergedSchema(source: Schema, target: Schema): Schema = {

mergedSchemaCache.get(TupleSchema(source, target), new Callable[Schema] {
override def call(): Schema = {
val rightSchema = HoodieAvroUtils.removeMetadataFields(target)
mergeSchema(source, rightSchema)
mergedSchemaCache.get(TupleSchema(source, target), new Function[TupleSchema, Schema] {
override def apply(t: TupleSchema): Schema = {
val rightSchema = HoodieAvroUtils.removeMetadataFields(t.second)
mergeSchema(t.first, rightSchema)
}
})
}
Expand Down
Loading

0 comments on commit 15cbbea

Please sign in to comment.