From 5622332a15ff41b39c02dc88c5a5969e922944a2 Mon Sep 17 00:00:00 2001 From: Cristian Astorino Date: Thu, 9 Nov 2023 16:37:34 +0000 Subject: [PATCH] [WIT-682] MWAA SP does not validate provisioning request # New features and improvements - Test coverage for MRs is now exported - Test coverage artifacts are now saved in the pipeline - CI optimizations # Bug fixes - Implemented proper validation for v1/validate endpoint - open-generator-cli uses a fixed version - Enabled akka.loglevel to INFO # Related issue Closes WIT-682 # Definition of Done for Feature/Hotfixes ## All Developments - [x] Feature was implemented as per the requirements - [x] If some code parts are complex they must be commented with code documentation - [x] CI/CD is successful - [x] Code coverage is not reduced, any new code is covered - [x] E2E/integration tests are successful (whether run locally or in CI/CD) - [x] If dependencies were changed, be sure that they will not impact the project, that their license is compatible, and that they introduce no vulnerabilities - [x] Documentation have been updated * Documentation has been updated with explanation of the new feature if it is user-facing (eg component now has additional setting) or it impacts him in some other way (eg optional field that becomes mandatory) * If it is a breaking change, we have documented it as such in the MR description in a "Breaking Changes" section - [x] Check that you are not affecting any existing environments with these changes, especially the Sandbox/Playground. This means that merging it to master and deploying it to these environments will not break them and **no manual operations that are not reported in the documentation will be needed** - [x] Check that nothing is out of order and nothing problematic is included in the changes (sensitive information, credentials, customer information or other intellectual property) as they could end up being public (we have Open Source SP already published and automatically mirrored) - [x] Security, Authentication and Authorization have been considered. No SQL injection, tokens handling, RBAC integration. Common security vulnerabilities identified and resolved ## API Development - [x] Semantic of API has been checked, it is comprehensible, meaningful, with no redundant information and user oriented - [x] Meaningful unit and integration tests are present - [x] API Parameters are checked and errors are handled - [x] Returned errors are meaningful to the user - [x] API contract has been defined and documented. Documentation means explaining the meaning of all fields and including at least one example - [x] Exceptions and errors are handled, without letting the underlying framework to respond with a generic Internal Server Error - [x] API Performance has been assessed and is good for real world use cases. Database accesses have been optimized. - [x] API is logging in compliance with audit standards, presence of sensitive information for GDPR has been assessed and removed or managed in case is needed ## DB Development - [x] The database schema is designed to accurately represent the data model and meet the requirements - [x] Tables, relationships, and constraints (e.g. primary keys, foreign keys, unique constraints) are defined appropriately and following a common convention - [x] Normalization principles are applied to eliminate data redundancy and ensure data integrity - [x] Schema semantic is meaningful - [x] Fields naming are following naming convention ( Ex. camelCase or snake_case) - [x] No fields with mixed behaviors and meaning. If a field is representing an enum, enum values are strongly mutually exclusive - [x] Data Types have been reviewed and they are a good fit for each field - [x] Indexes are defined to optimize query performance for frequently accessed data, paying attention to do not affect too much write path and the overall complexity - [x] Sensitive data is stored securely, encrypted if required, and access is restricted to authorized users - [x] Backup and restore procedures have been updated to introduce new or changed tables - [x] Migration scripts to upgrade and downgrade the database have been implemented and tested --- .gitlab-ci.yml | 53 ++++----- README.md | 2 +- .../s3/gateway/S3GatewayError.scala | 40 +++++-- build.sbt | 7 +- helm/files/application.conf | 2 +- project/plugins.sbt | 2 - src/main/resources/reference.conf | 2 +- .../ProvisionerApiMarshallerImpl.scala | 3 + .../ProvisionerApiServiceImpl.scala | 78 ++++++------ .../common/Constants.scala | 1 + .../common/StringUtils.scala | 8 ++ .../error/ErrorType.scala | 5 + .../error/ProvisionErrorType.scala | 9 ++ .../error/ValidationErrorType.scala | 47 ++++++++ .../model/MwaaFields.scala | 10 ++ .../model/ProvisioningRequestDescriptor.scala | 2 +- .../mwaa/MwaaManager.scala | 110 ++++++----------- .../mwaa/MwaaManagerError.scala | 42 ------- .../server/impl/Main.scala | 6 +- .../validation/MwaaValidator.scala | 103 ++++++++++++++++ .../validation/Validator.scala | 9 ++ src/test/resources/application.conf | 3 + .../descriptor_ko_missing_component.yml | 24 ++++ .../descriptor_ko_missing_specifics.yml | 40 +++++++ ...escriptor_ko_wrong_component_id_format.yml | 45 +++++++ .../resources/validation/descriptor_ok.yml | 45 +++++++ .../ProvisionerApiServiceImplSpec.scala | 98 +++++++++++++-- .../model/DescriptorParserSpec.scala | 35 ++++++ .../mwaa/MwaaManagerSpec.scala | 96 +++++++++++++++ .../validation/MwaaValidatorSpec.scala | 112 ++++++++++++++++++ 30 files changed, 821 insertions(+), 218 deletions(-) create mode 100644 src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/common/StringUtils.scala create mode 100644 src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ErrorType.scala create mode 100644 src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ProvisionErrorType.scala create mode 100644 src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ValidationErrorType.scala create mode 100644 src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/MwaaFields.scala delete mode 100644 src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManagerError.scala create mode 100644 src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/MwaaValidator.scala create mode 100644 src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/Validator.scala create mode 100644 src/test/resources/application.conf create mode 100644 src/test/resources/validation/descriptor_ko_missing_component.yml create mode 100644 src/test/resources/validation/descriptor_ko_missing_specifics.yml create mode 100644 src/test/resources/validation/descriptor_ko_wrong_component_id_format.yml create mode 100644 src/test/resources/validation/descriptor_ok.yml create mode 100644 src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManagerSpec.scala create mode 100644 src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/MwaaValidatorSpec.scala diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 2a11d96..71eef5b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -6,19 +6,7 @@ include: ref: 'main' file: 'common/witboost.downstream.gitlab-ci.yml' -image: ubuntu:20.04 - -before_script: - - apt-get update -yqq - - DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt-get -y install tzdata - - apt-get install -yqq openjdk-17-jdk-headless - - apt-get install -yqq gpg - - echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | tee -a /etc/apt/sources.list.d/sbt.list - - mkdir -p /root/.gnupg - - gpg --recv-keys --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --keyserver hkp://keyserver.ubuntu.com:80 2EE0EA64E40A89B84B2DF73499E82A75642AC823 - - chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg - - apt-get update -yqq - - apt-get install -yqq sbt +image: sbtscala/scala-sbt:eclipse-temurin-jammy-17.0.9_9_1.9.7_2.13.12 variables: SBT_OPTS: "-Dsbt.global.base=sbt-cache/sbtboot -Dsbt.boot.directory=sbt-cache/boot -Dsbt.ivy.home=sbt-cache/ivy -Dsbt.ci=true" @@ -33,7 +21,7 @@ cache: stages: - setup - - checkFormatting + - check - test - build - package @@ -52,12 +40,12 @@ setup: dotenv: vars.env checkFormatting: - stage: checkFormatting + stage: check script: - 'sbt scalafmtSbtCheck scalafmtCheckAll' witboost.helm.checks: - stage: checkFormatting + stage: check extends: .witboost.helm.base-job before_script: [] cache: [] @@ -70,30 +58,35 @@ witboost.helm.checks: test: stage: test script: - - apt-get install -yqq npm - - npm install @openapitools/openapi-generator-cli -g - - 'sbt clean generateCode coverage test multi-jvm:test coverageReport' + - apt-get update -yqq && apt-get install -yqq npm + - npm install @openapitools/openapi-generator-cli@2.7.0 -g + - 'sbt clean generateCode coverage test coverageReport' + coverage: '/Statement coverage[A-Za-z\.*]\s*:\s*([^%]+)/' + artifacts: + paths: + - target/scala-2.13/scoverage-report/* + - target/scala-2.13/coverage-report/* + reports: + coverage_report: + coverage_format: cobertura + path: 'target/scala-2.13/coverage-report/cobertura.xml' build: services: - - docker:19.03.12-dind + - docker:24.0.5-dind stage: build variables: DOCKER_HOST: tcp://docker:2375 script: | - apt-get install -yqq curl - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg - echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu focal stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null - apt-get update -yqq - apt-get install -yqq docker-ce-cli - apt-get install -yqq npm - npm install @openapitools/openapi-generator-cli -g + apt-get update -yqq && apt-get install -yqq ca-certificates curl gnupg npm + install -m 0755 -d /etc/apt/keyrings + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg && chmod a+r /etc/apt/keyrings/docker.gpg + echo "deb [arch="$(dpkg --print-architecture)" signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu "$(. /etc/os-release && echo "$VERSION_CODENAME")" stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null + apt-get update -yqq && apt-get install -yqq docker-ce-cli + npm install @openapitools/openapi-generator-cli@2.7.0 -g echo $VERSION docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY sbt clean generateCode compile k8tyGitlabCIPublish docker:publish - artifacts: - reports: - dotenv: vars.env witboost.helm.deploy: stage: package diff --git a/README.md b/README.md index 39c279c..a0ad577 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Designed by [Agile Lab](https://www.agilelab.it/), witboost is a versatile platform that addresses a wide range of sophisticated data engineering challenges. It enables businesses to discover, enhance, and productize their data, fostering the creation of automated data platforms that adhere to the highest standards of data governance. Want to know more about witboost? Check it out [here](https://www.agilelab.it/witboost) or [contact us!](https://www.agilelab.it/contacts). -This repository is part of our Open Source projects meant to showcase witboost's integration capabilities and provide a "batteries-included" product. +This repository is part of our [Starter Kit](https://github.com/agile-lab-dev/witboost-starter-kit) meant to showcase witboost's integration capabilities and provide a "batteries-included" product. # MWAA Specific Provisioner diff --git a/aws-integration/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/s3/gateway/S3GatewayError.scala b/aws-integration/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/s3/gateway/S3GatewayError.scala index 2b5941f..0a3df3a 100644 --- a/aws-integration/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/s3/gateway/S3GatewayError.scala +++ b/aws-integration/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/s3/gateway/S3GatewayError.scala @@ -7,16 +7,36 @@ import it.agilelab.datamesh.mwaaspecificprovisioner.s3.common.ShowableOps.showTh trait S3GatewayError extends Exception with Product with Serializable object S3GatewayError { - final case class S3GatewayInitError(error: Throwable) extends S3GatewayError - final case class ObjectExistsErr(bucket: String, key: String, error: Throwable) extends S3GatewayError - final case class CreateFolderErr(bucket: String, folder: String, error: Throwable) extends S3GatewayError - final case class CreateFileErr(bucket: String, key: String, error: Throwable) extends S3GatewayError - final case class GetObjectContentErr(bucket: String, key: String, error: Throwable) extends S3GatewayError - final case class ListObjectsErr(bucket: String, prefix: String, error: Throwable) extends S3GatewayError - final case class ListVersionsErr(bucket: String, prefix: String, error: Throwable) extends S3GatewayError - final case class ListDeleteMarkersErr(bucket: String, prefix: String, error: Throwable) extends S3GatewayError - final case class CopyObjectErr(source: String, dest: String, error: Throwable) extends S3GatewayError - final case class DeleteObjectErr(bucket: String, obj: String, error: Throwable) extends S3GatewayError + final case class S3GatewayInitError(error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } + final case class ObjectExistsErr(bucket: String, key: String, error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } + final case class CreateFolderErr(bucket: String, folder: String, error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } + final case class CreateFileErr(bucket: String, key: String, error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } + final case class GetObjectContentErr(bucket: String, key: String, error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } + final case class ListObjectsErr(bucket: String, prefix: String, error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } + final case class ListVersionsErr(bucket: String, prefix: String, error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } + final case class ListDeleteMarkersErr(bucket: String, prefix: String, error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } + final case class CopyObjectErr(source: String, dest: String, error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } + final case class DeleteObjectErr(bucket: String, obj: String, error: Throwable) extends S3GatewayError { + override def getMessage: String = error.getMessage + } implicit val showS3GatewayError: Show[S3GatewayError] = Show.show { case e: S3GatewayInitError => show"S3GatewayInitError(${e.error})" diff --git a/build.sbt b/build.sbt index 8213137..4e10266 100644 --- a/build.sbt +++ b/build.sbt @@ -114,16 +114,15 @@ lazy val root = (project in file(".")).settings( name := "datamesh.mwaaspecificprovisioner", Test / parallelExecution := false, dockerBuildOptions ++= Seq("--network=host"), - dockerBaseImage := "adoptopenjdk:11-jdk-hotspot", + dockerBaseImage := "eclipse-temurin:17-jre-jammy", dockerUpdateLatest := true, daemonUser := "daemon", Docker / version := (ThisBuild / version).value, Docker / packageName := s"registry.gitlab.com/agilefactory/witboost.mesh/provisioning/sandbox/witboost.mesh.provisioning.sandbox.mwaaspecificprovisioner", - Docker / dockerExposedPorts := Seq(8080), + Docker / dockerExposedPorts := Seq(8093), onChangedBuildSource := ReloadOnSourceChanges, scalafixOnCompile := true, semanticdbEnabled := true, semanticdbVersion := scalafixSemanticdb.revision -).aggregate(clientGenerated).dependsOn(serverGenerated, awsIntegration).enablePlugins(JavaAppPackaging, MultiJvmPlugin) - .configs(MultiJvm).setupBuildInfo +).aggregate(clientGenerated).dependsOn(serverGenerated, awsIntegration).enablePlugins(JavaAppPackaging).setupBuildInfo diff --git a/helm/files/application.conf b/helm/files/application.conf index f4ec1a8..8fe573e 100644 --- a/helm/files/application.conf +++ b/helm/files/application.conf @@ -1,5 +1,5 @@ akka { - loglevel = "OFF" + loglevel = "INFO" actor.warn-about-java-serializer-usage = on actor.allow-java-serialization = off coordinated-shutdown.exit-jvm = on diff --git a/project/plugins.sbt b/project/plugins.sbt index 26568d9..dc2c422 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -8,8 +8,6 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.9") -addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0") - addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.34") diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 9c096e3..cd74fcb 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -1,5 +1,5 @@ akka { - loglevel = "OFF" + loglevel = "INFO" actor.warn-about-java-serializer-usage = on actor.allow-java-serialization = off coordinated-shutdown.exit-jvm = on diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/intepreter/ProvisionerApiMarshallerImpl.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/intepreter/ProvisionerApiMarshallerImpl.scala index 0c668bf..3028c6a 100644 --- a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/intepreter/ProvisionerApiMarshallerImpl.scala +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/intepreter/ProvisionerApiMarshallerImpl.scala @@ -133,6 +133,9 @@ class ProvisionerApiMarshallerImpl extends SpecificProvisionerApiMarshaller { )) } + implicit def toEntityMarshallerReverseProvisioningRequest: ToEntityMarshaller[ReverseProvisioningRequest] = + marshaller[ReverseProvisioningRequest] + implicit def fromEntityUnmarshallerReverseProvisioningRequest: FromEntityUnmarshaller[ReverseProvisioningRequest] = unmarshaller[ReverseProvisioningRequest] diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/intepreter/ProvisionerApiServiceImpl.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/intepreter/ProvisionerApiServiceImpl.scala index cee5fd8..c799d8e 100644 --- a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/intepreter/ProvisionerApiServiceImpl.scala +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/intepreter/ProvisionerApiServiceImpl.scala @@ -3,14 +3,12 @@ package it.agilelab.datamesh.mwaaspecificprovisioner.api.intepreter import akka.http.scaladsl.marshalling.ToEntityMarshaller import akka.http.scaladsl.server.Route import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller -import cats.data.NonEmptyList -import cats.implicits.toShow +import cats.data.Validated.{Invalid, Valid} import com.typesafe.scalalogging.LazyLogging import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport.{marshaller, unmarshaller} import it.agilelab.datamesh.mwaaspecificprovisioner.api.SpecificProvisionerApiService import it.agilelab.datamesh.mwaaspecificprovisioner.model._ -import it.agilelab.datamesh.mwaaspecificprovisioner.mwaa.{MwaaManager, MwaaManagerError} -import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.S3GatewayError +import it.agilelab.datamesh.mwaaspecificprovisioner.mwaa.MwaaManager class ProvisionerApiServiceImpl(mwaaManager: MwaaManager) extends SpecificProvisionerApiService with LazyLogging { @@ -59,23 +57,18 @@ class ProvisionerApiServiceImpl(mwaaManager: MwaaManager) extends SpecificProvis toEntityMarshallerRequestValidationError: ToEntityMarshaller[RequestValidationError], toEntityMarshallerSystemError: ToEntityMarshaller[SystemError], toEntityMarshallerProvisioningStatus: ToEntityMarshaller[ProvisioningStatus] - ): Route = ProvisioningRequestDescriptor(provisioningRequest.descriptor).flatMap(mwaaManager.executeProvision) match { - case Left(e: S3GatewayError) => - logger.error(e.show) - provision500(SystemError(e.show)) - case Left(e: MwaaManagerError) => - logger.error(e.errorMessage) - provision500(SystemError(e.errorMessage)) - case Left(e: NonEmptyList[_]) => - logger.error(e.head.toString) - provision400(RequestValidationError(e.toList.map(_.toString))) - case Right(_) => - logger.info("OK") - provision200(ProvisioningStatus(ProvisioningStatusEnums.StatusEnum.COMPLETED, "OK")) - case other => - logger.error("Generic Error. Received {}", other) - provision500(SystemError("Generic Error")) - } + ): Route = + try mwaaManager.executeProvision(provisioningRequest.descriptor) match { + case Valid(_) => provision200(ProvisioningStatus(ProvisioningStatusEnums.StatusEnum.COMPLETED, "OK")) + case Invalid(e) => provision400(RequestValidationError(e.toList.map(_.errorMessage))) + } + catch { + case t: Throwable => + logger.error(s"Exception in provision", t) + provision500(SystemError( + s"An unexpected error occurred while processing the request. Please try again and if the problem persists contact the platform team. Details: ${t.getMessage}" + )) + } /** Code: 200, Message: It synchronously returns the request result, DataType: String * Code: 400, Message: Invalid input, DataType: RequestValidationError @@ -85,7 +78,20 @@ class ProvisionerApiServiceImpl(mwaaManager: MwaaManager) extends SpecificProvis contexts: Seq[(String, String)], toEntityMarshallerSystemError: ToEntityMarshaller[SystemError], toEntityMarshallerValidationResult: ToEntityMarshaller[ValidationResult] - ): Route = validate200(ValidationResult(valid = true)) + ): Route = + try mwaaManager.executeValidation(provisioningRequest.descriptor) match { + case Valid(_) => validate200(ValidationResult(valid = true)) + case Invalid(e) => + val errors = e.map(_.errorMessage).toList + validate200(ValidationResult(valid = false, error = Some(ValidationError(errors)))) + } + catch { + case t: Throwable => + logger.error(s"Exception in validate", t) + validate500(SystemError( + s"An unexpected error occurred while processing the request. Please try again and if the problem persists contact the platform team. Details: ${t.getMessage}" + )) + } /** Code: 200, Message: It synchronously returns the request result, DataType: ProvisioningStatus * Code: 202, Message: If successful returns a provisioning deployment task token that can be used for polling the request status, DataType: String @@ -98,22 +104,16 @@ class ProvisionerApiServiceImpl(mwaaManager: MwaaManager) extends SpecificProvis toEntityMarshallerSystemError: ToEntityMarshaller[SystemError], toEntityMarshallerProvisioningStatus: ToEntityMarshaller[ProvisioningStatus] ): Route = - ProvisioningRequestDescriptor(provisioningRequest.descriptor).flatMap(mwaaManager.executeUnprovision) match { - case Left(e: S3GatewayError) => - logger.error(e.show) - provision500(SystemError(e.show)) - case Left(e: MwaaManagerError) => - logger.error(e.errorMessage) - provision500(SystemError(e.errorMessage)) - case Left(e: NonEmptyList[_]) => - logger.error(e.head.toString) - provision400(RequestValidationError(e.toList.map(_.toString))) - case Right(_) => - logger.info("OK") - provision200(ProvisioningStatus(ProvisioningStatusEnums.StatusEnum.COMPLETED, "OK")) - case other => - logger.error("Generic Error. Received {}", other) - provision500(SystemError("Generic Error")) + try mwaaManager.executeUnprovision(provisioningRequest.descriptor) match { + case Valid(_) => unprovision200(ProvisioningStatus(ProvisioningStatusEnums.StatusEnum.COMPLETED, "OK")) + case Invalid(e) => unprovision400(RequestValidationError(e.toList.map(_.errorMessage))) + } + catch { + case t: Throwable => + logger.error(s"Exception in unprovision", t) + unprovision500(SystemError( + s"An unexpected error occurred while processing the request. Please try again and if the problem persists contact the platform team. Details: ${t.getMessage}" + )) } /** Code: 200, Message: It synchronously returns the access request response, DataType: ProvisioningStatus @@ -126,7 +126,7 @@ class ProvisionerApiServiceImpl(mwaaManager: MwaaManager) extends SpecificProvis toEntityMarshallerRequestValidationError: ToEntityMarshaller[RequestValidationError], toEntityMarshallerSystemError: ToEntityMarshaller[SystemError], toEntityMarshallerProvisioningStatus: ToEntityMarshaller[ProvisioningStatus] - ): Route = updateacl200(ProvisioningStatus(ProvisioningStatusEnums.StatusEnum.COMPLETED, "OK")) + ): Route = updateacl500(NotImplementedError) /** Code: 202, Message: It returns a token that can be used for polling the async validation operation status and results, DataType: String * Code: 400, Message: Invalid input, DataType: RequestValidationError diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/common/Constants.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/common/Constants.scala index 33840bc..eb8109a 100644 --- a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/common/Constants.scala +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/common/Constants.scala @@ -15,4 +15,5 @@ object Constants { val SOURCE_DAG_PATH_FIELD = "sourcePath" val BUCKET_NAME_FIELD = "bucketName" val DAG_NAME_FIELD = "dagName" + val SCHEDULE_CRON_FIELD = "scheduleCron" } diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/common/StringUtils.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/common/StringUtils.scala new file mode 100644 index 0000000..3ceec7e --- /dev/null +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/common/StringUtils.scala @@ -0,0 +1,8 @@ +package it.agilelab.datamesh.mwaaspecificprovisioner.common + +object StringUtils { + + implicit class StringImplicits(val s: String) { + def ensureTrailingSlash: String = if (s.endsWith("/")) s else s"$s/" + } +} diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ErrorType.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ErrorType.scala new file mode 100644 index 0000000..7b971bb --- /dev/null +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ErrorType.scala @@ -0,0 +1,5 @@ +package it.agilelab.datamesh.mwaaspecificprovisioner.error + +trait ErrorType { + def errorMessage: String +} diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ProvisionErrorType.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ProvisionErrorType.scala new file mode 100644 index 0000000..818420d --- /dev/null +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ProvisionErrorType.scala @@ -0,0 +1,9 @@ +package it.agilelab.datamesh.mwaaspecificprovisioner.error + +import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.S3GatewayError + +case class ProvisionErrorType(error: S3GatewayError) extends ErrorType { + + override def errorMessage: String = + s"An error occurred while provisioning/unprovisioning the component. Details: ${error.getMessage}" +} diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ValidationErrorType.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ValidationErrorType.scala new file mode 100644 index 0000000..f7d72b0 --- /dev/null +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/error/ValidationErrorType.scala @@ -0,0 +1,47 @@ +package it.agilelab.datamesh.mwaaspecificprovisioner.error + +import cats.data.NonEmptyList + +trait ValidationErrorType extends ErrorType + +case class InvalidDescriptor(errors: NonEmptyList[String]) extends ValidationErrorType { + override def errorMessage: String = s"Descriptor is not valid. Details: ${errors.toList.mkString(",")}" +} + +case class InvalidComponent(componentId: String) extends ValidationErrorType { + override def errorMessage: String = s"The component '$componentId' to provision is not present" +} + +case class InvalidComponentId(componentId: String) extends ValidationErrorType { + override def errorMessage: String = s"The componentId '$componentId' is not valid" +} + +case class InvalidDagName(fieldName: String, error: Throwable) extends ValidationErrorType { + override def errorMessage: String = s"The $fieldName field is not present or is invalid. Details: ${error.getMessage}" +} + +case class InvalidDestinationPath(fieldName: String, error: Throwable) extends ValidationErrorType { + override def errorMessage: String = s"The $fieldName field is not present or is invalid. Details: ${error.getMessage}" +} + +case class InvalidSourcePath(fieldName: String, error: Throwable) extends ValidationErrorType { + override def errorMessage: String = s"The $fieldName field is not present or is invalid. Details: ${error.getMessage}" +} + +case class InvalidBucketName(fieldName: String, error: Throwable) extends ValidationErrorType { + override def errorMessage: String = s"The $fieldName field is not present or is invalid. Details: ${error.getMessage}" +} + +case class InvalidScheduleCron(fieldName: String, error: Throwable) extends ValidationErrorType { + override def errorMessage: String = s"The $fieldName field is not present or is invalid. Details: ${error.getMessage}" +} + +case class ErrorSourceFile(bucket: String, key: String, error: Throwable) extends ValidationErrorType { + + override def errorMessage: String = + s"An error occurred while verifying existence of the file $key in bucket $bucket: ${error.getMessage}" +} + +case class MissingSourceFile(bucket: String, key: String) extends ValidationErrorType { + override def errorMessage: String = s"File $key not found in bucket $bucket" +} diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/MwaaFields.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/MwaaFields.scala new file mode 100644 index 0000000..eac89f6 --- /dev/null +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/MwaaFields.scala @@ -0,0 +1,10 @@ +package it.agilelab.datamesh.mwaaspecificprovisioner.model + +case class MwaaFields( + dagName: String, + component: ComponentDescriptor, + destinationPath: String, + sourcePath: String, + bucketName: String, + prefix: String +) diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/ProvisioningRequestDescriptor.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/ProvisioningRequestDescriptor.scala index bcc4f76..d271f51 100644 --- a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/ProvisioningRequestDescriptor.scala +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/ProvisioningRequestDescriptor.scala @@ -19,7 +19,7 @@ object ProvisioningRequestDescriptor { } def apply(yaml: String): EitherNel[String, ProvisioningRequestDescriptor] = { - val maybePr: Either[Serializable, ProvisioningRequestDescriptor] = for { + val maybePr: Either[String, ProvisioningRequestDescriptor] = for { dataProduct <- DataProductDescriptor(yaml) componentIdToProvision <- getComponentIdToProvision(yaml) } yield ProvisioningRequestDescriptor(dataProduct, componentIdToProvision) diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManager.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManager.scala index 0fdd960..0d46af6 100644 --- a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManager.scala +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManager.scala @@ -1,81 +1,41 @@ package it.agilelab.datamesh.mwaaspecificprovisioner.mwaa -import cats.implicits.toBifunctorOps +import cats.data.ValidatedNel +import cats.implicits._ import com.typesafe.scalalogging.LazyLogging +import it.agilelab.datamesh.mwaaspecificprovisioner.common.StringUtils.StringImplicits +import it.agilelab.datamesh.mwaaspecificprovisioner.error.{ErrorType, ProvisionErrorType, ValidationErrorType} +import it.agilelab.datamesh.mwaaspecificprovisioner.model.MwaaFields import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.S3Gateway -import it.agilelab.datamesh.mwaaspecificprovisioner.common.Constants -import it.agilelab.datamesh.mwaaspecificprovisioner.model.{ComponentDescriptor, ProvisioningRequestDescriptor} +import it.agilelab.datamesh.mwaaspecificprovisioner.validation.Validator + +class MwaaManager(s3Client: S3Gateway, mwaaValidator: Validator) extends LazyLogging { + + def executeValidation(descriptor: String): ValidatedNel[ValidationErrorType, MwaaFields] = mwaaValidator + .validate(descriptor) + + def executeProvision(descriptor: String): ValidatedNel[ErrorType, Unit] = mwaaValidator.validate(descriptor) + .andThen { mwaaFields => + s3Client.copyObject( + mwaaFields.bucketName, + s"${mwaaFields.destinationPath.ensureTrailingSlash}${mwaaFields.prefix}.${mwaaFields.dagName}", + mwaaFields.bucketName, + s"${mwaaFields.sourcePath.ensureTrailingSlash}${mwaaFields.prefix}.${mwaaFields.dagName}" + ).leftMap { e => + logger.error(s"Error in executeProvision: ${e.show}") + ProvisionErrorType(e) + }.toValidatedNel + } + + def executeUnprovision(descriptor: String): ValidatedNel[ErrorType, Unit] = mwaaValidator.validate(descriptor) + .andThen { mwaaFields => + s3Client.deleteObject( + mwaaFields.bucketName, + s"${mwaaFields.destinationPath.ensureTrailingSlash}${mwaaFields.prefix}.${mwaaFields.dagName}" + ).leftMap { e => + logger.error(s"Error in executeUnprovision: ${e.show}") + ProvisionErrorType(e) + }.toValidatedNel + } -class MwaaManager(s3Client: S3Gateway) extends LazyLogging { - - def executeProvision(descriptor: ProvisioningRequestDescriptor): Either[Product, Unit] = { - logger.info("Starting executing executeProvision method") - for { - dagName <- getDagName(descriptor) - component <- getComponent(descriptor) - destinationPath <- getDestinationPath(descriptor) - sourcePath <- getSourcePath(descriptor) - bucketName <- getBucketName(descriptor) - urnArray = component.id.split(":") - prefix = s"${urnArray(3)}.${urnArray(4)}.${urnArray(5)}." - _ <- s3Client - .copyObject(bucketName, s"$destinationPath$prefix$dagName", bucketName, s"$sourcePath$prefix$dagName") - } yield () - } - - def executeUnprovision(descriptor: ProvisioningRequestDescriptor): Either[Product, Unit] = { - logger.info("Starting executing executeUnprovision method") - for { - dagName <- getDagName(descriptor) - bucketName <- getBucketName(descriptor) - destinationPath <- getDestinationPath(descriptor) - component <- getComponent(descriptor) - urnArray = component.id.split(":") - prefix = s"${urnArray(3)}.${urnArray(4)}.${urnArray(5)}." - _ <- s3Client.deleteObject(bucketName, s"$destinationPath$prefix$dagName") - } yield () - } - - def getDagName(descriptor: ProvisioningRequestDescriptor): Either[MwaaManagerError with Product, String] = { - logger.info("Starting executing getDagName method") - for { - component <- descriptor.getComponentToProvision - .toRight(GetDagName(descriptor, "Unable to find the component to provision")) - dagName <- component.specific.hcursor.downField(Constants.DAG_NAME_FIELD).as[String] - .leftMap(error => GetDagName(descriptor, error.getMessage)) - } yield dagName - } - - def getBucketName(descriptor: ProvisioningRequestDescriptor): Either[MwaaManagerError with Product, String] = { - logger.info("Starting executing getBucketName method") - for { - component <- getComponent(descriptor) - sourceBucket <- component.specific.hcursor.downField(Constants.BUCKET_NAME_FIELD).as[String] - .leftMap(error => GetBucketNameError(descriptor, error.getMessage)) - } yield sourceBucket - } - - def getDestinationPath(descriptor: ProvisioningRequestDescriptor): Either[MwaaManagerError with Product, String] = { - logger.info("Starting executing getDestinationPath method") - for { - component <- descriptor.getComponentToProvision - .toRight(GetDestinationPathError(descriptor, "Unable to find the component to provision")) - dagName <- component.specific.hcursor.downField(Constants.DESTINATION_DAG_PATH_FIELD).as[String] - .leftMap(error => GetDestinationPathError(descriptor, error.getMessage)) - } yield dagName - } - - def getSourcePath(descriptor: ProvisioningRequestDescriptor): Either[MwaaManagerError with Product, String] = { - logger.info("Starting executing getSourcePath method") - for { - component <- getComponent(descriptor) - sourceBucket <- component.specific.hcursor.downField(Constants.SOURCE_DAG_PATH_FIELD).as[String] - .leftMap(error => GetSourcePathError(descriptor, error.getMessage)) - } yield sourceBucket - } - - private def getComponent( - descriptor: ProvisioningRequestDescriptor - ): Either[MwaaManagerError with Product, ComponentDescriptor] = descriptor.getComponentToProvision - .toRight(GetComponentError(descriptor, "Unable to find the component to provision")) } diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManagerError.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManagerError.scala deleted file mode 100644 index acf663a..0000000 --- a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManagerError.scala +++ /dev/null @@ -1,42 +0,0 @@ -package it.agilelab.datamesh.mwaaspecificprovisioner.mwaa - -import it.agilelab.datamesh.mwaaspecificprovisioner.model.ProvisioningRequestDescriptor - -sealed trait MwaaManagerError { - def errorMessage: String -} - -final case class SendRequestError(dagName: String, error: String) extends MwaaManagerError { - override def errorMessage: String = s"SendRequestError($dagName, $error)" -} - -final case class GetDagName(descriptor: ProvisioningRequestDescriptor, error: String) extends MwaaManagerError { - override def errorMessage: String = s"GetDagName($descriptor, $error)" -} - -final case class GetBucketNameError(descriptor: ProvisioningRequestDescriptor, error: String) extends MwaaManagerError { - override def errorMessage: String = s"GetBucketNameError($descriptor, $error)" -} - -final case class GetDestinationPathError(descriptor: ProvisioningRequestDescriptor, error: String) - extends MwaaManagerError { - override def errorMessage: String = s"GetDestinationPathError($descriptor, $error)" -} - -final case class GetSourcePathError(descriptor: ProvisioningRequestDescriptor, error: String) extends MwaaManagerError { - override def errorMessage: String = s"GetSourcePathError($descriptor, $error)" -} - -final case class GetComponentError(descriptor: ProvisioningRequestDescriptor, error: String) extends MwaaManagerError { - override def errorMessage: String = s"GetComponentError($descriptor, $error)" -} - -final case class GetComponentNameError(descriptor: ProvisioningRequestDescriptor, error: String) - extends MwaaManagerError { - override def errorMessage: String = s"GetComponentNameError($descriptor, $error)" -} - -final case class ExecuteProvisionError(descriptor: ProvisioningRequestDescriptor, error: String) - extends MwaaManagerError { - override def errorMessage: String = s"ExecuteProvisionError($descriptor, $error)" -} diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/server/impl/Main.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/server/impl/Main.scala index bffa302..590751d 100644 --- a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/server/impl/Main.scala +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/server/impl/Main.scala @@ -17,6 +17,7 @@ import it.agilelab.datamesh.mwaaspecificprovisioner.mwaa.MwaaManager import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.{S3Gateway, S3GatewayMock} import it.agilelab.datamesh.mwaaspecificprovisioner.server.Controller import it.agilelab.datamesh.mwaaspecificprovisioner.system.ApplicationConfiguration.{httpPort, isMock} +import it.agilelab.datamesh.mwaaspecificprovisioner.validation.MwaaValidator import scala.jdk.CollectionConverters._ @@ -27,8 +28,9 @@ object Main extends LazyLogging { import akka.actor.typed.scaladsl.adapter._ implicit val classicSystem: actor.ActorSystem = context.system.toClassic - val manager = new MwaaManager(clientAws) - val impl = new ProvisionerApiServiceImpl(manager) + val validator = new MwaaValidator(clientAws) + val manager = new MwaaManager(clientAws, validator) + val impl = new ProvisionerApiServiceImpl(manager) val api = new SpecificProvisionerApi( impl, diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/MwaaValidator.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/MwaaValidator.scala new file mode 100644 index 0000000..9106fca --- /dev/null +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/MwaaValidator.scala @@ -0,0 +1,103 @@ +package it.agilelab.datamesh.mwaaspecificprovisioner.validation + +import cats.data.{Validated, ValidatedNel} +import it.agilelab.datamesh.mwaaspecificprovisioner.error.{ + ErrorSourceFile, + InvalidBucketName, + InvalidComponent, + InvalidComponentId, + InvalidDagName, + InvalidDescriptor, + InvalidDestinationPath, + InvalidScheduleCron, + InvalidSourcePath, + MissingSourceFile, + ValidationErrorType +} +import it.agilelab.datamesh.mwaaspecificprovisioner.model.{ + ComponentDescriptor, + MwaaFields, + ProvisioningRequestDescriptor +} +import cats.implicits._ +import com.typesafe.scalalogging.LazyLogging +import it.agilelab.datamesh.mwaaspecificprovisioner.common.Constants +import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.S3Gateway + +class MwaaValidator(s3Gateway: S3Gateway) extends Validator with LazyLogging { + + private def validateComponent( + provisioningRequestDescriptor: ProvisioningRequestDescriptor + ): ValidatedNel[ValidationErrorType, ComponentDescriptor] = provisioningRequestDescriptor.getComponentToProvision + .toRight(InvalidComponent(provisioningRequestDescriptor.componentIdToProvision)).toValidatedNel + + private def validateComponentId(c: ComponentDescriptor): ValidatedNel[ValidationErrorType, String] = { + val urnArray = c.id.split(":") + Validated.condNel(urnArray.length >= 6, s"${urnArray(3)}.${urnArray(4)}.${urnArray(5)}", InvalidComponentId(c.id)) + } + + private def validateDagName(c: ComponentDescriptor): ValidatedNel[ValidationErrorType, String] = c.specific.hcursor + .downField(Constants.DAG_NAME_FIELD).as[String].leftMap { error => + logger.error("Error in validateDagName", error) + InvalidDagName(Constants.DAG_NAME_FIELD, error) + }.toValidatedNel + + private def validateDestinationPath(c: ComponentDescriptor): ValidatedNel[ValidationErrorType, String] = c.specific + .hcursor.downField(Constants.DESTINATION_DAG_PATH_FIELD).as[String].leftMap { error => + logger.error("Error in validateDestinationPath", error) + InvalidDestinationPath(Constants.DESTINATION_DAG_PATH_FIELD, error) + }.toValidatedNel + + private def validateSourcePath(c: ComponentDescriptor): ValidatedNel[ValidationErrorType, String] = c.specific.hcursor + .downField(Constants.SOURCE_DAG_PATH_FIELD).as[String].leftMap { error => + logger.error("Error in validateSourcePath", error) + InvalidSourcePath(Constants.SOURCE_DAG_PATH_FIELD, error) + }.toValidatedNel + + private def validateBucketName(c: ComponentDescriptor): ValidatedNel[ValidationErrorType, String] = c.specific.hcursor + .downField(Constants.BUCKET_NAME_FIELD).as[String].leftMap { error => + logger.error("Error in validateBucketName", error) + InvalidBucketName(Constants.BUCKET_NAME_FIELD, error) + }.toValidatedNel + + private def validateScheduleCron(c: ComponentDescriptor): ValidatedNel[ValidationErrorType, String] = c.specific + .hcursor.downField(Constants.SCHEDULE_CRON_FIELD).as[String].leftMap { error => + logger.error("Error in validateScheduleCron", error) + InvalidScheduleCron(Constants.SCHEDULE_CRON_FIELD, error) + }.toValidatedNel + + private def sourceFileExists(mwaaFields: MwaaFields): ValidatedNel[ValidationErrorType, MwaaFields] = s3Gateway + .objectExists(mwaaFields.bucketName, s"${mwaaFields.sourcePath}${mwaaFields.prefix}${mwaaFields.dagName}") + .leftMap { error => + logger.error(s"Error in sourceFileExists ${error.show}") + ErrorSourceFile( + mwaaFields.bucketName, + s"${mwaaFields.sourcePath}${mwaaFields.prefix}${mwaaFields.dagName}", + error + ) + }.toValidatedNel.andThen(exists => + Validated.condNel( + exists, + mwaaFields, + MissingSourceFile(mwaaFields.bucketName, s"${mwaaFields.sourcePath}${mwaaFields.prefix}${mwaaFields.dagName}") + ) + ) + + override def validate(descriptor: String): ValidatedNel[ValidationErrorType, MwaaFields] = + ProvisioningRequestDescriptor(descriptor).leftMap(errors => InvalidDescriptor(errors)).toValidatedNel + .andThen(provisioningRequestDescriptor => + validateComponent(provisioningRequestDescriptor).andThen { c => + ( + validateDagName(c), + validateDestinationPath(c), + validateSourcePath(c), + validateBucketName(c), + validateComponentId(c), + validateScheduleCron(c) + ).mapN((dagName, destinationPath, sourcePath, bucketName, prefix, _) => + MwaaFields(dagName, c, destinationPath, sourcePath, bucketName, prefix) + ).andThen(mwaaFields => sourceFileExists(mwaaFields)) + } + ) + +} diff --git a/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/Validator.scala b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/Validator.scala new file mode 100644 index 0000000..7a32da4 --- /dev/null +++ b/src/main/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/Validator.scala @@ -0,0 +1,9 @@ +package it.agilelab.datamesh.mwaaspecificprovisioner.validation + +import cats.data.ValidatedNel +import it.agilelab.datamesh.mwaaspecificprovisioner.error.ValidationErrorType +import it.agilelab.datamesh.mwaaspecificprovisioner.model.MwaaFields + +trait Validator { + def validate(descriptor: String): ValidatedNel[ValidationErrorType, MwaaFields] +} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf new file mode 100644 index 0000000..80245d0 --- /dev/null +++ b/src/test/resources/application.conf @@ -0,0 +1,3 @@ +akka { + coordinated-shutdown.exit-jvm = off +} \ No newline at end of file diff --git a/src/test/resources/validation/descriptor_ko_missing_component.yml b/src/test/resources/validation/descriptor_ko_missing_component.yml new file mode 100644 index 0000000..71cdbff --- /dev/null +++ b/src/test/resources/validation/descriptor_ko_missing_component.yml @@ -0,0 +1,24 @@ +dataProduct: + dataProductOwnerDisplayName: Cristian Astorino + environment: development + domain: demographic + kind: dataproduct + domainId: urn:dmb:dmn:demographic + id: urn:dmb:dp:demographic:dp-to-test-mwaa-fix:0 + description: DP to test MWAA fix WIT-567 + devGroup: agile_lab + ownerGroup: cristian.astorino_agilelab.it + dataProductOwner: user:cristian.astorino_agilelab.it + email: cristian.astorino@agilelab.it + version: 0.1.0-SNAPSHOT-1 + fullyQualifiedName: null + name: DP to test MWAA fix + informationSLA: null + maturity: null + useCaseTemplateId: urn:dmb:utm:dataproduct-template:0.0.0 + infrastructureTemplateId: urn:dmb:itm:dataproduct-provisioner:1 + billing: {} + tags: [] + specific: {} + components: [] +componentIdToProvision: urn:dmb:cmp:demographic:dp-to-test-mwaa-fix:0:airflow-workload-test-fix \ No newline at end of file diff --git a/src/test/resources/validation/descriptor_ko_missing_specifics.yml b/src/test/resources/validation/descriptor_ko_missing_specifics.yml new file mode 100644 index 0000000..048b59d --- /dev/null +++ b/src/test/resources/validation/descriptor_ko_missing_specifics.yml @@ -0,0 +1,40 @@ +dataProduct: + dataProductOwnerDisplayName: Cristian Astorino + environment: development + domain: demographic + kind: dataproduct + domainId: urn:dmb:dmn:demographic + id: urn:dmb:dp:demographic:dp-to-test-mwaa-fix:0 + description: DP to test MWAA fix WIT-567 + devGroup: agile_lab + ownerGroup: cristian.astorino_agilelab.it + dataProductOwner: user:cristian.astorino_agilelab.it + email: cristian.astorino@agilelab.it + version: 0.1.0-SNAPSHOT-1 + fullyQualifiedName: null + name: DP to test MWAA fix + informationSLA: null + maturity: null + useCaseTemplateId: urn:dmb:utm:dataproduct-template:0.0.0 + infrastructureTemplateId: urn:dmb:itm:dataproduct-provisioner:1 + billing: {} + tags: [] + specific: {} + components: + - kind: workload + id: urn:dmb:cmp:demographic:dp-to-test-mwaa-fix:0:airflow-workload-test-fix + description: Airflow Workload Test Fix WIT-567 + name: Airflow Workload Test Fix + fullyQualifiedName: null + version: 0.0.0 + infrastructureTemplateId: urn:dmb:itm:aws-workload-airflow-provisioner:0 + useCaseTemplateId: urn:dmb:utm:aws-airflow-workload-template:0.0.0 + dependsOn: [] + platform: AWS + technology: airflow + workloadType: batch + connectionType: DataPipeline + tags: [] + readsFrom: [] + specific: {} +componentIdToProvision: urn:dmb:cmp:demographic:dp-to-test-mwaa-fix:0:airflow-workload-test-fix \ No newline at end of file diff --git a/src/test/resources/validation/descriptor_ko_wrong_component_id_format.yml b/src/test/resources/validation/descriptor_ko_wrong_component_id_format.yml new file mode 100644 index 0000000..aaaf10c --- /dev/null +++ b/src/test/resources/validation/descriptor_ko_wrong_component_id_format.yml @@ -0,0 +1,45 @@ +dataProduct: + dataProductOwnerDisplayName: Cristian Astorino + environment: development + domain: demographic + kind: dataproduct + domainId: urn:dmb:dmn:demographic + id: urn:dmb:dp:demographic:dp-to-test-mwaa-fix:0 + description: DP to test MWAA fix WIT-567 + devGroup: agile_lab + ownerGroup: cristian.astorino_agilelab.it + dataProductOwner: user:cristian.astorino_agilelab.it + email: cristian.astorino@agilelab.it + version: 0.1.0-SNAPSHOT-1 + fullyQualifiedName: null + name: DP to test MWAA fix + informationSLA: null + maturity: null + useCaseTemplateId: urn:dmb:utm:dataproduct-template:0.0.0 + infrastructureTemplateId: urn:dmb:itm:dataproduct-provisioner:1 + billing: {} + tags: [] + specific: {} + components: + - kind: workload + id: urn:dmb:cmp:demographic:dp-to-test-mwaa-fix + description: Airflow Workload Test Fix WIT-567 + name: Airflow Workload Test Fix + fullyQualifiedName: null + version: 0.0.0 + infrastructureTemplateId: urn:dmb:itm:aws-workload-airflow-provisioner:0 + useCaseTemplateId: urn:dmb:utm:aws-airflow-workload-template:0.0.0 + dependsOn: [] + platform: AWS + technology: airflow + workloadType: batch + connectionType: DataPipeline + tags: [] + readsFrom: [] + specific: + scheduleCron: 5 5 * * * + dagName: airbyte_snowflake_dag_custom_wit_567.py + destinationPath: dags/ + sourcePath: source/ + bucketName: sandbox-qa-mwaa-eu-west-1-278435202378 +componentIdToProvision: urn:dmb:cmp:demographic:dp-to-test-mwaa-fix \ No newline at end of file diff --git a/src/test/resources/validation/descriptor_ok.yml b/src/test/resources/validation/descriptor_ok.yml new file mode 100644 index 0000000..cdff0ab --- /dev/null +++ b/src/test/resources/validation/descriptor_ok.yml @@ -0,0 +1,45 @@ +dataProduct: + dataProductOwnerDisplayName: Cristian Astorino + environment: development + domain: demographic + kind: dataproduct + domainId: urn:dmb:dmn:demographic + id: urn:dmb:dp:demographic:dp-to-test-mwaa-fix:0 + description: DP to test MWAA fix WIT-567 + devGroup: agile_lab + ownerGroup: cristian.astorino_agilelab.it + dataProductOwner: user:cristian.astorino_agilelab.it + email: cristian.astorino@agilelab.it + version: 0.1.0-SNAPSHOT-1 + fullyQualifiedName: null + name: DP to test MWAA fix + informationSLA: null + maturity: null + useCaseTemplateId: urn:dmb:utm:dataproduct-template:0.0.0 + infrastructureTemplateId: urn:dmb:itm:dataproduct-provisioner:1 + billing: {} + tags: [] + specific: {} + components: + - kind: workload + id: urn:dmb:cmp:demographic:dp-to-test-mwaa-fix:0:airflow-workload-test-fix + description: Airflow Workload Test Fix WIT-567 + name: Airflow Workload Test Fix + fullyQualifiedName: null + version: 0.0.0 + infrastructureTemplateId: urn:dmb:itm:aws-workload-airflow-provisioner:0 + useCaseTemplateId: urn:dmb:utm:aws-airflow-workload-template:0.0.0 + dependsOn: [] + platform: AWS + technology: airflow + workloadType: batch + connectionType: DataPipeline + tags: [] + readsFrom: [] + specific: + scheduleCron: 5 5 * * * + dagName: airbyte_snowflake_dag_custom_wit_567.py + destinationPath: dags/ + sourcePath: source/ + bucketName: sandbox-qa-mwaa-eu-west-1-278435202378 +componentIdToProvision: urn:dmb:cmp:demographic:dp-to-test-mwaa-fix:0:airflow-workload-test-fix \ No newline at end of file diff --git a/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/interpreter/ProvisionerApiServiceImplSpec.scala b/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/interpreter/ProvisionerApiServiceImplSpec.scala index 99be0ed..1e77fff 100644 --- a/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/interpreter/ProvisionerApiServiceImplSpec.scala +++ b/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/api/interpreter/ProvisionerApiServiceImplSpec.scala @@ -6,16 +6,21 @@ import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.server.{Directive1, RequestContext, Route} import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} import akka.testkit.TestDuration +import cats.data.{NonEmptyList, Validated} +import cats.data.Validated.Valid import com.typesafe.scalalogging.StrictLogging import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport +import io.circe.Json import it.agilelab.datamesh.mwaaspecificprovisioner.api.SpecificProvisionerApi import it.agilelab.datamesh.mwaaspecificprovisioner.api.intepreter.{ ProvisionerApiMarshallerImpl, ProvisionerApiServiceImpl } import it.agilelab.datamesh.mwaaspecificprovisioner.common.test.getTestResourceAsString +import it.agilelab.datamesh.mwaaspecificprovisioner.error.{InvalidDagName, InvalidDescriptor, ProvisionErrorType} import it.agilelab.datamesh.mwaaspecificprovisioner.model._ -import it.agilelab.datamesh.mwaaspecificprovisioner.mwaa.{MwaaManager, SendRequestError} +import it.agilelab.datamesh.mwaaspecificprovisioner.mwaa.MwaaManager +import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.S3GatewayError.S3GatewayInitError import it.agilelab.datamesh.mwaaspecificprovisioner.server.Controller import org.scalamock.scalatest.MockFactory import org.scalatest.flatspec.AnyFlatSpec @@ -65,7 +70,8 @@ class ProvisionerApiServiceImplSpec "synchronously validate with no errors when a valid descriptor is passed as input" in { val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = "valid", removeData = false) - /* TODO when validation is done */ + val _ = (mwaaManager.executeValidation _).expects(*) + .returns(Valid(MwaaFields("", ComponentDescriptor("", "", Json.Null, Json.Null), "", "", "", ""))) Post("/v1/validate", request) ~> api.route ~> check { val response = responseAs[ValidationResult] @@ -74,11 +80,33 @@ class ProvisionerApiServiceImplSpec } } + it should "return a validation error if validation fails" in { + val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = "invalid", removeData = false) + + val _ = (mwaaManager.executeValidation _).expects(*) + .returns(Validated.invalidNel(InvalidDagName("", new Throwable("")))) + + Post("/v1/validate", request) ~> api.route ~> check { + val response = responseAs[ValidationResult] + response.valid shouldEqual false + response.error.isDefined shouldBe true + } + } + + it should "raise an error if there's an uncaught exception while validating" in { + val yaml = getTestResourceAsString("pr_descriptors/pr_descriptor_1.yml") + val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = yaml, removeData = false) + + val _ = (mwaaManager.executeValidation _).expects(*).throws(new Throwable("")) + + Post("/v1/validate", request) ~> api.route ~> check(response.status shouldEqual StatusCodes.InternalServerError) + } + it should "synchronously provision when a valid descriptor is passed as input" in { val yaml = getTestResourceAsString("pr_descriptors/pr_descriptor_1.yml") val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = yaml, removeData = false) - val _ = (mwaaManager.executeProvision _).expects(*).returns(Right(())) + val _ = (mwaaManager.executeProvision _).expects(*).returns(Valid(())) Post("/v1/provision", request) ~> api.route ~> check { val response = responseAs[ProvisioningStatus] @@ -89,6 +117,9 @@ class ProvisionerApiServiceImplSpec it should "raise an error if provision received descriptor is not valid" in { val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = "invalid", removeData = false) + val _ = (mwaaManager.executeProvision _).expects(*) + .returns(Validated.invalidNel(InvalidDescriptor(NonEmptyList.one("")))) + Post("/v1/provision", request) ~> api.route ~> check(response.status shouldEqual StatusCodes.BadRequest) } @@ -96,7 +127,17 @@ class ProvisionerApiServiceImplSpec val yaml = getTestResourceAsString("pr_descriptors/pr_descriptor_1.yml") val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = yaml, removeData = false) - val _ = (mwaaManager.executeProvision _).expects(*).returns(Left(SendRequestError("dagName", "an error"))) + val _ = (mwaaManager.executeProvision _).expects(*) + .returns(Validated.invalidNel(ProvisionErrorType(S3GatewayInitError(new Throwable(""))))) + + Post("/v1/provision", request) ~> api.route ~> check(response.status shouldEqual StatusCodes.BadRequest) + } + + it should "raise an error if there's an uncaught exception while provisioning" in { + val yaml = getTestResourceAsString("pr_descriptors/pr_descriptor_1.yml") + val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = yaml, removeData = false) + + val _ = (mwaaManager.executeProvision _).expects(*).throws(new Throwable("")) Post("/v1/provision", request) ~> api.route ~> check(response.status shouldEqual StatusCodes.InternalServerError) } @@ -105,7 +146,7 @@ class ProvisionerApiServiceImplSpec val yaml = getTestResourceAsString("pr_descriptors/pr_descriptor_1.yml") val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = yaml, removeData = false) - val _ = (mwaaManager.executeUnprovision _).expects(*).returns(Right(())) + val _ = (mwaaManager.executeUnprovision _).expects(*).returns(Valid(())) Post("/v1/unprovision", request) ~> api.route ~> check { val response = responseAs[ProvisioningStatus] @@ -116,6 +157,9 @@ class ProvisionerApiServiceImplSpec it should "raise an error if unprovision received descriptor is not valid" in { val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = "invalid", removeData = false) + val _ = (mwaaManager.executeUnprovision _).expects(*) + .returns(Validated.invalidNel(InvalidDescriptor(NonEmptyList.one("")))) + Post("/v1/unprovision", request) ~> api.route ~> check(response.status shouldEqual StatusCodes.BadRequest) } @@ -123,20 +167,54 @@ class ProvisionerApiServiceImplSpec val yaml = getTestResourceAsString("pr_descriptors/pr_descriptor_1.yml") val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = yaml, removeData = false) - val _ = (mwaaManager.executeUnprovision _).expects(*).returns(Left(SendRequestError("dagName", "an error"))) + val _ = (mwaaManager.executeUnprovision _).expects(*) + .returns(Validated.invalidNel(ProvisionErrorType(S3GatewayInitError(new Throwable(""))))) + + Post("/v1/unprovision", request) ~> api.route ~> check(response.status shouldEqual StatusCodes.BadRequest) + } + + it should "raise an error if there's an uncaught exception while unprovisioning" in { + val yaml = getTestResourceAsString("pr_descriptors/pr_descriptor_1.yml") + val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = yaml, removeData = false) + + val _ = (mwaaManager.executeUnprovision _).expects(*).throws(new Throwable("")) Post("/v1/unprovision", request) ~> api.route ~> check(response.status shouldEqual StatusCodes.InternalServerError) } - it should "synchronously updateacl when a valid descriptor is passed as input" in { + it should "raise an error for an updateAcl request" in { val request = UpdateAclRequest(List("sergio.mejia_agilelab.it"), ProvisionInfo("req", "res")) - Post("/v1/updateacl", request) ~> api.route ~> check { - val response = responseAs[ProvisioningStatus] - response.status shouldEqual ProvisioningStatusEnums.StatusEnum.COMPLETED + Post("/v1/updateacl", request) ~> api.route ~> check(response.status shouldEqual StatusCodes.InternalServerError) + } + + it should "raise an error for an async getStatus request" in Get("/v1/provision/token/status") ~> api.route ~> check { + response.status shouldEqual StatusCodes.BadRequest + } + + it should "raise an error for an async validate request" in { + val request = ProvisioningRequest(DATAPRODUCT_DESCRIPTOR, descriptor = "", removeData = false) + + Post("/v2/validate", request) ~> api.route ~> check(response.status shouldEqual StatusCodes.InternalServerError) + } + + it should "raise an error for a validate status request" in Get("/v2/validate/token/status") ~> api.route ~> check { + response.status shouldEqual StatusCodes.InternalServerError + } + + it should "raise an error for a reverse provisioning request" in { + val request = ReverseProvisioningRequest("", "") + + Post("/v1/reverse-provisioning", request) ~> api.route ~> check { + response.status shouldEqual StatusCodes.InternalServerError } } + it should "raise an error for a reverse provisioning status request" in + Get("/v1/reverse-provisioning/token/status") ~> api.route ~> check { + response.status shouldEqual StatusCodes.InternalServerError + } + } abstract class ExtractContexts extends Directive1[Seq[(String, String)]] diff --git a/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/DescriptorParserSpec.scala b/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/DescriptorParserSpec.scala index 654e296..90c8fe8 100644 --- a/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/DescriptorParserSpec.scala +++ b/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/model/DescriptorParserSpec.scala @@ -77,4 +77,39 @@ class DescriptorParserSpec extends AnyFlatSpec { dp.left.value should be(a[NonEmptyList[_]]) } + "Parsing a well formed json component" should "return a correct ComponentDescriptor" in { + import io.circe._, io.circe.parser._ + val rawHeader: String = """ + { + "name": "Airflow Workload Test Fix", + "kind": "workload", + "infrastructureTemplateId": "urn:dmb:itm:aws-workload-airflow-provisioner:0", + "useCaseTemplateId": "urn:dmb:utm:aws-airflow-workload-template:0.0.0" + } + """ + val rawSpecific = "{}" + + val component = + new ComponentDescriptor("", "", parse(rawHeader).getOrElse(Json.Null), parse(rawSpecific).getOrElse(Json.Null)) + + component.getName should be(Right("Airflow Workload Test Fix")) + component.getKind should be(Right("workload")) + component.getInfrastructureTemplateId should be(Right("urn:dmb:itm:aws-workload-airflow-provisioner:0")) + component.getUseCaseTemplateId should be(Right(Some("urn:dmb:utm:aws-airflow-workload-template:0.0.0"))) + } + + "Parsing a wrong json component" should "fail" in { + import io.circe._, io.circe.parser._ + val rawHeader = "{}" + val rawSpecific = "{}" + + val component = + new ComponentDescriptor("", "", parse(rawHeader).getOrElse(Json.Null), parse(rawSpecific).getOrElse(Json.Null)) + + component.getName.isLeft should be(true) + component.getKind.isLeft should be(true) + component.getInfrastructureTemplateId.isLeft should be(true) + component.getUseCaseTemplateId.getOrElse(None).isDefined should be(false) + } + } diff --git a/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManagerSpec.scala b/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManagerSpec.scala new file mode 100644 index 0000000..9713a30 --- /dev/null +++ b/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/mwaa/MwaaManagerSpec.scala @@ -0,0 +1,96 @@ +package it.agilelab.datamesh.mwaaspecificprovisioner.mwaa + +import cats.data.Validated +import cats.data.Validated.Valid +import io.circe.Json +import it.agilelab.datamesh.mwaaspecificprovisioner.error.InvalidDagName +import it.agilelab.datamesh.mwaaspecificprovisioner.model.{ComponentDescriptor, MwaaFields} +import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.S3Gateway +import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.S3GatewayError.S3GatewayInitError +import it.agilelab.datamesh.mwaaspecificprovisioner.validation.Validator +import org.scalamock.scalatest.MockFactory +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class MwaaManagerSpec extends AnyFlatSpec with MockFactory with Matchers { + + private val s3Client = mock[S3Gateway] + private val validator = mock[Validator] + + "Validation" should "succeed" in { + val mwaaManager = new MwaaManager(s3Client, validator) + val _ = (validator.validate _).expects(*) + .returns(Valid(MwaaFields("", ComponentDescriptor("", "", Json.Null, Json.Null), "", "", "", ""))) + val _ = (s3Client.copyObject _).expects(*, *, *, *).never() + val _ = (s3Client.deleteObject _).expects(*, *).never() + + val res = mwaaManager.executeValidation("valid") + + res.isValid should be(true) + } + + "Provision" should "succeed" in { + val mwaaManager = new MwaaManager(s3Client, validator) + val _ = (validator.validate _).expects(*) + .returns(Valid(MwaaFields("", ComponentDescriptor("", "", Json.Null, Json.Null), "", "", "", ""))) + val _ = (s3Client.copyObject _).expects(*, *, *, *).returns(Right(())) + + val res = mwaaManager.executeProvision("valid") + + res.isValid should be(true) + } + + it should "fail if validation fails" in { + val mwaaManager = new MwaaManager(s3Client, validator) + val _ = (validator.validate _).expects(*).returns(Validated.invalidNel(InvalidDagName("", new Throwable("")))) + val _ = (s3Client.copyObject _).expects(*, *, *, *).never() + + val res = mwaaManager.executeProvision("invalid") + + res.isValid should be(false) + } + + it should "fail if copy fails" in { + val mwaaManager = new MwaaManager(s3Client, validator) + val _ = (validator.validate _).expects(*) + .returns(Valid(MwaaFields("", ComponentDescriptor("", "", Json.Null, Json.Null), "", "", "", ""))) + val _ = (s3Client.copyObject _).expects(*, *, *, *).returns(Left(S3GatewayInitError(new Throwable("")))) + + val res = mwaaManager.executeProvision("valid") + + res.isValid should be(false) + } + + "Unprovision" should "succeed" in { + val mwaaManager = new MwaaManager(s3Client, validator) + val _ = (validator.validate _).expects(*) + .returns(Valid(MwaaFields("", ComponentDescriptor("", "", Json.Null, Json.Null), "", "", "", ""))) + val _ = (s3Client.deleteObject _).expects(*, *).returns(Right(())) + + val res = mwaaManager.executeUnprovision("valid") + + res.isValid should be(true) + } + + it should "fail if validation fails" in { + val mwaaManager = new MwaaManager(s3Client, validator) + val _ = (validator.validate _).expects(*).returns(Validated.invalidNel(InvalidDagName("", new Throwable("")))) + val _ = (s3Client.deleteObject _).expects(*, *).never() + + val res = mwaaManager.executeUnprovision("invalid") + + res.isValid should be(false) + } + + it should "fail if delete fails" in { + val mwaaManager = new MwaaManager(s3Client, validator) + val _ = (validator.validate _).expects(*) + .returns(Valid(MwaaFields("", ComponentDescriptor("", "", Json.Null, Json.Null), "", "", "", ""))) + val _ = (s3Client.deleteObject _).expects(*, *).returns(Left(S3GatewayInitError(new Throwable("")))) + + val res = mwaaManager.executeUnprovision("valid") + + res.isValid should be(false) + } + +} diff --git a/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/MwaaValidatorSpec.scala b/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/MwaaValidatorSpec.scala new file mode 100644 index 0000000..ce146a7 --- /dev/null +++ b/src/test/scala/it/agilelab/datamesh/mwaaspecificprovisioner/validation/MwaaValidatorSpec.scala @@ -0,0 +1,112 @@ +package it.agilelab.datamesh.mwaaspecificprovisioner.validation + +import cats.data.Validated +import it.agilelab.datamesh.mwaaspecificprovisioner.common.test.getTestResourceAsString +import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.S3GatewayError.ObjectExistsErr +import it.agilelab.datamesh.mwaaspecificprovisioner.s3.gateway.{S3Gateway, S3GatewayMock} +import org.scalamock.scalatest.MockFactory +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class MwaaValidatorSpec extends AnyFlatSpec with Matchers with MockFactory { + + "Validation" should "succeed" in { + val yaml = getTestResourceAsString("validation/descriptor_ok.yml") + val s3Gateway = new S3GatewayMock() + val validator = new MwaaValidator(s3Gateway) + + val res = validator.validate(yaml) + + res match { + case Validated.Valid(mf) => + mf.dagName should be("airbyte_snowflake_dag_custom_wit_567.py") + mf.component.id should be("urn:dmb:cmp:demographic:dp-to-test-mwaa-fix:0:airflow-workload-test-fix") + mf.bucketName should be("sandbox-qa-mwaa-eu-west-1-278435202378") + mf.sourcePath should be("source/") + mf.destinationPath should be("dags/") + mf.prefix should be("demographic.dp-to-test-mwaa-fix.0") + case Validated.Invalid(_) => fail("Expected valid") + } + } + + it should "fail if DP is empty" in { + val yaml = "" + val s3Gateway = new S3GatewayMock() + val validator = new MwaaValidator(s3Gateway) + + val res = validator.validate(yaml) + + res match { + case Validated.Valid(_) => fail("Expected invalid") + case Validated.Invalid(e) => e.length should be(1) + } + } + + it should "fail if specific fields are missing" in { + val yaml = getTestResourceAsString("validation/descriptor_ko_missing_specifics.yml") + val s3Gateway = new S3GatewayMock() + val validator = new MwaaValidator(s3Gateway) + + val res = validator.validate(yaml) + + res match { + case Validated.Valid(_) => fail("Expected invalid") + case Validated.Invalid(e) => e.length should be(5) + } + } + + it should "fail if the component is missing" in { + val yaml = getTestResourceAsString("validation/descriptor_ko_missing_component.yml") + val s3Gateway = new S3GatewayMock() + val validator = new MwaaValidator(s3Gateway) + + val res = validator.validate(yaml) + + res match { + case Validated.Valid(_) => fail("Expected invalid") + case Validated.Invalid(e) => e.length should be(1) + } + } + + it should "fail if the componentId hasn't the expected format" in { + val yaml = getTestResourceAsString("validation/descriptor_ko_wrong_component_id_format.yml") + val s3Gateway = new S3GatewayMock() + val validator = new MwaaValidator(s3Gateway) + + val res = validator.validate(yaml) + + res match { + case Validated.Valid(_) => fail("Expected invalid") + case Validated.Invalid(e) => e.length should be(1) + } + } + + it should "fail if the source file is not existent" in { + val yaml = getTestResourceAsString("validation/descriptor_ok.yml") + val s3Gateway = mock[S3Gateway] + val validator = new MwaaValidator(s3Gateway) + + val _ = (s3Gateway.objectExists _).expects(*, *).returns(Right(false)) + val res = validator.validate(yaml) + + res match { + case Validated.Valid(_) => fail("Expected invalid") + case Validated.Invalid(e) => e.length should be(1) + } + } + + it should "fail if s3Gateway.objectExists returns a Left" in { + val yaml = getTestResourceAsString("validation/descriptor_ok.yml") + val s3Gateway = mock[S3Gateway] + val validator = new MwaaValidator(s3Gateway) + + val _ = (s3Gateway.objectExists _).expects(*, *).returns(Left(ObjectExistsErr("", "", new Throwable("")))) + val res = validator.validate(yaml) + + res match { + case Validated.Valid(_) => fail("Expected invalid") + case Validated.Invalid(e) => e.length should be(1) + } + } + +}