Skip to content

Commit

Permalink
Merge pull request #424 from atlanhq/DVX-169
Browse files Browse the repository at this point in the history
Changes delta calculation for improved speed
  • Loading branch information
cmgrote authored Jan 12, 2024
2 parents 864d053 + 34597e2 commit 5bb65ee
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
import com.atlan.Atlan
import com.atlan.pkg.CustomPackage
import com.atlan.pkg.config.model.ui.UIConfig
import com.atlan.pkg.config.model.ui.UIRule
import com.atlan.pkg.config.model.ui.UIStep
import com.atlan.pkg.config.model.workflow.WorkflowOutputs
import com.atlan.pkg.config.widgets.BooleanInput
import com.atlan.pkg.config.widgets.DropDown
import com.atlan.pkg.config.widgets.FileUploader
import com.atlan.pkg.config.widgets.Radio
import com.atlan.pkg.config.widgets.TextInput
import com.atlan.pkg.rab.Importer

/**
Expand Down Expand Up @@ -71,54 +69,6 @@ object PackageConfig : CustomPackage(
),
),
),
UIStep(
title = "Delta",
description = "Removal details",
inputs = mapOf(
"delete_assets" to Radio(
label = "Delete untouched assets?",
required = true,
possibleValues = mapOf(
"NONE" to "No",
"SOFT" to "Archive (recoverable)",
"PURGE" to "Purge (permanent)",
),
default = "NONE",
help = "If this is a complete upload, use this to delete any assets not touched by the upload.",
),
"deletion_prefix" to TextInput(
label = "Prefix",
required = true,
help = "Prefix for qualifiedNames to consider for deletion, if deleting untouched assets.",
grid = 3,
),
"asset_types" to DropDown(
label = "Delete assets of type",
required = true,
possibleValues = mapOf(
"Database" to "Database",
"Schema" to "Schema",
"Table" to "Table",
"View" to "View",
"MaterialisedView" to "Materialized View",
"Column" to "Column",
),
multiSelect = true,
help = "Only remove assets of these types, if deleting untouched assets.",
grid = 5,
),
),
),
),
rules = listOf(
UIRule(
whenInputs = mapOf("delete_assets" to "SOFT"),
required = listOf("deletion_prefix", "asset_types"),
),
UIRule(
whenInputs = mapOf("delete_assets" to "PURGE"),
required = listOf("deletion_prefix", "asset_types"),
),
),
),
containerImage = "ghcr.io/atlanhq/csa-relational-assets-builder:${Atlan.VERSION}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,4 @@ data class RelationalAssetsBuilderCfg(
@JsonProperty("assets_attr_to_overwrite") val assetsAttrToOverwrite: List<String>? = null,
@JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String? = null,
@JsonProperty("assets_fail_on_errors") val assetsFailOnErrors: Boolean? = null,
@JsonProperty("delete_assets") val deleteAssets: String? = null,
@JsonProperty("deletion_prefix") val deletionPrefix: String? = null,
@JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class)
@JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class)
@JsonProperty("asset_types") val assetTypes: List<String>? = null,
) : CustomConfig()
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ abstract class AssetImporter(
companion object {
const val ENTITY_NAME = "entityName"

/**
* Build a connection identity from an asset's tenant-agnostic qualifiedName.
*
* @param agnosticQualifiedName the tenant-agnostic qualifiedName of an asset
* @return connection identity used for that asset
*/
fun getConnectionIdentityFromQN(agnosticQualifiedName: String): ConnectionIdentity? {
val tokens = agnosticQualifiedName.split("/")
return if (tokens.size > 1) {
ConnectionIdentity(tokens[0], tokens[1])
} else {
null
}
}

/**
* Calculate the qualifiedName components from a row of data, completely in-memory (no calls to Atlan).
*
Expand All @@ -79,7 +94,7 @@ abstract class AssetImporter(
Connection.TYPE_NAME -> {
val connection = row[header.indexOf(Asset.CONNECTION_NAME.atlanFieldName)]
val connector = row[header.indexOf(ConnectionImporter.CONNECTOR_TYPE)]
current = "$connection/$connector"
current = ConnectionIdentity(connection, connector).toString()
parent = null
}
Database.TYPE_NAME -> {
Expand Down Expand Up @@ -121,4 +136,10 @@ abstract class AssetImporter(
val parentUniqueQN: String,
val parentPartialQN: String,
)

data class ConnectionIdentity(val name: String, val type: String) {
override fun toString(): String {
return "$name/$type"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,72 +6,204 @@ import com.atlan.Atlan
import com.atlan.model.assets.Asset
import com.atlan.model.enums.AtlanDeleteType
import com.atlan.model.search.FluentSearch
import com.atlan.pkg.serde.RowDeserialization.AssetIdentity
import de.siegmar.fastcsv.reader.CsvReader
import de.siegmar.fastcsv.reader.CsvRow
import mu.KLogger
import java.io.IOException
import java.nio.file.Paths
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentHashMap.KeySetView
import kotlin.math.min
import kotlin.math.round

/**
* Remove assets from Atlan if they exist in Atlan but were entirely absent from an import.
* Utility class to help pre-calculate a delta between full-load files.
* When we receive files that contain a full load (refresh) of an entire set of assets, it can
* be very time-consuming to figure out which assets to remove. This utility class tackles the
* problem by comparing the latest input file with a previously-loaded input file in order to
* calculate which assets appeared in the previously-loaded input file which no longer appear in
* the latest input file. This is done entirely independently of any calls to Atlan itself,
* thereby reducing time to calculate which assets should be removed.
*
* @param touchedGuids GUIDs of assets that were present in the import (any matches to this set will NOT be removed)
* @param removeTypes names of asset types that should be deleted, if untouched
* @param removalPrefix qualifiedName prefix that must match an untouched asset for it to be deleted
* @param purge if true, any untouched asset that matches will be permanently deleted (otherwise only archived)
* @param connectionsMap a mapping from tenant-agnostic connection identity to tenant-specific qualifiedName for the connection
* @param logger for tracking status and documenting any errors
* @param removeTypes names of asset types that should be considered for deleted (default: all)
* @param removalPrefix qualifiedName prefix that must match an asset for it to be deleted (default: all)
* @param purge if true, any asset that matches will be permanently deleted (otherwise, default: only archived)
*/
class AssetRemover(
private val touchedGuids: Set<String>,
private val removeTypes: List<String>,
private val removalPrefix: String,
private val purge: Boolean,
private val connectionsMap: Map<AssetImporter.ConnectionIdentity, String>,
private val logger: KLogger,
private val removeTypes: List<String> = listOf(),
private val removalPrefix: String = "",
private val purge: Boolean = false,
) {
private val untouched = ConcurrentHashMap.newKeySet<String>()
private val client = Atlan.getDefaultClient()
val assetsToDelete: KeySetView<AssetIdentity, Boolean> = ConcurrentHashMap.newKeySet()
private val guidsToDelete = ConcurrentHashMap.newKeySet<String>()

companion object {
private const val QUERY_BATCH = 50
private const val DELETION_BATCH = 20
}

/**
* Actually run the removal of any untouched assets.
* Calculate which qualifiedNames should be deleted, by determining which qualifiedNames
* appear in the previousFile that no longer appear in the currentFile.
*
* @param currentFile the latest file that should be loaded
* @param previousFile the previous file that was loaded for the same assets
* @throws IOException if there is no typeName column in the CSV file
*/
fun run() {
findUntouched()
deleteUntouched()
@Throws(IOException::class)
fun calculateDeletions(currentFile: String, previousFile: String) {
logger.info { " --- Calculating delta... ---" }
logger.info { " ... latest file: $currentFile" }
logger.info { " ... previous file: $previousFile" }
val currentIdentities = getAssetIdentities(currentFile)
val previousIdentities = getAssetIdentities(previousFile)
previousIdentities.forEach {
if (!currentIdentities.contains(it)) {
assetsToDelete.add(it)
}
}
}

/**
* Find all untouched assets within the parameters provided (prefix and asset types)
* Indicates whether any assets have been identified to delete.
*
* @return true if there is at least one asset to be deleted, otherwise false
*/
private fun findUntouched() {
logger.info { " --- Determining which assets to delete... ---" }
// Include archived in case we are rerunning with a PURGE semantic
client.assets.select(true)
.where(FluentSearch.assetTypes(removeTypes))
.where(Asset.QUALIFIED_NAME.startsWith(removalPrefix))
.stream(true)
.forEach { asset ->
val candidate = asset.guid
if (!touchedGuids.contains(candidate)) {
untouched.add(candidate)
}
fun hasAnythingToDelete(): Boolean {
return assetsToDelete.isNotEmpty()
}

/**
* Actually run the removal of any assets identified for deletion.
*/
fun deleteAssets() {
translateToGuids()
deleteAssetsByGuid()
}

/**
* Create a set of unique asset identities that appear in the provided file.
*
* @param filename to a CSV file having at least a typeName field
* @return set of unique asset identities present in the file
* @throws IOException if there is no typeName column in the CSV file
*/
@Throws(IOException::class)
private fun getAssetIdentities(filename: String): Set<AssetIdentity> {
val inputFile = Paths.get(filename)
val builder = CsvReader.builder()
.fieldSeparator(',')
.quoteCharacter('"')
.skipEmptyRows(true)
.errorOnDifferentFieldCount(true)
val header = builder.build(inputFile).use { tmp ->
tmp.stream().findFirst().map { obj: CsvRow -> obj.fields }
.orElse(emptyList())
}
val typeIdx = header.indexOf(Asset.TYPE_NAME.atlanFieldName)
if (typeIdx < 0) {
throw IOException(
"Unable to find the column 'typeName'. This is a mandatory column in the input CSV.",
)
}
val reader = builder.build(inputFile)
val set = mutableSetOf<AssetIdentity>()
reader.stream().skip(1).parallel().forEach { r: CsvRow ->
val values = r.fields
val typeName = values[typeIdx]!!
val qnDetails = AssetImporter.getQualifiedNameDetails(values, header, typeName)
val agnosticQN = qnDetails.uniqueQN
val connectionIdentity = AssetImporter.getConnectionIdentityFromQN(agnosticQN)
if (connectionIdentity != null && connectionsMap.containsKey(connectionIdentity)) {
val qualifiedName =
agnosticQN.replaceFirst(connectionIdentity.toString(), connectionsMap[connectionIdentity]!!)
set.add(AssetIdentity(typeName, qualifiedName))
} else {
logger.warn { "Unknown connection used in asset -- skipping: $agnosticQN" }
}
}
return set
}

/**
* Translate all assets to delete to their GUIDs
*/
private fun translateToGuids() {
logger.info { " --- Translating qualifiedNames to GUIDs... ---" }
// Skip archived assets, as they're already deleted (leave it to separate process to
// purge them, if desired)
val totalToTranslate = assetsToDelete.size
val qualifiedNamesToDelete = assetsToDelete.map { it.qualifiedName }.toList()
if (totalToTranslate < QUERY_BATCH) {
translate(qualifiedNamesToDelete)
} else {
for (i in 0..totalToTranslate step QUERY_BATCH) {
logger.info { " ... next batch of $QUERY_BATCH (${round((i.toDouble() / totalToTranslate) * 100)}%)" }
val sublist = qualifiedNamesToDelete.subList(i, min(i + QUERY_BATCH, totalToTranslate))
translate(sublist)
}
}
}

/**
* Translate a specific list of qualifiedNames to GUIDs.
*
* @param qualifiedNamesToDelete the list of qualifiedNames to query all at the same time
*/
private fun translate(qualifiedNamesToDelete: List<String>) {
val builder = client.assets.select()
.pageSize(QUERY_BATCH)
.where(Asset.QUALIFIED_NAME.`in`(qualifiedNamesToDelete))
if (removeTypes.isNotEmpty()) {
builder.where(FluentSearch.assetTypes(removeTypes))
}
if (removalPrefix.isNotBlank()) {
builder.where(Asset.QUALIFIED_NAME.startsWith(removalPrefix))
}
val response = builder
.toRequestBuilder()
.excludeMeanings(true)
.excludeAtlanTags(true)
.build()
.search()
response.forEach { validateResult(it) }
}

/**
* Ensure the asset provided is one that we can delete (checking BOTH
* typeName and qualifiedName match expectations), and if so, track its
* GUID for deletion.
*
* @param asset to validate can be deleted, and if so, to track its GUID for deletion
*/
private fun validateResult(asset: Asset) {
val candidate = AssetIdentity(asset.typeName, asset.qualifiedName)
if (assetsToDelete.contains(candidate)) {
guidsToDelete.add(asset.guid)
}
}

/**
* Delete all untouched assets we have identified, in batches of 20 at a time.
* Delete all assets we have identified for deletion, in batches of 20 at a time.
*/
private fun deleteUntouched() {
if (untouched.isNotEmpty()) {
private fun deleteAssetsByGuid() {
if (guidsToDelete.isNotEmpty()) {
val deletionType = if (purge) AtlanDeleteType.PURGE else AtlanDeleteType.SOFT
val guidList = untouched.toList()
val guidList = guidsToDelete.toList()
val totalToDelete = guidList.size
logger.info { " --- Deleting ($deletionType) $totalToDelete assets across $removeTypes... ---" }
val batchSize = 20
if (totalToDelete < batchSize) {
if (totalToDelete < DELETION_BATCH) {
client.assets.delete(guidList, deletionType)
} else {
for (i in 0..totalToDelete step batchSize) {
logger.info { " ... next batch of $batchSize (${round((i.toDouble() / totalToDelete) * 100)}%)" }
val sublist = guidList.subList(i, min(i + batchSize, totalToDelete))
for (i in 0..totalToDelete step DELETION_BATCH) {
logger.info { " ... next batch of $DELETION_BATCH (${round((i.toDouble() / totalToDelete) * 100)}%)" }
val sublist = guidList.subList(i, min(i + DELETION_BATCH, totalToDelete))
client.assets.delete(sublist, deletionType)
}
}
Expand Down
Loading

0 comments on commit 5bb65ee

Please sign in to comment.