From 72223757a1624b7106d641e8669b3db23fc38b48 Mon Sep 17 00:00:00 2001 From: Christopher Grote Date: Tue, 16 Jan 2024 15:53:12 +0000 Subject: [PATCH] Adds option to batch load case-insensitively Signed-off-by: Christopher Grote --- .../com/atlan/pkg/serde/csv/CSVImporter.kt | 2 + .../com/atlan/pkg/serde/csv/CSVReader.kt | 4 ++ .../src/main/kotlin/AssetImportCfg.kt | 1 + .../src/main/kotlin/PackageConfig.kt | 5 +++ .../kotlin/com/atlan/pkg/aim/AssetImporter.kt | 3 ++ .../main/kotlin/com/atlan/pkg/aim/Importer.kt | 3 +- .../main/java/com/atlan/util/AssetBatch.java | 45 +++++++++++++++++-- .../java/com/atlan/util/ParallelBatch.java | 29 +++++++++++- 8 files changed, 87 insertions(+), 5 deletions(-) diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVImporter.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVImporter.kt index 5abd6b18fa..f885217a07 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVImporter.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVImporter.kt @@ -28,6 +28,7 @@ import kotlin.system.exitProcess * @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist) * @param batchSize maximum number of records to save per API request * @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked) + * @param caseSensitive (only applies when updateOnly is true) attempt to match assets case-sensitively (true) or case-insensitively (false) */ abstract class CSVImporter( private val filename: String, @@ -37,6 +38,7 @@ abstract class CSVImporter( private val updateOnly: Boolean = false, private val batchSize: Int = 20, private val trackBatches: Boolean = true, + private val caseSensitive: Boolean = true, ) : AssetGenerator { /** diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVReader.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVReader.kt index 3c91a0d597..bef5a045d4 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVReader.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVReader.kt @@ -28,11 +28,13 @@ import java.util.concurrent.atomic.AtomicLong * @param updateOnly when true, the reader will first look up assets to ensure they exist (and only update them, never create) * @param fieldSeparator character to use to separate fields (for example ',' or ';') * @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked) + * @param caseSensitive (only applies when updateOnly is true) attempt to match assets case-sensitively (true) or case-insensitively (false) */ class CSVReader @JvmOverloads constructor( path: String, private val updateOnly: Boolean, private val trackBatches: Boolean = true, + private val caseSensitive: Boolean = true, fieldSeparator: Char = ',', ) : Closeable { @@ -88,6 +90,7 @@ class CSVReader @JvmOverloads constructor( true, updateOnly, trackBatches, + caseSensitive, ) val relatedBatch = ParallelBatch( client, @@ -97,6 +100,7 @@ class CSVReader @JvmOverloads constructor( true, false, trackBatches, + caseSensitive, ) val relatedHolds: MutableMap = ConcurrentHashMap() val deferDeletes: MutableMap> = ConcurrentHashMap() diff --git a/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt b/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt index f9a2b43728..137cd2967f 100644 --- a/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt +++ b/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt @@ -20,6 +20,7 @@ data class AssetImportCfg( @JsonProperty("assets_attr_to_overwrite") val assetsAttrToOverwrite: List? = null, @JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String? = null, @JsonProperty("assets_fail_on_errors") val assetsFailOnErrors: Boolean? = null, + @JsonProperty("assets_case_sensitive") val assetsCaseSensitive: Boolean? = null, @JsonProperty("glossaries_file") val glossariesFile: String? = null, @JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class) @JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class) diff --git a/samples/packages/asset-import/src/main/kotlin/PackageConfig.kt b/samples/packages/asset-import/src/main/kotlin/PackageConfig.kt index bc9b446b3f..a65d7ead74 100644 --- a/samples/packages/asset-import/src/main/kotlin/PackageConfig.kt +++ b/samples/packages/asset-import/src/main/kotlin/PackageConfig.kt @@ -67,6 +67,11 @@ object PackageConfig : CustomPackage( required = false, help = "Whether an invalid value in a field should cause the import to fail (Yes) or log a warning, skip that value, and proceed (No).", ), + "assets_case_sensitive" to BooleanInput( + label = "Case-sensitive match for updates", + required = false, + help = "Whether to use case-sensitive matching when running in update-only mode (Yes) or try case-insensitive matching (No).", + ), ), ), UIStep( diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt index 7d25a9969a..3791a51e52 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt @@ -21,18 +21,21 @@ import mu.KotlinLogging * @param attrsToOverwrite list of fields that should be overwritten in Atlan, if their value is empty in the CSV * @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist) * @param batchSize maximum number of records to save per API request + * @param caseSensitive (only applies when updateOnly is true) attempt to match assets case-sensitively (true) or case-insensitively (false) */ class AssetImporter( private val filename: String, private val attrsToOverwrite: List, private val updateOnly: Boolean, private val batchSize: Int, + private val caseSensitive: Boolean = true, ) : CSVImporter( filename, logger = KotlinLogging.logger {}, attrsToOverwrite = attrsToOverwrite, updateOnly = updateOnly, batchSize = batchSize, + caseSensitive = caseSensitive, ) { /** {@inheritDoc} */ override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt index 0370094512..68af96a975 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt @@ -33,6 +33,7 @@ object Importer { val glossaryAttrsToOverwrite = attributesToClear(Utils.getOrDefault(config.glossariesAttrToOverwrite, listOf()).toMutableList(), "glossaries", logger) val assetsUpdateOnly = Utils.getOrDefault(config.assetsUpsertSemantic, "update") == "update" + val assetsCaseSensitive = Utils.getOrDefault(config.assetsCaseSensitive, true) val glossariesUpdateOnly = Utils.getOrDefault(config.glossariesUpsertSemantic, "update") == "update" val glossariesFailOnErrors = Utils.getOrDefault(config.glossariesFailOnErrors, true) @@ -61,7 +62,7 @@ object Importer { if (assetsFilename.isNotBlank()) { FieldSerde.FAIL_ON_ERRORS.set(assetsFailOnErrors) logger.info { "=== Importing assets... ===" } - val assetImporter = AssetImporter(assetsFilename, assetAttrsToOverwrite, assetsUpdateOnly, batchSize) + val assetImporter = AssetImporter(assetsFilename, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, assetsCaseSensitive) assetImporter.import() } } diff --git a/sdk/src/main/java/com/atlan/util/AssetBatch.java b/sdk/src/main/java/com/atlan/util/AssetBatch.java index 8afd485d8d..03652cc443 100644 --- a/sdk/src/main/java/com/atlan/util/AssetBatch.java +++ b/sdk/src/main/java/com/atlan/util/AssetBatch.java @@ -9,6 +9,7 @@ import com.atlan.model.assets.IndistinctAsset; import com.atlan.model.core.AssetMutationResponse; import com.atlan.model.core.ConnectionCreationResponse; +import com.atlan.model.search.FluentSearch; import com.atlan.serde.Serde; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -37,6 +38,7 @@ public enum CustomMetadataHandling { private final boolean captureFailures; private final boolean updateOnly; private final boolean track; + private final boolean caseInsensitive; /** Number of assets that were created (no details, just a count). */ @Getter @@ -201,6 +203,30 @@ public AssetBatch( boolean captureFailures, boolean updateOnly, boolean track) { + this(client, maxSize, replaceAtlanTags, customMetadataHandling, captureFailures, updateOnly, track, false); + } + + /** + * Create a new batch of assets to be bulk-saved. + * + * @param client connectivity to Atlan + * @param maxSize maximum size of each batch that should be processed (per API call) + * @param replaceAtlanTags if true, all Atlan tags on an existing asset will be overwritten; if false, all Atlan tags will be ignored + * @param customMetadataHandling how to handle custom metadata (ignore it, replace it (wiping out anything pre-existing), or merge it) + * @param captureFailures when true, any failed batches will be captured and retained rather than exceptions being raised (for large amounts of processing this could cause memory issues!) + * @param updateOnly when true, only attempt to update existing assets and do not create any assets (note: this will incur a performance penalty) + * @param track when false, details about each created and updated asset will no longer be tracked (only an overall count of each) -- useful if you intend to send close to (or more than) 1 million assets through a batch + * @param caseInsensitive (only applies when updateOnly is true) when matching assets, search for their qualifiedName in a case-insensitive way + */ + public AssetBatch( + AtlanClient client, + int maxSize, + boolean replaceAtlanTags, + CustomMetadataHandling customMetadataHandling, + boolean captureFailures, + boolean updateOnly, + boolean track, + boolean caseInsensitive) { this.client = client; _batch = Collections.synchronizedList(new ArrayList<>()); this.maxSize = maxSize; @@ -225,6 +251,7 @@ public AssetBatch( this.failures = null; } this.updateOnly = updateOnly; + this.caseInsensitive = caseInsensitive; } /** @@ -269,11 +296,23 @@ public AssetMutationResponse flush() throws AtlanException { Set found = new HashSet<>(); List qualifiedNames = _batch.stream().map(Asset::getQualifiedName).collect(Collectors.toList()); - client.assets.select().where(Asset.QUALIFIED_NAME.in(qualifiedNames)).pageSize(maxSize).stream() - .forEach(asset -> found.add(asset.getTypeName() + "::" + asset.getQualifiedName())); + FluentSearch.FluentSearchBuilder builder; + if (caseInsensitive) { + builder = client.assets.select().minSomes(1); + for (String qn : qualifiedNames) { + builder.whereSome(Asset.QUALIFIED_NAME.eq(qn, true)); + } + } else { + builder = client.assets.select().where(Asset.QUALIFIED_NAME.in(qualifiedNames)); + } + builder.pageSize(maxSize).stream().forEach(asset -> { + String assetId = asset.getTypeName() + "::" + asset.getQualifiedName(); + found.add(assetId.toLowerCase(Locale.ROOT)); + }); revised = new ArrayList<>(); _batch.forEach(asset -> { - if (found.contains(asset.getTypeName() + "::" + asset.getQualifiedName())) { + String assetId = asset.getTypeName() + "::" + asset.getQualifiedName(); + if (found.contains(assetId.toLowerCase(Locale.ROOT))) { revised.add(asset); } else { track(skipped, asset); diff --git a/sdk/src/main/java/com/atlan/util/ParallelBatch.java b/sdk/src/main/java/com/atlan/util/ParallelBatch.java index 4d80453d57..6890ac4caf 100644 --- a/sdk/src/main/java/com/atlan/util/ParallelBatch.java +++ b/sdk/src/main/java/com/atlan/util/ParallelBatch.java @@ -22,6 +22,7 @@ public class ParallelBatch { private final boolean captureFailures; private final boolean track; private final boolean updateOnly; + private final boolean caseSensitive; private final Map batchMap; private List created = null; @@ -113,6 +114,30 @@ public ParallelBatch( boolean captureFailures, boolean updateOnly, boolean track) { + this(client, maxSize, replaceAtlanTags, customMetadataHandling, captureFailures, updateOnly, track, true); + } + + /** + * Create a new batch of assets to be bulk-saved, in parallel (across threads). + * + * @param client connectivity to Atlan + * @param maxSize maximum size of each batch that should be processed (per API call) + * @param replaceAtlanTags if true, all Atlan tags on an existing asset will be overwritten; if false, all Atlan tags will be ignored + * @param customMetadataHandling how to handle custom metadata (ignore it, replace it (wiping out anything pre-existing), or merge it) + * @param captureFailures when true, any failed batches will be captured and retained rather than exceptions being raised (for large amounts of processing this could cause memory issues!) + * @param updateOnly when true, only attempt to update existing assets and do not create any assets (note: this will incur a performance penalty) + * @param track when false, details about each created and updated asset will no longer be tracked (only an overall count of each) -- useful if you intend to send close to (or more than) 1 million assets through a batch + * @param caseSensitive (only applies when updateOnly is true) attempt to match assets case-sensitively (true) or case-insensitively (false) + */ + public ParallelBatch( + AtlanClient client, + int maxSize, + boolean replaceAtlanTags, + AssetBatch.CustomMetadataHandling customMetadataHandling, + boolean captureFailures, + boolean updateOnly, + boolean track, + boolean caseSensitive) { this.client = client; this.maxSize = maxSize; this.replaceAtlanTags = replaceAtlanTags; @@ -120,6 +145,7 @@ public ParallelBatch( this.captureFailures = captureFailures; this.track = track; this.updateOnly = updateOnly; + this.caseSensitive = caseSensitive; this.batchMap = new ConcurrentHashMap<>(); } @@ -142,7 +168,8 @@ public AssetMutationResponse add(Asset single) throws AtlanException { customMetadataHandling, captureFailures, updateOnly, - track)); + track, + !caseSensitive)); } return batchMap.get(id).add(single); }