Skip to content

Commit

Permalink
Merge pull request #428 from atlanhq/DVX-119
Browse files Browse the repository at this point in the history
Adds option to skip tracking of created and updated assets when batch processing
  • Loading branch information
cmgrote authored Jan 14, 2024
2 parents 9cc0fbe + 8f7d324 commit 4bc20aa
Show file tree
Hide file tree
Showing 17 changed files with 190 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kotlin.system.exitProcess
* @param attrsToOverwrite list of fields that should be overwritten in Atlan, if their value is empty in the CSV
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
abstract class CSVImporter(
private val filename: String,
Expand All @@ -35,6 +36,7 @@ abstract class CSVImporter(
private val attrsToOverwrite: List<AtlanField> = listOf(),
private val updateOnly: Boolean = false,
private val batchSize: Int = 20,
private val trackBatches: Boolean = true,
) : AssetGenerator {

/**
Expand All @@ -44,15 +46,15 @@ abstract class CSVImporter(
* @return details about the results of the import
*/
open fun import(columnsToSkip: Set<String> = setOf()): ImportResults? {
CSVReader(filename, updateOnly).use { csv ->
CSVReader(filename, updateOnly, trackBatches).use { csv ->
val start = System.currentTimeMillis()
val results = csv.streamRows(this, batchSize, logger, columnsToSkip)
logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" }
if (results.anyFailures) {
logger.error { "Some errors detected, failing the workflow." }
exitProcess(1)
}
cacheCreated(results.primary.created)
cacheCreated(results.primary.created ?: listOf())
return results
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import java.util.concurrent.atomic.AtomicLong
* @param path location and filename of the CSV file to read
* @param updateOnly when true, the reader will first look up assets to ensure they exist (and only update them, never create)
* @param fieldSeparator character to use to separate fields (for example ',' or ';')
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
class CSVReader @JvmOverloads constructor(
path: String,
private val updateOnly: Boolean,
private val trackBatches: Boolean = true,
fieldSeparator: Char = ',',
) : Closeable {

Expand Down Expand Up @@ -85,13 +87,16 @@ class CSVReader @JvmOverloads constructor(
AssetBatch.CustomMetadataHandling.MERGE,
true,
updateOnly,
trackBatches,
)
val relatedBatch = ParallelBatch(
client,
batchSize,
true,
AssetBatch.CustomMetadataHandling.MERGE,
true,
false,
trackBatches,
)
val relatedHolds: MutableMap<String, RelatedAssetHold> = ConcurrentHashMap()
val deferDeletes: MutableMap<String, Set<AtlanField>> = ConcurrentHashMap()
Expand Down Expand Up @@ -126,8 +131,8 @@ class CSVReader @JvmOverloads constructor(
}
}
primaryBatch.flush()
val totalCreates = primaryBatch.created.size
val totalUpdates = primaryBatch.updated.size
val totalCreates = primaryBatch.numCreated
val totalUpdates = primaryBatch.numUpdated
val totalSkipped = primaryBatch.skipped.size
val totalFailures = AtomicLong(0)
someFailure = someFailure || primaryBatch.failures.isNotEmpty()
Expand Down Expand Up @@ -174,8 +179,8 @@ class CSVReader @JvmOverloads constructor(

// Step 3: final-flush the deferred related assets
relatedBatch.flush()
val totalCreatesR = relatedBatch.created.size
val totalUpdatesR = relatedBatch.updated.size
val totalCreatesR = relatedBatch.numCreated
val totalUpdatesR = relatedBatch.numUpdated
val totalFailuresR = AtomicLong(0)
someFailure = someFailure || relatedBatch.failures.isNotEmpty()
logFailures(relatedBatch, logger, totalFailuresR)
Expand Down Expand Up @@ -225,12 +230,16 @@ class CSVReader @JvmOverloads constructor(
primaryBatch.created,
primaryBatch.updated,
primaryBatch.skipped,
primaryBatch.numCreated,
primaryBatch.numUpdated,
),
ImportResults.Details(
relatedBatch.resolvedGuids,
relatedBatch.created,
relatedBatch.updated,
relatedBatch.skipped,
relatedBatch.numCreated,
relatedBatch.numUpdated,
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@ data class ImportResults(
* Details about the import results.
*
* @param guidAssignments mapping from placeholder to actual (resolved) GUIDs, even if no change was made to an asset
* @param created list of (minimal) assets that were created
* @param updated list of (minimal) assets that were updated
* @param created list of (minimal) assets that were created (note: when tracking is turned off in batch-processing, this will be null)
* @param updated list of (minimal) assets that were updated (note: when tracking is turned off in batch-processing, this will be null)
* @param skipped list of (minimal) assets that were skipped
* @param numCreated number of assets that were created (count only)
* @param numUpdated number of assets that were updated (count only)
*/
data class Details(
val guidAssignments: Map<String, String>,
val created: List<Asset>,
val updated: List<Asset>,
val created: List<Asset>?,
val updated: List<Asset>?,
val skipped: List<Asset>,
val numCreated: Long,
val numUpdated: Long,
) {
/**
* Combine this set of details with another.
Expand All @@ -53,9 +57,11 @@ data class ImportResults(
fun combinedWith(other: Details): Details {
return Details(
this.guidAssignments.plus(other.guidAssignments),
this.created.plus(other.created),
this.updated.plus(other.updated),
this.created?.plus(other.created ?: listOf()),
this.updated?.plus(other.updated ?: listOf()),
this.skipped.plus(other.skipped),
this.numCreated.plus(other.numCreated),
this.numUpdated.plus(other.numUpdated),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ object PackageConfig : CustomPackage(
required = false,
help = "Whether an invalid value in a field should cause the import to fail (Yes) or log a warning, skip that value, and proceed (No).",
),
"track_batches" to BooleanInput(
label = "Track asset details",
required = false,
hidden = true,
help = "Whether to track details about every asset across batches (Yes) or only counts (No).",
),
),
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ data class RelationalAssetsBuilderCfg(
@JsonProperty("assets_attr_to_overwrite") val assetsAttrToOverwrite: List<String>? = null,
@JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String? = null,
@JsonProperty("assets_fail_on_errors") val assetsFailOnErrors: Boolean? = null,
@JsonProperty("track_batches") val trackBatches: Boolean? = null,
) : CustomConfig()
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import mu.KLogger
* @param attrsToOverwrite list of fields that should be overwritten in Atlan, if their value is empty in the CSV
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
abstract class AssetImporter(
private val filename: String,
Expand All @@ -36,12 +37,14 @@ abstract class AssetImporter(
private val batchSize: Int,
typeNameFilter: String,
logger: KLogger,
trackBatches: Boolean,
) : CSVImporter(
filename,
logger,
typeNameFilter,
attrsToOverwrite,
batchSize = batchSize,
trackBatches = trackBatches,
) {

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ import mu.KotlinLogging
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param connectionImporter that was used to import connections
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
class ColumnImporter(
private val preprocessed: Importer.PreprocessedCsv,
private val attrsToOverwrite: List<AtlanField>,
private val updateOnly: Boolean,
private val batchSize: Int,
private val connectionImporter: ConnectionImporter,
trackBatches: Boolean,
) : AssetImporter(
preprocessed.preprocessedFile,
attrsToOverwrite,
updateOnly,
batchSize,
Column.TYPE_NAME,
KotlinLogging.logger {},
trackBatches,
) {
companion object {
const val COLUMN_PARENT_QN = "columnParentQualifiedName"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@ import mu.KotlinLogging
* @param attrsToOverwrite list of fields that should be overwritten in Atlan, if their value is empty in the CSV
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
class ConnectionImporter(
private val preprocessed: Importer.PreprocessedCsv,
private val attrsToOverwrite: List<AtlanField>,
private val updateOnly: Boolean,
private val batchSize: Int,
trackBatches: Boolean,
) : AssetImporter(
preprocessed.preprocessedFile,
attrsToOverwrite,
updateOnly,
batchSize,
Connection.TYPE_NAME,
KotlinLogging.logger {},
trackBatches,
) {
companion object {
const val CONNECTOR_TYPE = "connectorType"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ import mu.KotlinLogging
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param connectionImporter that was used to import connections
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
class DatabaseImporter(
private val preprocessed: Importer.PreprocessedCsv,
private val attrsToOverwrite: List<AtlanField>,
private val updateOnly: Boolean,
private val batchSize: Int,
private val connectionImporter: ConnectionImporter,
trackBatches: Boolean,
) : AssetImporter(
preprocessed.preprocessedFile,
attrsToOverwrite,
updateOnly,
batchSize,
Database.TYPE_NAME,
KotlinLogging.logger {},
trackBatches,
) {
/** {@inheritDoc} */
override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object Importer {
CSVImporter.attributesToClear(Utils.getOrDefault(config.assetsAttrToOverwrite, listOf()).toMutableList(), "assets", logger)
val assetsFailOnErrors = Utils.getOrDefault(config.assetsFailOnErrors, true)
val assetsUpdateOnly = Utils.getOrDefault(config.assetsUpsertSemantic, "update") == "update"
val trackBatches = Utils.getOrDefault(config.trackBatches, true)

if (assetsFilename.isBlank()) {
logger.error { "No input file was provided for assets." }
Expand All @@ -71,31 +72,34 @@ object Importer {
logger.info { "=== Importing assets... ===" }

logger.info { " --- Importing connections... ---" }
val connectionImporter = ConnectionImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, 1)
// Note: we force-track the batches here to ensure any created connections are cached
// (without tracking, any connections created will NOT be cached, either, which will then cause issues
// with the subsequent processing steps.)
val connectionImporter = ConnectionImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, 1, true)
connectionImporter.import()

logger.info { " --- Importing databases... ---" }
val databaseImporter = DatabaseImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter)
val databaseImporter = DatabaseImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter, trackBatches)
databaseImporter.import()

logger.info { " --- Importing schemas... ---" }
val schemaImporter = SchemaImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter)
val schemaImporter = SchemaImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter, trackBatches)
schemaImporter.import()

logger.info { " --- Importing tables... ---" }
val tableImporter = TableImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter)
val tableImporter = TableImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter, trackBatches)
tableImporter.import()

logger.info { " --- Importing views... ---" }
val viewImporter = ViewImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter)
val viewImporter = ViewImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter, trackBatches)
viewImporter.import()

logger.info { " --- Importing materialized views... ---" }
val materializedViewImporter = MaterializedViewImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter)
val materializedViewImporter = MaterializedViewImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter, trackBatches)
materializedViewImporter.import()

logger.info { " --- Importing columns... ---" }
val columnImporter = ColumnImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter)
val columnImporter = ColumnImporter(preprocessedDetails, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, connectionImporter, trackBatches)
columnImporter.import()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ import mu.KotlinLogging
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param connectionImporter that was used to import connections
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
class MaterializedViewImporter(
private val preprocessed: Importer.PreprocessedCsv,
private val attrsToOverwrite: List<AtlanField>,
private val updateOnly: Boolean,
private val batchSize: Int,
private val connectionImporter: ConnectionImporter,
trackBatches: Boolean,
) : AssetImporter(
preprocessed.preprocessedFile,
attrsToOverwrite,
updateOnly,
batchSize,
MaterializedView.TYPE_NAME,
KotlinLogging.logger {},
trackBatches,
) {
/** {@inheritDoc} */
override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ import mu.KotlinLogging
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param connectionImporter that was used to import connections
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
class SchemaImporter(
private val preprocessed: Importer.PreprocessedCsv,
private val attrsToOverwrite: List<AtlanField>,
private val updateOnly: Boolean,
private val batchSize: Int,
private val connectionImporter: ConnectionImporter,
trackBatches: Boolean,
) : AssetImporter(
preprocessed.preprocessedFile,
attrsToOverwrite,
updateOnly,
batchSize,
Schema.TYPE_NAME,
KotlinLogging.logger {},
trackBatches,
) {
/** {@inheritDoc} */
override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ import mu.KotlinLogging
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param connectionImporter that was used to import connections
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
class TableImporter(
private val preprocessed: Importer.PreprocessedCsv,
private val attrsToOverwrite: List<AtlanField>,
private val updateOnly: Boolean,
private val batchSize: Int,
private val connectionImporter: ConnectionImporter,
trackBatches: Boolean,
) : AssetImporter(
preprocessed.preprocessedFile,
attrsToOverwrite,
updateOnly,
batchSize,
Table.TYPE_NAME,
KotlinLogging.logger {},
trackBatches,
) {
/** {@inheritDoc} */
override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ import mu.KotlinLogging
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param connectionImporter that was used to import connections
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
*/
class ViewImporter(
private val preprocessed: Importer.PreprocessedCsv,
private val attrsToOverwrite: List<AtlanField>,
private val updateOnly: Boolean,
private val batchSize: Int,
private val connectionImporter: ConnectionImporter,
trackBatches: Boolean,
) : AssetImporter(
preprocessed.preprocessedFile,
attrsToOverwrite,
updateOnly,
batchSize,
View.TYPE_NAME,
KotlinLogging.logger {},
trackBatches,
) {
/** {@inheritDoc} */
override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> {
Expand Down
Loading

0 comments on commit 4bc20aa

Please sign in to comment.