Skip to content

Commit

Permalink
Merge pull request #434 from atlanhq/DVX-179
Browse files Browse the repository at this point in the history
Adds option to batch load case-insensitively
  • Loading branch information
cmgrote authored Jan 16, 2024
2 parents ea94772 + 7222375 commit 4f97e92
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import kotlin.system.exitProcess
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
* @param caseSensitive (only applies when updateOnly is true) attempt to match assets case-sensitively (true) or case-insensitively (false)
*/
abstract class CSVImporter(
private val filename: String,
Expand All @@ -37,6 +38,7 @@ abstract class CSVImporter(
private val updateOnly: Boolean = false,
private val batchSize: Int = 20,
private val trackBatches: Boolean = true,
private val caseSensitive: Boolean = true,
) : AssetGenerator {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import java.util.concurrent.atomic.AtomicLong
* @param updateOnly when true, the reader will first look up assets to ensure they exist (and only update them, never create)
* @param fieldSeparator character to use to separate fields (for example ',' or ';')
* @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked)
* @param caseSensitive (only applies when updateOnly is true) attempt to match assets case-sensitively (true) or case-insensitively (false)
*/
class CSVReader @JvmOverloads constructor(
path: String,
private val updateOnly: Boolean,
private val trackBatches: Boolean = true,
private val caseSensitive: Boolean = true,
fieldSeparator: Char = ',',
) : Closeable {

Expand Down Expand Up @@ -88,6 +90,7 @@ class CSVReader @JvmOverloads constructor(
true,
updateOnly,
trackBatches,
caseSensitive,
)
val relatedBatch = ParallelBatch(
client,
Expand All @@ -97,6 +100,7 @@ class CSVReader @JvmOverloads constructor(
true,
false,
trackBatches,
caseSensitive,
)
val relatedHolds: MutableMap<String, RelatedAssetHold> = ConcurrentHashMap()
val deferDeletes: MutableMap<String, Set<AtlanField>> = ConcurrentHashMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ data class AssetImportCfg(
@JsonProperty("assets_attr_to_overwrite") val assetsAttrToOverwrite: List<String>? = null,
@JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String? = null,
@JsonProperty("assets_fail_on_errors") val assetsFailOnErrors: Boolean? = null,
@JsonProperty("assets_case_sensitive") val assetsCaseSensitive: Boolean? = null,
@JsonProperty("glossaries_file") val glossariesFile: String? = null,
@JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class)
@JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ object PackageConfig : CustomPackage(
required = false,
help = "Whether an invalid value in a field should cause the import to fail (Yes) or log a warning, skip that value, and proceed (No).",
),
"assets_case_sensitive" to BooleanInput(
label = "Case-sensitive match for updates",
required = false,
help = "Whether to use case-sensitive matching when running in update-only mode (Yes) or try case-insensitive matching (No).",
),
),
),
UIStep(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ import mu.KotlinLogging
* @param attrsToOverwrite list of fields that should be overwritten in Atlan, if their value is empty in the CSV
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
* @param caseSensitive (only applies when updateOnly is true) attempt to match assets case-sensitively (true) or case-insensitively (false)
*/
class AssetImporter(
private val filename: String,
private val attrsToOverwrite: List<AtlanField>,
private val updateOnly: Boolean,
private val batchSize: Int,
private val caseSensitive: Boolean = true,
) : CSVImporter(
filename,
logger = KotlinLogging.logger {},
attrsToOverwrite = attrsToOverwrite,
updateOnly = updateOnly,
batchSize = batchSize,
caseSensitive = caseSensitive,
) {
/** {@inheritDoc} */
override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object Importer {
val glossaryAttrsToOverwrite =
attributesToClear(Utils.getOrDefault(config.glossariesAttrToOverwrite, listOf()).toMutableList(), "glossaries", logger)
val assetsUpdateOnly = Utils.getOrDefault(config.assetsUpsertSemantic, "update") == "update"
val assetsCaseSensitive = Utils.getOrDefault(config.assetsCaseSensitive, true)
val glossariesUpdateOnly = Utils.getOrDefault(config.glossariesUpsertSemantic, "update") == "update"
val glossariesFailOnErrors = Utils.getOrDefault(config.glossariesFailOnErrors, true)

Expand Down Expand Up @@ -61,7 +62,7 @@ object Importer {
if (assetsFilename.isNotBlank()) {
FieldSerde.FAIL_ON_ERRORS.set(assetsFailOnErrors)
logger.info { "=== Importing assets... ===" }
val assetImporter = AssetImporter(assetsFilename, assetAttrsToOverwrite, assetsUpdateOnly, batchSize)
val assetImporter = AssetImporter(assetsFilename, assetAttrsToOverwrite, assetsUpdateOnly, batchSize, assetsCaseSensitive)
assetImporter.import()
}
}
Expand Down
45 changes: 42 additions & 3 deletions sdk/src/main/java/com/atlan/util/AssetBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.atlan.model.assets.IndistinctAsset;
import com.atlan.model.core.AssetMutationResponse;
import com.atlan.model.core.ConnectionCreationResponse;
import com.atlan.model.search.FluentSearch;
import com.atlan.serde.Serde;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -37,6 +38,7 @@ public enum CustomMetadataHandling {
private final boolean captureFailures;
private final boolean updateOnly;
private final boolean track;
private final boolean caseInsensitive;

/** Number of assets that were created (no details, just a count). */
@Getter
Expand Down Expand Up @@ -201,6 +203,30 @@ public AssetBatch(
boolean captureFailures,
boolean updateOnly,
boolean track) {
this(client, maxSize, replaceAtlanTags, customMetadataHandling, captureFailures, updateOnly, track, false);
}

/**
* Create a new batch of assets to be bulk-saved.
*
* @param client connectivity to Atlan
* @param maxSize maximum size of each batch that should be processed (per API call)
* @param replaceAtlanTags if true, all Atlan tags on an existing asset will be overwritten; if false, all Atlan tags will be ignored
* @param customMetadataHandling how to handle custom metadata (ignore it, replace it (wiping out anything pre-existing), or merge it)
* @param captureFailures when true, any failed batches will be captured and retained rather than exceptions being raised (for large amounts of processing this could cause memory issues!)
* @param updateOnly when true, only attempt to update existing assets and do not create any assets (note: this will incur a performance penalty)
* @param track when false, details about each created and updated asset will no longer be tracked (only an overall count of each) -- useful if you intend to send close to (or more than) 1 million assets through a batch
* @param caseInsensitive (only applies when updateOnly is true) when matching assets, search for their qualifiedName in a case-insensitive way
*/
public AssetBatch(
AtlanClient client,
int maxSize,
boolean replaceAtlanTags,
CustomMetadataHandling customMetadataHandling,
boolean captureFailures,
boolean updateOnly,
boolean track,
boolean caseInsensitive) {
this.client = client;
_batch = Collections.synchronizedList(new ArrayList<>());
this.maxSize = maxSize;
Expand All @@ -225,6 +251,7 @@ public AssetBatch(
this.failures = null;
}
this.updateOnly = updateOnly;
this.caseInsensitive = caseInsensitive;
}

/**
Expand Down Expand Up @@ -269,11 +296,23 @@ public AssetMutationResponse flush() throws AtlanException {
Set<String> found = new HashSet<>();
List<String> qualifiedNames =
_batch.stream().map(Asset::getQualifiedName).collect(Collectors.toList());
client.assets.select().where(Asset.QUALIFIED_NAME.in(qualifiedNames)).pageSize(maxSize).stream()
.forEach(asset -> found.add(asset.getTypeName() + "::" + asset.getQualifiedName()));
FluentSearch.FluentSearchBuilder<?, ?> builder;
if (caseInsensitive) {
builder = client.assets.select().minSomes(1);
for (String qn : qualifiedNames) {
builder.whereSome(Asset.QUALIFIED_NAME.eq(qn, true));
}
} else {
builder = client.assets.select().where(Asset.QUALIFIED_NAME.in(qualifiedNames));
}
builder.pageSize(maxSize).stream().forEach(asset -> {
String assetId = asset.getTypeName() + "::" + asset.getQualifiedName();
found.add(assetId.toLowerCase(Locale.ROOT));
});
revised = new ArrayList<>();
_batch.forEach(asset -> {
if (found.contains(asset.getTypeName() + "::" + asset.getQualifiedName())) {
String assetId = asset.getTypeName() + "::" + asset.getQualifiedName();
if (found.contains(assetId.toLowerCase(Locale.ROOT))) {
revised.add(asset);
} else {
track(skipped, asset);
Expand Down
29 changes: 28 additions & 1 deletion sdk/src/main/java/com/atlan/util/ParallelBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class ParallelBatch {
private final boolean captureFailures;
private final boolean track;
private final boolean updateOnly;
private final boolean caseSensitive;
private final Map<Long, AssetBatch> batchMap;

private List<Asset> created = null;
Expand Down Expand Up @@ -113,13 +114,38 @@ public ParallelBatch(
boolean captureFailures,
boolean updateOnly,
boolean track) {
this(client, maxSize, replaceAtlanTags, customMetadataHandling, captureFailures, updateOnly, track, true);
}

/**
* Create a new batch of assets to be bulk-saved, in parallel (across threads).
*
* @param client connectivity to Atlan
* @param maxSize maximum size of each batch that should be processed (per API call)
* @param replaceAtlanTags if true, all Atlan tags on an existing asset will be overwritten; if false, all Atlan tags will be ignored
* @param customMetadataHandling how to handle custom metadata (ignore it, replace it (wiping out anything pre-existing), or merge it)
* @param captureFailures when true, any failed batches will be captured and retained rather than exceptions being raised (for large amounts of processing this could cause memory issues!)
* @param updateOnly when true, only attempt to update existing assets and do not create any assets (note: this will incur a performance penalty)
* @param track when false, details about each created and updated asset will no longer be tracked (only an overall count of each) -- useful if you intend to send close to (or more than) 1 million assets through a batch
* @param caseSensitive (only applies when updateOnly is true) attempt to match assets case-sensitively (true) or case-insensitively (false)
*/
public ParallelBatch(
AtlanClient client,
int maxSize,
boolean replaceAtlanTags,
AssetBatch.CustomMetadataHandling customMetadataHandling,
boolean captureFailures,
boolean updateOnly,
boolean track,
boolean caseSensitive) {
this.client = client;
this.maxSize = maxSize;
this.replaceAtlanTags = replaceAtlanTags;
this.customMetadataHandling = customMetadataHandling;
this.captureFailures = captureFailures;
this.track = track;
this.updateOnly = updateOnly;
this.caseSensitive = caseSensitive;
this.batchMap = new ConcurrentHashMap<>();
}

Expand All @@ -142,7 +168,8 @@ public AssetMutationResponse add(Asset single) throws AtlanException {
customMetadataHandling,
captureFailures,
updateOnly,
track));
track,
!caseSensitive));
}
return batchMap.get(id).add(single);
}
Expand Down

0 comments on commit 4f97e92

Please sign in to comment.