Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4482] remove guava and use caffeine instead for cache #6240

Merged
merged 1 commit into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi.command.payload

import com.google.common.cache.{Cache, CacheBuilder}
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
Expand All @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hudi.command.payload.SqlTypedRecord.{getAvroDeserializer, getSqlType}
import org.apache.spark.sql.types.StructType

import java.util.concurrent.Callable
import java.util.function.Function

/**
* A sql typed record which will convert the avro field to sql typed value.
Expand All @@ -49,30 +49,24 @@ class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {

object SqlTypedRecord {

private val sqlTypeCache = CacheBuilder.newBuilder().build[Schema, StructType]()
KnightChess marked this conversation as resolved.
Show resolved Hide resolved
private val sqlTypeCache = Caffeine.newBuilder()
.maximumSize(16).build[Schema, StructType]

private val avroDeserializerCacheLocal = new ThreadLocal[Cache[Schema, HoodieAvroDeserializer]] {
override def initialValue(): Cache[Schema, HoodieAvroDeserializer] =
CacheBuilder.newBuilder().build[Schema, HoodieAvroDeserializer]()
override def initialValue(): Cache[Schema, HoodieAvroDeserializer] = {
Caffeine.newBuilder().maximumSize(16).build[Schema, HoodieAvroDeserializer]
}
}

def getSqlType(schema: Schema): StructType = {
sqlTypeCache.get(schema, new Callable[StructType] {
override def call(): StructType = {
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
sqlTypeCache.put(schema, structType)
structType
}
sqlTypeCache.get(schema, new Function[Schema, StructType] {
override def apply(t: Schema): StructType = AvroConversionUtils.convertAvroSchemaToStructType(t)
})
}

def getAvroDeserializer(schema: Schema): HoodieAvroDeserializer= {
avroDeserializerCacheLocal.get().get(schema, new Callable[HoodieAvroDeserializer] {
override def call(): HoodieAvroDeserializer = {
val deserializer = sparkAdapter.createAvroDeserializer(schema, getSqlType(schema))
avroDeserializerCacheLocal.get().put(schema, deserializer)
deserializer
}
avroDeserializerCacheLocal.get().get(schema, new Function[Schema, HoodieAvroDeserializer] {
override def apply(t: Schema): HoodieAvroDeserializer = sparkAdapter.createAvroDeserializer(t, getSqlType(t))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi.command.payload

import com.google.common.cache.CacheBuilder
import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.hudi.AvroConversionUtils
Expand All @@ -32,11 +32,11 @@ import org.apache.hudi.sql.IExpressionEvaluator
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, setWriteSchema, getMergedSchema}
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema, setWriteSchema}
import org.apache.spark.sql.types.{StructField, StructType}

import java.util.concurrent.Callable
import java.util.{Base64, Properties}
import java.util.function.Function
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -270,19 +270,19 @@ object ExpressionPayload {
* The Map[IExpressionEvaluator, IExpressionEvaluator] is the map of the condition expression
* to the assignments expression.
*/
private val cache = CacheBuilder.newBuilder()
private val cache = Caffeine.newBuilder()
.maximumSize(1024)
.build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()

private val writeSchemaCache = CacheBuilder.newBuilder().build[String, Schema]()
private val writeSchemaCache = Caffeine.newBuilder()
.maximumSize(16).build[String, Schema]()

def setWriteSchema(properties: Properties): Schema = {
ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
writeSchemaCache.get(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key),
new Callable[Schema] {
override def call(): Schema =
new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
new Function[String, Schema] {
override def apply(t: String): Schema = new Schema.Parser().parse(t)
})
}

Expand All @@ -293,10 +293,9 @@ object ExpressionPayload {
def getEvaluator(
serializedConditionAssignments: String, writeSchema: Schema): Map[IExpressionEvaluator, IExpressionEvaluator] = {
cache.get(serializedConditionAssignments,
new Callable[Map[IExpressionEvaluator, IExpressionEvaluator]] {

override def call(): Map[IExpressionEvaluator, IExpressionEvaluator] = {
val serializedBytes = Base64.getDecoder.decode(serializedConditionAssignments)
new Function[String, Map[IExpressionEvaluator, IExpressionEvaluator]] {
override def apply(t: String): Map[IExpressionEvaluator, IExpressionEvaluator] = {
val serializedBytes = Base64.getDecoder.decode(t)
val conditionAssignments = SerDeUtils.toObject(serializedBytes)
.asInstanceOf[Map[Expression, Seq[Expression]]]
// Do the CodeGen for condition expression and assignment expression
Expand All @@ -316,14 +315,14 @@ object ExpressionPayload {
})
}

private val mergedSchemaCache = CacheBuilder.newBuilder().build[TupleSchema, Schema]()
private val mergedSchemaCache = Caffeine.newBuilder().maximumSize(16).build[TupleSchema, Schema]()

def getMergedSchema(source: Schema, target: Schema): Schema = {

mergedSchemaCache.get(TupleSchema(source, target), new Callable[Schema] {
override def call(): Schema = {
val rightSchema = HoodieAvroUtils.removeMetadataFields(target)
mergeSchema(source, rightSchema)
mergedSchemaCache.get(TupleSchema(source, target), new Function[TupleSchema, Schema] {
override def apply(t: TupleSchema): Schema = {
val rightSchema = HoodieAvroUtils.removeMetadataFields(t.second)
mergeSchema(t.first, rightSchema)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,72 +17,68 @@

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.ImmutableMap

import java.util
import java.util.Locale
import java.util.function.Supplier

object HoodieProcedures {
private val BUILDERS: util.Map[String, Supplier[ProcedureBuilder]] = initProcedureBuilders
private val BUILDERS: Map[String, Supplier[ProcedureBuilder]] = initProcedureBuilders

def newBuilder(name: String): ProcedureBuilder = {
val builderSupplier: Supplier[ProcedureBuilder] = BUILDERS.get(name.toLowerCase(Locale.ROOT))
if (builderSupplier != null) builderSupplier.get else null
val builderSupplier = BUILDERS.get(name.toLowerCase(Locale.ROOT))
if (builderSupplier.isDefined) builderSupplier.get.get() else null
}

private def initProcedureBuilders: util.Map[String, Supplier[ProcedureBuilder]] = {
val mapBuilder: ImmutableMap.Builder[String, Supplier[ProcedureBuilder]] = ImmutableMap.builder()
mapBuilder.put(RunCompactionProcedure.NAME, RunCompactionProcedure.builder)
mapBuilder.put(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder)
mapBuilder.put(CreateSavepointProcedure.NAME, CreateSavepointProcedure.builder)
mapBuilder.put(DeleteSavepointProcedure.NAME, DeleteSavepointProcedure.builder)
mapBuilder.put(RollbackToSavepointProcedure.NAME, RollbackToSavepointProcedure.builder)
mapBuilder.put(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder)
mapBuilder.put(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
mapBuilder.put(ShowArchivedCommitsProcedure.NAME, ShowArchivedCommitsProcedure.builder)
mapBuilder.put(ShowArchivedCommitsMetadataProcedure.NAME, ShowArchivedCommitsMetadataProcedure.builder)
mapBuilder.put(ShowCommitFilesProcedure.NAME, ShowCommitFilesProcedure.builder)
mapBuilder.put(ShowCommitPartitionsProcedure.NAME, ShowCommitPartitionsProcedure.builder)
mapBuilder.put(ShowCommitWriteStatsProcedure.NAME, ShowCommitWriteStatsProcedure.builder)
mapBuilder.put(CommitsCompareProcedure.NAME, CommitsCompareProcedure.builder)
mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder)
mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
mapBuilder.put(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
mapBuilder.put(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
mapBuilder.put(ShowAllFileSystemViewProcedure.NAME, ShowAllFileSystemViewProcedure.builder)
mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder)
mapBuilder.put(ShowHoodieLogFileMetadataProcedure.NAME, ShowHoodieLogFileMetadataProcedure.builder)
mapBuilder.put(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder)
mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder)
mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
mapBuilder.put(HdfsParquetImportProcedure.NAME, HdfsParquetImportProcedure.builder)
mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
mapBuilder.put(ShowBootstrapMappingProcedure.NAME, ShowBootstrapMappingProcedure.builder)
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
mapBuilder.put(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
mapBuilder.put(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder)
mapBuilder.put(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder)
mapBuilder.put(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder)
mapBuilder.put(DeleteMetadataTableProcedure.NAME, DeleteMetadataTableProcedure.builder)
mapBuilder.put(InitMetadataTableProcedure.NAME, InitMetadataTableProcedure.builder)
mapBuilder.put(ShowMetadataTableStatsProcedure.NAME, ShowMetadataTableStatsProcedure.builder)
mapBuilder.put(ValidateMetadataTableFilesProcedure.NAME, ValidateMetadataTableFilesProcedure.builder)
mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
mapBuilder.put(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder)
mapBuilder.put(RepairCorruptedCleanFilesProcedure.NAME, RepairCorruptedCleanFilesProcedure.builder)
mapBuilder.put(RepairDeduplicateProcedure.NAME, RepairDeduplicateProcedure.builder)
mapBuilder.put(RepairMigratePartitionMetaProcedure.NAME, RepairMigratePartitionMetaProcedure.builder)
mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, RepairOverwriteHoodiePropsProcedure.builder)
mapBuilder.put(RunCleanProcedure.NAME, RunCleanProcedure.builder)
mapBuilder.put(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
mapBuilder.put(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
mapBuilder.build
private def initProcedureBuilders: Map[String, Supplier[ProcedureBuilder]] = {
Map((RunCompactionProcedure.NAME, RunCompactionProcedure.builder)
,(ShowCompactionProcedure.NAME, ShowCompactionProcedure.builder)
,(CreateSavepointProcedure.NAME, CreateSavepointProcedure.builder)
,(DeleteSavepointProcedure.NAME, DeleteSavepointProcedure.builder)
,(RollbackToSavepointProcedure.NAME, RollbackToSavepointProcedure.builder)
,(RollbackToInstantTimeProcedure.NAME, RollbackToInstantTimeProcedure.builder)
,(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
,(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
,(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
,(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
,(ShowArchivedCommitsProcedure.NAME, ShowArchivedCommitsProcedure.builder)
,(ShowArchivedCommitsMetadataProcedure.NAME, ShowArchivedCommitsMetadataProcedure.builder)
,(ShowCommitFilesProcedure.NAME, ShowCommitFilesProcedure.builder)
,(ShowCommitPartitionsProcedure.NAME, ShowCommitPartitionsProcedure.builder)
,(ShowCommitWriteStatsProcedure.NAME, ShowCommitWriteStatsProcedure.builder)
,(CommitsCompareProcedure.NAME, CommitsCompareProcedure.builder)
,(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder)
,(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
,(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
,(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
,(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
,(ShowAllFileSystemViewProcedure.NAME, ShowAllFileSystemViewProcedure.builder)
,(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder)
,(ShowHoodieLogFileMetadataProcedure.NAME, ShowHoodieLogFileMetadataProcedure.builder)
,(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder)
,(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder)
,(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
,(HdfsParquetImportProcedure.NAME, HdfsParquetImportProcedure.builder)
,(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
,(ShowBootstrapMappingProcedure.NAME, ShowBootstrapMappingProcedure.builder)
,(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
,(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
,(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
,(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder)
,(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder)
,(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder)
,(DeleteMetadataTableProcedure.NAME, DeleteMetadataTableProcedure.builder)
,(InitMetadataTableProcedure.NAME, InitMetadataTableProcedure.builder)
,(ShowMetadataTableStatsProcedure.NAME, ShowMetadataTableStatsProcedure.builder)
,(ValidateMetadataTableFilesProcedure.NAME, ValidateMetadataTableFilesProcedure.builder)
,(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
,(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
,(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder)
,(RepairCorruptedCleanFilesProcedure.NAME, RepairCorruptedCleanFilesProcedure.builder)
,(RepairDeduplicateProcedure.NAME, RepairDeduplicateProcedure.builder)
,(RepairMigratePartitionMetaProcedure.NAME, RepairMigratePartitionMetaProcedure.builder)
,(RepairOverwriteHoodiePropsProcedure.NAME, RepairOverwriteHoodiePropsProcedure.builder)
,(RunCleanProcedure.NAME, RunCleanProcedure.builder)
,(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
,(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.Lists
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.model.{BootstrapFileMapping, HoodieFileGroupId}
import org.apache.hudi.common.table.HoodieTableMetaClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.Lists
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
Expand All @@ -30,7 +29,8 @@ import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.{Function, Supplier}
import java.util.stream.Collectors
import scala.collection.JavaConverters.{asJavaIteratorConverter, asScalaIteratorConverter}
import scala.collection.JavaConversions
import scala.collection.JavaConverters.{asJavaIterableConverter, asJavaIteratorConverter, asScalaIteratorConverter}

class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS_ALL: Array[ProcedureParameter] = Array[ProcedureParameter](
Expand Down Expand Up @@ -118,12 +118,14 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit
metaClient.getActiveTimeline.getInstantDetails(instant)
}
}
val filteredTimeline = new HoodieDefaultTimeline(Lists.newArrayList(instants.asJava).stream(), details)

val filteredTimeline = new HoodieDefaultTimeline(
new java.util.ArrayList[HoodieInstant](JavaConversions.asJavaCollection(instants.toList)).stream(), details)
new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new Array[FileStatus](0)))
}

private def showAllFileSlices(fsView: HoodieTableFileSystemView): java.util.List[Row] = {
val rows: java.util.List[Row] = Lists.newArrayList()
val rows: java.util.List[Row] = new java.util.ArrayList[Row]
fsView.getAllFileGroups.iterator().asScala.foreach(fg => {
fg.getAllFileSlices.iterator().asScala.foreach(fs => {
val fileId = fg.getFileGroupId.getFileId
Expand Down Expand Up @@ -161,7 +163,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit
maxInstant
})
}
val rows: java.util.List[Row] = Lists.newArrayList()
val rows: java.util.List[Row] = new java.util.ArrayList[Row]
fileSliceStream.iterator().asScala.foreach {
fs => {
val fileId = fs.getFileId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hudi.command.procedures

import com.google.common.collect.Lists
import org.apache.hadoop.fs.{ContentSummary, FileStatus, Path}
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -58,7 +57,7 @@ class ShowFsPathDetailProcedure extends BaseProcedure with ProcedureBuilder {
val path: Path = new Path(srcPath)
val fs = FSUtils.getFs(path, jsc.hadoopConfiguration())
val status: Array[FileStatus] = if (isSub) fs.listStatus(path) else fs.globStatus(path)
val rows: java.util.List[Row] = Lists.newArrayList()
val rows: java.util.List[Row] = new java.util.ArrayList[Row]()

if (status.nonEmpty) {
for (i <- status.indices) {
Expand Down
Loading