Skip to content

Commit

Permalink
Merge pull request #436 from atlanhq/DVX-170
Browse files Browse the repository at this point in the history
Adds ability to take file input from either direct upload or S3
  • Loading branch information
cmgrote authored Jan 17, 2024
2 parents 48b9c5b + b7da787 commit 388ac6a
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 37 deletions.
31 changes: 31 additions & 0 deletions package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.atlan.Atlan
import com.atlan.exception.AtlanException
import com.atlan.exception.NotFoundException
import com.atlan.model.assets.Connection
import com.atlan.pkg.s3.S3Sync
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import jakarta.activation.FileDataSource
import jakarta.mail.Message
Expand Down Expand Up @@ -379,4 +380,34 @@ object Utils {
fun getAssetLink(guid: String): String {
return "${Atlan.getBaseUrl()}/assets/$guid/overview"
}

/**
* Return the (container-)local input file name whenever a user is given the
* choice of how to provide an input file (either by uploading directly or through S3).
* If using the S3 details, the object will be downloaded from S3 and placed into local
* storage as part of this method.
*
* @param uploadResult filename from a direct upload
* @param s3Region name of the S3 region for an S3 download
* @param s3Bucket name of the S3 bucket for an S3 download
* @param s3ObjectKey full path to the S3 object within the bucket
* @param outputDirectory local directory where any S3-downloaded file should be placed
* @param preferUpload if true, take the directly-uploaded file; otherwise use the S3 details to download the file
* @return the name of the file that is on local container storage from which we can read information
*/
fun getInputFile(uploadResult: String, s3Region: String, s3Bucket: String, s3ObjectKey: String, outputDirectory: String, preferUpload: Boolean = true): String {
return if (preferUpload) {
uploadResult
} else {
if (s3ObjectKey.isNotBlank()) {
val sync = S3Sync(s3Bucket, s3Region, logger)
val filename = File(s3ObjectKey).name
val path = "$outputDirectory${File.separator}$filename"
sync.downloadFromS3(s3ObjectKey, path)
path
} else {
""
}
}
}
}
100 changes: 77 additions & 23 deletions package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/s3/S3Sync.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class S3Sync(
* @return true if any files were copied, otherwise false
*/
fun copyFromS3(s3Prefix: String, localDirectory: String): Boolean {
logger.info("Syncing files from s3://$bucketName/$s3Prefix to $localDirectory")
logger.info { "Syncing files from s3://$bucketName/$s3Prefix to $localDirectory" }

val s3Client = S3Client.builder().region(Region.of(region)).build()
val request = ListObjectsV2Request.builder()
Expand All @@ -59,24 +59,53 @@ class S3Sync(
var anySynced = false

s3FilesToDownload.forEach {
val localFile = File(localDirectory, it)
if (localFile.exists()) {
localFile.delete()
}
if (!localFile.parentFile.exists()) {
localFile.parentFile.mkdirs()
}
val prefix = File(s3Prefix, it).path
logger.info("Downloading s3://$bucketName/$prefix to ${localFile.path}")
s3Client.getObject(
GetObjectRequest.builder().bucket(bucketName).key(prefix).build(),
localFile.toPath(),
downloadFromS3(
s3Client,
File(s3Prefix, it).path,
File(localDirectory, it).path,
)
anySynced = true
}
return anySynced
}

/**
* Download a single file from the provided S3 object key to the specified local file.
*
* @param s3ObjectKey from which to download the file
* @param localFile into which to download the file
*/
fun downloadFromS3(s3ObjectKey: String, localFile: String) {
downloadFromS3(
S3Client.builder().region(Region.of(region)).build(),
s3ObjectKey,
localFile,
)
}

/**
* Download a single file from the provided S3 object key to the specified local file.
*
* @param s3Client connectivity to S3
* @param s3ObjectKey from which to download the file
* @param localFile into which to download the file
*/
private fun downloadFromS3(s3Client: S3Client, s3ObjectKey: String, localFile: String) {
logger.info { " ... downloading s3://$bucketName/$s3ObjectKey to $localFile" }
val local = File(localFile)
if (local.exists()) {
local.delete()
}
if (!local.parentFile.exists()) {
local.parentFile.mkdirs()
}
val objectKey = File(s3ObjectKey).path
s3Client.getObject(
GetObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
local.toPath(),
)
}

/**
* Copy all files from the provided local directory to the specified S3 prefix.
* Note: files that already exist in S3 will be replaced by files from the local directory,
Expand All @@ -87,7 +116,7 @@ class S3Sync(
* @return true if any files were copied, otherwise false
*/
fun copyToS3(localDirectory: String, s3Prefix: String): Boolean {
logger.info("Syncing files from $localDirectory to s3://$bucketName/$s3Prefix")
logger.info { "Syncing files from $localDirectory to s3://$bucketName/$s3Prefix" }

val s3Client = S3Client.builder().region(Region.of(region)).build()
val request = ListObjectsV2Request.builder()
Expand All @@ -113,17 +142,42 @@ class S3Sync(
var anySynced = false

localFilesToUpload.forEach {
// Note: no need to delete files first (putObject overwrites, including auto-versioning
// if enabled on the bucket), and no need to create parent prefixes in S3
val localFile = File(localDirectory, it)
val prefix = File(s3Prefix, it).path
logger.info("Uploading ${localFile.path} to s3://$bucketName/$prefix")
s3Client.putObject(
PutObjectRequest.builder().bucket(bucketName).key(prefix).build(),
localFile.toPath(),
)
uploadToS3(s3Client, File(localDirectory, it).path, File(s3Prefix, it).path)
anySynced = true
}
return anySynced
}

/**
* Upload a single file from the specified local file to the provided S3 object key.
*
* @param localFile from which to upload the file
* @param s3ObjectKey into which to upload the file
*/
fun uploadToS3(localFile: String, s3ObjectKey: String) {
uploadToS3(
S3Client.builder().region(Region.of(region)).build(),
localFile,
s3ObjectKey,
)
}

/**
* Upload a single file from the specified local file to the provided S3 object key.
*
* @param s3Client connectivity to S3
* @param localFile from which to upload the file
* @param s3ObjectKey into which to upload the file
*/
private fun uploadToS3(s3Client: S3Client, localFile: String, s3ObjectKey: String) {
logger.info { " ... uploading $localFile to s3://$bucketName/$s3ObjectKey" }
// Note: no need to delete files first (putObject overwrites, including auto-versioning
// if enabled on the bucket), and no need to create parent prefixes in S3
val local = File(localFile)
val objectKey = File(s3ObjectKey).path
s3Client.putObject(
PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
local.toPath(),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@ import javax.annotation.processing.Generated
@Generated("com.atlan.pkg.CustomPackage")
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
data class AssetImportCfg(
@JsonProperty("assets_import_type") val assetsImportType: String? = null,
@JsonProperty("assets_file") val assetsFile: String? = null,
@JsonProperty("assets_s3_region") val assetsS3Region: String? = null,
@JsonProperty("assets_s3_bucket") val assetsS3Bucket: String? = null,
@JsonProperty("assets_s3_object_key") val assetsS3ObjectKey: String? = null,
@JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class)
@JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class)
@JsonProperty("assets_attr_to_overwrite") val assetsAttrToOverwrite: List<String>? = null,
@JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String? = null,
@JsonProperty("assets_fail_on_errors") val assetsFailOnErrors: Boolean? = null,
@JsonProperty("assets_case_sensitive") val assetsCaseSensitive: Boolean? = null,
@JsonProperty("glossaries_import_type") val glossariesImportType: String? = null,
@JsonProperty("glossaries_file") val glossariesFile: String? = null,
@JsonProperty("glossaries_s3_region") val glossariesS3Region: String? = null,
@JsonProperty("glossaries_s3_bucket") val glossariesS3Bucket: String? = null,
@JsonProperty("glossaries_s3_object_key") val glossariesS3ObjectKey: String? = null,
@JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class)
@JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class)
@JsonProperty("glossaries_attr_to_overwrite") val glossariesAttrToOverwrite: List<String>? = null,
Expand Down
82 changes: 82 additions & 0 deletions samples/packages/asset-import/src/main/kotlin/PackageConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import com.atlan.Atlan
import com.atlan.pkg.CustomPackage
import com.atlan.pkg.aim.Importer
import com.atlan.pkg.config.model.ui.UIConfig
import com.atlan.pkg.config.model.ui.UIRule
import com.atlan.pkg.config.model.ui.UIStep
import com.atlan.pkg.config.model.workflow.WorkflowOutputs
import com.atlan.pkg.config.widgets.BooleanInput
import com.atlan.pkg.config.widgets.DropDown
import com.atlan.pkg.config.widgets.FileUploader
import com.atlan.pkg.config.widgets.Radio
import com.atlan.pkg.config.widgets.TextInput

/**
* Definition for the Asset Import custom package.
Expand All @@ -27,13 +29,44 @@ object PackageConfig : CustomPackage(
title = "Assets",
description = "Assets to import",
inputs = mapOf(
"assets_import_type" to Radio(
label = "Import assets from",
required = true,
help = "Select how you want to provide the file containing asset details to be imported.",
possibleValues = mapOf(
"UPLOAD" to "Direct upload",
"S3" to "S3 object",
),
default = "UPLOAD",
),
"assets_file" to FileUploader(
label = "Assets file",
fileTypes = listOf("text/csv"),
required = false,
help = "Select the file containing assets to import, produced by one of the Asset Export packages.",
placeholder = "Select assets CSV file",
),
"assets_s3_region" to TextInput(
label = "S3 region",
required = false,
help = "Enter the S3 region from which to retrieve the S3 object. If empty, will use the region of Atlan's own back-end storage.",
placeholder = "ap-south-1",
grid = 4,
),
"assets_s3_bucket" to TextInput(
label = "S3 bucket",
required = false,
help = "Enter the S3 bucket from which to retrieve the S3 object. IF empty, will use the bucket of Atlan's own back-end storage.",
placeholder = "bucket-name",
grid = 4,
),
"assets_s3_object_key" to TextInput(
label = "S3 object key",
required = false,
help = "Enter the S3 object key, including the name of the object and its prefix (path) in the S3 bucket.",
placeholder = "some/where/file.csv",
grid = 8,
),
"assets_attr_to_overwrite" to DropDown(
label = "Remove attributes, if empty",
required = false,
Expand Down Expand Up @@ -78,13 +111,44 @@ object PackageConfig : CustomPackage(
title = "Glossaries",
description = "Glossaries to import",
inputs = mapOf(
"glossaries_import_type" to Radio(
label = "Import glossaries, categories and terms from",
required = true,
help = "Select how you want to provide the file containing glossaries, categories and terms to be imported.",
possibleValues = mapOf(
"UPLOAD" to "Direct upload",
"S3" to "S3 object",
),
default = "UPLOAD",
),
"glossaries_file" to FileUploader(
label = "Glossaries file",
fileTypes = listOf("text/csv"),
required = false,
help = "Select the file containing glossaries, categories and terms to import, produced by one of the Asset Export packages.",
placeholder = "Select glossaries CSV file",
),
"glossaries_s3_region" to TextInput(
label = "S3 region",
required = false,
help = "Enter the S3 region from which to retrieve the S3 object. If empty, will use the region of Atlan's own back-end storage.",
placeholder = "ap-south-1",
grid = 4,
),
"glossaries_s3_bucket" to TextInput(
label = "S3 bucket",
required = false,
help = "Enter the S3 bucket from which to retrieve the S3 object. IF empty, will use the bucket of Atlan's own back-end storage.",
placeholder = "bucket-name",
grid = 4,
),
"glossaries_s3_object_key" to TextInput(
label = "S3 object key",
required = false,
help = "Enter the S3 object key, including the name of the object and its prefix (path) in the S3 bucket.",
placeholder = "some/where/file.csv",
grid = 8,
),
"glossaries_attr_to_overwrite" to DropDown(
label = "Remove attributes, if empty",
required = false,
Expand Down Expand Up @@ -121,6 +185,24 @@ object PackageConfig : CustomPackage(
),
),
),
rules = listOf(
UIRule(
whenInputs = mapOf("assets_import_type" to "UPLOAD"),
required = listOf("assets_file"),
),
UIRule(
whenInputs = mapOf("assets_import_type" to "S3"),
required = listOf("assets_s3_region", "assets_s3_bucket", "assets_s3_object_key"),
),
UIRule(
whenInputs = mapOf("glossaries_import_type" to "UPLOAD"),
required = listOf("glossaries_file"),
),
UIRule(
whenInputs = mapOf("glossaries_import_type" to "S3"),
required = listOf("glossaries_s3_region", "glossaries_s3_bucket", "glossaries_s3_object_key"),
),
),
),
containerImage = "ghcr.io/atlanhq/csa-asset-import:${Atlan.VERSION}",
classToRun = Importer::class.java,
Expand Down
Loading

0 comments on commit 388ac6a

Please sign in to comment.