Skip to content

Commit

Permalink
Merge pull request #418 from atlanhq/DVX-169
Browse files Browse the repository at this point in the history
Experiment to allow delta detection and asset removal
  • Loading branch information
cmgrote authored Jan 10, 2024
2 parents f0bcf34 + 7730f3e commit 7488711
Show file tree
Hide file tree
Showing 14 changed files with 385 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ interface AssetGenerator {
* This will be called with the results of any created assets after processing
* a pass.
*
* @param map from GUID to asset that was created
* @param list of assets that were created (minimal info for each)
*/
fun cacheCreated(map: Map<String, Asset>) {
fun cacheCreated(list: List<Asset>) {
// Do nothing, by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ abstract class CSVImporter(
* Actually run the import.
*
* @param columnsToSkip (optional) columns in the CSV file to skip when loading (primarily useful for multi-pass loads)
* @return details about the results of the import
*/
open fun import(columnsToSkip: Set<String> = setOf()) {
open fun import(columnsToSkip: Set<String> = setOf()): ImportResults? {
CSVReader(filename, updateOnly).use { csv ->
val start = System.currentTimeMillis()
val anyFailures = csv.streamRows(this, batchSize, logger, columnsToSkip)
val results = csv.streamRows(this, batchSize, logger, columnsToSkip)
logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" }
if (anyFailures) {
if (results.anyFailures) {
logger.error { "Some errors detected, failing the workflow." }
exitProcess(1)
}
cacheCreated(csv.created)
cacheCreated(results.primary.created)
return results
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class CSVReader @JvmOverloads constructor(
private val typeIdx: Int
private val qualifiedNameIdx: Int

val created: ConcurrentHashMap<String, Asset>

init {
val inputFile = Paths.get(path)
val builder = CsvReader.builder()
Expand All @@ -62,7 +60,6 @@ class CSVReader @JvmOverloads constructor(
"Unable to find the column 'typeName'. This is a mandatory column in the input CSV.",
)
}
created = ConcurrentHashMap()
reader = builder.build(inputFile)
counter = builder.build(inputFile)
}
Expand All @@ -77,9 +74,9 @@ class CSVReader @JvmOverloads constructor(
* @param batchSize maximum number of Assets to bulk-save in Atlan per API request
* @param logger through which to report the overall progress
* @param skipColumns columns to skip during the processing (i.e. where they need to be processed in a later pass)
* @return true if all rows were processed successfully, or false if there were any failures
* @return details of the results of the import
*/
fun streamRows(rowToAsset: AssetGenerator, batchSize: Int, logger: KLogger, skipColumns: Set<String> = setOf()): Boolean {
fun streamRows(rowToAsset: AssetGenerator, batchSize: Int, logger: KLogger, skipColumns: Set<String> = setOf()): ImportResults {
val client = Atlan.getDefaultClient()
val primaryBatch = ParallelBatch(
client,
Expand Down Expand Up @@ -129,9 +126,6 @@ class CSVReader @JvmOverloads constructor(
}
}
primaryBatch.flush()
primaryBatch.created.forEach { asset ->
created[asset.guid] = asset
}
val totalCreates = primaryBatch.created.size
val totalUpdates = primaryBatch.updated.size
val totalSkipped = primaryBatch.skipped.size
Expand All @@ -146,20 +140,36 @@ class CSVReader @JvmOverloads constructor(

// Step 2: load the deferred related assets (and final-flush the main asset batches, too)
val totalRelated = AtomicLong(0)
val relatedCount = AtomicLong(0)
val searchAndDelete = mutableMapOf<String, Set<AtlanField>>()
relatedHolds.values.forEach { b -> totalRelated.getAndAdd(b.relatedMap.size.toLong()) }
logger.info { "Processing $totalRelated total related assets in a second pass." }
relatedHolds.entries.parallelStream().forEach { hold: MutableMap.MutableEntry<String, RelatedAssetHold> ->
val placeholderGuid = hold.key
val relatedAssetHold = hold.value
val resolvedGuid = primaryBatch.resolvedGuids[placeholderGuid]
val resolvedAsset = relatedAssetHold.fromAsset.toBuilder().guid(resolvedGuid).build() as Asset
AssetRefXformer.buildRelated(resolvedAsset, relatedAssetHold.relatedMap, relatedBatch, count, totalRelated, logger, batchSize)
if (!resolvedGuid.isNullOrBlank()) {
val resolvedAsset = relatedAssetHold.fromAsset.toBuilder().guid(resolvedGuid).build() as Asset
AssetRefXformer.buildRelated(
resolvedAsset,
relatedAssetHold.relatedMap,
relatedBatch,
relatedCount,
totalRelated,
logger,
batchSize,
)
} else {
logger.info { " ... skipped related asset as primary asset was skipped (above)." }
relatedCount.getAndIncrement()
}
}
deferDeletes.entries.parallelStream().forEach { delete: MutableMap.MutableEntry<String, Set<AtlanField>> ->
val placeholderGuid = delete.key
val resolvedGuid = primaryBatch.resolvedGuids[placeholderGuid]!!
searchAndDelete[resolvedGuid] = delete.value
val resolvedGuid = primaryBatch.resolvedGuids[placeholderGuid]
if (!resolvedGuid.isNullOrBlank()) {
searchAndDelete[resolvedGuid] = delete.value
}
}

// Step 3: final-flush the deferred related assets
Expand Down Expand Up @@ -208,7 +218,21 @@ class CSVReader @JvmOverloads constructor(
Utils.logProgress(totalScanned, totalToScan, logger, batchSize)
}
logger.info { "Total READMEs deleted: $totalDeleted" }
return someFailure
return ImportResults(
someFailure,
ImportResults.Details(
primaryBatch.resolvedGuids,
primaryBatch.created,
primaryBatch.updated,
primaryBatch.skipped,
),
ImportResults.Details(
relatedBatch.resolvedGuids,
relatedBatch.created,
relatedBatch.updated,
relatedBatch.skipped,
),
)
}

private fun logFailures(b: ParallelBatch, logger: KLogger, totalFailures: AtomicLong) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2022 Atlan Pte. Ltd. */
package com.atlan.pkg.serde.csv

import com.atlan.model.assets.Asset

/**
* Class to capture details about the results of an import.
*
* @param anyFailures if there were 0 failures in the import this will be false, otherwise true
* @param primary details about the primary assets
* @param related details about the related assets (READMEs, links, etc)
*/
data class ImportResults(
val anyFailures: Boolean,
val primary: Details,
val related: Details,
) {
/**
* Combine this set of import results details with another.
*
* @param other the other import results to combine with this one
* @return the combined set of results, as a single set of results
*/
fun combinedWith(other: ImportResults): ImportResults {
return ImportResults(
this.anyFailures || other.anyFailures,
this.primary.combinedWith(other.primary),
this.related.combinedWith(other.related),
)
}

/**
* Details about the import results.
*
* @param guidAssignments mapping from placeholder to actual (resolved) GUIDs, even if no change was made to an asset
* @param created list of (minimal) assets that were created
* @param updated list of (minimal) assets that were updated
* @param skipped list of (minimal) assets that were skipped
*/
data class Details(
val guidAssignments: Map<String, String>,
val created: List<Asset>,
val updated: List<Asset>,
val skipped: List<Asset>,
) {
/**
* Combine this set of details with another.
*
* @param other the other details to combine with this one
* @return the combined set of details, as a single set of details
*/
fun combinedWith(other: Details): Details {
return Details(
this.guidAssignments.plus(other.guidAssignments),
this.created.plus(other.created),
this.updated.plus(other.updated),
this.skipped.plus(other.skipped),
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.atlan.pkg.cache.CategoryCache
import com.atlan.pkg.serde.RowDeserializer
import com.atlan.pkg.serde.cell.GlossaryCategoryXformer.CATEGORY_DELIMITER
import com.atlan.pkg.serde.cell.GlossaryXformer.GLOSSARY_DELIMITER
import com.atlan.pkg.serde.csv.ImportResults
import mu.KotlinLogging
import java.util.concurrent.atomic.AtomicInteger
import kotlin.math.max
Expand Down Expand Up @@ -46,15 +47,22 @@ class CategoryImporter(
private val maxCategoryDepth = AtomicInteger(1)

/** {@inheritDoc} */
override fun import(columnsToSkip: Set<String>) {
override fun import(columnsToSkip: Set<String>): ImportResults? {
cache.preload()
// Import categories by level, top-to-bottom, and stop when we hit a level with no categories
logger.info { "Loading categories in multiple passes, by level..." }
var combinedResults: ImportResults? = null
while (levelToProcess < maxCategoryDepth.get()) {
levelToProcess += 1
logger.info { "--- Loading level $levelToProcess categories... ---" }
super.import(columnsToSkip)
val results = super.import(columnsToSkip)
if (combinedResults == null) {
combinedResults = results
} else if (results != null) {
combinedResults = combinedResults.combinedWith(results)
}
}
return combinedResults
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,16 @@ abstract class GTCImporter(
updateOnly = updateOnly,
batchSize = batchSize,
) {
/**
* Cache any created assets.
*
* @param map from GUID to asset that was created
*/
override fun cacheCreated(map: Map<String, Asset>) {
/** {@inheritDoc} */
override fun cacheCreated(list: List<Asset>) {
// Cache any assets that were created by processing
map.keys.forEach { k ->
list.forEach { asset ->
// We must look up the asset and then cache to ensure we have the necessary identity
// characteristics and status
val result = cache.lookupAssetByGuid(k, maxRetries = 5)
val result = cache.lookupAssetByGuid(asset.guid, maxRetries = 5)
result?.let {
cache.addByGuid(k, result)
} ?: throw IllegalStateException("Result of searching by GUID for $k was null.")
cache.addByGuid(asset.guid, result)
} ?: throw IllegalStateException("Result of searching by GUID for ${asset.guid} was null.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.atlan.pkg.cache.TermCache
import com.atlan.pkg.serde.RowDeserializer
import com.atlan.pkg.serde.cell.GlossaryTermXformer
import com.atlan.pkg.serde.cell.GlossaryXformer
import com.atlan.pkg.serde.csv.ImportResults
import mu.KotlinLogging

/**
Expand Down Expand Up @@ -45,15 +46,24 @@ class TermImporter(
)

/** {@inheritDoc} */
override fun import(columnsToSkip: Set<String>) {
override fun import(columnsToSkip: Set<String>): ImportResults? {
cache.preload()
// Import categories by level, top-to-bottom, and stop when we hit a level with no categories
logger.info { "--- Loading terms in first pass, without term-to-term relationships... ---" }
super.import(GlossaryTermXformer.TERM_TO_TERM_FIELDS)
// In this second pass we need to ignore fields that were loaded in the first pass,
// or we will end up with duplicates (links) or extra audit log messages (tags, README)
logger.info { "--- Loading term-to-term relationships (second pass)... ---" }
super.import(secondPassIgnore)
val firstPassResults = super.import(GlossaryTermXformer.TERM_TO_TERM_FIELDS)
return if (firstPassResults != null) {
// In this second pass we need to ignore fields that were loaded in the first pass,
// or we will end up with duplicates (links) or extra audit log messages (tags, README)
logger.info { "--- Loading term-to-term relationships (second pass)... ---" }
val secondPassResults = super.import(secondPassIgnore)
return if (secondPassResults != null) {
firstPassResults.combinedWith(secondPassResults)
} else {
firstPassResults
}
} else {
null
}
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import com.atlan.Atlan
import com.atlan.pkg.CustomPackage
import com.atlan.pkg.config.model.ui.UIConfig
import com.atlan.pkg.config.model.ui.UIRule
import com.atlan.pkg.config.model.ui.UIStep
import com.atlan.pkg.config.model.workflow.WorkflowOutputs
import com.atlan.pkg.config.widgets.BooleanInput
import com.atlan.pkg.config.widgets.DropDown
import com.atlan.pkg.config.widgets.FileUploader
import com.atlan.pkg.config.widgets.Radio
import com.atlan.pkg.config.widgets.TextInput
import com.atlan.pkg.rab.Importer

/**
Expand Down Expand Up @@ -69,6 +71,54 @@ object PackageConfig : CustomPackage(
),
),
),
UIStep(
title = "Delta",
description = "Removal details",
inputs = mapOf(
"delete_assets" to Radio(
label = "Delete untouched assets?",
required = true,
possibleValues = mapOf(
"NONE" to "No",
"SOFT" to "Archive (recoverable)",
"PURGE" to "Purge (permanent)",
),
default = "NONE",
help = "If this is a complete upload, use this to delete any assets not touched by the upload.",
),
"deletion_prefix" to TextInput(
label = "Prefix",
required = true,
help = "Prefix for qualifiedNames to consider for deletion, if deleting untouched assets.",
grid = 3,
),
"asset_types" to DropDown(
label = "Delete assets of type",
required = true,
possibleValues = mapOf(
"Database" to "Database",
"Schema" to "Schema",
"Table" to "Table",
"View" to "View",
"MaterialisedView" to "Materialized View",
"Column" to "Column",
),
multiSelect = true,
help = "Only remove assets of these types, if deleting untouched assets.",
grid = 5,
),
),
),
),
rules = listOf(
UIRule(
whenInputs = mapOf("delete_assets" to "SOFT"),
required = listOf("deletion_prefix", "asset_types"),
),
UIRule(
whenInputs = mapOf("delete_assets" to "PURGE"),
required = listOf("deletion_prefix", "asset_types"),
),
),
),
containerImage = "ghcr.io/atlanhq/csa-relational-assets-builder:${Atlan.VERSION}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ data class RelationalAssetsBuilderCfg(
@JsonProperty("assets_attr_to_overwrite") val assetsAttrToOverwrite: List<String>?,
@JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String?,
@JsonProperty("assets_fail_on_errors") val assetsFailOnErrors: Boolean?,
@JsonProperty("delete_assets") val deleteAssets: String?,
@JsonProperty("deletion_prefix") val deletionPrefix: String?,
@JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class)
@JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class)
@JsonProperty("asset_types") val assetTypes: List<String>?,
) : CustomConfig()
Loading

0 comments on commit 7488711

Please sign in to comment.