Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zli82016 committed Sep 23, 2024
1 parent 3c7b7f1 commit 48cafb0
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 1 deletion.
34 changes: 33 additions & 1 deletion mmv1/products/dataproc/Batch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ examples:
prevent_destroy: 'false'
ignore_read_extra:
- 'runtime_config.0.properties'
- !ruby/object:Provider::Terraform::Examples
name: 'dataproc_batch_spark_full'
primary_resource_id: 'example_batch_spark'
primary_resource_name:
'fmt.Sprintf("tf-test-spark-batch%s", context["random_suffix"])'
test_env_vars:
project_name: :PROJECT_NAME
vars:
network_name: 'default'
prevent_destroy: 'true'
key_name: 'example-key'
keyring_name: 'example-keyring'
bucket_name: 'dataproc-bucket'
test_vars_overrides:
network_name: 'acctest.BootstrapNetWithFirewallForDataprocBatches(t, "dataproc-spark-test-network", "dataproc-spark-test-subnetwork")'
prevent_destroy: 'false'
ignore_read_extra:
- 'runtime_config.0.properties'
- !ruby/object:Provider::Terraform::Examples
name: 'dataproc_batch_sparksql'
primary_resource_id: 'example_batch_sparsql'
Expand Down Expand Up @@ -304,12 +322,26 @@ properties:
The Cloud KMS key to use for encryption.
- !ruby/object:Api::Type::String
name: 'idleTtl'
default_from_api: true
description: |
Applies to sessions only. The duration to keep the session alive while it's idling.
Exceeding this threshold causes the session to terminate. This field cannot be set on a batch workload.
Minimum value is 10 minutes; maximum value is 14 days (see JSON representation of Duration).
Defaults to 1 hour if not set. If both ttl and idleTtl are specified for an interactive session,
the conditions are treated as OR conditions: the workload will be terminated when it has been idle
for idleTtl or when ttl has been exceeded, whichever occurs first.
- !ruby/object:Api::Type::String
name: 'ttl'
default_from_api: true
description: |
The duration after which the workload will be terminated.
The duration after which the workload will be terminated, specified as the JSON representation for Duration.
When the workload exceeds this duration, it will be unconditionally terminated without waiting for ongoing
work to finish. If ttl is not specified for a batch workload, the workload will be allowed to run until it
exits naturally (or run forever without exiting). If ttl is not specified for an interactive session,
it defaults to 24 hours. If ttl is not specified for a batch that uses 2.1+ runtime version, it defaults to 4 hours.
Minimum value is 10 minutes; maximum value is 14 days. If both ttl and idleTtl are specified (for an interactive session),
the conditions are treated as OR conditions: the workload will be terminated when it has been idle for idleTtl or
when ttl has been exceeded, whichever occurs first.
- !ruby/object:Api::Type::String
name: 'stagingBucket'
description: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ resource "google_dataproc_batch" "<%= ctx[:primary_resource_id] %>" {

pyspark_batch {
main_python_file_uri = "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
archive_uris = ["archive-uri-1", "archive-uri-2"]
args = ["10"]
file_uris = ["file-uri-1", "file-uri-2"]
jar_file_uris = ["file:///usr/lib/spark/examples/jars/spark-examples.jar"]
python_file_uris = ["pipelineparam--common_utils_py_fqn"]
}
}

128 changes: 128 additions & 0 deletions mmv1/templates/terraform/examples/dataproc_batch_spark_full.tf.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
data "google_project" "test_project" {
}

resource "google_dataproc_batch" "<%= ctx[:primary_resource_id] %>" {
batch_id = "<%= ctx[:vars]['dataproc_batch'] %>"
location = "us-central1"
labels = {"batch_test": "terraform"}

runtime_config {
properties = { "spark.dynamicAllocation.enabled": "false", "spark.executor.instances": "2" }
container_image = "gcr.io/my-project-id/my-spark-image:latest"
repository_config {
pypi_repository_config {
pypi_repository = "PYPI"
}
}
version = "2.2"
}

environment_config {
execution_config {
ttl = "3600s"
idle_ttl = "3600s"
network_tags = ["tag1"]
kms_key = google_kms_crypto_key.crypto_key.id
network_uri = "<%= ctx[:vars]['network_name'] %>"
service_account = "${data.google_project.test_project.number}[email protected]"
staging_bucket = google_storage_bucket.bucket.name
}
peripherals_config {
metastore_service = google_dataproc_metastore_service.ms.name
spark_history_server_config {
dataproc_cluster = google_dataproc_cluster.basic.id
}
}
}

spark_batch {
main_class = "org.apache.spark.examples.SparkPi"
args = [
"wordcount",
"file:///usr/lib/spark/NOTICE",
"gs://${google_dataproc_cluster.basic.cluster_config[0].bucket}/hadoopjob_output_%s",
]
jar_file_uris = ["file:///usr/lib/spark/examples/jars/spark-examples.jar"]
archive_uris = ["archive-uri-1", "archive-uri-2"]
file_uris = ["file-uri-1", "file-uri-2"]
main_jar_file_uri = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
}

depends_on = [
google_kms_crypto_key_iam_member.crypto_key_member_1,
google_kms_crypto_key_iam_member.crypto_key_member_2,
]
}

resource "google_storage_bucket" "bucket" {
name = "<%= ctx[:vars]['bucket_name'] %>"
location = "US"
}

resource "google_kms_crypto_key" "crypto_key" {
name = "<%= ctx[:vars]['key_name'] %>"
key_ring = google_kms_key_ring.key_ring.id

purpose = "ENCRYPT_DECRYPT"
}

resource "google_kms_key_ring" "key_ring" {
name = "<%= ctx[:vars]['keyring_name'] %>"
location = "us-central1"
}

resource "google_kms_crypto_key_iam_member" "crypto_key_member_1" {
crypto_key_id = google_kms_crypto_key.crypto_key.id
role = "roles/cloudkms.cryptoKeyEncrypterDecrypter"

member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-metastore.iam.gserviceaccount.com"
}

resource "google_kms_crypto_key_iam_member" "crypto_key_member_2" {
crypto_key_id = google_kms_crypto_key.crypto_key.id
role = "roles/cloudkms.cryptoKeyEncrypterDecrypter"

member = "serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}"
}

resource "google_dataproc_cluster" "basic" {
name = "<%= ctx[:vars]['dataproc_batch'] %>"
region = "us-central1"

cluster_config {
# Keep the costs down with smallest config we can get away with
software_config {
override_properties = {
"dataproc:dataproc.allow.zero.workers" = "true"
}
}

master_config {
num_instances = 1
machine_type = "e2-standard-2"
disk_config {
boot_disk_size_gb = 35
}
}

metastore_config {
dataproc_metastore_service = google_dataproc_metastore_service.ms.name
}
}
}

resource "google_dataproc_metastore_service" "ms" {
service_id = "<%= ctx[:vars]['dataproc_batch'] %>"
location = "us-central1"
port = 9080
tier = "DEVELOPER"

maintenance_window {
hour_of_day = 2
day_of_week = "SUNDAY"
}

hive_metastore_config {
version = "3.1.2"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ resource "google_dataproc_batch" "<%= ctx[:primary_resource_id] %>" {
spark_r_batch {
main_r_file_uri = "https://storage.googleapis.com/terraform-batches/spark-r-flights.r"
args = ["https://storage.googleapis.com/terraform-batches/flights.csv"]
archive_uris = ["archive-uri-1", "archive-uri-2"]
file_uris = ["file-uri-1", "file-uri-2"]
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ resource "google_dataproc_batch" "<%= ctx[:primary_resource_id] %>" {

spark_sql_batch {
query_file_uri = "gs://dataproc-examples/spark-sql/natality/cigarette_correlations.sql"
jar_file_uris = ["file:///usr/lib/spark/examples/jars/spark-examples.jar"]
}
}

6 changes: 6 additions & 0 deletions mmv1/third_party/terraform/acctest/bootstrap_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,12 @@ func BootstrapSubnetWithFirewallForDataprocBatches(t *testing.T, testId string,
return subnetworkName
}

func BootstrapNetWithFirewallForDataprocBatches(t *testing.T, testId string, subnetName string) string {
networkName := BootstrapSharedTestNetwork(t, testId)
BootstrapFirewallForDataprocSharedNetwork(t, subnetName, networkName)
return networkName
}

func BootstrapSubnetWithOverrides(t *testing.T, subnetName string, networkName string, subnetOptions map[string]interface{}) string {
projectID := envvar.GetTestProjectFromEnv()
region := envvar.GetTestRegionFromEnv()
Expand Down

0 comments on commit 48cafb0

Please sign in to comment.