Skip to content
This repository has been archived by the owner on Nov 14, 2020. It is now read-only.

Commit

Permalink
Add federation_upstream resource (#55)
Browse files Browse the repository at this point in the history
* Add Federation Upstream resource.

* Add Component attribute to Federation Upstream.

  -- attribute is computed (i.e. read only)

* Parse Federation Upstream id.

* Import Federation Upstream.

* Upgrade module replace to use remote branch.

  -- this is temporary.

* Create a generic parseId function

  - move to util.go and refactor code to use this.

* Move parseId functions to util.

* Create federation-upstream.html.markdown

  -- first draft
  -- update with discussion of default values

* Defaults for some federation attributes.

* Include default values in Federation docs.

* Remove replace from go.mod

* Don't cache tests by default.

* Ensure federation plugins are installed during build.

  -- after #48

* Tests should assert resource attributes.

* Refactor tests.

* Add a more complete federation example.
  • Loading branch information
niclic authored Jul 3, 2020
1 parent e5321a9 commit 329d2b6
Show file tree
Hide file tree
Showing 11 changed files with 767 additions and 9 deletions.
3 changes: 2 additions & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
TEST?=$$(go list ./... |grep -v 'vendor')
TEST_COUNT?=1
GOFMT_FILES?=$$(find . -name '*.go' |grep -v vendor)
WEBSITE_REPO=github.com/hashicorp/terraform-website
PKG_NAME=rabbitmq
Expand All @@ -14,7 +15,7 @@ test: fmtcheck
xargs -t -n4 go test $(TESTARGS) -timeout=30s -parallel=4

testacc: fmtcheck
TF_ACC=1 go test $(TEST) -v $(TESTARGS) -timeout 120m
TF_ACC=1 go test $(TEST) -count $(TEST_COUNT) -v $(TESTARGS) -timeout 120m

vet:
@echo "go vet ."
Expand Down
34 changes: 34 additions & 0 deletions rabbitmq/import_federation_upstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package rabbitmq

import (
"testing"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"

"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
)

func TestAccFederationUpstream_importBasic(t *testing.T) {
var upstream rabbithole.FederationUpstream
resourceName := "rabbitmq_federation_upstream.foo"

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccFederationUpstreamCheckDestroy(&upstream),
Steps: []resource.TestStep{
{
Config: testAccFederationUpstream_create(),
Check: testAccFederationUpstreamCheck(
resourceName, &upstream,
),
},

{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}
17 changes: 9 additions & 8 deletions rabbitmq/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,15 @@ func Provider() terraform.ResourceProvider {
},

ResourcesMap: map[string]*schema.Resource{
"rabbitmq_binding": resourceBinding(),
"rabbitmq_exchange": resourceExchange(),
"rabbitmq_permissions": resourcePermissions(),
"rabbitmq_topic_permissions": resourceTopicPermissions(),
"rabbitmq_policy": resourcePolicy(),
"rabbitmq_queue": resourceQueue(),
"rabbitmq_user": resourceUser(),
"rabbitmq_vhost": resourceVhost(),
"rabbitmq_binding": resourceBinding(),
"rabbitmq_exchange": resourceExchange(),
"rabbitmq_permissions": resourcePermissions(),
"rabbitmq_topic_permissions": resourceTopicPermissions(),
"rabbitmq_federation_upstream": resourceFederationUpstream(),
"rabbitmq_policy": resourcePolicy(),
"rabbitmq_queue": resourceQueue(),
"rabbitmq_user": resourceUser(),
"rabbitmq_vhost": resourceVhost(),
},

ConfigureFunc: providerConfigure,
Expand Down
286 changes: 286 additions & 0 deletions rabbitmq/resource_federation_upstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package rabbitmq

import (
"fmt"
"log"

"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
)

func resourceFederationUpstream() *schema.Resource {
return &schema.Resource{
Create: CreateFederationUpstream,
Read: ReadFederationUpstream,
Update: UpdateFederationUpstream,
Delete: DeleteFederationUpstream,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"vhost": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

// "federation-upstream"
"component": {
Type: schema.TypeString,
Computed: true,
},

"definition": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// applicable to both federated exchanges and queues
"uri": {
Type: schema.TypeString,
Required: true,
Sensitive: true,
},

"prefetch_count": {
Type: schema.TypeInt,
Optional: true,
Default: 1000,
},

"reconnect_delay": {
Type: schema.TypeInt,
Optional: true,
Default: 5,
},

"ack_mode": {
Type: schema.TypeString,
Optional: true,
Default: "on-confirm",
ValidateFunc: validation.StringInSlice([]string{
"on-confirm",
"on-publish",
"no-ack",
}, false),
},

"trust_user_id": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
// applicable to federated exchanges only
"exchange": {
Type: schema.TypeString,
Optional: true,
},

"max_hops": {
Type: schema.TypeInt,
Optional: true,
Default: 1,
},

"expires": {
Type: schema.TypeInt,
Optional: true,
},

"message_ttl": {
Type: schema.TypeInt,
Optional: true,
},
// applicable to federated queues only
"queue": {
Type: schema.TypeString,
Optional: true,
},
},
},
},
},
}
}

func CreateFederationUpstream(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)

name := d.Get("name").(string)
vhost := d.Get("vhost").(string)
defList := d.Get("definition").([]interface{})

defMap, ok := defList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("Unable to parse federation upstream definition")
}

if err := putFederationUpstream(rmqc, vhost, name, defMap); err != nil {
return err
}

id := fmt.Sprintf("%s@%s", name, vhost)
d.SetId(id)

return ReadFederationUpstream(d, meta)
}

func ReadFederationUpstream(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)

name, vhost, err := parseResourceId(d)
if err != nil {
return err
}

upstream, err := rmqc.GetFederationUpstream(vhost, name)
if err != nil {
return checkDeleted(d, err)
}

log.Printf("[DEBUG] RabbitMQ: Federation upstream retrieved for %s: %#v", d.Id(), upstream)

d.Set("name", upstream.Name)
d.Set("vhost", upstream.Vhost)
d.Set("component", upstream.Component)

defMap := map[string]interface{}{
"uri": upstream.Definition.Uri,
"prefetch_count": upstream.Definition.PrefetchCount,
"reconnect_delay": upstream.Definition.ReconnectDelay,
"ack_mode": upstream.Definition.AckMode,
"trust_user_id": upstream.Definition.TrustUserId,
"exchange": upstream.Definition.Exchange,
"max_hops": upstream.Definition.MaxHops,
"expires": upstream.Definition.Expires,
"message_ttl": upstream.Definition.MessageTTL,
"queue": upstream.Definition.Queue,
}

defList := [1]map[string]interface{}{defMap}
d.Set("definition", defList)

return nil
}

func UpdateFederationUpstream(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)

name, vhost, err := parseResourceId(d)
if err != nil {
return err
}

if d.HasChange("definition") {
_, newDef := d.GetChange("definition")

defList := newDef.([]interface{})
defMap, ok := defList[0].(map[string]interface{})
if !ok {
return fmt.Errorf("Unable to parse federation definition")
}

if err := putFederationUpstream(rmqc, vhost, name, defMap); err != nil {
return err
}
}

return ReadFederationUpstream(d, meta)
}

func DeleteFederationUpstream(d *schema.ResourceData, meta interface{}) error {
rmqc := meta.(*rabbithole.Client)

name, vhost, err := parseResourceId(d)
if err != nil {
return err
}

log.Printf("[DEBUG] RabbitMQ: Attempting to delete federation upstream for %s", d.Id())

resp, err := rmqc.DeleteFederationUpstream(vhost, name)
log.Printf("[DEBUG] RabbitMQ: Federation upstream delete response: %#v", resp)
if err != nil {
return err
}

if resp.StatusCode == 404 {
// the upstream was automatically deleted
return nil
}

if resp.StatusCode >= 400 {
return fmt.Errorf("Error deleting RabbitMQ federation upstream: %s", resp.Status)
}

return nil
}

func putFederationUpstream(rmqc *rabbithole.Client, vhost string, name string, defMap map[string]interface{}) error {
definition := rabbithole.FederationDefinition{}

log.Printf("[DEBUG] RabbitMQ: Attempting to put federation definition for %s@%s: %#v", name, vhost, defMap)

if v, ok := defMap["uri"].(string); ok {
definition.Uri = v
}

if v, ok := defMap["expires"].(int); ok {
definition.Expires = v
}

if v, ok := defMap["message_ttl"].(int); ok {
definition.MessageTTL = int32(v)
}

if v, ok := defMap["max_hops"].(int); ok {
definition.MaxHops = v
}

if v, ok := defMap["prefetch_count"].(int); ok {
definition.PrefetchCount = v
}

if v, ok := defMap["reconnect_delay"].(int); ok {
definition.ReconnectDelay = v
}

if v, ok := defMap["ack_mode"].(string); ok {
definition.AckMode = v
}

if v, ok := defMap["trust_user_id"].(bool); ok {
definition.TrustUserId = v
}

if v, ok := defMap["exchange"].(string); ok {
definition.Exchange = v
}

if v, ok := defMap["queue"].(string); ok {
definition.Queue = v
}

log.Printf("[DEBUG] RabbitMQ: Attempting to declare federation upstream for %s@%s: %#v", name, vhost, definition)

resp, err := rmqc.PutFederationUpstream(vhost, name, definition)
log.Printf("[DEBUG] RabbitMQ: Federation upstream declare response: %#v", resp)
if err != nil {
return err
}

if resp.StatusCode >= 400 {
return fmt.Errorf("Error creating RabbitMQ federation upstream: %s", resp.Status)
}

return nil
}
Loading

0 comments on commit 329d2b6

Please sign in to comment.