Skip to content

Commit

Permalink
[HUDI-4482] remove guava in spark moudle
Browse files Browse the repository at this point in the history
  • Loading branch information
KnightChess committed Aug 25, 2022
1 parent c188852 commit dc41a68
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi.command.payload

import com.google.common.cache.{Cache, CacheBuilder}
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
Expand All @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hudi.command.payload.SqlTypedRecord.{getAvroDeserializer, getSqlType}
import org.apache.spark.sql.types.StructType

import java.util.concurrent.Callable
import java.util.function.Function

/**
* A sql typed record which will convert the avro field to sql typed value.
Expand All @@ -49,30 +49,24 @@ class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {

object SqlTypedRecord {

private val sqlTypeCache = CacheBuilder.newBuilder().build[Schema, StructType]()
private val sqlTypeCache = Caffeine.newBuilder()
.maximumSize(16).build[Schema, StructType]

private val avroDeserializerCacheLocal = new ThreadLocal[Cache[Schema, HoodieAvroDeserializer]] {
override def initialValue(): Cache[Schema, HoodieAvroDeserializer] =
CacheBuilder.newBuilder().build[Schema, HoodieAvroDeserializer]()
override def initialValue(): Cache[Schema, HoodieAvroDeserializer] = {
Caffeine.newBuilder().maximumSize(16).build[Schema, HoodieAvroDeserializer]
}
}

def getSqlType(schema: Schema): StructType = {
sqlTypeCache.get(schema, new Callable[StructType] {
override def call(): StructType = {
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
sqlTypeCache.put(schema, structType)
structType
}
sqlTypeCache.get(schema, new Function[Schema, StructType] {
override def apply(t: Schema): StructType = AvroConversionUtils.convertAvroSchemaToStructType(t)
})
}

def getAvroDeserializer(schema: Schema): HoodieAvroDeserializer= {
avroDeserializerCacheLocal.get().get(schema, new Callable[HoodieAvroDeserializer] {
override def call(): HoodieAvroDeserializer = {
val deserializer = sparkAdapter.createAvroDeserializer(schema, getSqlType(schema))
avroDeserializerCacheLocal.get().put(schema, deserializer)
deserializer
}
avroDeserializerCacheLocal.get().get(schema, new Function[Schema, HoodieAvroDeserializer] {
override def apply(t: Schema): HoodieAvroDeserializer = sparkAdapter.createAvroDeserializer(t, getSqlType(t))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi.command.payload

import com.google.common.cache.CacheBuilder
import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.hudi.AvroConversionUtils
Expand All @@ -32,11 +32,11 @@ import org.apache.hudi.sql.IExpressionEvaluator
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, setWriteSchema, getMergedSchema}
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema, setWriteSchema}
import org.apache.spark.sql.types.{StructField, StructType}

import java.util.concurrent.Callable
import java.util.{Base64, Properties}
import java.util.function.Function
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -270,19 +270,19 @@ object ExpressionPayload {
* The Map[IExpressionEvaluator, IExpressionEvaluator] is the map of the condition expression
* to the assignments expression.
*/
private val cache = CacheBuilder.newBuilder()
private val cache = Caffeine.newBuilder()
.maximumSize(1024)
.build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()

private val writeSchemaCache = CacheBuilder.newBuilder().build[String, Schema]()
private val writeSchemaCache = Caffeine.newBuilder()
.maximumSize(16).build[String, Schema]()

def setWriteSchema(properties: Properties): Schema = {
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
writeSchemaCache.get(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key),
new Callable[Schema] {
override def call(): Schema =
new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
new Function[String, Schema] {
override def apply(t: String): Schema = new Schema.Parser().parse(t)
})
}

Expand All @@ -293,10 +293,9 @@ object ExpressionPayload {
def getEvaluator(
serializedConditionAssignments: String, writeSchema: Schema): Map[IExpressionEvaluator, IExpressionEvaluator] = {
cache.get(serializedConditionAssignments,
new Callable[Map[IExpressionEvaluator, IExpressionEvaluator]] {

override def call(): Map[IExpressionEvaluator, IExpressionEvaluator] = {
val serializedBytes = Base64.getDecoder.decode(serializedConditionAssignments)
new Function[String, Map[IExpressionEvaluator, IExpressionEvaluator]] {
override def apply(t: String): Map[IExpressionEvaluator, IExpressionEvaluator] = {
val serializedBytes = Base64.getDecoder.decode(t)
val conditionAssignments = SerDeUtils.toObject(serializedBytes)
.asInstanceOf[Map[Expression, Seq[Expression]]]
// Do the CodeGen for condition expression and assignment expression
Expand All @@ -316,14 +315,14 @@ object ExpressionPayload {
})
}

private val mergedSchemaCache = CacheBuilder.newBuilder().build[TupleSchema, Schema]()
private val mergedSchemaCache = Caffeine.newBuilder().maximumSize(16).build[TupleSchema, Schema]()

def getMergedSchema(source: Schema, target: Schema): Schema = {

mergedSchemaCache.get(TupleSchema(source, target), new Callable[Schema] {
override def call(): Schema = {
val rightSchema = HoodieAvroUtils.removeMetadataFields(target)
mergeSchema(source, rightSchema)
mergedSchemaCache.get(TupleSchema(source, target), new Function[TupleSchema, Schema] {
override def apply(t: TupleSchema): Schema = {
val rightSchema = HoodieAvroUtils.removeMetadataFields(t.second)
mergeSchema(t.first, rightSchema)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,72 +17,68 @@

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.ImmutableMap

import java.util
import java.util.Locale
import java.util.function.Supplier

object HoodieProcedures {
private val BUILDERS: util.Map[String, Supplier[ProcedureBuilder]] = initProcedureBuilders
private val BUILDERS: Map[String, Supplier[ProcedureBuilder]] = initProcedureBuilders

def newBuilder(name: String): ProcedureBuilder = {
val builderSupplier: Supplier[ProcedureBuilder] = BUILDERS.get(name.toLowerCase(Locale.ROOT))
if (builderSupplier != null) builderSupplier.get else null
val builderSupplier = BUILDERS.get(name.toLowerCase(Locale.ROOT))
if (builderSupplier.isDefined) builderSupplier.get.get() else null
}

private def initProcedureBuilders: util.Map[String, Supplier[ProcedureBuilder]] = {
val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder()
mapBuilder.put(RunCompactionProcedure.NAME, RunCompactionProcedure.builder)
mapBuilder.put(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder)
mapBuilder.put(CreateSavepointProcedure.NAME, CreateSavepointProcedure.builder)
mapBuilder.put(DeleteSavepointProcedure.NAME, DeleteSavepointProcedure.builder)
mapBuilder.put(RollbackToSavepointProcedure.NAME, RollbackToSavepointProcedure.builder)
mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder)
mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
mapBuilder.put(ShowArchivedCommitsProcedure.NAME, ShowArchivedCommitsProcedure.builder)
mapBuilder.put(ShowArchivedCommitsMetadataProcedure.NAME, ShowArchivedCommitsMetadataProcedure.builder)
mapBuilder.put(ShowCommitFilesProcedure.NAME, ShowCommitFilesProcedure.builder)
mapBuilder.put(ShowCommitPartitionsProcedure.NAME, ShowCommitPartitionsProcedure.builder)
mapBuilder.put(ShowCommitWriteStatsProcedure.NAME, ShowCommitWriteStatsProcedure.builder)
mapBuilder.put(CommitsCompareProcedure.NAME, CommitsCompareProcedure.builder)
mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder)
mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
mapBuilder.put(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
mapBuilder.put(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
mapBuilder.put(ShowAllFileSystemViewProcedure.NAME, ShowAllFileSystemViewProcedure.builder)
mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder)
mapBuilder.put(ShowHoodieLogFileMetadataProcedure.NAME, ShowHoodieLogFileMetadataProcedure.builder)
mapBuilder.put(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder)
mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder)
mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
mapBuilder.put(HdfsParquetImportProcedure.NAME, HdfsParquetImportProcedure.builder)
mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
mapBuilder.put(ShowBootstrapMappingProcedure.NAME, ShowBootstrapMappingProcedure.builder)
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
mapBuilder.put(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
mapBuilder.put(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder)
mapBuilder.put(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder)
mapBuilder.put(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder)
mapBuilder.put(DeleteMetadataTableProcedure.NAME, DeleteMetadataTableProcedure.builder)
mapBuilder.put(InitMetadataTableProcedure.NAME, InitMetadataTableProcedure.builder)
mapBuilder.put(ShowMetadataTableStatsProcedure.NAME, ShowMetadataTableStatsProcedure.builder)
mapBuilder.put(ValidateMetadataTableFilesProcedure.NAME, ValidateMetadataTableFilesProcedure.builder)
mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
mapBuilder.put(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder)
mapBuilder.put(RepairCorruptedCleanFilesProcedure.NAME, RepairCorruptedCleanFilesProcedure.builder)
mapBuilder.put(RepairDeduplicateProcedure.NAME, RepairDeduplicateProcedure.builder)
mapBuilder.put(RepairMigratePartitionMetaProcedure.NAME, RepairMigratePartitionMetaProcedure.builder)
mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, RepairOverwriteHoodiePropsProcedure.builder)
mapBuilder.put(RunCleanProcedure.NAME, RunCleanProcedure.builder)
mapBuilder.put(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
mapBuilder.put(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
mapBuilder.build
private def initProcedureBuilders: Map[String, Supplier[ProcedureBuilder]] = {
Map((RunCompactionProcedure.NAME, RunCompactionProcedure.builder)
,(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder)
,(CreateSavepointProcedure.NAME, CreateSavepointProcedure.builder)
,(DeleteSavepointProcedure.NAME, DeleteSavepointProcedure.builder)
,(RollbackToSavepointProcedure.NAME, RollbackToSavepointProcedure.builder)
,(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder)
,(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
,(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
,(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
,(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
,(ShowArchivedCommitsProcedure.NAME, ShowArchivedCommitsProcedure.builder)
,(ShowArchivedCommitsMetadataProcedure.NAME, ShowArchivedCommitsMetadataProcedure.builder)
,(ShowCommitFilesProcedure.NAME, ShowCommitFilesProcedure.builder)
,(ShowCommitPartitionsProcedure.NAME, ShowCommitPartitionsProcedure.builder)
,(ShowCommitWriteStatsProcedure.NAME, ShowCommitWriteStatsProcedure.builder)
,(CommitsCompareProcedure.NAME, CommitsCompareProcedure.builder)
,(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder)
,(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
,(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
,(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
,(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
,(ShowAllFileSystemViewProcedure.NAME, ShowAllFileSystemViewProcedure.builder)
,(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder)
,(ShowHoodieLogFileMetadataProcedure.NAME, ShowHoodieLogFileMetadataProcedure.builder)
,(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder)
,(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder)
,(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
,(HdfsParquetImportProcedure.NAME, HdfsParquetImportProcedure.builder)
,(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
,(ShowBootstrapMappingProcedure.NAME, ShowBootstrapMappingProcedure.builder)
,(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
,(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
,(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
,(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder)
,(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder)
,(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder)
,(DeleteMetadataTableProcedure.NAME, DeleteMetadataTableProcedure.builder)
,(InitMetadataTableProcedure.NAME, InitMetadataTableProcedure.builder)
,(ShowMetadataTableStatsProcedure.NAME, ShowMetadataTableStatsProcedure.builder)
,(ValidateMetadataTableFilesProcedure.NAME, ValidateMetadataTableFilesProcedure.builder)
,(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
,(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
,(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder)
,(RepairCorruptedCleanFilesProcedure.NAME, RepairCorruptedCleanFilesProcedure.builder)
,(RepairDeduplicateProcedure.NAME, RepairDeduplicateProcedure.builder)
,(RepairMigratePartitionMetaProcedure.NAME, RepairMigratePartitionMetaProcedure.builder)
,(RepairOverwriteHoodiePropsProcedure.NAME, RepairOverwriteHoodiePropsProcedure.builder)
,(RunCleanProcedure.NAME, RunCleanProcedure.builder)
,(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
,(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.Lists
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.model.{BootstrapFileMapping, HoodieFileGroupId}
import org.apache.hudi.common.table.HoodieTableMetaClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.Lists
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
Expand All @@ -30,7 +29,8 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.{Function, Supplier}
import java.util.stream.Collectors
import scala.collection.JavaConverters.{asJavaIteratorConverter, asScalaIteratorConverter}
import scala.collection.JavaConversions
import scala.collection.JavaConverters.{asJavaIterableConverter, asJavaIteratorConverter, asScalaIteratorConverter}

class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS_ALL: Array[ProcedureParameter] = Array[ProcedureParameter](
Expand Down Expand Up @@ -118,12 +118,14 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit
metaClient.getActiveTimeline.getInstantDetails(instant)
}
}
val filteredTimeline = new HoodieDefaultTimeline(Lists.newArrayList(instants.asJava).stream(), details)

val filteredTimeline = new HoodieDefaultTimeline(
new java.util.ArrayList[HoodieInstant](JavaConversions.asJavaCollection(instants.toList)).stream(), details)
new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new Array[FileStatus](0)))
}

private def showAllFileSlices(fsView: HoodieTableFileSystemView): java.util.List[Row] = {
val rows: java.util.List[Row] = Lists.newArrayList()
val rows: java.util.List[Row] = new java.util.ArrayList[Row]
fsView.getAllFileGroups.iterator().asScala.foreach(fg => {
fg.getAllFileSlices.iterator().asScala.foreach(fs => {
val fileId = fg.getFileGroupId.getFileId
Expand Down Expand Up @@ -161,7 +163,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit
maxInstant
})
}
val rows: java.util.List[Row] = Lists.newArrayList()
val rows: java.util.List[Row] = new java.util.ArrayList[Row]
fileSliceStream.iterator().asScala.foreach {
fs => {
val fileId = fs.getFileId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.Lists
import org.apache.hadoop.fs.{ContentSummary, FileStatus, Path}
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -58,7 +57,7 @@ class ShowFsPathDetailProcedure extends BaseProcedure with ProcedureBuilder {
val path: Path = new Path(srcPath)
val fs = FSUtils.getFs(path, jsc.hadoopConfiguration())
val status: Array[FileStatus] = if (isSub) fs.listStatus(path) else fs.globStatus(path)
val rows: java.util.List[Row] = Lists.newArrayList()
val rows: java.util.List[Row] = new java.util.ArrayList[Row]()

if (status.nonEmpty) {
for (i <- status.indices) {
Expand Down
Loading

0 comments on commit dc41a68

Please sign in to comment.