diff --git a/GNUmakefile b/GNUmakefile index 81d2e5b3..75f4eda9 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -1,4 +1,5 @@ TEST?=$$(go list ./... |grep -v 'vendor') +TEST_COUNT?=1 GOFMT_FILES?=$$(find . -name '*.go' |grep -v vendor) WEBSITE_REPO=github.com/hashicorp/terraform-website PKG_NAME=rabbitmq @@ -14,7 +15,7 @@ test: fmtcheck xargs -t -n4 go test $(TESTARGS) -timeout=30s -parallel=4 testacc: fmtcheck - TF_ACC=1 go test $(TEST) -v $(TESTARGS) -timeout 120m + TF_ACC=1 go test $(TEST) -count $(TEST_COUNT) -v $(TESTARGS) -timeout 120m vet: @echo "go vet ." diff --git a/rabbitmq/import_federation_upstream_test.go b/rabbitmq/import_federation_upstream_test.go new file mode 100644 index 00000000..5f1c8534 --- /dev/null +++ b/rabbitmq/import_federation_upstream_test.go @@ -0,0 +1,34 @@ +package rabbitmq + +import ( + "testing" + + rabbithole "github.com/michaelklishin/rabbit-hole/v2" + + "github.com/hashicorp/terraform-plugin-sdk/helper/resource" +) + +func TestAccFederationUpstream_importBasic(t *testing.T) { + var upstream rabbithole.FederationUpstream + resourceName := "rabbitmq_federation_upstream.foo" + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccFederationUpstreamCheckDestroy(&upstream), + Steps: []resource.TestStep{ + { + Config: testAccFederationUpstream_create(), + Check: testAccFederationUpstreamCheck( + resourceName, &upstream, + ), + }, + + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} diff --git a/rabbitmq/provider.go b/rabbitmq/provider.go index 56e6df27..3345f461 100644 --- a/rabbitmq/provider.go +++ b/rabbitmq/provider.go @@ -83,14 +83,15 @@ func Provider() terraform.ResourceProvider { }, ResourcesMap: map[string]*schema.Resource{ - "rabbitmq_binding": resourceBinding(), - "rabbitmq_exchange": resourceExchange(), - "rabbitmq_permissions": resourcePermissions(), - "rabbitmq_topic_permissions": resourceTopicPermissions(), - "rabbitmq_policy": resourcePolicy(), - "rabbitmq_queue": resourceQueue(), - "rabbitmq_user": resourceUser(), - "rabbitmq_vhost": resourceVhost(), + "rabbitmq_binding": resourceBinding(), + "rabbitmq_exchange": resourceExchange(), + "rabbitmq_permissions": resourcePermissions(), + "rabbitmq_topic_permissions": resourceTopicPermissions(), + "rabbitmq_federation_upstream": resourceFederationUpstream(), + "rabbitmq_policy": resourcePolicy(), + "rabbitmq_queue": resourceQueue(), + "rabbitmq_user": resourceUser(), + "rabbitmq_vhost": resourceVhost(), }, ConfigureFunc: providerConfigure, diff --git a/rabbitmq/resource_federation_upstream.go b/rabbitmq/resource_federation_upstream.go new file mode 100644 index 00000000..35e75871 --- /dev/null +++ b/rabbitmq/resource_federation_upstream.go @@ -0,0 +1,286 @@ +package rabbitmq + +import ( + "fmt" + "log" + + "github.com/hashicorp/terraform-plugin-sdk/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/helper/validation" + rabbithole "github.com/michaelklishin/rabbit-hole/v2" +) + +func resourceFederationUpstream() *schema.Resource { + return &schema.Resource{ + Create: CreateFederationUpstream, + Read: ReadFederationUpstream, + Update: UpdateFederationUpstream, + Delete: DeleteFederationUpstream, + Importer: &schema.ResourceImporter{ + State: schema.ImportStatePassthrough, + }, + + Schema: map[string]*schema.Schema{ + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + + "vhost": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + + // "federation-upstream" + "component": { + Type: schema.TypeString, + Computed: true, + }, + + "definition": { + Type: schema.TypeList, + Required: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + // applicable to both federated exchanges and queues + "uri": { + Type: schema.TypeString, + Required: true, + Sensitive: true, + }, + + "prefetch_count": { + Type: schema.TypeInt, + Optional: true, + Default: 1000, + }, + + "reconnect_delay": { + Type: schema.TypeInt, + Optional: true, + Default: 5, + }, + + "ack_mode": { + Type: schema.TypeString, + Optional: true, + Default: "on-confirm", + ValidateFunc: validation.StringInSlice([]string{ + "on-confirm", + "on-publish", + "no-ack", + }, false), + }, + + "trust_user_id": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + // applicable to federated exchanges only + "exchange": { + Type: schema.TypeString, + Optional: true, + }, + + "max_hops": { + Type: schema.TypeInt, + Optional: true, + Default: 1, + }, + + "expires": { + Type: schema.TypeInt, + Optional: true, + }, + + "message_ttl": { + Type: schema.TypeInt, + Optional: true, + }, + // applicable to federated queues only + "queue": { + Type: schema.TypeString, + Optional: true, + }, + }, + }, + }, + }, + } +} + +func CreateFederationUpstream(d *schema.ResourceData, meta interface{}) error { + rmqc := meta.(*rabbithole.Client) + + name := d.Get("name").(string) + vhost := d.Get("vhost").(string) + defList := d.Get("definition").([]interface{}) + + defMap, ok := defList[0].(map[string]interface{}) + if !ok { + return fmt.Errorf("Unable to parse federation upstream definition") + } + + if err := putFederationUpstream(rmqc, vhost, name, defMap); err != nil { + return err + } + + id := fmt.Sprintf("%s@%s", name, vhost) + d.SetId(id) + + return ReadFederationUpstream(d, meta) +} + +func ReadFederationUpstream(d *schema.ResourceData, meta interface{}) error { + rmqc := meta.(*rabbithole.Client) + + name, vhost, err := parseResourceId(d) + if err != nil { + return err + } + + upstream, err := rmqc.GetFederationUpstream(vhost, name) + if err != nil { + return checkDeleted(d, err) + } + + log.Printf("[DEBUG] RabbitMQ: Federation upstream retrieved for %s: %#v", d.Id(), upstream) + + d.Set("name", upstream.Name) + d.Set("vhost", upstream.Vhost) + d.Set("component", upstream.Component) + + defMap := map[string]interface{}{ + "uri": upstream.Definition.Uri, + "prefetch_count": upstream.Definition.PrefetchCount, + "reconnect_delay": upstream.Definition.ReconnectDelay, + "ack_mode": upstream.Definition.AckMode, + "trust_user_id": upstream.Definition.TrustUserId, + "exchange": upstream.Definition.Exchange, + "max_hops": upstream.Definition.MaxHops, + "expires": upstream.Definition.Expires, + "message_ttl": upstream.Definition.MessageTTL, + "queue": upstream.Definition.Queue, + } + + defList := [1]map[string]interface{}{defMap} + d.Set("definition", defList) + + return nil +} + +func UpdateFederationUpstream(d *schema.ResourceData, meta interface{}) error { + rmqc := meta.(*rabbithole.Client) + + name, vhost, err := parseResourceId(d) + if err != nil { + return err + } + + if d.HasChange("definition") { + _, newDef := d.GetChange("definition") + + defList := newDef.([]interface{}) + defMap, ok := defList[0].(map[string]interface{}) + if !ok { + return fmt.Errorf("Unable to parse federation definition") + } + + if err := putFederationUpstream(rmqc, vhost, name, defMap); err != nil { + return err + } + } + + return ReadFederationUpstream(d, meta) +} + +func DeleteFederationUpstream(d *schema.ResourceData, meta interface{}) error { + rmqc := meta.(*rabbithole.Client) + + name, vhost, err := parseResourceId(d) + if err != nil { + return err + } + + log.Printf("[DEBUG] RabbitMQ: Attempting to delete federation upstream for %s", d.Id()) + + resp, err := rmqc.DeleteFederationUpstream(vhost, name) + log.Printf("[DEBUG] RabbitMQ: Federation upstream delete response: %#v", resp) + if err != nil { + return err + } + + if resp.StatusCode == 404 { + // the upstream was automatically deleted + return nil + } + + if resp.StatusCode >= 400 { + return fmt.Errorf("Error deleting RabbitMQ federation upstream: %s", resp.Status) + } + + return nil +} + +func putFederationUpstream(rmqc *rabbithole.Client, vhost string, name string, defMap map[string]interface{}) error { + definition := rabbithole.FederationDefinition{} + + log.Printf("[DEBUG] RabbitMQ: Attempting to put federation definition for %s@%s: %#v", name, vhost, defMap) + + if v, ok := defMap["uri"].(string); ok { + definition.Uri = v + } + + if v, ok := defMap["expires"].(int); ok { + definition.Expires = v + } + + if v, ok := defMap["message_ttl"].(int); ok { + definition.MessageTTL = int32(v) + } + + if v, ok := defMap["max_hops"].(int); ok { + definition.MaxHops = v + } + + if v, ok := defMap["prefetch_count"].(int); ok { + definition.PrefetchCount = v + } + + if v, ok := defMap["reconnect_delay"].(int); ok { + definition.ReconnectDelay = v + } + + if v, ok := defMap["ack_mode"].(string); ok { + definition.AckMode = v + } + + if v, ok := defMap["trust_user_id"].(bool); ok { + definition.TrustUserId = v + } + + if v, ok := defMap["exchange"].(string); ok { + definition.Exchange = v + } + + if v, ok := defMap["queue"].(string); ok { + definition.Queue = v + } + + log.Printf("[DEBUG] RabbitMQ: Attempting to declare federation upstream for %s@%s: %#v", name, vhost, definition) + + resp, err := rmqc.PutFederationUpstream(vhost, name, definition) + log.Printf("[DEBUG] RabbitMQ: Federation upstream declare response: %#v", resp) + if err != nil { + return err + } + + if resp.StatusCode >= 400 { + return fmt.Errorf("Error creating RabbitMQ federation upstream: %s", resp.Status) + } + + return nil +} diff --git a/rabbitmq/resource_federation_upstream_test.go b/rabbitmq/resource_federation_upstream_test.go new file mode 100644 index 00000000..53b91475 --- /dev/null +++ b/rabbitmq/resource_federation_upstream_test.go @@ -0,0 +1,258 @@ +package rabbitmq + +import ( + "fmt" + "regexp" + "strings" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/terraform" + rabbithole "github.com/michaelklishin/rabbit-hole/v2" +) + +func TestAccFederationUpstream(t *testing.T) { + var upstream rabbithole.FederationUpstream + resourceName := "rabbitmq_federation_upstream.foo" + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccFederationUpstreamCheckDestroy(&upstream), + Steps: []resource.TestStep{ + { + Config: testAccFederationUpstream_create(), + Check: resource.ComposeTestCheckFunc( + testAccFederationUpstreamCheck(resourceName, &upstream), + resource.TestCheckResourceAttr(resourceName, "definition.#", "1"), + resource.TestCheckResourceAttr(resourceName, "definition.0.uri", "amqp://server-name"), + resource.TestCheckResourceAttr(resourceName, "definition.0.prefetch_count", "1000"), + resource.TestCheckResourceAttr(resourceName, "definition.0.reconnect_delay", "1"), + resource.TestCheckResourceAttr(resourceName, "definition.0.ack_mode", "on-confirm"), + resource.TestCheckResourceAttr(resourceName, "definition.0.trust_user_id", "false"), + resource.TestCheckResourceAttr(resourceName, "definition.0.max_hops", "1"), + resource.TestCheckResourceAttr(resourceName, "definition.0.expires", "0"), + resource.TestCheckResourceAttr(resourceName, "definition.0.message_ttl", "0"), + )}, + { + Config: testAccFederationUpstream_update(), + Check: resource.ComposeTestCheckFunc( + testAccFederationUpstreamCheck(resourceName, &upstream), + resource.TestCheckResourceAttr(resourceName, "definition.#", "1"), + resource.TestCheckResourceAttr(resourceName, "definition.0.prefetch_count", "500"), + resource.TestCheckResourceAttr(resourceName, "definition.0.reconnect_delay", "10"), + resource.TestCheckResourceAttr(resourceName, "definition.0.ack_mode", "on-publish"), + resource.TestCheckResourceAttr(resourceName, "definition.0.trust_user_id", "true"), + resource.TestCheckResourceAttr(resourceName, "definition.0.max_hops", "2"), + resource.TestCheckResourceAttr(resourceName, "definition.0.expires", "1800000"), + resource.TestCheckResourceAttr(resourceName, "definition.0.message_ttl", "60000"), + )}, + }, + }) +} + +func TestAccFederationUpstream_hasComponent(t *testing.T) { + var upstream rabbithole.FederationUpstream + resourceName := "rabbitmq_federation_upstream.foo" + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccFederationUpstreamCheckDestroy(&upstream), + Steps: []resource.TestStep{ + { + Config: testAccFederationUpstream_basic(), + Check: resource.ComposeTestCheckFunc( + testAccFederationUpstreamCheck(resourceName, &upstream), + resource.TestCheckResourceAttr(resourceName, "component", "federation-upstream"), + ), + }, + }, + }) +} + +func TestAccFederationUpstream_defaults(t *testing.T) { + var upstream rabbithole.FederationUpstream + resourceName := "rabbitmq_federation_upstream.foo" + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccFederationUpstreamCheckDestroy(&upstream), + Steps: []resource.TestStep{ + { + Config: testAccFederationUpstream_basic(), + Check: resource.ComposeTestCheckFunc( + testAccFederationUpstreamCheck(resourceName, &upstream), + resource.TestCheckResourceAttr(resourceName, "definition.#", "1"), + resource.TestCheckResourceAttr(resourceName, "definition.0.prefetch_count", "1000"), + resource.TestCheckResourceAttr(resourceName, "definition.0.reconnect_delay", "5"), + resource.TestCheckResourceAttr(resourceName, "definition.0.ack_mode", "on-confirm"), + resource.TestCheckResourceAttr(resourceName, "definition.0.trust_user_id", "false"), + resource.TestCheckResourceAttr(resourceName, "definition.0.max_hops", "1"), + ), + }, + }, + }) +} + +func TestAccFederationUpstream_validation(t *testing.T) { + var upstream rabbithole.FederationUpstream + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccFederationUpstreamCheckDestroy(&upstream), + Steps: []resource.TestStep{ + { + Config: testAccFederationUpstream_validation(), + ExpectError: regexp.MustCompile("^config is invalid"), + }, + }, + }) +} + +func testAccFederationUpstreamCheck(rn string, upstream *rabbithole.FederationUpstream) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[rn] + if !ok { + return fmt.Errorf("resource not found: %s", rn) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("federation upstream id not set") + } + + id := strings.Split(rs.Primary.ID, "@") + name := id[0] + vhost := id[1] + + rmqc := testAccProvider.Meta().(*rabbithole.Client) + upstreams, err := rmqc.ListFederationUpstreamsIn(vhost) + if err != nil { + return fmt.Errorf("Error retrieving federation upstreams: %s", err) + } + + for _, up := range upstreams { + if up.Name == name && up.Vhost == vhost { + upstream = &up + return nil + } + } + + return fmt.Errorf("Unable to find federation upstream %s", rn) + } +} + +func testAccFederationUpstreamCheckDestroy(upstream *rabbithole.FederationUpstream) resource.TestCheckFunc { + return func(s *terraform.State) error { + rmqc := testAccProvider.Meta().(*rabbithole.Client) + + upstreams, err := rmqc.ListFederationUpstreamsIn(upstream.Vhost) + if err != nil { + return fmt.Errorf("Error retrieving federation upstreams: %s", err) + } + + for _, up := range upstreams { + if up.Name == upstream.Name && up.Vhost == upstream.Vhost { + return fmt.Errorf("Federation upstream %s@%s still exists", upstream.Name, upstream.Vhost) + } + } + + return nil + } +} + +func testAccFederationUpstream_baseConfig() string { + return fmt.Sprintf(` +resource "rabbitmq_vhost" "test" { + name = "test" +} + +resource "rabbitmq_permissions" "guest" { + user = "guest" + vhost = rabbitmq_vhost.test.name + permissions { + configure = ".*" + write = ".*" + read = ".*" + } +} + +`) +} + +func testAccFederationUpstream_create() string { + return testAccFederationUpstream_baseConfig() + fmt.Sprintf(` +resource "rabbitmq_federation_upstream" "foo" { + name = "foo" + vhost = rabbitmq_permissions.guest.vhost + + definition { + uri = "amqp://server-name" + prefetch_count = 1000 + reconnect_delay = 1 + ack_mode = "on-confirm" + trust_user_id = false + + exchange = "" + max_hops = 1 + expires = 0 + message_ttl = 0 + + queue = "" + } +} +`) +} + +func testAccFederationUpstream_update() string { + return testAccFederationUpstream_baseConfig() + fmt.Sprintf(` +resource "rabbitmq_federation_upstream" "foo" { + name = "foo" + vhost = rabbitmq_permissions.guest.vhost + + definition { + uri = "amqp://server-name" + prefetch_count = 500 + reconnect_delay = 10 + ack_mode = "on-publish" + trust_user_id = true + + exchange = "" + max_hops = 2 + expires = 1800000 + message_ttl = 60000 + + queue = "" + } +} +`) +} + +func testAccFederationUpstream_basic() string { + return testAccFederationUpstream_baseConfig() + fmt.Sprintf(` +resource "rabbitmq_federation_upstream" "foo" { + name = "foo" + vhost = rabbitmq_permissions.guest.vhost + + definition { + uri = "amqp://server-name" + } +} +`) +} + +func testAccFederationUpstream_validation() string { + return testAccFederationUpstream_baseConfig() + fmt.Sprintf(` +resource "rabbitmq_federation_upstream" "foo" { + name = "foo" + vhost = rabbitmq_permissions.guest.vhost + + definition { + uri = "amqp://server-name" + ack_mode = "not-valid" + } +} +`) +} diff --git a/rabbitmq/util.go b/rabbitmq/util.go index b1feca6b..70732099 100644 --- a/rabbitmq/util.go +++ b/rabbitmq/util.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "fmt" "strings" "github.com/hashicorp/terraform-plugin-sdk/helper/schema" @@ -33,3 +34,20 @@ func percentDecodeSlashes(s string) string { // Decode any forward slashes, then decode any percent signs. return strings.Replace(strings.Replace(s, "%2F", "/", -1), "%25", "%", -1) } + +// get the id of the resource from the ResourceData +func parseResourceId(d *schema.ResourceData) (name, vhost string, err error) { + return parseId(d.Id()) +} + +// get the resource name and rabbitmq vhost from the resource id +func parseId(resourceId string) (name, vhost string, err error) { + parts := strings.Split(resourceId, "@") + if len(parts) != 2 { + err = fmt.Errorf("Unable to parse resource id: %s", resourceId) + return + } + name = parts[0] + vhost = parts[1] + return +} diff --git a/rabbitmq/util_test.go b/rabbitmq/util_test.go new file mode 100644 index 00000000..81c269f2 --- /dev/null +++ b/rabbitmq/util_test.go @@ -0,0 +1,36 @@ +package rabbitmq + +import "testing" + +func TestParseId(t *testing.T) { + var badInputs = []string{ + "", + "foo/test", + "footest", + "foo@bar@test", + } + + for _, input := range badInputs { + _, _, err := parseId(input) + if err == nil { + t.Errorf("parseId failed for: %s.", input) + } + } + + var goodInputs = []struct { + input string + name string + vhost string + }{ + {"foo@test", "foo", "test"}, + {"foo@/", "foo", "/"}, + {"foo/bar/baz@/", "foo/bar/baz", "/"}, + } + + for _, test := range goodInputs { + name, vhost, err := parseId(test.input) + if err != nil || name != test.name || vhost != test.vhost { + t.Errorf("parseId failed for: %s.", test.input) + } + } +} diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index 7504a17d..4390d215 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -8,3 +8,5 @@ services: RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD:-guest} ports: - 15672:15672 + volumes: + - ./enabled_plugins:/etc/rabbitmq/enabled_plugins \ No newline at end of file diff --git a/scripts/enabled_plugins b/scripts/enabled_plugins new file mode 100644 index 00000000..b6d6a6d0 --- /dev/null +++ b/scripts/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management,rabbitmq_federation,rabbitmq_federation_management]. \ No newline at end of file diff --git a/website/docs/r/federation-upstream.html.markdown b/website/docs/r/federation-upstream.html.markdown new file mode 100644 index 00000000..c810818a --- /dev/null +++ b/website/docs/r/federation-upstream.html.markdown @@ -0,0 +1,118 @@ +--- +layout: "rabbitmq" +page_title: "RabbitMQ: rabbitmq_federation_upstream" +sidebar_current: "docs-rabbitmq-resource-federation-upstream" +description: |- + Creates and manages a federation upstream on a RabbitMQ server. +--- + +# rabbitmq\_federation\_upstream + +The ``rabbitmq_federation_upstream`` resource creates and manages a federation upstream parameter. + +## Example Usage + +```hcl +resource "rabbitmq_vhost" "test" { + name = "test" +} + +resource "rabbitmq_permissions" "guest" { + user = "guest" + vhost = rabbitmq_vhost.test.name + permissions { + configure = ".*" + write = ".*" + read = ".*" + } +} + +// downstream exchange +resource "rabbitmq_exchange" "foo" { + name = "foo" + vhost = rabbitmq_permissions.guest.vhost + + settings { + type = "topic" + durable = "true" + } +} + +// upstream broker +resource "rabbitmq_federation_upstream" "foo" { + name = "foo" + vhost = rabbitmq_permissions.guest.vhost + + definition { + uri = "amqp://guest:guest@upstream-server-name:5672/%2f" + prefetch_count = 1000 + reconnect_delay = 5 + ack_mode = "on-confirm" + trust_user_id = false + max_hops = 1 + } +} + +resource "rabbitmq_policy" "foo" { + name = "foo" + vhost = rabbitmq_permissions.guest.vhost + + policy { + pattern = "(^${rabbitmq_exchange.foo.name}$)" + priority = 1 + apply_to = "exchanges" + + definition = { + federation-upstream = rabbitmq_federation_upstream.foo.name + } + } +} + +``` + +## Argument Reference + +The following arguments are supported: + +* `name` - (Required) The name of the federation upstream. + +* `vhost` - (Required) The vhost to create the resource in. + +* `component` - (Computed) Set to `federation-upstream` by the underlying RabbitMQ provider. You do not set this attribute but will see it in state and plan output. + +* `definition` - (Required) The configuration of the federation upstream. The structure is described below. + +The `definition` block supports the following arguments: + +Applicable to Both Federated Exchanges and Queues + +* `uri` - (Required) The AMQP URI(s) for the upstream. Note that the URI may contain sensitive information, such as a password. +* `prefetch_count` - (Optional) Maximum number of unacknowledged messages that may be in flight over a federation link at one time. Default is `1000`. +* `reconnect_delay` - (Optional) Time in seconds to wait after a network link goes down before attempting reconnection. Default is `5`. +* `ack_mode` - (Optional) Determines how the link should acknowledge messages. Valid values are `on-confirm`, `on-publish`, and `no-ack`. Default is `on-confirm`. +* `trust_user_id` - (Optional) Determines how federation should interact with the validated user-id feature. Default is `false`. + +Applicable to Federated Exchanges Only + +* `exchange` - (Optional) The name of the upstream exchange. +* `max_hops` - (Optional) Maximum number of federation links that messages can traverse before being dropped. Default is `1`. +* `expires` - (Optional) The expiry time (in milliseconds) after which an upstream queue for a federated exchange may be deleted if a connection to the upstream is lost. +* `message_ttl` - (Optional) The expiry time (in milliseconds) for messages in the upstream queue for a federated exchange (see expires). + +Applicable to Federated Queues Only + +* `queue` - (Optional) The name of the upstream queue. + +Consult the RabbitMQ [Federation Reference](https://www.rabbitmq.com/federation-reference.html) documentation for detailed information and guidance on setting these values. + +## Attributes Reference + +No further attributes are exported. + +## Import + +A Federation upstream can be imported using the resource `id` which is composed of `name@vhost`, e.g. + +```sh +terraform import rabbitmq_federation_upstream.foo foo@test +``` diff --git a/website/rabbitmq.erb b/website/rabbitmq.erb index c48bf1d6..c8ac7690 100644 --- a/website/rabbitmq.erb +++ b/website/rabbitmq.erb @@ -19,6 +19,9 @@ > rabbitmq_exchange + > + rabbitmq_federation_upstream + > rabbitmq_permissions