From 137318dfd07b117ec59b96e92def6384ac085522 Mon Sep 17 00:00:00 2001 From: neil-yechenwei Date: Tue, 26 Apr 2022 16:23:28 +0800 Subject: [PATCH 1/2] azurerm_stream_analytics_job - support new property job_type --- .../stream_analytics_job_resource.go | 48 +++++++++-- .../stream_analytics_job_resource_test.go | 83 +++++++++++++++++++ .../docs/r/stream_analytics_job.html.markdown | 8 +- 3 files changed, 129 insertions(+), 10 deletions(-) diff --git a/internal/services/streamanalytics/stream_analytics_job_resource.go b/internal/services/streamanalytics/stream_analytics_job_resource.go index dcd6c7f9fcce..f5212c8bc36e 100644 --- a/internal/services/streamanalytics/stream_analytics_job_resource.go +++ b/internal/services/streamanalytics/stream_analytics_job_resource.go @@ -102,6 +102,17 @@ func resourceStreamAnalyticsJob() *pluginsdk.Resource { Default: string(streamanalytics.EventsOutOfOrderPolicyAdjust), }, + "job_type": { + Type: pluginsdk.TypeString, + Optional: true, + ForceNew: true, + ValidateFunc: validation.StringInSlice([]string{ + string(streamanalytics.JobTypeCloud), + string(streamanalytics.JobTypeEdge), + }, false), + Default: string(streamanalytics.JobTypeCloud), + }, + "output_error_policy": { Type: pluginsdk.TypeString, Optional: true, @@ -114,7 +125,7 @@ func resourceStreamAnalyticsJob() *pluginsdk.Resource { "streaming_units": { Type: pluginsdk.TypeInt, - Required: true, + Optional: true, ValidateFunc: validate.StreamAnalyticsJobStreamingUnits, }, @@ -167,9 +178,9 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte eventsLateArrivalMaxDelayInSeconds := d.Get("events_late_arrival_max_delay_in_seconds").(int) eventsOutOfOrderMaxDelayInSeconds := d.Get("events_out_of_order_max_delay_in_seconds").(int) eventsOutOfOrderPolicy := d.Get("events_out_of_order_policy").(string) + jobType := d.Get("job_type").(string) location := azure.NormalizeLocation(d.Get("location").(string)) outputErrorPolicy := d.Get("output_error_policy").(string) - streamingUnits := d.Get("streaming_units").(int) transformationQuery := d.Get("transformation_query").(string) t := d.Get("tags").(map[string]interface{}) @@ -177,11 +188,22 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte transformation := streamanalytics.Transformation{ Name: utils.String("main"), TransformationProperties: &streamanalytics.TransformationProperties{ - StreamingUnits: utils.Int32(int32(streamingUnits)), - Query: utils.String(transformationQuery), + Query: utils.String(transformationQuery), }, } + if jobType == string(streamanalytics.JobTypeEdge) { + if _, ok := d.GetOk("streaming_units"); ok { + return fmt.Errorf("the job type `Edge` doesn't support `streaming_units`") + } + } else { + if v, ok := d.GetOk("streaming_units"); ok { + transformation.TransformationProperties.StreamingUnits = utils.Int32(int32(v.(int))) + } else { + return fmt.Errorf("`streaming_units` must be set when `job_type` is `Cloud`") + } + } + expandedIdentity, err := expandStreamAnalyticsJobIdentity(d.Get("identity").([]interface{})) if err != nil { return fmt.Errorf("expanding `identity`: %+v", err) @@ -199,18 +221,25 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte EventsOutOfOrderMaxDelayInSeconds: utils.Int32(int32(eventsOutOfOrderMaxDelayInSeconds)), EventsOutOfOrderPolicy: streamanalytics.EventsOutOfOrderPolicy(eventsOutOfOrderPolicy), OutputErrorPolicy: streamanalytics.OutputErrorPolicy(outputErrorPolicy), + JobType: streamanalytics.JobType(jobType), }, Identity: expandedIdentity, Tags: tags.Expand(t), } - if streamAnalyticsCluster := d.Get("stream_analytics_cluster_id"); streamAnalyticsCluster != "" { - props.StreamingJobProperties.Cluster = &streamanalytics.ClusterInfo{ - ID: utils.String(streamAnalyticsCluster.(string)), + if jobType == string(streamanalytics.JobTypeEdge) { + if _, ok := d.GetOk("stream_analytics_cluster_id"); ok { + return fmt.Errorf("the job type `Edge` doesn't support `stream_analytics_cluster_id`") } } else { - props.StreamingJobProperties.Cluster = &streamanalytics.ClusterInfo{ - ID: nil, + if streamAnalyticsCluster := d.Get("stream_analytics_cluster_id"); streamAnalyticsCluster != "" { + props.StreamingJobProperties.Cluster = &streamanalytics.ClusterInfo{ + ID: utils.String(streamAnalyticsCluster.(string)), + } + } else { + props.StreamingJobProperties.Cluster = &streamanalytics.ClusterInfo{ + ID: nil, + } } } @@ -297,6 +326,7 @@ func resourceStreamAnalyticsJobRead(d *pluginsdk.ResourceData, meta interface{}) } d.Set("events_out_of_order_policy", string(props.EventsOutOfOrderPolicy)) d.Set("output_error_policy", string(props.OutputErrorPolicy)) + d.Set("job_type", string(props.JobType)) // Computed d.Set("job_id", props.JobID) diff --git a/internal/services/streamanalytics/stream_analytics_job_resource_test.go b/internal/services/streamanalytics/stream_analytics_job_resource_test.go index 6a55e3cb402e..d57f9dc8f8f8 100644 --- a/internal/services/streamanalytics/stream_analytics_job_resource_test.go +++ b/internal/services/streamanalytics/stream_analytics_job_resource_test.go @@ -103,6 +103,36 @@ func TestAccStreamAnalyticsJob_identity(t *testing.T) { }) } +func TestAccStreamAnalyticsJob_jobTypeCloud(t *testing.T) { + data := acceptance.BuildTestData(t, "azurerm_stream_analytics_job", "test") + r := StreamAnalyticsJobResource{} + + data.ResourceTest(t, r, []acceptance.TestStep{ + { + Config: r.jobTypeCloud(data), + Check: acceptance.ComposeTestCheckFunc( + check.That(data.ResourceName).ExistsInAzure(r), + ), + }, + data.ImportStep(), + }) +} + +func TestAccStreamAnalyticsJob_jobTypeEdge(t *testing.T) { + data := acceptance.BuildTestData(t, "azurerm_stream_analytics_job", "test") + r := StreamAnalyticsJobResource{} + + data.ResourceTest(t, r, []acceptance.TestStep{ + { + Config: r.jobTypeEdge(data), + Check: acceptance.ComposeTestCheckFunc( + check.That(data.ResourceName).ExistsInAzure(r), + ), + }, + data.ImportStep(), + }) +} + func (r StreamAnalyticsJobResource) Exists(ctx context.Context, client *clients.Client, state *pluginsdk.InstanceState) (*bool, error) { id, err := parse.StreamingJobID(state.ID) if err != nil { @@ -278,3 +308,56 @@ QUERY } `, data.RandomInteger, data.Locations.Primary, data.RandomInteger) } + +func (r StreamAnalyticsJobResource) jobTypeCloud(data acceptance.TestData) string { + return fmt.Sprintf(` +provider "azurerm" { + features {} +} + +resource "azurerm_resource_group" "test" { + name = "acctestRG-%d" + location = "%s" +} + +resource "azurerm_stream_analytics_job" "test" { + name = "acctestjob-%d" + resource_group_name = azurerm_resource_group.test.name + location = azurerm_resource_group.test.location + streaming_units = 3 + job_type = "Cloud" + + transformation_query = < **NOTE:** `Edge` doesn't support `stream_analytics_cluster_id` and `streaming_units`. + * `identity` - (Optional) An `identity` block as defined below. * `output_error_policy` - (Optional) Specifies the policy which should be applied to events which arrive at the output and cannot be written to the external storage due to being malformed (such as missing column values, column values of wrong type or size). Possible values are `Drop` and `Stop`. Default is `Drop`. -* `streaming_units` - (Required) Specifies the number of streaming units that the streaming job uses. Supported values are `1`, `3`, `6` and multiples of `6` up to `120`. +* `streaming_units` - (Optional) Specifies the number of streaming units that the streaming job uses. Supported values are `1`, `3`, `6` and multiples of `6` up to `120`. + +-> **NOTE:** `streaming_units` must be set when `job_type` is `Cloud`. * `transformation_query` - (Required) Specifies the query that will be run in the streaming job, [written in Stream Analytics Query Language (SAQL)](https://msdn.microsoft.com/library/azure/dn834998). From d704ecd1fa0aa2fbb0ee052cc0ff2a63eb0f6b97 Mon Sep 17 00:00:00 2001 From: neil-yechenwei Date: Wed, 27 Apr 2022 09:21:14 +0800 Subject: [PATCH 2/2] update code --- .../streamanalytics/stream_analytics_job_resource.go | 8 ++++---- .../streamanalytics/stream_analytics_job_resource_test.go | 4 ++-- website/docs/r/stream_analytics_job.html.markdown | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/services/streamanalytics/stream_analytics_job_resource.go b/internal/services/streamanalytics/stream_analytics_job_resource.go index f5212c8bc36e..e5ff37548bbd 100644 --- a/internal/services/streamanalytics/stream_analytics_job_resource.go +++ b/internal/services/streamanalytics/stream_analytics_job_resource.go @@ -102,7 +102,7 @@ func resourceStreamAnalyticsJob() *pluginsdk.Resource { Default: string(streamanalytics.EventsOutOfOrderPolicyAdjust), }, - "job_type": { + "type": { Type: pluginsdk.TypeString, Optional: true, ForceNew: true, @@ -178,7 +178,7 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte eventsLateArrivalMaxDelayInSeconds := d.Get("events_late_arrival_max_delay_in_seconds").(int) eventsOutOfOrderMaxDelayInSeconds := d.Get("events_out_of_order_max_delay_in_seconds").(int) eventsOutOfOrderPolicy := d.Get("events_out_of_order_policy").(string) - jobType := d.Get("job_type").(string) + jobType := d.Get("type").(string) location := azure.NormalizeLocation(d.Get("location").(string)) outputErrorPolicy := d.Get("output_error_policy").(string) transformationQuery := d.Get("transformation_query").(string) @@ -200,7 +200,7 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte if v, ok := d.GetOk("streaming_units"); ok { transformation.TransformationProperties.StreamingUnits = utils.Int32(int32(v.(int))) } else { - return fmt.Errorf("`streaming_units` must be set when `job_type` is `Cloud`") + return fmt.Errorf("`streaming_units` must be set when `type` is `Cloud`") } } @@ -326,7 +326,7 @@ func resourceStreamAnalyticsJobRead(d *pluginsdk.ResourceData, meta interface{}) } d.Set("events_out_of_order_policy", string(props.EventsOutOfOrderPolicy)) d.Set("output_error_policy", string(props.OutputErrorPolicy)) - d.Set("job_type", string(props.JobType)) + d.Set("type", string(props.JobType)) // Computed d.Set("job_id", props.JobID) diff --git a/internal/services/streamanalytics/stream_analytics_job_resource_test.go b/internal/services/streamanalytics/stream_analytics_job_resource_test.go index d57f9dc8f8f8..bdd22ccca8b9 100644 --- a/internal/services/streamanalytics/stream_analytics_job_resource_test.go +++ b/internal/services/streamanalytics/stream_analytics_job_resource_test.go @@ -325,7 +325,7 @@ resource "azurerm_stream_analytics_job" "test" { resource_group_name = azurerm_resource_group.test.name location = azurerm_resource_group.test.location streaming_units = 3 - job_type = "Cloud" + type = "Cloud" transformation_query = < **NOTE:** `Edge` doesn't support `stream_analytics_cluster_id` and `streaming_units`. @@ -77,7 +77,7 @@ The following arguments are supported: * `streaming_units` - (Optional) Specifies the number of streaming units that the streaming job uses. Supported values are `1`, `3`, `6` and multiples of `6` up to `120`. --> **NOTE:** `streaming_units` must be set when `job_type` is `Cloud`. +-> **NOTE:** `streaming_units` must be set when `type` is `Cloud`. * `transformation_query` - (Required) Specifies the query that will be run in the streaming job, [written in Stream Analytics Query Language (SAQL)](https://msdn.microsoft.com/library/azure/dn834998).