From b012021c405ca89bd4160d182007e8941b978b97 Mon Sep 17 00:00:00 2001 From: Justin Date: Fri, 17 Jul 2020 16:37:41 +0200 Subject: [PATCH] New resource: `rabbimtq_shovel`. --- rabbitmq/provider.go | 1 + rabbitmq/resource_shovel.go | 240 ++++++++++++++++++++++++++++ rabbitmq/resource_shovel_test.go | 124 ++++++++++++++ scripts/enabled_plugins | 2 +- website/docs/r/shovel.html.markdown | 111 +++++++++++++ website/rabbitmq.erb | 5 +- 6 files changed, 481 insertions(+), 2 deletions(-) create mode 100644 rabbitmq/resource_shovel.go create mode 100644 rabbitmq/resource_shovel_test.go create mode 100644 website/docs/r/shovel.html.markdown diff --git a/rabbitmq/provider.go b/rabbitmq/provider.go index 3345f461..6dca7b24 100644 --- a/rabbitmq/provider.go +++ b/rabbitmq/provider.go @@ -92,6 +92,7 @@ func Provider() terraform.ResourceProvider { "rabbitmq_queue": resourceQueue(), "rabbitmq_user": resourceUser(), "rabbitmq_vhost": resourceVhost(), + "rabbitmq_shovel": resourceShovel(), }, ConfigureFunc: providerConfigure, diff --git a/rabbitmq/resource_shovel.go b/rabbitmq/resource_shovel.go new file mode 100644 index 00000000..00441262 --- /dev/null +++ b/rabbitmq/resource_shovel.go @@ -0,0 +1,240 @@ +package rabbitmq + +import ( + "fmt" + "log" + "strings" + + rabbithole "github.com/michaelklishin/rabbit-hole/v2" + + "github.com/hashicorp/terraform-plugin-sdk/helper/schema" +) + +func resourceShovel() *schema.Resource { + return &schema.Resource{ + Create: CreateShovel, + Read: ReadShovel, + Delete: DeleteShovel, + Importer: &schema.ResourceImporter{ + State: schema.ImportStatePassthrough, + }, + + Schema: map[string]*schema.Schema{ + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "vhost": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "info": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "source_uri": { + Type: schema.TypeString, + Required: true, + Sensitive: true, + }, + "source_exchange": { + Type: schema.TypeString, + Optional: true, + Default: nil, + }, + "source_exchange_key": { + Type: schema.TypeString, + Optional: true, + Default: nil, + }, + "source_queue": { + Type: schema.TypeString, + Optional: true, + Default: nil, + }, + "destination_uri": { + Type: schema.TypeString, + Required: true, + Sensitive: true, + }, + "destination_exchange": { + Type: schema.TypeString, + Optional: true, + Default: nil, + }, + "destination_exchange_key": { + Type: schema.TypeString, + Optional: true, + Default: nil, + }, + "destination_queue": { + Type: schema.TypeString, + Optional: true, + Default: nil, + }, + "prefetch_count": { + Type: schema.TypeInt, + Optional: true, + Default: 1000, + }, + "reconnect_delay": { + Type: schema.TypeInt, + Optional: true, + Default: 1, + }, + "add_forward_headers": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "ack_mode": { + Type: schema.TypeString, + Optional: true, + Default: "on-confirm", + }, + "delete_after": { + Type: schema.TypeString, + Optional: true, + Default: "never", + }, + }, + }, + }, + }, + } +} + +func CreateShovel(d *schema.ResourceData, meta interface{}) error { + rmqc := meta.(*rabbithole.Client) + + vhost := d.Get("vhost").(string) + shovelName := d.Get("name").(string) + shovelInfo := d.Get("info").([]interface{}) + + shovelMap, ok := shovelInfo[0].(map[string]interface{}) + if !ok { + return fmt.Errorf("Unable to parse shovel info") + } + + shovelDefinition := setShovelDefinition(shovelMap).(rabbithole.ShovelDefinition) + + log.Printf("[DEBUG] RabbitMQ: Attempting to declare shovel %s in vhost %s", shovelName, vhost) + resp, err := rmqc.DeclareShovel(vhost, shovelName, shovelDefinition) + log.Printf("[DEBUG] RabbitMQ: shovel declartion response: %#v", resp) + if err != nil { + return err + } + + shovelId := fmt.Sprintf("%s@%s", shovelName, vhost) + + d.SetId(shovelId) + + return ReadShovel(d, meta) +} + +func ReadShovel(d *schema.ResourceData, meta interface{}) error { + rmqc := meta.(*rabbithole.Client) + + shovelId := strings.Split(d.Id(), "@") + + name := shovelId[0] + vhost := shovelId[1] + + shovelInfo, err := rmqc.GetShovel(vhost, name) + if err != nil { + return checkDeleted(d, err) + } + + log.Printf("[DEBUG] RabbitMQ: Shovel retrieved: Vhost: %#v, Name: %#v", vhost, name) + + d.Set("name", shovelInfo.Name) + d.Set("vhost", shovelInfo.Vhost) + + return nil +} + +func DeleteShovel(d *schema.ResourceData, meta interface{}) error { + rmqc := meta.(*rabbithole.Client) + + shovelId := strings.Split(d.Id(), "@") + + name := shovelId[0] + vhost := shovelId[1] + + log.Printf("[DEBUG] RabbitMQ: Attempting to delete shovel %s", d.Id()) + + resp, err := rmqc.DeleteShovel(vhost, name) + log.Printf("[DEBUG] RabbitMQ: shovel deletion response: %#v", resp) + if err != nil { + return err + } + + if resp.StatusCode >= 400 { + return fmt.Errorf("Error deleting RabbitMQ shovel: %s", resp.Status) + } + + return nil +} + +func setShovelDefinition(shovelMap map[string]interface{}) interface{} { + shovelDefinition := &rabbithole.ShovelDefinition{} + + if v, ok := shovelMap["source_uri"].(string); ok { + shovelDefinition.SourceURI = v + } + + if v, ok := shovelMap["source_exchange"].(string); ok { + shovelDefinition.SourceExchange = v + } + + if v, ok := shovelMap["source_exchange_key"].(string); ok { + shovelDefinition.SourceExchangeKey = v + } + + if v, ok := shovelMap["source_queue"].(string); ok { + shovelDefinition.SourceQueue = v + } + + if v, ok := shovelMap["destination_uri"].(string); ok { + shovelDefinition.DestinationURI = v + } + + if v, ok := shovelMap["destination_exchange"].(string); ok { + shovelDefinition.DestinationExchange = v + } + + if v, ok := shovelMap["destination_exchange_key"].(string); ok { + shovelDefinition.DestinationExchangeKey = v + } + + if v, ok := shovelMap["destination_queue"].(string); ok { + shovelDefinition.DestinationQueue = v + } + + if v, ok := shovelMap["prefetch_count"].(int); ok { + shovelDefinition.PrefetchCount = v + } + + if v, ok := shovelMap["reconnect_delay"].(int); ok { + shovelDefinition.ReconnectDelay = v + } + + if v, ok := shovelMap["add_forward_headers"].(bool); ok { + shovelDefinition.AddForwardHeaders = v + } + + if v, ok := shovelMap["ack_mode"].(string); ok { + shovelDefinition.AckMode = v + } + + if v, ok := shovelMap["delete_after"].(string); ok { + shovelDefinition.DeleteAfter = v + } + + return *shovelDefinition +} diff --git a/rabbitmq/resource_shovel_test.go b/rabbitmq/resource_shovel_test.go new file mode 100644 index 00000000..f10222e7 --- /dev/null +++ b/rabbitmq/resource_shovel_test.go @@ -0,0 +1,124 @@ +package rabbitmq + +import ( + "fmt" + "strings" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/terraform" + rabbithole "github.com/michaelklishin/rabbit-hole/v2" +) + +func TestAccShovel(t *testing.T) { + var shovelInfo rabbithole.ShovelInfo + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccShovelCheckDestroy(&shovelInfo), + Steps: []resource.TestStep{ + { + Config: testAccShovelConfig_basic, + Check: testAccShovelCheck( + "rabbitmq_shovel.shovelTest", &shovelInfo, + ), + }, + }, + }) +} + +func testAccShovelCheck(rn string, shovelInfo *rabbithole.ShovelInfo) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[rn] + if !ok { + return fmt.Errorf("resource not found: %s", rn) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("shovel id not set") + } + + rmqc := testAccProvider.Meta().(*rabbithole.Client) + shovelParts := strings.Split(rs.Primary.ID, "@") + + shovelInfos, err := rmqc.ListShovels() + if err != nil { + return fmt.Errorf("Error retrieving shovels: %s", err) + } + + for _, info := range shovelInfos { + if info.Name == shovelParts[0] && info.Vhost == shovelParts[1] { + shovelInfo = &info + return nil + } + } + + return fmt.Errorf("Unable to find shovel %s", rn) + } +} + +func testAccShovelCheckDestroy(shovelInfo *rabbithole.ShovelInfo) resource.TestCheckFunc { + return func(s *terraform.State) error { + rmqc := testAccProvider.Meta().(*rabbithole.Client) + + shovelInfos, err := rmqc.ListShovels() + if err != nil { + return fmt.Errorf("Error retrieving shovels: %s", err) + } + + for _, info := range shovelInfos { + if info.Name == shovelInfo.Name && info.Vhost == shovelInfo.Vhost { + return fmt.Errorf("shovel still exists: %v", info) + } + } + + return nil + } +} + +const testAccShovelConfig_basic = ` +resource "rabbitmq_vhost" "test" { + name = "test" +} + +resource "rabbitmq_permissions" "guest" { + user = "guest" + vhost = "${rabbitmq_vhost.test.name}" + permissions { + configure = ".*" + write = ".*" + read = ".*" + } +} + +resource "rabbitmq_exchange" "test" { + name = "test_exchange" + vhost = "${rabbitmq_permissions.guest.vhost}" + settings { + type = "fanout" + durable = false + auto_delete = true + } +} + +resource "rabbitmq_queue" "test" { + name = "test_queue" + vhost = "${rabbitmq_exchange.test.vhost}" + settings { + durable = false + auto_delete = true + } +} + +resource "rabbitmq_shovel" "shovelTest" { + name = "shovelTest" + vhost = "${rabbitmq_queue.test.vhost}" + info { + source_uri = "amqp:///test" + source_exchange = "${rabbitmq_exchange.test.name}" + source_exchange_key = "test" + destination_uri = "amqp:///test" + destination_queue = "${rabbitmq_queue.test.name}" + } +}` diff --git a/scripts/enabled_plugins b/scripts/enabled_plugins index b6d6a6d0..edf251f1 100644 --- a/scripts/enabled_plugins +++ b/scripts/enabled_plugins @@ -1 +1 @@ -[rabbitmq_management,rabbitmq_federation,rabbitmq_federation_management]. \ No newline at end of file +[rabbitmq_management,rabbitmq_federation,rabbitmq_federation_management,rabbitmq_shovel,rabbitmq_shovel_management]. diff --git a/website/docs/r/shovel.html.markdown b/website/docs/r/shovel.html.markdown new file mode 100644 index 00000000..e63be20a --- /dev/null +++ b/website/docs/r/shovel.html.markdown @@ -0,0 +1,111 @@ +--- +layout: "rabbitmq" +page_title: "RabbitMQ: rabbitmq_shovel" +sidebar_current: "docs-rabbitmq-resource-shovel" +description: |- + Creates and manages a shovel on a RabbitMQ server. +--- + +# rabbitmq\_shovel + +The ``rabbitmq_shovel`` resource creates and manages a shovel. + +## Example Usage + +```hcl +resource "rabbitmq_vhost" "test" { + name = "test" +} + +resource "rabbitmq_exchange" "test" { + name = "test_exchange" + vhost = "${rabbitmq_vhost.test.name}" + settings { + type = "fanout" + durable = false + auto_delete = true + } +} + +resource "rabbitmq_queue" "test" { + name = "test_queue" + vhost = "${rabbitmq_vhost.test.name}" + settings { + durable = false + auto_delete = true + } +} + +resource "rabbitmq_shovel" "shovelTest" { + name = "shovelTest" + vhost = "${rabbitmq_vhost.test.name}" + info { + source_uri = "amqp:///test" + source_exchange = "${rabbitmq_exchange.test.name}" + source_exchange_key = "test" + destination_uri = "amqp:///test" + destination_queue = "${rabbitmq_queue.test.name}" + } +} +``` + +## Argument Reference + +The following arguments are supported: + +* `name` - (Required) The shovel name. + +* `vhost` - (Required) The vhost to create the resource in. + +* `info` - (Required) The settings of the shovel. The structure is + described below. + +The `info` block supports: + +* `source_uri` - (Required) The amqp uri for the source. + +* `source_exchange` - (Optional) The exchange from which to consume. +Either this or source_queue must be specified but not both. + +* `source_exchange_key` - (Optional) The routing key when using source_exchange. + +* `source_queue` - (Optional) The queue from which to consume. +Either this or source_exchange must be specified but not both. + +* `destination_uri` - (Required) The amqp uri for the destination . + +* `destination_exchange` - (Optional) The exchange to which messages should be published. +Either this or destination_queue must be specified but not both. + +* `destination_exchange_key` - (Optional) The routing key when using destination_exchange. + +* `destination_queue` - (Optional) The queue to which messages should be published. +Either this or destination_exchange must be specified but not both. + +* `prefetch_count` - (Optional) The maximum number of unacknowledged messages copied over a shovel at any one time. +Defaults to `1000`. + +* `reconnect_delay` - (Optional) The duration in seconds to reconnect to a broker after disconnected. +Defaults to `1`. + +* `add_forward_headers` - (Optional) Whether to amqp shovel headers. +Defaults to `false`. + +* `ack_mode` - (Optional) Determines how the shovel should acknowledge messages. +Defaults to `on-confirm`. + +* `delete_after` - (Optional) Determines when (if ever) the shovel should delete itself . +Defaults to `never`. + +## Attributes Reference + +No further attributes are exported. + +## Import + +Shovels can be imported using the `name` and `vhost` +E.g. + +``` +terraform import rabbitmq_shovel.test shovelTest@test +``` diff --git a/website/rabbitmq.erb b/website/rabbitmq.erb index c8ac7690..088d57bd 100644 --- a/website/rabbitmq.erb +++ b/website/rabbitmq.erb @@ -21,7 +21,7 @@ > rabbitmq_federation_upstream - + > rabbitmq_permissions @@ -40,6 +40,9 @@ > rabbitmq_vhost + > + rabbitmq_shovel +