Skip to content

Commit

Permalink
Fix crash when parameter.enableStreamingEngine is set on `google_da…
Browse files Browse the repository at this point in the history
…taflow_flex_template_job`, update resource's docs (#10303)
  • Loading branch information
SarahFrench authored Mar 27, 2024
1 parent 28bed1d commit 561b6a3
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,15 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
var enableStreamingEngine bool
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
delete(updatedParameters, "enableStreamingEngine")
enableStreamingEngine = p.(bool)
e := strings.ToLower(p.(string))
switch e {
case "true":
enableStreamingEngine = true
case "false":
enableStreamingEngine = false
default:
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
}
} else {
if v, ok := d.GetOk("enable_streaming_engine"); ok {
enableStreamingEngine = v.(bool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,45 @@ func TestAccDataflowJob_withAttributionLabelProactive(t *testing.T) {
})
}

// Test implementation of enabling streaming engine via parameters or via argument in resource block
// NOTE: these fields are immutable, so the resource is being recreated between both test steps.
func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
// Dataflow responses include serialized java classes and bash commands
// This makes body comparison infeasible
acctest.SkipIfVcr(t)
t.Parallel()

randStr := acctest.RandString(t, 10)
job := "tf-test-dataflow-job-" + randStr
bucket := "tf-test-dataflow-bucket-" + randStr
topic := "tf-test-topic" + randStr

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
// Is not set
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
),
},
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Now is unset
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
// Now is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
),
},
},
})
}

func testAccDataflowFlexTemplateJobHasNetwork(t *testing.T, res, expected string, wait bool) resource.TestCheckFunc {
return func(s *terraform.State) error {
Expand Down Expand Up @@ -1757,4 +1796,115 @@ func testAccDataflowFlexJobExists(t *testing.T, resource string, wait bool) reso
}
}

// Set parameters.enableStreamingEngine value in parameters map to control feature enablement (versus using enable_streaming_engine field)
func testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topicName string) string {
return fmt.Sprintf(`

resource "google_pubsub_topic" "example" {
name = "%s"
}

data "google_storage_bucket_object" "flex_template" {
name = "latest/flex/Streaming_Data_Generator"
bucket = "dataflow-templates"
}

resource "google_storage_bucket" "bucket" {
name = "%s"
location = "US-CENTRAL1"
force_destroy = true
uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "schema" {
name = "schema.json"
bucket = google_storage_bucket.bucket.name
content = <<EOF
{
"eventId": "{{uuid()}}",
"eventTimestamp": {{timestamp()}},
"ipv4": "{{ipv4()}}",
"ipv6": "{{ipv6()}}",
"country": "{{country()}}",
"username": "{{username()}}",
"quest": "{{random("A Break In the Ice", "Ghosts of Perdition", "Survive the Low Road")}}",
"score": {{integer(100, 10000)}},
"completed": {{bool()}}
}
EOF
}

resource "google_dataflow_flex_template_job" "flex_job" {
name = "%s"
container_spec_gcs_path = "gs://${data.google_storage_bucket_object.flex_template.bucket}/${data.google_storage_bucket_object.flex_template.name}"
on_delete = "cancel"
parameters = {
schemaLocation = "gs://${google_storage_bucket_object.schema.bucket}/schema.json"
qps = "1"
topic = google_pubsub_topic.example.id
enableStreamingEngine = true
}
labels = {
"my_labels" = "value"
}
}
`, topicName, bucket, job)
}

// Set enable_streaming_engine field to control feature enablement (versus using parameters.enableStreamingEngine)
func testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topicName string) string {
return fmt.Sprintf(`

resource "google_pubsub_topic" "example" {
name = "%s"
}

data "google_storage_bucket_object" "flex_template" {
name = "latest/flex/Streaming_Data_Generator"
bucket = "dataflow-templates"
}

resource "google_storage_bucket" "bucket" {
name = "%s"
location = "US-CENTRAL1"
force_destroy = true
uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "schema" {
name = "schema.json"
bucket = google_storage_bucket.bucket.name
content = <<EOF
{
"eventId": "{{uuid()}}",
"eventTimestamp": {{timestamp()}},
"ipv4": "{{ipv4()}}",
"ipv6": "{{ipv6()}}",
"country": "{{country()}}",
"username": "{{username()}}",
"quest": "{{random("A Break In the Ice", "Ghosts of Perdition", "Survive the Low Road")}}",
"score": {{integer(100, 10000)}},
"completed": {{bool()}}
}
EOF
}

resource "google_dataflow_flex_template_job" "flex_job" {
name = "%s"
container_spec_gcs_path = "gs://${data.google_storage_bucket_object.flex_template.bucket}/${data.google_storage_bucket_object.flex_template.name}"
on_delete = "cancel"

enable_streaming_engine = true

parameters = {
schemaLocation = "gs://${google_storage_bucket_object.schema.bucket}/schema.json"
qps = "1"
topic = google_pubsub_topic.example.id
}
labels = {
"my_labels" = "value"
}
}
`, topicName, bucket, job)
}
<% end -%>
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,27 @@ resource "google_dataflow_flex_template_job" "big_data_job" {

The following arguments are supported:

* `name` - (Required) A unique name for the resource, required by Dataflow.
* `name` - (Required) Immutable. A unique name for the resource, required by Dataflow.

* `container_spec_gcs_path` - (Required) The GCS path to the Dataflow job Flex
Template.

- - -

* `additional_experiments` - (Optional) List of experiments that should be used by the job. An example value is `["enable_stackdriver_agent_metrics"]`.

* `autoscaling_algorithm` - (Optional) The algorithm to use for autoscaling.

* `parameters` - (Optional) Key/Value pairs to be passed to the Dataflow job (as
used in the template). Additional [pipeline options](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-dataflow-pipeline-options)
such as `serviceAccount`, `workerMachineType`, etc can be specified here.

* `enable_streaming_engine` - (Optional) Immutable. Indicates if the job should use the streaming engine feature.

* `ip_configuration` - (Optional) The configuration for VM IPs. Options are `"WORKER_IP_PUBLIC"` or `"WORKER_IP_PRIVATE"`.

* `kms_key_name` - (Optional) The name for the Cloud KMS key for the job. Key format is: `projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY`

* `labels` - (Optional) User labels to be specified for the job. Keys and values
should follow the restrictions specified in the [labeling restrictions](https://cloud.google.com/compute/docs/labeling-resources#restrictions)
page.
Expand All @@ -106,21 +116,39 @@ page.
* `effective_labels` -
All of labels (key/value pairs) present on the resource in GCP, including the labels configured through Terraform, other clients and services.

* `launcher_machine_type` - (Optional) The machine type to use for launching the job. The default is n1-standard-1.

* `machine_type` - (Optional) The machine type to use for the job.

* `max_workers` - (Optional) Immutable. The maximum number of Google Compute Engine instances to be made available to your pipeline during execution, from 1 to 1000.

* `network` - (Optional) The network to which VMs will be assigned. If it is not provided, "default" will be used.

* `num_workers` - (Optional) Immutable. The initial number of Google Compute Engine instances for the job.

* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of
deletion during `terraform destroy`. See above note.

* `project` - (Optional) The project in which the resource belongs. If it is not
provided, the provider project is used.

* `region` - (Optional) Immutable. The region in which the created job should run.

* `sdk_container_image` - (Optional) Docker registry location of container image to use for the 'worker harness. Default is the container for the version of the SDK. Note this field is only valid for portable pipelines.

* `service_account_email` - (Optional) Service account email to run the workers as.

* `skip_wait_on_job_termination` - (Optional) If set to `true`, terraform will
treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource,
and will remove the resource from terraform state and move on. See above note.

* `project` - (Optional) The project in which the resource belongs. If it is not
provided, the provider project is used.
* `staging_location` - (Optional) The Cloud Storage path to use for staging files. Must be a valid Cloud Storage URL, beginning with gs://.

* `region` - (Optional) The region in which the created job should run.
* `subnetwork` - (Optional) The subnetwork to which VMs will be assigned. Should be of the form "regions/REGION/subnetworks/SUBNETWORK".

* `service_account_email` - (Optional) Service account email to run the workers as.
* `temp_location` - (Optional) The Cloud Storage path to use for temporary files. Must be a valid Cloud Storage URL, beginning with gs://.

* `subnetwork` - (Optional) Compute Engine subnetwork for launching instances to run your pipeline.
* `transform_name_mapping` - (Optional) Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job.Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job.

## Attributes Reference
In addition to the arguments listed above, the following computed attributes are exported:
Expand Down

0 comments on commit 561b6a3

Please sign in to comment.