From e453517f4dd1699263384f827421f0b02f769b85 Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sat, 25 Apr 2020 18:54:50 -0400 Subject: [PATCH 01/19] GET a federation-upstream parameter. --- bin/ci/before_build.sh | 3 +++ federation.go | 19 ++++++++++++++++++ rabbithole_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/bin/ci/before_build.sh b/bin/ci/before_build.sh index bd940c5..7bd6c9d 100755 --- a/bin/ci/before_build.sh +++ b/bin/ci/before_build.sh @@ -29,3 +29,6 @@ $CTL set_cluster_name rabbitmq@localhost # Enable shovel plugin $PLUGINS enable rabbitmq_shovel $PLUGINS enable rabbitmq_shovel_management + +$PLUGINS enable rabbitmq_federation +$PLUGINS enable rabbitmq_federation_management diff --git a/federation.go b/federation.go index 887cf3b..5de97c9 100644 --- a/federation.go +++ b/federation.go @@ -27,6 +27,25 @@ type FederationUpstream struct { Definition FederationDefinition `json:"value"` } +// +// GET /api/parameters/federation-upstream/{vhost}/{upstream} +// + +// GetFederationUpstream returns a federation upstream +func (c *Client) GetFederationUpstream(vhost, upstreamName string) (rec *FederationUpstream, err error) { + req, err := newGETRequest(c, "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName)) + + if err != nil { + return nil, err + } + + if err = executeAndParseRequest(c, req, &rec); err != nil { + return nil, err + } + + return rec, nil +} + // // PUT /api/parameters/federation-upstream/{vhost}/{upstream} // diff --git a/rabbithole_test.go b/rabbithole_test.go index a918db0..b367824 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -90,6 +90,50 @@ var _ = Describe("Rabbithole", func() { rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest") }) + Context("GET /api/parameters/federation-upstream/{vhost}/{upstream}", func() { + Context("when the upstream does not exist", func() { + It("returns a 404 error response", func() { + vh := "rabbit/hole" + name := "temporary" + + fu, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + Ω(fu).Should(BeNil()) + }) + }) + + Context("when the upstream exists", func() { + It("returns the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + fd := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 1000, + ReconnectDelay: 1, + AckMode: "on-confirm", + TrustUserId: false, + } + + _, err := rmqc.PutFederationUpstream(vh, name, fd) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + fu, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) + Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) + Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) + Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) + Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) + + _, err = rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + }) + }) + }) + Context("PUT /parameters/shovel/{vhost}/{name}", func() { It("declares a shovel", func() { vh := "rabbit/hole" From 5b4314a26bfbfb919fa27056d0dec23d06774058 Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sat, 25 Apr 2020 18:56:13 -0400 Subject: [PATCH 02/19] Add tests for PutFederationUpstream method. --- rabbithole_test.go | 87 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/rabbithole_test.go b/rabbithole_test.go index b367824..fceede9 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -134,6 +134,93 @@ var _ = Describe("Rabbithole", func() { }) }) + Context("PUT /api/parameters/federation-upstream/{vhost}/{upstream}", func() { + Context("when the upstream does not exist", func() { + It("creates the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + fd := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 500, + ReconnectDelay: 5, + AckMode: "on-publish", + TrustUserId: false, + } + + _, err := rmqc.PutFederationUpstream(vh, name, fd) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + fu, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) + Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) + Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) + Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) + Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) + + _, err = rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + }) + }) + + Context("when the upstream exists", func() { + It("updates the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + // create the initial upstream + fd := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 1000, + ReconnectDelay: 1, + AckMode: "on-confirm", + TrustUserId: false, + } + + _, err := rmqc.PutFederationUpstream(vh, name, fd) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + fu, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) + Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) + Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) + Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) + Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) + + // update the upstream + fd2 := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 500, + ReconnectDelay: 10, + AckMode: "no-ack", + TrustUserId: true, + } + + _, err = rmqc.PutFederationUpstream(vh, name, fd2) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + fu2, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(fu2.Definition.Uri).Should(Equal(fd2.Uri)) + Ω(fu2.Definition.PrefetchCount).Should(Equal(fd2.PrefetchCount)) + Ω(fu2.Definition.ReconnectDelay).Should(Equal(fd2.ReconnectDelay)) + Ω(fu2.Definition.AckMode).Should(Equal(fd2.AckMode)) + Ω(fu2.Definition.TrustUserId).Should(Equal(fd2.TrustUserId)) + + _, err = rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + }) + }) + }) + Context("PUT /parameters/shovel/{vhost}/{name}", func() { It("declares a shovel", func() { vh := "rabbit/hole" From fc8b221d35a63762472449e77331bd14d4804b0b Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sat, 25 Apr 2020 19:35:16 -0400 Subject: [PATCH 03/19] Add tests for DeleteFederationUpstream. --- rabbithole_test.go | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/rabbithole_test.go b/rabbithole_test.go index fceede9..36bf815 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -92,7 +92,7 @@ var _ = Describe("Rabbithole", func() { Context("GET /api/parameters/federation-upstream/{vhost}/{upstream}", func() { Context("when the upstream does not exist", func() { - It("returns a 404 error response", func() { + It("returns a 404 error", func() { vh := "rabbit/hole" name := "temporary" @@ -221,6 +221,41 @@ var _ = Describe("Rabbithole", func() { }) }) + Context("DELETE /api/parameters/federation-upstream/{vhost}/{name}", func() { + Context("when the upstream does not exist", func() { + It("returns a 404 error response", func() { + vh := "rabbit/hole" + name := "temporary" + + // this is NOT an error, but a HTTP 404 response + resp, err := rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(resp.StatusCode).Should(Equal(404)) + Ω(resp.Status).Should(Equal("404 Not Found")) + }) + }) + + Context("when the upstream exists", func() { + It("deletes the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + fd := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + } + + _, err := rmqc.PutFederationUpstream(vh, name, fd) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + resp, err := rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(resp.Status).Should(HavePrefix("20")) + }) + }) + }) + Context("PUT /parameters/shovel/{vhost}/{name}", func() { It("declares a shovel", func() { vh := "rabbit/hole" From 16223445e3caefcf47eed13d380873577d9b9b8d Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sun, 26 Apr 2020 11:51:27 -0400 Subject: [PATCH 04/19] Add ListFederationUpstreams* methods. --- federation.go | 36 ++++++++++++++++ rabbithole_test.go | 102 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) diff --git a/federation.go b/federation.go index 5de97c9..5083ed5 100644 --- a/federation.go +++ b/federation.go @@ -27,6 +27,42 @@ type FederationUpstream struct { Definition FederationDefinition `json:"value"` } +// +// GET /api/parameters/federation-upstream +// + +// ListFederationUpstreams returns all federation upstreams +func (c *Client) ListFederationUpstreams() (rec []FederationUpstream, err error) { + req, err := newGETRequest(c, "parameters/federation-upstream") + if err != nil { + return []FederationUpstream{}, err + } + + if err = executeAndParseRequest(c, req, &rec); err != nil { + return []FederationUpstream{}, err + } + + return rec, nil +} + +// +// GET /api/parameters/federation-upstream/{vhost} +// + +// ListFederationUpstreamsIn returns all federation upstreams in a vhost +func (c *Client) ListFederationUpstreamsIn(vhost string) (rec []FederationUpstream, err error) { + req, err := newGETRequest(c, "parameters/federation-upstream/"+url.PathEscape(vhost)) + if err != nil { + return []FederationUpstream{}, err + } + + if err = executeAndParseRequest(c, req, &rec); err != nil { + return []FederationUpstream{}, err + } + + return rec, nil +} + // // GET /api/parameters/federation-upstream/{vhost}/{upstream} // diff --git a/rabbithole_test.go b/rabbithole_test.go index 36bf815..d07c176 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -90,6 +90,108 @@ var _ = Describe("Rabbithole", func() { rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest") }) + Context("GET /api/parameters/federation-upstream", func() { + Context("when there are no upstreams", func() { + It("returns an empty response", func() { + list, err := rmqc.ListFederationUpstreams() + Ω(err).Should(BeNil()) + Ω(list).Should(BeEmpty()) + }) + }) + + Context("when there are upstreams", func() { + It("returns the list of upstreams", func() { + fd1 := FederationDefinition{ + Uri: "amqp://server-name/%2f", + } + _, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", fd1) + Ω(err).Should(BeNil()) + + fd2 := FederationDefinition{ + Uri: "amqp://example.com/%2f", + } + _, err = rmqc.PutFederationUpstream("/", "upstream2", fd2) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err := rmqc.ListFederationUpstreams() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(2)) + + _, err = rmqc.DeleteFederationUpstream("rabbit/hole", "upstream1") + Ω(err).Should(BeNil()) + + _, err = rmqc.DeleteFederationUpstream("/", "upstream2") + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err = rmqc.ListFederationUpstreams() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(0)) + }) + }) + }) + + Context("GET /api/parameters/federation-upstream/{vhost}", func() { + Context("when there are no upstreams", func() { + It("returns an empty response", func() { + vh := "rabbit/hole" + + list, err := rmqc.ListFederationUpstreamsIn(vh) + Ω(err).Should(BeNil()) + Ω(list).Should(BeEmpty()) + }) + }) + + Context("when there are upstreams", func() { + It("returns the list of upstreams", func() { + vh := "rabbit/hole" + + fd1 := FederationDefinition{ + Uri: "amqp://server-name/%2f", + } + + _, err := rmqc.PutFederationUpstream(vh, "upstream1", fd1) + Ω(err).Should(BeNil()) + + fd2 := FederationDefinition{ + Uri: "amqp://example.com/%2f", + } + + _, err = rmqc.PutFederationUpstream(vh, "upstream2", fd2) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err := rmqc.ListFederationUpstreamsIn(vh) + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(2)) + + // delete upstream1 + _, err = rmqc.DeleteFederationUpstream(vh, "upstream1") + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err = rmqc.ListFederationUpstreamsIn(vh) + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(1)) + + // delete upstream2 + _, err = rmqc.DeleteFederationUpstream(vh, "upstream2") + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err = rmqc.ListFederationUpstreamsIn(vh) + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(0)) + }) + }) + }) + Context("GET /api/parameters/federation-upstream/{vhost}/{upstream}", func() { Context("when the upstream does not exist", func() { It("returns a 404 error", func() { From 2c38d11b25b03c8bcbe6a954428d37c2f5e4b18b Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sun, 26 Apr 2020 13:14:39 -0400 Subject: [PATCH 05/19] Add additional fields to FederationUpstream. - to support responses from GET requests. --- federation.go | 13 +++++++++++-- rabbithole_test.go | 12 ++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/federation.go b/federation.go index 5083ed5..0866154 100644 --- a/federation.go +++ b/federation.go @@ -24,6 +24,14 @@ type FederationDefinition struct { // Represents a configured Federation upstream. type FederationUpstream struct { + Name string `json:"name"` + Vhost string `json:"vhost"` + Component string `json:"component"` + Definition FederationDefinition `json:"value"` +} + +// FederationDefinitionDTO provides a data transfer object for a FederationDefinition. +type FederationDefinitionDTO struct { Definition FederationDefinition `json:"value"` } @@ -88,10 +96,11 @@ func (c *Client) GetFederationUpstream(vhost, upstreamName string) (rec *Federat // Updates a federation upstream func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error) { - fedUp := FederationUpstream{ + fedDTO := FederationDefinitionDTO{ Definition: fDef, } - body, err := json.Marshal(fedUp) + + body, err := json.Marshal(fedDTO) if err != nil { return nil, err } diff --git a/rabbithole_test.go b/rabbithole_test.go index d07c176..d9b0c76 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -224,6 +224,9 @@ var _ = Describe("Rabbithole", func() { fu, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) + Ω(fu.Vhost).Should(Equal(vh)) + Ω(fu.Name).Should(Equal(name)) + Ω(fu.Component).Should(Equal("federation-upstream")) Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) @@ -257,6 +260,9 @@ var _ = Describe("Rabbithole", func() { fu, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) + Ω(fu.Vhost).Should(Equal(vh)) + Ω(fu.Name).Should(Equal(name)) + Ω(fu.Component).Should(Equal("federation-upstream")) Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) @@ -289,6 +295,9 @@ var _ = Describe("Rabbithole", func() { fu, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) + Ω(fu.Vhost).Should(Equal(vh)) + Ω(fu.Name).Should(Equal(name)) + Ω(fu.Component).Should(Equal("federation-upstream")) Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) @@ -311,6 +320,9 @@ var _ = Describe("Rabbithole", func() { fu2, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) + Ω(fu2.Vhost).Should(Equal(vh)) + Ω(fu2.Name).Should(Equal(name)) + Ω(fu2.Component).Should(Equal("federation-upstream")) Ω(fu2.Definition.Uri).Should(Equal(fd2.Uri)) Ω(fu2.Definition.PrefetchCount).Should(Equal(fd2.PrefetchCount)) Ω(fu2.Definition.ReconnectDelay).Should(Equal(fd2.ReconnectDelay)) From d187d008a6f2bca6fb38717e6e613c9fb935e193 Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sun, 26 Apr 2020 14:46:24 -0400 Subject: [PATCH 06/19] Test for case for bad PutFederationUpstream request. --- rabbithole_test.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/rabbithole_test.go b/rabbithole_test.go index d9b0c76..4976acd 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -333,6 +333,16 @@ var _ = Describe("Rabbithole", func() { Ω(err).Should(BeNil()) }) }) + + Context("when the upstream definition is bad", func() { + It("returns a 400 error response", func() { + // this is NOT an err, but a HTTP 400 response + resp, err := rmqc.PutFederationUpstream("rabbit/hole", "error", FederationDefinition{}) + Ω(err).Should(BeNil()) + Ω(resp.StatusCode).Should(Equal(400)) + Ω(resp.Status).Should(Equal("400 Bad Request")) + }) + }) }) Context("DELETE /api/parameters/federation-upstream/{vhost}/{name}", func() { @@ -341,7 +351,7 @@ var _ = Describe("Rabbithole", func() { vh := "rabbit/hole" name := "temporary" - // this is NOT an error, but a HTTP 404 response + // this is NOT an err, but a HTTP 404 response resp, err := rmqc.DeleteFederationUpstream(vh, name) Ω(err).Should(BeNil()) Ω(resp.StatusCode).Should(Equal(404)) From def98df17c5b1366493052e2248b73d3687781f3 Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sun, 26 Apr 2020 15:05:18 -0400 Subject: [PATCH 07/19] Minor tweaks to tests. --- rabbithole_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rabbithole_test.go b/rabbithole_test.go index 4976acd..4353059 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -137,9 +137,7 @@ var _ = Describe("Rabbithole", func() { Context("GET /api/parameters/federation-upstream/{vhost}", func() { Context("when there are no upstreams", func() { It("returns an empty response", func() { - vh := "rabbit/hole" - - list, err := rmqc.ListFederationUpstreamsIn(vh) + list, err := rmqc.ListFederationUpstreamsIn("rabbit/hole") Ω(err).Should(BeNil()) Ω(list).Should(BeEmpty()) }) @@ -279,7 +277,7 @@ var _ = Describe("Rabbithole", func() { vh := "rabbit/hole" name := "temporary" - // create the initial upstream + // create the upstream fd := FederationDefinition{ Uri: "amqp://127.0.0.1/%2f", PrefetchCount: 1000, From 125f378ef3d4cda6021c859f7ccde623fa1a39ab Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sun, 26 Apr 2020 15:07:28 -0400 Subject: [PATCH 08/19] Move parameter tests to the bottom of the suite. --- rabbithole_test.go | 1346 ++++++++++++++++++++++---------------------- 1 file changed, 673 insertions(+), 673 deletions(-) diff --git a/rabbithole_test.go b/rabbithole_test.go index 4353059..6832f79 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -90,335 +90,6 @@ var _ = Describe("Rabbithole", func() { rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest") }) - Context("GET /api/parameters/federation-upstream", func() { - Context("when there are no upstreams", func() { - It("returns an empty response", func() { - list, err := rmqc.ListFederationUpstreams() - Ω(err).Should(BeNil()) - Ω(list).Should(BeEmpty()) - }) - }) - - Context("when there are upstreams", func() { - It("returns the list of upstreams", func() { - fd1 := FederationDefinition{ - Uri: "amqp://server-name/%2f", - } - _, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", fd1) - Ω(err).Should(BeNil()) - - fd2 := FederationDefinition{ - Uri: "amqp://example.com/%2f", - } - _, err = rmqc.PutFederationUpstream("/", "upstream2", fd2) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - list, err := rmqc.ListFederationUpstreams() - Ω(err).Should(BeNil()) - Ω(len(list)).Should(Equal(2)) - - _, err = rmqc.DeleteFederationUpstream("rabbit/hole", "upstream1") - Ω(err).Should(BeNil()) - - _, err = rmqc.DeleteFederationUpstream("/", "upstream2") - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - list, err = rmqc.ListFederationUpstreams() - Ω(err).Should(BeNil()) - Ω(len(list)).Should(Equal(0)) - }) - }) - }) - - Context("GET /api/parameters/federation-upstream/{vhost}", func() { - Context("when there are no upstreams", func() { - It("returns an empty response", func() { - list, err := rmqc.ListFederationUpstreamsIn("rabbit/hole") - Ω(err).Should(BeNil()) - Ω(list).Should(BeEmpty()) - }) - }) - - Context("when there are upstreams", func() { - It("returns the list of upstreams", func() { - vh := "rabbit/hole" - - fd1 := FederationDefinition{ - Uri: "amqp://server-name/%2f", - } - - _, err := rmqc.PutFederationUpstream(vh, "upstream1", fd1) - Ω(err).Should(BeNil()) - - fd2 := FederationDefinition{ - Uri: "amqp://example.com/%2f", - } - - _, err = rmqc.PutFederationUpstream(vh, "upstream2", fd2) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - list, err := rmqc.ListFederationUpstreamsIn(vh) - Ω(err).Should(BeNil()) - Ω(len(list)).Should(Equal(2)) - - // delete upstream1 - _, err = rmqc.DeleteFederationUpstream(vh, "upstream1") - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - list, err = rmqc.ListFederationUpstreamsIn(vh) - Ω(err).Should(BeNil()) - Ω(len(list)).Should(Equal(1)) - - // delete upstream2 - _, err = rmqc.DeleteFederationUpstream(vh, "upstream2") - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - list, err = rmqc.ListFederationUpstreamsIn(vh) - Ω(err).Should(BeNil()) - Ω(len(list)).Should(Equal(0)) - }) - }) - }) - - Context("GET /api/parameters/federation-upstream/{vhost}/{upstream}", func() { - Context("when the upstream does not exist", func() { - It("returns a 404 error", func() { - vh := "rabbit/hole" - name := "temporary" - - fu, err := rmqc.GetFederationUpstream(vh, name) - Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) - Ω(fu).Should(BeNil()) - }) - }) - - Context("when the upstream exists", func() { - It("returns the upstream", func() { - vh := "rabbit/hole" - name := "temporary" - - fd := FederationDefinition{ - Uri: "amqp://127.0.0.1/%2f", - PrefetchCount: 1000, - ReconnectDelay: 1, - AckMode: "on-confirm", - TrustUserId: false, - } - - _, err := rmqc.PutFederationUpstream(vh, name, fd) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - fu, err := rmqc.GetFederationUpstream(vh, name) - Ω(err).Should(BeNil()) - Ω(fu.Vhost).Should(Equal(vh)) - Ω(fu.Name).Should(Equal(name)) - Ω(fu.Component).Should(Equal("federation-upstream")) - Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) - Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) - Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) - Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) - Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) - - _, err = rmqc.DeleteFederationUpstream(vh, name) - Ω(err).Should(BeNil()) - }) - }) - }) - - Context("PUT /api/parameters/federation-upstream/{vhost}/{upstream}", func() { - Context("when the upstream does not exist", func() { - It("creates the upstream", func() { - vh := "rabbit/hole" - name := "temporary" - - fd := FederationDefinition{ - Uri: "amqp://127.0.0.1/%2f", - PrefetchCount: 500, - ReconnectDelay: 5, - AckMode: "on-publish", - TrustUserId: false, - } - - _, err := rmqc.PutFederationUpstream(vh, name, fd) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - fu, err := rmqc.GetFederationUpstream(vh, name) - Ω(err).Should(BeNil()) - Ω(fu.Vhost).Should(Equal(vh)) - Ω(fu.Name).Should(Equal(name)) - Ω(fu.Component).Should(Equal("federation-upstream")) - Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) - Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) - Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) - Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) - Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) - - _, err = rmqc.DeleteFederationUpstream(vh, name) - Ω(err).Should(BeNil()) - }) - }) - - Context("when the upstream exists", func() { - It("updates the upstream", func() { - vh := "rabbit/hole" - name := "temporary" - - // create the upstream - fd := FederationDefinition{ - Uri: "amqp://127.0.0.1/%2f", - PrefetchCount: 1000, - ReconnectDelay: 1, - AckMode: "on-confirm", - TrustUserId: false, - } - - _, err := rmqc.PutFederationUpstream(vh, name, fd) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - fu, err := rmqc.GetFederationUpstream(vh, name) - Ω(err).Should(BeNil()) - Ω(fu.Vhost).Should(Equal(vh)) - Ω(fu.Name).Should(Equal(name)) - Ω(fu.Component).Should(Equal("federation-upstream")) - Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) - Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) - Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) - Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) - Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) - - // update the upstream - fd2 := FederationDefinition{ - Uri: "amqp://127.0.0.1/%2f", - PrefetchCount: 500, - ReconnectDelay: 10, - AckMode: "no-ack", - TrustUserId: true, - } - - _, err = rmqc.PutFederationUpstream(vh, name, fd2) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - fu2, err := rmqc.GetFederationUpstream(vh, name) - Ω(err).Should(BeNil()) - Ω(fu2.Vhost).Should(Equal(vh)) - Ω(fu2.Name).Should(Equal(name)) - Ω(fu2.Component).Should(Equal("federation-upstream")) - Ω(fu2.Definition.Uri).Should(Equal(fd2.Uri)) - Ω(fu2.Definition.PrefetchCount).Should(Equal(fd2.PrefetchCount)) - Ω(fu2.Definition.ReconnectDelay).Should(Equal(fd2.ReconnectDelay)) - Ω(fu2.Definition.AckMode).Should(Equal(fd2.AckMode)) - Ω(fu2.Definition.TrustUserId).Should(Equal(fd2.TrustUserId)) - - _, err = rmqc.DeleteFederationUpstream(vh, name) - Ω(err).Should(BeNil()) - }) - }) - - Context("when the upstream definition is bad", func() { - It("returns a 400 error response", func() { - // this is NOT an err, but a HTTP 400 response - resp, err := rmqc.PutFederationUpstream("rabbit/hole", "error", FederationDefinition{}) - Ω(err).Should(BeNil()) - Ω(resp.StatusCode).Should(Equal(400)) - Ω(resp.Status).Should(Equal("400 Bad Request")) - }) - }) - }) - - Context("DELETE /api/parameters/federation-upstream/{vhost}/{name}", func() { - Context("when the upstream does not exist", func() { - It("returns a 404 error response", func() { - vh := "rabbit/hole" - name := "temporary" - - // this is NOT an err, but a HTTP 404 response - resp, err := rmqc.DeleteFederationUpstream(vh, name) - Ω(err).Should(BeNil()) - Ω(resp.StatusCode).Should(Equal(404)) - Ω(resp.Status).Should(Equal("404 Not Found")) - }) - }) - - Context("when the upstream exists", func() { - It("deletes the upstream", func() { - vh := "rabbit/hole" - name := "temporary" - - fd := FederationDefinition{ - Uri: "amqp://127.0.0.1/%2f", - } - - _, err := rmqc.PutFederationUpstream(vh, name, fd) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - resp, err := rmqc.DeleteFederationUpstream(vh, name) - Ω(err).Should(BeNil()) - Ω(resp.Status).Should(HavePrefix("20")) - }) - }) - }) - - Context("PUT /parameters/shovel/{vhost}/{name}", func() { - It("declares a shovel", func() { - vh := "rabbit/hole" - sn := "temporary" - - ssu := "amqp://127.0.0.1/%2f" - sdu := "amqp://127.0.0.1/%2f" - - shovelDefinition := ShovelDefinition{ - SourceURI: ssu, - SourceQueue: "mySourceQueue", - DestinationURI: sdu, - DestinationQueue: "myDestQueue", - AddForwardHeaders: true, - AckMode: "on-confirm", - DeleteAfter: "never"} - - _, err := rmqc.DeclareShovel(vh, sn, shovelDefinition) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - x, err := rmqc.GetShovel(vh, sn) - Ω(err).Should(BeNil()) - Ω(x.Name).Should(Equal(sn)) - Ω(x.Vhost).Should(Equal(vh)) - Ω(x.Component).Should(Equal("shovel")) - Ω(x.Definition.SourceURI).Should(Equal(ssu)) - Ω(x.Definition.SourceQueue).Should(Equal("mySourceQueue")) - Ω(x.Definition.DestinationURI).Should(Equal(sdu)) - Ω(x.Definition.DestinationQueue).Should(Equal("myDestQueue")) - Ω(x.Definition.AddForwardHeaders).Should(Equal(true)) - Ω(x.Definition.AckMode).Should(Equal("on-confirm")) - Ω(x.Definition.DeleteAfter).Should(Equal("never")) - - rmqc.DeleteShovel(vh, sn) - awaitEventPropagation() - x, _ = rmqc.GetShovel(vh, sn) - Ω(x).Should(BeNil()) - }) - }) - Context("GET /overview", func() { It("returns decoded response", func() { conn := openConnection("/") @@ -1683,486 +1354,815 @@ var _ = Describe("Rabbithole", func() { It("returns decoded response", func() { u := "temporary" - _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) - Ω(err).Should(BeNil()) + _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) + Ω(err).Should(BeNil()) + + permissions := TopicPermissions{Exchange: "amq.topic", Write: "log.*", Read: "log.*"} + _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + xs, err := rmqc.ListTopicPermissions() + Ω(err).Should(BeNil()) + + x := xs[0] + Ω(x.User).ShouldNot(BeNil()) + Ω(x.Vhost).ShouldNot(BeNil()) + + Ω(x.Exchange).ShouldNot(BeNil()) + Ω(x.Write).ShouldNot(BeNil()) + Ω(x.Read).ShouldNot(BeNil()) + + rmqc.DeleteUser(u) + }) + }) + + Context("GET /users/{name}/topic-permissions", func() { + It("returns decoded response", func() { + u := "temporary" + + _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) + Ω(err).Should(BeNil()) + + permissions := TopicPermissions{Exchange: "amq.topic", Write: "log.*", Read: "log.*"} + _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + xs, err := rmqc.ListTopicPermissionsOf("temporary") + Ω(err).Should(BeNil()) + + x := xs[0] + Ω(x.User).ShouldNot(BeNil()) + Ω(x.Vhost).ShouldNot(BeNil()) + + Ω(x.Exchange).ShouldNot(BeNil()) + Ω(x.Write).ShouldNot(BeNil()) + Ω(x.Read).ShouldNot(BeNil()) + + rmqc.DeleteUser(u) + }) + }) + + Context("PUT /topic-permissions/{vhost}/{user}", func() { + It("updates topic permissions", func() { + u := "temporary" + + _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) + Ω(err).Should(BeNil()) + + permissions := TopicPermissions{Exchange: "amq.topic", Write: "log.*", Read: "log.*"} + _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + fetched, err := rmqc.GetTopicPermissionsIn("/", u) + Ω(err).Should(BeNil()) + x := fetched[0] + Ω(x.Exchange).Should(Equal(permissions.Exchange)) + Ω(x.Write).Should(Equal(permissions.Write)) + Ω(x.Read).Should(Equal(permissions.Read)) + + rmqc.DeleteUser(u) + }) + }) + + Context("DELETE /topic-permissions/{vhost}/{user}", func() { + It("clears topic permissions", func() { + u := "temporary" + + _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + _, err = rmqc.ClearTopicPermissionsIn("/", u) + Ω(err).Should(BeNil()) + awaitEventPropagation() + _, err = rmqc.GetTopicPermissionsIn("/", u) + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + + rmqc.DeleteUser(u) + }) + }) + + Context("DELETE /topic-permissions/{vhost}/{user}/{exchange}", func() { + It("deletes one topic permissions", func() { + u := "temporary" + + _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + permissions := TopicPermissions{Exchange: "amq.topic", Write: "log.*", Read: "log.*"} + _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) + Ω(err).Should(BeNil()) + permissions = TopicPermissions{Exchange: "foobar", Write: "log.*", Read: "log.*"} + _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + _, err = rmqc.DeleteTopicPermissionsIn("/", u, "foobar") + Ω(err).Should(BeNil()) + + awaitEventPropagation() + xs, err := rmqc.ListTopicPermissionsOf(u) + Ω(err).Should(BeNil()) + + Ω(len(xs)).Should(BeEquivalentTo(1)) + + rmqc.DeleteUser(u) + }) + }) + + Context("PUT /exchanges/{vhost}/{exchange}", func() { + It("declares an exchange", func() { + vh := "rabbit/hole" + xn := "temporary" + + _, err := rmqc.DeclareExchange(vh, xn, ExchangeSettings{Type: "fanout", Durable: false}) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + x, err := rmqc.GetExchange(vh, xn) + Ω(err).Should(BeNil()) + Ω(x.Name).Should(Equal(xn)) + Ω(x.Durable).Should(Equal(false)) + Ω(x.AutoDelete).Should(Equal(false)) + Ω(x.Type).Should(Equal("fanout")) + Ω(x.Vhost).Should(Equal(vh)) + + rmqc.DeleteExchange(vh, xn) + }) + }) + + Context("DELETE /exchanges/{vhost}/{exchange}", func() { + It("deletes an exchange", func() { + vh := "rabbit/hole" + xn := "temporary" + + _, err := rmqc.DeclareExchange(vh, xn, ExchangeSettings{Type: "fanout", Durable: false}) + Ω(err).Should(BeNil()) + + rmqc.DeleteExchange(vh, xn) + awaitEventPropagation() + x, err := rmqc.GetExchange(vh, xn) + Ω(x).Should(BeNil()) + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + }) + }) + + Context("PUT /queues/{vhost}/{queue}", func() { + It("declares a queue", func() { + vh := "rabbit/hole" + qn := "temporary" + + _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + x, err := rmqc.GetQueue(vh, qn) + Ω(err).Should(BeNil()) + Ω(x.Name).Should(Equal(qn)) + Ω(x.Durable).Should(Equal(false)) + Ω(x.AutoDelete).Should(Equal(false)) + Ω(x.Vhost).Should(Equal(vh)) + + rmqc.DeleteQueue(vh, qn) + }) + }) + + Context("DELETE /queues/{vhost}/{queue}", func() { + It("deletes a queue", func() { + vh := "rabbit/hole" + qn := "temporary" + + _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + rmqc.DeleteQueue(vh, qn) + awaitEventPropagation() + x, err := rmqc.GetQueue(vh, qn) + Ω(x).Should(BeNil()) + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + }) + }) + + Context("DELETE /queues/{vhost}/{queue}/contents", func() { + It("purges a queue", func() { + vh := "rabbit/hole" + qn := "temporary" + + _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + _, err = rmqc.PurgeQueue(vh, qn) + awaitEventPropagation() + Ω(err).Should(BeNil()) + rmqc.DeleteQueue(vh, qn) + awaitEventPropagation() + x, err := rmqc.GetQueue(vh, qn) + awaitEventPropagation() + Ω(x).Should(BeNil()) + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + }) + }) + + Context("POST /queues/{vhost}/{queue}/actions", func() { + It("synchronises queue", func() { + vh := "rabbit/hole" + qn := "temporary" + + _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) + Ω(err).Should(BeNil()) + awaitEventPropagation() + + // it would be better to test this in a cluster configuration + x, err := rmqc.SyncQueue(vh, qn) + Ω(err).Should(BeNil()) + Ω(x.StatusCode).Should(Equal(204)) + rmqc.DeleteQueue(vh, qn) + }) + + It("cancels queue synchronisation", func() { + vh := "rabbit/hole" + qn := "temporary" + + _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) + Ω(err).Should(BeNil()) + awaitEventPropagation() + + // it would be better to test this in a cluster configuration + x, err := rmqc.CancelSyncQueue(vh, qn) + Ω(err).Should(BeNil()) + Ω(x.StatusCode).Should(Equal(204)) + rmqc.DeleteQueue(vh, qn) + }) + }) + + Context("GET /policies", func() { + Context("when policy exists", func() { + It("returns decoded response", func() { + policy1 := Policy{ + Pattern: "abc", + ApplyTo: "all", + Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, + Priority: 0, + } - permissions := TopicPermissions{Exchange: "amq.topic", Write: "log.*", Read: "log.*"} - _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) - Ω(err).Should(BeNil()) + policy2 := Policy{ + Pattern: ".*", + ApplyTo: "all", + Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, + Priority: 0, + } - awaitEventPropagation() + // prepare policies + _, err := rmqc.PutPolicy("rabbit/hole", "woot1", policy1) + Ω(err).Should(BeNil()) - xs, err := rmqc.ListTopicPermissions() - Ω(err).Should(BeNil()) + _, err = rmqc.PutPolicy("rabbit/hole", "woot2", policy2) + Ω(err).Should(BeNil()) - x := xs[0] - Ω(x.User).ShouldNot(BeNil()) - Ω(x.Vhost).ShouldNot(BeNil()) + awaitEventPropagation() - Ω(x.Exchange).ShouldNot(BeNil()) - Ω(x.Write).ShouldNot(BeNil()) - Ω(x.Read).ShouldNot(BeNil()) + // test + pols, err := rmqc.ListPolicies() + Ω(err).Should(BeNil()) + Ω(pols).ShouldNot(BeEmpty()) + Ω(len(pols)).Should(BeNumerically(">=", 2)) + Ω(pols[0].Name).ShouldNot(BeNil()) + Ω(pols[1].Name).ShouldNot(BeNil()) - rmqc.DeleteUser(u) + // cleanup + _, err = rmqc.DeletePolicy("rabbit/hole", "woot1") + Ω(err).Should(BeNil()) + + _, err = rmqc.DeletePolicy("rabbit/hole", "woot2") + Ω(err).Should(BeNil()) + }) }) }) - Context("GET /users/{name}/topic-permissions", func() { - It("returns decoded response", func() { - u := "temporary" + Context("GET /polices/{vhost}", func() { + Context("when policy exists", func() { + It("returns decoded response", func() { + policy1 := Policy{ + Pattern: "abc", + ApplyTo: "all", + Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, + Priority: 0, + } - _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) - Ω(err).Should(BeNil()) + policy2 := Policy{ + Pattern: ".*", + ApplyTo: "all", + Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, + Priority: 0, + } - permissions := TopicPermissions{Exchange: "amq.topic", Write: "log.*", Read: "log.*"} - _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) - Ω(err).Should(BeNil()) + // prepare policies + _, err := rmqc.PutPolicy("rabbit/hole", "woot1", policy1) + Ω(err).Should(BeNil()) - awaitEventPropagation() + _, err = rmqc.PutPolicy("/", "woot2", policy2) + Ω(err).Should(BeNil()) - xs, err := rmqc.ListTopicPermissionsOf("temporary") - Ω(err).Should(BeNil()) + awaitEventPropagation() - x := xs[0] - Ω(x.User).ShouldNot(BeNil()) - Ω(x.Vhost).ShouldNot(BeNil()) + // test + pols, err := rmqc.ListPoliciesIn("rabbit/hole") + Ω(err).Should(BeNil()) + Ω(pols).ShouldNot(BeEmpty()) + Ω(len(pols)).Should(Equal(1)) + Ω(pols[0].Name).Should(Equal("woot1")) - Ω(x.Exchange).ShouldNot(BeNil()) - Ω(x.Write).ShouldNot(BeNil()) - Ω(x.Read).ShouldNot(BeNil()) + // cleanup + _, err = rmqc.DeletePolicy("rabbit/hole", "woot1") + Ω(err).Should(BeNil()) - rmqc.DeleteUser(u) + _, err = rmqc.DeletePolicy("/", "woot2") + Ω(err).Should(BeNil()) + }) + }) + + Context("when no policies exist", func() { + It("returns decoded response", func() { + pols, err := rmqc.ListPoliciesIn("rabbit/hole") + Ω(err).Should(BeNil()) + Ω(pols).Should(BeEmpty()) + }) }) }) - Context("PUT /topic-permissions/{vhost}/{user}", func() { - It("updates topic permissions", func() { - u := "temporary" + Context("GET /policies/{vhost}/{name}", func() { + Context("when policy exists", func() { + It("returns decoded response", func() { + policy := Policy{ + Pattern: ".*", + ApplyTo: "all", + Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, + Priority: 0, + } - _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) - Ω(err).Should(BeNil()) + _, err := rmqc.PutPolicy("rabbit/hole", "woot", policy) + Ω(err).Should(BeNil()) - permissions := TopicPermissions{Exchange: "amq.topic", Write: "log.*", Read: "log.*"} - _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) - Ω(err).Should(BeNil()) + awaitEventPropagation() - awaitEventPropagation() - fetched, err := rmqc.GetTopicPermissionsIn("/", u) - Ω(err).Should(BeNil()) - x := fetched[0] - Ω(x.Exchange).Should(Equal(permissions.Exchange)) - Ω(x.Write).Should(Equal(permissions.Write)) - Ω(x.Read).Should(Equal(permissions.Read)) + pol, err := rmqc.GetPolicy("rabbit/hole", "woot") + Ω(err).Should(BeNil()) + Ω(pol.Vhost).Should(Equal("rabbit/hole")) + Ω(pol.Name).Should(Equal("woot")) + Ω(pol.ApplyTo).Should(Equal("all")) + Ω(pol.Pattern).Should(Equal(".*")) + Ω(pol.Priority).Should(BeEquivalentTo(0)) + Ω(pol.Definition).Should(BeAssignableToTypeOf(PolicyDefinition{})) + Ω(pol.Definition["expires"]).Should(BeEquivalentTo(100)) + Ω(pol.Definition["ha-mode"]).Should(Equal("all")) - rmqc.DeleteUser(u) + _, err = rmqc.DeletePolicy("rabbit/hole", "woot") + Ω(err).Should(BeNil()) + }) }) - }) - Context("DELETE /topic-permissions/{vhost}/{user}", func() { - It("clears topic permissions", func() { - u := "temporary" + Context("when policy not found", func() { + It("returns decoded response", func() { + pol, err := rmqc.GetPolicy("rabbit/hole", "woot") + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + Ω(pol).Should(BeNil()) + }) + }) + }) - _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) - Ω(err).Should(BeNil()) + Context("DELETE /polices/{vhost}/{name}", func() { + It("deletes the policy", func() { + policy := Policy{ + Pattern: ".*", + ApplyTo: "all", + Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, + Priority: 0, + } - awaitEventPropagation() - _, err = rmqc.ClearTopicPermissionsIn("/", u) + _, err := rmqc.PutPolicy("rabbit/hole", "woot", policy) Ω(err).Should(BeNil()) awaitEventPropagation() - _, err = rmqc.GetTopicPermissionsIn("/", u) - Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) - rmqc.DeleteUser(u) + resp, err := rmqc.DeletePolicy("rabbit/hole", "woot") + Ω(err).Should(BeNil()) + Ω(resp.Status).Should(HavePrefix("20")) }) }) - Context("DELETE /topic-permissions/{vhost}/{user}/{exchange}", func() { - It("deletes one topic permissions", func() { - u := "temporary" - - _, err := rmqc.PutUser(u, UserSettings{Password: "s3krE7"}) - Ω(err).Should(BeNil()) + Context("PUT /policies/{vhost}/{name}", func() { + It("creates the policy", func() { + policy := Policy{ + Pattern: ".*", + ApplyTo: "all", + Definition: PolicyDefinition{ + "expires": 100, + "ha-mode": "nodes", + "ha-params": NodeNames{"a", "b", "c"}, + }, + Priority: 0, + } - awaitEventPropagation() - permissions := TopicPermissions{Exchange: "amq.topic", Write: "log.*", Read: "log.*"} - _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) - Ω(err).Should(BeNil()) - permissions = TopicPermissions{Exchange: "foobar", Write: "log.*", Read: "log.*"} - _, err = rmqc.UpdateTopicPermissionsIn("/", u, permissions) + resp, err := rmqc.PutPolicy("rabbit/hole", "woot", policy) Ω(err).Should(BeNil()) + Ω(resp.Status).Should(HavePrefix("20")) awaitEventPropagation() - _, err = rmqc.DeleteTopicPermissionsIn("/", u, "foobar") + _, err = rmqc.GetPolicy("rabbit/hole", "woot") Ω(err).Should(BeNil()) - awaitEventPropagation() - xs, err := rmqc.ListTopicPermissionsOf(u) + _, err = rmqc.DeletePolicy("rabbit/hole", "woot") Ω(err).Should(BeNil()) - - Ω(len(xs)).Should(BeEquivalentTo(1)) - - rmqc.DeleteUser(u) }) - }) - - Context("PUT /exchanges/{vhost}/{exchange}", func() { - It("declares an exchange", func() { - vh := "rabbit/hole" - xn := "temporary" - _, err := rmqc.DeclareExchange(vh, xn, ExchangeSettings{Type: "fanout", Durable: false}) - Ω(err).Should(BeNil()) + It("updates the policy", func() { + policy := Policy{ + Pattern: ".*", + Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, + } - awaitEventPropagation() - x, err := rmqc.GetExchange(vh, xn) + // create Policy + _, err := rmqc.PutPolicy("rabbit/hole", "woot", policy) Ω(err).Should(BeNil()) - Ω(x.Name).Should(Equal(xn)) - Ω(x.Durable).Should(Equal(false)) - Ω(x.AutoDelete).Should(Equal(false)) - Ω(x.Type).Should(Equal("fanout")) - Ω(x.Vhost).Should(Equal(vh)) - - rmqc.DeleteExchange(vh, xn) - }) - }) - Context("DELETE /exchanges/{vhost}/{exchange}", func() { - It("deletes an exchange", func() { - vh := "rabbit/hole" - xn := "temporary" + // create new Policy + newPolicy := Policy{ + Pattern: "\\d+", + ApplyTo: "all", + Definition: PolicyDefinition{ + "max-length": 100, + "ha-mode": "nodes", + "ha-params": NodeNames{"a", "b", "c"}, + }, + Priority: 1, + } - _, err := rmqc.DeclareExchange(vh, xn, ExchangeSettings{Type: "fanout", Durable: false}) + // update old Policy + resp, err := rmqc.PutPolicy("/", "woot2", newPolicy) Ω(err).Should(BeNil()) + Ω(resp.Status).Should(HavePrefix("20")) - rmqc.DeleteExchange(vh, xn) awaitEventPropagation() - x, err := rmqc.GetExchange(vh, xn) - Ω(x).Should(BeNil()) - Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) - }) - }) - Context("PUT /queues/{vhost}/{queue}", func() { - It("declares a queue", func() { - vh := "rabbit/hole" - qn := "temporary" + // old policy should not exist already + _, err = rmqc.GetPolicy("rabbit/hole", "woot") + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) - _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) + // but new (updated) policy is here + pol, err := rmqc.GetPolicy("/", "woot2") Ω(err).Should(BeNil()) + Ω(pol.Vhost).Should(Equal("/")) + Ω(pol.Name).Should(Equal("woot2")) + Ω(pol.Pattern).Should(Equal("\\d+")) + Ω(pol.ApplyTo).Should(Equal("all")) + Ω(pol.Priority).Should(BeEquivalentTo(1)) + Ω(pol.Definition).Should(BeAssignableToTypeOf(PolicyDefinition{})) + Ω(pol.Definition["max-length"]).Should(BeEquivalentTo(100)) + Ω(pol.Definition["ha-mode"]).Should(Equal("nodes")) + Ω(pol.Definition["ha-params"]).Should(HaveLen(3)) + Ω(pol.Definition["ha-params"]).Should(ContainElement("a")) + Ω(pol.Definition["ha-params"]).Should(ContainElement("c")) + Ω(pol.Definition["expires"]).Should(BeNil()) - awaitEventPropagation() - x, err := rmqc.GetQueue(vh, qn) + // cleanup + _, err = rmqc.DeletePolicy("/", "woot2") Ω(err).Should(BeNil()) - Ω(x.Name).Should(Equal(qn)) - Ω(x.Durable).Should(Equal(false)) - Ω(x.AutoDelete).Should(Equal(false)) - Ω(x.Vhost).Should(Equal(vh)) - - rmqc.DeleteQueue(vh, qn) }) }) - Context("DELETE /queues/{vhost}/{queue}", func() { - It("deletes a queue", func() { - vh := "rabbit/hole" - qn := "temporary" - - _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - rmqc.DeleteQueue(vh, qn) - awaitEventPropagation() - x, err := rmqc.GetQueue(vh, qn) - Ω(x).Should(BeNil()) - Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + Context("GET /api/parameters/federation-upstream", func() { + Context("when there are no upstreams", func() { + It("returns an empty response", func() { + list, err := rmqc.ListFederationUpstreams() + Ω(err).Should(BeNil()) + Ω(list).Should(BeEmpty()) + }) }) - }) - - Context("DELETE /queues/{vhost}/{queue}/contents", func() { - It("purges a queue", func() { - vh := "rabbit/hole" - qn := "temporary" - _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) - Ω(err).Should(BeNil()) + Context("when there are upstreams", func() { + It("returns the list of upstreams", func() { + fd1 := FederationDefinition{ + Uri: "amqp://server-name/%2f", + } + _, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", fd1) + Ω(err).Should(BeNil()) - awaitEventPropagation() - _, err = rmqc.PurgeQueue(vh, qn) - awaitEventPropagation() - Ω(err).Should(BeNil()) - rmqc.DeleteQueue(vh, qn) - awaitEventPropagation() - x, err := rmqc.GetQueue(vh, qn) - awaitEventPropagation() - Ω(x).Should(BeNil()) - Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) - }) - }) + fd2 := FederationDefinition{ + Uri: "amqp://example.com/%2f", + } + _, err = rmqc.PutFederationUpstream("/", "upstream2", fd2) + Ω(err).Should(BeNil()) - Context("POST /queues/{vhost}/{queue}/actions", func() { - It("synchronises queue", func() { - vh := "rabbit/hole" - qn := "temporary" + awaitEventPropagation() - _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) - Ω(err).Should(BeNil()) - awaitEventPropagation() + list, err := rmqc.ListFederationUpstreams() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(2)) - // it would be better to test this in a cluster configuration - x, err := rmqc.SyncQueue(vh, qn) - Ω(err).Should(BeNil()) - Ω(x.StatusCode).Should(Equal(204)) - rmqc.DeleteQueue(vh, qn) - }) + _, err = rmqc.DeleteFederationUpstream("rabbit/hole", "upstream1") + Ω(err).Should(BeNil()) - It("cancels queue synchronisation", func() { - vh := "rabbit/hole" - qn := "temporary" + _, err = rmqc.DeleteFederationUpstream("/", "upstream2") + Ω(err).Should(BeNil()) - _, err := rmqc.DeclareQueue(vh, qn, QueueSettings{Durable: false}) - Ω(err).Should(BeNil()) - awaitEventPropagation() + awaitEventPropagation() - // it would be better to test this in a cluster configuration - x, err := rmqc.CancelSyncQueue(vh, qn) - Ω(err).Should(BeNil()) - Ω(x.StatusCode).Should(Equal(204)) - rmqc.DeleteQueue(vh, qn) + list, err = rmqc.ListFederationUpstreams() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(0)) + }) }) }) - Context("GET /policies", func() { - Context("when policy exists", func() { - It("returns decoded response", func() { - policy1 := Policy{ - Pattern: "abc", - ApplyTo: "all", - Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, - Priority: 0, + Context("GET /api/parameters/federation-upstream/{vhost}", func() { + Context("when there are no upstreams", func() { + It("returns an empty response", func() { + list, err := rmqc.ListFederationUpstreamsIn("rabbit/hole") + Ω(err).Should(BeNil()) + Ω(list).Should(BeEmpty()) + }) + }) + + Context("when there are upstreams", func() { + It("returns the list of upstreams", func() { + vh := "rabbit/hole" + + fd1 := FederationDefinition{ + Uri: "amqp://server-name/%2f", } - policy2 := Policy{ - Pattern: ".*", - ApplyTo: "all", - Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, - Priority: 0, + _, err := rmqc.PutFederationUpstream(vh, "upstream1", fd1) + Ω(err).Should(BeNil()) + + fd2 := FederationDefinition{ + Uri: "amqp://example.com/%2f", } - // prepare policies - _, err := rmqc.PutPolicy("rabbit/hole", "woot1", policy1) + _, err = rmqc.PutFederationUpstream(vh, "upstream2", fd2) Ω(err).Should(BeNil()) - _, err = rmqc.PutPolicy("rabbit/hole", "woot2", policy2) + awaitEventPropagation() + + list, err := rmqc.ListFederationUpstreamsIn(vh) + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(2)) + + // delete upstream1 + _, err = rmqc.DeleteFederationUpstream(vh, "upstream1") Ω(err).Should(BeNil()) awaitEventPropagation() - // test - pols, err := rmqc.ListPolicies() + list, err = rmqc.ListFederationUpstreamsIn(vh) Ω(err).Should(BeNil()) - Ω(pols).ShouldNot(BeEmpty()) - Ω(len(pols)).Should(BeNumerically(">=", 2)) - Ω(pols[0].Name).ShouldNot(BeNil()) - Ω(pols[1].Name).ShouldNot(BeNil()) + Ω(len(list)).Should(Equal(1)) - // cleanup - _, err = rmqc.DeletePolicy("rabbit/hole", "woot1") + // delete upstream2 + _, err = rmqc.DeleteFederationUpstream(vh, "upstream2") Ω(err).Should(BeNil()) - _, err = rmqc.DeletePolicy("rabbit/hole", "woot2") + awaitEventPropagation() + + list, err = rmqc.ListFederationUpstreamsIn(vh) Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(0)) }) }) }) - Context("GET /polices/{vhost}", func() { - Context("when policy exists", func() { - It("returns decoded response", func() { - policy1 := Policy{ - Pattern: "abc", - ApplyTo: "all", - Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, - Priority: 0, - } + Context("GET /api/parameters/federation-upstream/{vhost}/{upstream}", func() { + Context("when the upstream does not exist", func() { + It("returns a 404 error", func() { + vh := "rabbit/hole" + name := "temporary" - policy2 := Policy{ - Pattern: ".*", - ApplyTo: "all", - Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, - Priority: 0, + fu, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + Ω(fu).Should(BeNil()) + }) + }) + + Context("when the upstream exists", func() { + It("returns the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + fd := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 1000, + ReconnectDelay: 1, + AckMode: "on-confirm", + TrustUserId: false, } - // prepare policies - _, err := rmqc.PutPolicy("rabbit/hole", "woot1", policy1) - Ω(err).Should(BeNil()) + _, err := rmqc.PutFederationUpstream(vh, name, fd) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + fu, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(fu.Vhost).Should(Equal(vh)) + Ω(fu.Name).Should(Equal(name)) + Ω(fu.Component).Should(Equal("federation-upstream")) + Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) + Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) + Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) + Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) + Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) + + _, err = rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + }) + }) + }) + + Context("PUT /api/parameters/federation-upstream/{vhost}/{upstream}", func() { + Context("when the upstream does not exist", func() { + It("creates the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + fd := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 500, + ReconnectDelay: 5, + AckMode: "on-publish", + TrustUserId: false, + } - _, err = rmqc.PutPolicy("/", "woot2", policy2) + _, err := rmqc.PutFederationUpstream(vh, name, fd) Ω(err).Should(BeNil()) awaitEventPropagation() - // test - pols, err := rmqc.ListPoliciesIn("rabbit/hole") - Ω(err).Should(BeNil()) - Ω(pols).ShouldNot(BeEmpty()) - Ω(len(pols)).Should(Equal(1)) - Ω(pols[0].Name).Should(Equal("woot1")) - - // cleanup - _, err = rmqc.DeletePolicy("rabbit/hole", "woot1") + fu, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) + Ω(fu.Vhost).Should(Equal(vh)) + Ω(fu.Name).Should(Equal(name)) + Ω(fu.Component).Should(Equal("federation-upstream")) + Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) + Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) + Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) + Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) + Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) - _, err = rmqc.DeletePolicy("/", "woot2") + _, err = rmqc.DeleteFederationUpstream(vh, name) Ω(err).Should(BeNil()) }) }) - Context("when no policies exist", func() { - It("returns decoded response", func() { - pols, err := rmqc.ListPoliciesIn("rabbit/hole") + Context("when the upstream exists", func() { + It("updates the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + // create the upstream + fd := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 1000, + ReconnectDelay: 1, + AckMode: "on-confirm", + TrustUserId: false, + } + + _, err := rmqc.PutFederationUpstream(vh, name, fd) Ω(err).Should(BeNil()) - Ω(pols).Should(BeEmpty()) - }) - }) - }) - Context("GET /policies/{vhost}/{name}", func() { - Context("when policy exists", func() { - It("returns decoded response", func() { - policy := Policy{ - Pattern: ".*", - ApplyTo: "all", - Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, - Priority: 0, + awaitEventPropagation() + + fu, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(fu.Vhost).Should(Equal(vh)) + Ω(fu.Name).Should(Equal(name)) + Ω(fu.Component).Should(Equal("federation-upstream")) + Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) + Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) + Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) + Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) + Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) + + // update the upstream + fd2 := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 500, + ReconnectDelay: 10, + AckMode: "no-ack", + TrustUserId: true, } - _, err := rmqc.PutPolicy("rabbit/hole", "woot", policy) + _, err = rmqc.PutFederationUpstream(vh, name, fd2) Ω(err).Should(BeNil()) awaitEventPropagation() - pol, err := rmqc.GetPolicy("rabbit/hole", "woot") + fu2, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) - Ω(pol.Vhost).Should(Equal("rabbit/hole")) - Ω(pol.Name).Should(Equal("woot")) - Ω(pol.ApplyTo).Should(Equal("all")) - Ω(pol.Pattern).Should(Equal(".*")) - Ω(pol.Priority).Should(BeEquivalentTo(0)) - Ω(pol.Definition).Should(BeAssignableToTypeOf(PolicyDefinition{})) - Ω(pol.Definition["expires"]).Should(BeEquivalentTo(100)) - Ω(pol.Definition["ha-mode"]).Should(Equal("all")) + Ω(fu2.Vhost).Should(Equal(vh)) + Ω(fu2.Name).Should(Equal(name)) + Ω(fu2.Component).Should(Equal("federation-upstream")) + Ω(fu2.Definition.Uri).Should(Equal(fd2.Uri)) + Ω(fu2.Definition.PrefetchCount).Should(Equal(fd2.PrefetchCount)) + Ω(fu2.Definition.ReconnectDelay).Should(Equal(fd2.ReconnectDelay)) + Ω(fu2.Definition.AckMode).Should(Equal(fd2.AckMode)) + Ω(fu2.Definition.TrustUserId).Should(Equal(fd2.TrustUserId)) - _, err = rmqc.DeletePolicy("rabbit/hole", "woot") + _, err = rmqc.DeleteFederationUpstream(vh, name) Ω(err).Should(BeNil()) }) }) - Context("when policy not found", func() { - It("returns decoded response", func() { - pol, err := rmqc.GetPolicy("rabbit/hole", "woot") - Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) - Ω(pol).Should(BeNil()) + Context("when the upstream definition is bad", func() { + It("returns a 400 error response", func() { + // this is NOT an err, but a HTTP 400 response + resp, err := rmqc.PutFederationUpstream("rabbit/hole", "error", FederationDefinition{}) + Ω(err).Should(BeNil()) + Ω(resp.StatusCode).Should(Equal(400)) + Ω(resp.Status).Should(Equal("400 Bad Request")) }) }) }) - Context("DELETE /polices/{vhost}/{name}", func() { - It("deletes the policy", func() { - policy := Policy{ - Pattern: ".*", - ApplyTo: "all", - Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, - Priority: 0, - } - - _, err := rmqc.PutPolicy("rabbit/hole", "woot", policy) - Ω(err).Should(BeNil()) - awaitEventPropagation() + Context("DELETE /api/parameters/federation-upstream/{vhost}/{name}", func() { + Context("when the upstream does not exist", func() { + It("returns a 404 error response", func() { + vh := "rabbit/hole" + name := "temporary" - resp, err := rmqc.DeletePolicy("rabbit/hole", "woot") - Ω(err).Should(BeNil()) - Ω(resp.Status).Should(HavePrefix("20")) + // this is NOT an err, but a HTTP 404 response + resp, err := rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(resp.StatusCode).Should(Equal(404)) + Ω(resp.Status).Should(Equal("404 Not Found")) + }) }) - }) - Context("PUT /policies/{vhost}/{name}", func() { - It("creates the policy", func() { - policy := Policy{ - Pattern: ".*", - ApplyTo: "all", - Definition: PolicyDefinition{ - "expires": 100, - "ha-mode": "nodes", - "ha-params": NodeNames{"a", "b", "c"}, - }, - Priority: 0, - } + Context("when the upstream exists", func() { + It("deletes the upstream", func() { + vh := "rabbit/hole" + name := "temporary" - resp, err := rmqc.PutPolicy("rabbit/hole", "woot", policy) - Ω(err).Should(BeNil()) - Ω(resp.Status).Should(HavePrefix("20")) + fd := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + } - awaitEventPropagation() - _, err = rmqc.GetPolicy("rabbit/hole", "woot") - Ω(err).Should(BeNil()) + _, err := rmqc.PutFederationUpstream(vh, name, fd) + Ω(err).Should(BeNil()) - _, err = rmqc.DeletePolicy("rabbit/hole", "woot") - Ω(err).Should(BeNil()) + awaitEventPropagation() + + resp, err := rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(resp.Status).Should(HavePrefix("20")) + }) }) + }) - It("updates the policy", func() { - policy := Policy{ - Pattern: ".*", - Definition: PolicyDefinition{"expires": 100, "ha-mode": "all"}, - } + Context("PUT /parameters/shovel/{vhost}/{name}", func() { + It("declares a shovel", func() { + vh := "rabbit/hole" + sn := "temporary" - // create Policy - _, err := rmqc.PutPolicy("rabbit/hole", "woot", policy) - Ω(err).Should(BeNil()) + ssu := "amqp://127.0.0.1/%2f" + sdu := "amqp://127.0.0.1/%2f" - // create new Policy - newPolicy := Policy{ - Pattern: "\\d+", - ApplyTo: "all", - Definition: PolicyDefinition{ - "max-length": 100, - "ha-mode": "nodes", - "ha-params": NodeNames{"a", "b", "c"}, - }, - Priority: 1, - } + shovelDefinition := ShovelDefinition{ + SourceURI: ssu, + SourceQueue: "mySourceQueue", + DestinationURI: sdu, + DestinationQueue: "myDestQueue", + AddForwardHeaders: true, + AckMode: "on-confirm", + DeleteAfter: "never"} - // update old Policy - resp, err := rmqc.PutPolicy("/", "woot2", newPolicy) + _, err := rmqc.DeclareShovel(vh, sn, shovelDefinition) Ω(err).Should(BeNil()) - Ω(resp.Status).Should(HavePrefix("20")) awaitEventPropagation() - - // old policy should not exist already - _, err = rmqc.GetPolicy("rabbit/hole", "woot") - Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) - - // but new (updated) policy is here - pol, err := rmqc.GetPolicy("/", "woot2") + x, err := rmqc.GetShovel(vh, sn) Ω(err).Should(BeNil()) - Ω(pol.Vhost).Should(Equal("/")) - Ω(pol.Name).Should(Equal("woot2")) - Ω(pol.Pattern).Should(Equal("\\d+")) - Ω(pol.ApplyTo).Should(Equal("all")) - Ω(pol.Priority).Should(BeEquivalentTo(1)) - Ω(pol.Definition).Should(BeAssignableToTypeOf(PolicyDefinition{})) - Ω(pol.Definition["max-length"]).Should(BeEquivalentTo(100)) - Ω(pol.Definition["ha-mode"]).Should(Equal("nodes")) - Ω(pol.Definition["ha-params"]).Should(HaveLen(3)) - Ω(pol.Definition["ha-params"]).Should(ContainElement("a")) - Ω(pol.Definition["ha-params"]).Should(ContainElement("c")) - Ω(pol.Definition["expires"]).Should(BeNil()) + Ω(x.Name).Should(Equal(sn)) + Ω(x.Vhost).Should(Equal(vh)) + Ω(x.Component).Should(Equal("shovel")) + Ω(x.Definition.SourceURI).Should(Equal(ssu)) + Ω(x.Definition.SourceQueue).Should(Equal("mySourceQueue")) + Ω(x.Definition.DestinationURI).Should(Equal(sdu)) + Ω(x.Definition.DestinationQueue).Should(Equal("myDestQueue")) + Ω(x.Definition.AddForwardHeaders).Should(Equal(true)) + Ω(x.Definition.AckMode).Should(Equal("on-confirm")) + Ω(x.Definition.DeleteAfter).Should(Equal("never")) - // cleanup - _, err = rmqc.DeletePolicy("/", "woot2") - Ω(err).Should(BeNil()) + rmqc.DeleteShovel(vh, sn) + awaitEventPropagation() + x, _ = rmqc.GetShovel(vh, sn) + Ω(x).Should(BeNil()) }) }) }) From fccfa4cf97f4444574e498196fafdd60e444459e Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sun, 26 Apr 2020 15:42:52 -0400 Subject: [PATCH 09/19] Enable federation plugins for windows builds. --- bin/ci/before_build.bat | 4 ++++ bin/ci/before_build.sh | 1 + 2 files changed, 5 insertions(+) diff --git a/bin/ci/before_build.bat b/bin/ci/before_build.bat index d9c4468..b9e9b95 100644 --- a/bin/ci/before_build.bat +++ b/bin/ci/before_build.bat @@ -25,3 +25,7 @@ call %RABBITHOLE_RABBITMQCTL% set_permissions -p "rabbit/hole" guest ".*" ".*" " REM Enable shovel plugin call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_shovel call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_shovel_management + +REM Enable shovel plugin +call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_federation +call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_federation_management diff --git a/bin/ci/before_build.sh b/bin/ci/before_build.sh index 7bd6c9d..11e2e99 100755 --- a/bin/ci/before_build.sh +++ b/bin/ci/before_build.sh @@ -30,5 +30,6 @@ $CTL set_cluster_name rabbitmq@localhost $PLUGINS enable rabbitmq_shovel $PLUGINS enable rabbitmq_shovel_management +# Enable federation plugin $PLUGINS enable rabbitmq_federation $PLUGINS enable rabbitmq_federation_management From 72923a223722935b6ca4d249bbc06c6276a07099 Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Sat, 2 May 2020 14:54:00 -0400 Subject: [PATCH 10/19] Document the Federation API. -- note that doc.go is out of sync. --- README.md | 27 +++++++++++++++++++++++++++ doc.go | 24 ++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/README.md b/README.md index b4be96d..d1fbdc5 100644 --- a/README.md +++ b/README.md @@ -326,6 +326,33 @@ resp, err := rmqc.DeleteShovel("/", "a.shovel") ``` +### Operations on Federation Upstreams + +```go +// list all upstreams +ups, err := rmqc.ListFederationUpstreams() +// => []FederationUpstream, error + +// list upstreams in a vhost +ups, err := rmqc.ListFederationUpstreamsIn("/") +// => []FederationUpstream, error + +// information about an upstream +up, err := rmqc.GetFederationUpstream("/", "upstream-name") +// => *FederationUpstream, error + +// declare an upstream +resp, err := rmqc.PutFederationUpstream("/", "upstream-name", FederationDefinition{ + Uri: "amqp://server-name", +}) +// => *http.Response, error + +// delete an upstream +resp, err := rmqc.DeleteFederationUpstream("/", "upstream-name") +// => *http.Response, error + +``` + ### Operations on cluster name ``` go // Get cluster name diff --git a/doc.go b/doc.go index ce279e3..01d437a 100644 --- a/doc.go +++ b/doc.go @@ -199,6 +199,30 @@ Managing Topic Permissions resp, err := rmqc.DeleteTopicPermissionsIn("/", "my.user", "exchange") // => *http.Response, err +Managing Federation Upstreams + + // list all upstreams + ups, err := rmqc.ListFederationUpstreams() + // => []FederationUpstream, error + + // list upstreams in a vhost + ups, err := rmqc.ListFederationUpstreamsIn("/") + // => []FederationUpstream, error + + // information about an upstream + up, err := rmqc.GetFederationUpstream("/", "upstream-name") + // => *FederationUpstream, error + + // declare an upstream + resp, err := rmqc.PutFederationUpstream("/", "upstream-name", FederationDefinition{ + Uri: "amqp://server-name", + }) + // => *http.Response, error + + // delete an upstream + resp, err := rmqc.DeleteFederationUpstream("/", "upstream-name") + // => *http.Response, error + Operations on cluster name // Get cluster name cn, err := rmqc.GetClusterName() From bbad1c4f88f00a26350d3d868e3688b0be1933ce Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Tue, 5 May 2020 11:21:03 -0400 Subject: [PATCH 11/19] API for managing runtime parameters. -- just one large test for the time being --- rabbithole_test.go | 42 +++++++++++++ runtime_parameters.go | 138 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+) create mode 100644 runtime_parameters.go diff --git a/rabbithole_test.go b/rabbithole_test.go index 6832f79..a30c2a1 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -2165,4 +2165,46 @@ var _ = Describe("Rabbithole", func() { Ω(x).Should(BeNil()) }) }) + + Context("PUT /api/parameters/{component}/{vhost}/{name}", func() { + Context("when the parameter does not exist", func() { + It("creates the parameter", func() { + component := "federation-upstream" + vhost := "rabbit/hole" + name := "temporary" + + pv := RuntimeParameterValue{ + "uri": "amqp://server-name", + "prefetch-count": 500, + "reconnect-delay": 5, + "ack-mode": "on-confirm", + "trust-user-id": false, + } + + _, err := rmqc.PutRuntimeParameter(component, vhost, name, pv) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + p, err := rmqc.GetRuntimeParameter(component, vhost, name) + + Ω(err).Should(BeNil()) + Ω(p.Component).Should(Equal("federation-upstream")) + Ω(p.Vhost).Should(Equal(vhost)) + Ω(p.Name).Should(Equal(name)) + + // could use deep reflect or a better assertion here. + Ω(p.Value["uri"]).Should(Equal(pv["uri"])) + + Ω(int(p.Value["prefetch-count"].(float64))).Should(Equal(pv["prefetch-count"])) + Ω(int(p.Value["reconnect-delay"].(float64))).Should(Equal(pv["reconnect-delay"])) + + Ω(p.Value["ack-mode"]).Should(Equal(pv["ack-mode"])) + Ω(p.Value["trust-user-id"]).Should(Equal(pv["trust-user-id"])) + + _, err = rmqc.DeleteRuntimeParameter(component, vhost, name) + Ω(err).Should(BeNil()) + }) + }) + }) }) diff --git a/runtime_parameters.go b/runtime_parameters.go new file mode 100644 index 0000000..2a57830 --- /dev/null +++ b/runtime_parameters.go @@ -0,0 +1,138 @@ +package rabbithole + +import ( + "encoding/json" + "net/http" + "net/url" +) + +// RuntimeParameter represents a vhost-scoped parameter. +type RuntimeParameter struct { + Name string `json:"name"` + Vhost string `json:"vhost"` + Component string `json:"component"` + Value RuntimeParameterValue `json:"value"` +} + +// RuntimeParameterValue represents arbitrary parameter data. +type RuntimeParameterValue map[string]interface{} + +// +// GET /api/parameters +// + +// ListRuntimeParameters returns all runtime parameters. +func (c *Client) ListRuntimeParameters() (params []RuntimeParameter, err error) { + req, err := newGETRequest(c, "parameters") + if err != nil { + return []RuntimeParameter{}, err + } + + if err = executeAndParseRequest(c, req, ¶ms); err != nil { + return []RuntimeParameter{}, err + } + + return params, nil +} + +// +// GET /api/parameters/{component} +// + +// ListRuntimeParametersFor returns all runtime parameters for a component in all vhosts. +func (c *Client) ListRuntimeParametersFor(component string) (params []RuntimeParameter, err error) { + req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)) + if err != nil { + return []RuntimeParameter{}, err + } + + if err = executeAndParseRequest(c, req, ¶ms); err != nil { + return []RuntimeParameter{}, err + } + + return params, nil +} + +// +// GET /api/parameters/{component}/{vhost} +// + +// ListRuntimeParametersIn returns all runtime parameters for a component in a vhost. +func (c *Client) ListRuntimeParametersIn(component, vhost string) (p []RuntimeParameter, err error) { + req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)) + if err != nil { + return []RuntimeParameter{}, err + } + + if err = executeAndParseRequest(c, req, &p); err != nil { + return []RuntimeParameter{}, err + } + + return p, nil +} + +// +// GET /api/parameters/{component}/{vhost}/{name} +// + +// GetRuntimeParameter returns a runtime parameter. +func (c *Client) GetRuntimeParameter(component, vhost, name string) (p *RuntimeParameter, err error) { + req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name)) + if err != nil { + return nil, err + } + + if err = executeAndParseRequest(c, req, &p); err != nil { + return nil, err + } + + return p, nil +} + +// +// PUT /api/parameters/{component}/{vhost}/{name} +// + +// PutRuntimeParameter creates a runtime parameter. +func (c *Client) PutRuntimeParameter(component, vhost, name string, value RuntimeParameterValue) (res *http.Response, err error) { + param := RuntimeParameter{ + name, + vhost, + component, + value, + } + + body, err := json.Marshal(param) + if err != nil { + return nil, err + } + + req, err := newRequestWithBody(c, "PUT", "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name), body) + if err != nil { + return nil, err + } + + if res, err = executeRequest(c, req); err != nil { + return nil, err + } + + return res, nil +} + +// +// DELETE /api/parameters/{component}/{vhost}/{name} +// + +// DeleteRuntimeParameter removes a runtime parameter. +func (c *Client) DeleteRuntimeParameter(component, vhost, name string) (res *http.Response, err error) { + req, err := newRequestWithBody(c, "DELETE", "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name), nil) + if err != nil { + return nil, err + } + + if res, err = executeRequest(c, req); err != nil { + return nil, err + } + + return res, nil +} From 5f31f0ce25990d382cf4b9c84a76c9938095937b Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Tue, 5 May 2020 11:43:12 -0400 Subject: [PATCH 12/19] Use the parameter API to create federation parameters. --- federation.go | 4 +++ rabbithole_test.go | 62 ++++++++++++++++++++++++++++++++++++++----- runtime_parameters.go | 12 +++++---- 3 files changed, 67 insertions(+), 11 deletions(-) diff --git a/federation.go b/federation.go index 0866154..bac29bb 100644 --- a/federation.go +++ b/federation.go @@ -117,6 +117,10 @@ func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef F return res, nil } +func (c *Client) PutFederationUpstreamV2(vhost string, name string, def FederationDefinition) (res *http.Response, err error) { + return c.PutRuntimeParameter("federation-upstream", vhost, name, def) +} + // // DELETE /api/parameters/federation-upstream/{vhost}/{name} // diff --git a/rabbithole_test.go b/rabbithole_test.go index a30c2a1..b26be9d 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -2193,14 +2193,64 @@ var _ = Describe("Rabbithole", func() { Ω(p.Vhost).Should(Equal(vhost)) Ω(p.Name).Should(Equal(name)) - // could use deep reflect or a better assertion here. - Ω(p.Value["uri"]).Should(Equal(pv["uri"])) + // we need to convert from interface{} + v := p.Value.(map[string]interface{}) - Ω(int(p.Value["prefetch-count"].(float64))).Should(Equal(pv["prefetch-count"])) - Ω(int(p.Value["reconnect-delay"].(float64))).Should(Equal(pv["reconnect-delay"])) + // could use reflect or a better assertion here. + Ω(v["uri"]).Should(Equal(pv["uri"])) - Ω(p.Value["ack-mode"]).Should(Equal(pv["ack-mode"])) - Ω(p.Value["trust-user-id"]).Should(Equal(pv["trust-user-id"])) + Ω(int(v["prefetch-count"].(float64))).Should(Equal(pv["prefetch-count"])) + Ω(int(v["reconnect-delay"].(float64))).Should(Equal(pv["reconnect-delay"])) + + Ω(v["ack-mode"]).Should(Equal(pv["ack-mode"])) + Ω(v["trust-user-id"]).Should(Equal(pv["trust-user-id"])) + + _, err = rmqc.DeleteRuntimeParameter(component, vhost, name) + Ω(err).Should(BeNil()) + }) + }) + }) + + Context("V2 PUT /api/parameters/federation-upstream/{vhost}/{name}", func() { + Context("when the parameter does not exist", func() { + It("creates the parameter", func() { + component := "federation-upstream" + vhost := "rabbit/hole" + name := "temporary" + + def := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 1000, + ReconnectDelay: 1, + AckMode: "on-confirm", + TrustUserId: false, + } + + // use the federation API to create the parameter + _, err := rmqc.PutFederationUpstreamV2(vhost, name, def) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + // use the runtime parameter API to read the federation info + // param is RuntimeParameter + param, err := rmqc.GetRuntimeParameter(component, vhost, name) + + Ω(err).Should(BeNil()) + Ω(param.Component).Should(Equal("federation-upstream")) + Ω(param.Vhost).Should(Equal(vhost)) + Ω(param.Name).Should(Equal(name)) + + // the federation defintion is contained in a map + v := param.Value.(map[string]interface{}) + Ω(v["uri"]).Should(Equal(def.Uri)) + + // this could be avoided by using decoder.UserNumber() + Ω(int(v["prefetch-count"].(float64))).Should(Equal(def.PrefetchCount)) + Ω(int(v["reconnect-delay"].(float64))).Should(Equal(def.ReconnectDelay)) + + Ω(v["ack-mode"]).Should(Equal(def.AckMode)) + Ω(v["trust-user-id"]).Should(Equal(def.TrustUserId)) _, err = rmqc.DeleteRuntimeParameter(component, vhost, name) Ω(err).Should(BeNil()) diff --git a/runtime_parameters.go b/runtime_parameters.go index 2a57830..47fc38c 100644 --- a/runtime_parameters.go +++ b/runtime_parameters.go @@ -7,11 +7,13 @@ import ( ) // RuntimeParameter represents a vhost-scoped parameter. +// Value is interface{} to support creating parameters directly from types such as +// FederationInfo and ShovelInfo. type RuntimeParameter struct { - Name string `json:"name"` - Vhost string `json:"vhost"` - Component string `json:"component"` - Value RuntimeParameterValue `json:"value"` + Name string `json:"name"` + Vhost string `json:"vhost"` + Component string `json:"component"` + Value interface{} `json:"value"` } // RuntimeParameterValue represents arbitrary parameter data. @@ -94,7 +96,7 @@ func (c *Client) GetRuntimeParameter(component, vhost, name string) (p *RuntimeP // // PutRuntimeParameter creates a runtime parameter. -func (c *Client) PutRuntimeParameter(component, vhost, name string, value RuntimeParameterValue) (res *http.Response, err error) { +func (c *Client) PutRuntimeParameter(component, vhost, name string, value interface{}) (res *http.Response, err error) { param := RuntimeParameter{ name, vhost, From 84c7b101eb6107f5dfa037bbfb6bcf0f4294ac81 Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Tue, 5 May 2020 12:56:26 -0400 Subject: [PATCH 13/19] Use the parameter API to read federation info. --- federation.go | 16 ++++++++++++++++ rabbithole_test.go | 15 +++++++++++++++ runtime_parameters.go | 14 +++++++++++--- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/federation.go b/federation.go index bac29bb..1b84a63 100644 --- a/federation.go +++ b/federation.go @@ -30,6 +30,15 @@ type FederationUpstream struct { Definition FederationDefinition `json:"value"` } +// FederationInfo represents a runtime parameter that targets a "federation-upstream" component. +// Replaces FederationUpstream and FederationDefinitionDTO +type FederationInfo struct { + Name string `json:"name"` + Vhost string `json:"vhost"` + Component string `json:"component"` + Definition FederationDefinition `json:"value"` +} + // FederationDefinitionDTO provides a data transfer object for a FederationDefinition. type FederationDefinitionDTO struct { Definition FederationDefinition `json:"value"` @@ -90,6 +99,13 @@ func (c *Client) GetFederationUpstream(vhost, upstreamName string) (rec *Federat return rec, nil } +func (c *Client) GetFederationUpstreamV2(vhost, name string) (info *FederationInfo, err error) { + if err = c.PopulateRuntimeParameter("federation-upstream", vhost, name, &info); err != nil { + return nil, err + } + return info, nil +} + // // PUT /api/parameters/federation-upstream/{vhost}/{upstream} // diff --git a/rabbithole_test.go b/rabbithole_test.go index b26be9d..482db5b 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -2252,6 +2252,21 @@ var _ = Describe("Rabbithole", func() { Ω(v["ack-mode"]).Should(Equal(def.AckMode)) Ω(v["trust-user-id"]).Should(Equal(def.TrustUserId)) + // use the federation API to read the federation info + // info is FederationInfo + info, err := rmqc.GetFederationUpstreamV2(vhost, name) + + Ω(err).Should(BeNil()) + Ω(info.Component).Should(Equal(component)) + Ω(info.Vhost).Should(Equal(vhost)) + Ω(info.Name).Should(Equal(name)) + + Ω(info.Definition.Uri).Should(Equal(def.Uri)) + Ω(info.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) + Ω(info.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) + Ω(info.Definition.AckMode).Should(Equal(def.AckMode)) + Ω(info.Definition.TrustUserId).Should(Equal(def.TrustUserId)) + _, err = rmqc.DeleteRuntimeParameter(component, vhost, name) Ω(err).Should(BeNil()) }) diff --git a/runtime_parameters.go b/runtime_parameters.go index 47fc38c..49912d1 100644 --- a/runtime_parameters.go +++ b/runtime_parameters.go @@ -79,16 +79,24 @@ func (c *Client) ListRuntimeParametersIn(component, vhost string) (p []RuntimePa // GetRuntimeParameter returns a runtime parameter. func (c *Client) GetRuntimeParameter(component, vhost, name string) (p *RuntimeParameter, err error) { + if err = c.PopulateRuntimeParameter(component, vhost, name, &p); err != nil { + return nil, err + } + return p, nil +} + +// PopulateRuntimeParameter hydrates a runtime parameter using the provided interface. +func (c *Client) PopulateRuntimeParameter(component, vhost, name string, p interface{}) error { req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name)) if err != nil { - return nil, err + return err } if err = executeAndParseRequest(c, req, &p); err != nil { - return nil, err + return err } - return p, nil + return nil } // From ce80a8c053797b21a07bc28e8124b3e71d9a334b Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Tue, 5 May 2020 18:34:56 -0400 Subject: [PATCH 14/19] Map from runtime parameter to federation upstream. - this removes the need for PopulateRuntimeParameter --- federation.go | 104 +++++++++++------------- rabbithole_test.go | 184 +++++++++++++++--------------------------- runtime_parameters.go | 14 +--- 3 files changed, 118 insertions(+), 184 deletions(-) diff --git a/federation.go b/federation.go index 1b84a63..48ab130 100644 --- a/federation.go +++ b/federation.go @@ -1,7 +1,6 @@ package rabbithole import ( - "encoding/json" "net/http" "net/url" ) @@ -30,20 +29,6 @@ type FederationUpstream struct { Definition FederationDefinition `json:"value"` } -// FederationInfo represents a runtime parameter that targets a "federation-upstream" component. -// Replaces FederationUpstream and FederationDefinitionDTO -type FederationInfo struct { - Name string `json:"name"` - Vhost string `json:"vhost"` - Component string `json:"component"` - Definition FederationDefinition `json:"value"` -} - -// FederationDefinitionDTO provides a data transfer object for a FederationDefinition. -type FederationDefinitionDTO struct { - Definition FederationDefinition `json:"value"` -} - // // GET /api/parameters/federation-upstream // @@ -85,55 +70,73 @@ func (c *Client) ListFederationUpstreamsIn(vhost string) (rec []FederationUpstre // // GetFederationUpstream returns a federation upstream -func (c *Client) GetFederationUpstream(vhost, upstreamName string) (rec *FederationUpstream, err error) { - req, err := newGETRequest(c, "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName)) - +func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstream, err error) { + param, err := c.GetRuntimeParameter("federation-upstream", vhost, name) if err != nil { return nil, err } - if err = executeAndParseRequest(c, req, &rec); err != nil { - return nil, err + // TODO: extract to function when migrating List* methods. + up = &FederationUpstream{ + Name: param.Name, + Vhost: param.Vhost, + Component: param.Component, } - return rec, nil -} + def := FederationDefinition{} + m := param.Value.(map[string]interface{}) -func (c *Client) GetFederationUpstreamV2(vhost, name string) (info *FederationInfo, err error) { - if err = c.PopulateRuntimeParameter("federation-upstream", vhost, name, &info); err != nil { - return nil, err + if v, ok := m["uri"].(string); ok { + def.Uri = v } - return info, nil -} -// -// PUT /api/parameters/federation-upstream/{vhost}/{upstream} -// + if v, ok := m["expires"].(float64); ok { + def.Expires = int(v) + } -// Updates a federation upstream -func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error) { - fedDTO := FederationDefinitionDTO{ - Definition: fDef, + if v, ok := m["message-ttl"].(float64); ok { + def.MessageTTL = int32(v) } - body, err := json.Marshal(fedDTO) - if err != nil { - return nil, err + if v, ok := m["max-hops"].(float64); ok { + def.MaxHops = int(v) } - req, err := newRequestWithBody(c, "PUT", "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName), body) - if err != nil { - return nil, err + if v, ok := m["prefetch-count"].(float64); ok { + def.PrefetchCount = int(v) } - if res, err = executeRequest(c, req); err != nil { - return nil, err + if v, ok := m["reconnect-delay"].(float64); ok { + def.ReconnectDelay = int(v) + } + + if v, ok := m["ack-mode"].(string); ok { + def.AckMode = v + } + + if v, ok := m["trust-user-id"].(bool); ok { + def.TrustUserId = v } - return res, nil + if v, ok := m["exchange"].(string); ok { + def.Exchange = v + } + + if v, ok := m["queue"].(string); ok { + def.Queue = v + } + + up.Definition = def + + return up, nil } -func (c *Client) PutFederationUpstreamV2(vhost string, name string, def FederationDefinition) (res *http.Response, err error) { +// +// PUT /api/parameters/federation-upstream/{vhost}/{upstream} +// + +// Updates a federation upstream +func (c *Client) PutFederationUpstream(vhost string, name string, def FederationDefinition) (res *http.Response, err error) { return c.PutRuntimeParameter("federation-upstream", vhost, name, def) } @@ -142,15 +145,6 @@ func (c *Client) PutFederationUpstreamV2(vhost string, name string, def Federati // // Deletes a federation upstream. -func (c *Client) DeleteFederationUpstream(vhost, upstreamName string) (res *http.Response, err error) { - req, err := newRequestWithBody(c, "DELETE", "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName), nil) - if err != nil { - return nil, err - } - - if res, err = executeRequest(c, req); err != nil { - return nil, err - } - - return res, nil +func (c *Client) DeleteFederationUpstream(vhost, name string) (res *http.Response, err error) { + return c.DeleteRuntimeParameter("federation-upstream", vhost, name) } diff --git a/rabbithole_test.go b/rabbithole_test.go index 482db5b..8152d6b 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -1848,16 +1848,16 @@ var _ = Describe("Rabbithole", func() { Context("when there are upstreams", func() { It("returns the list of upstreams", func() { - fd1 := FederationDefinition{ + def1 := FederationDefinition{ Uri: "amqp://server-name/%2f", } - _, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", fd1) + _, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", def1) Ω(err).Should(BeNil()) - fd2 := FederationDefinition{ + def2 := FederationDefinition{ Uri: "amqp://example.com/%2f", } - _, err = rmqc.PutFederationUpstream("/", "upstream2", fd2) + _, err = rmqc.PutFederationUpstream("/", "upstream2", def2) Ω(err).Should(BeNil()) awaitEventPropagation() @@ -1894,18 +1894,18 @@ var _ = Describe("Rabbithole", func() { It("returns the list of upstreams", func() { vh := "rabbit/hole" - fd1 := FederationDefinition{ + def1 := FederationDefinition{ Uri: "amqp://server-name/%2f", } - _, err := rmqc.PutFederationUpstream(vh, "upstream1", fd1) + _, err := rmqc.PutFederationUpstream(vh, "upstream1", def1) Ω(err).Should(BeNil()) - fd2 := FederationDefinition{ + def2 := FederationDefinition{ Uri: "amqp://example.com/%2f", } - _, err = rmqc.PutFederationUpstream(vh, "upstream2", fd2) + _, err = rmqc.PutFederationUpstream(vh, "upstream2", def2) Ω(err).Should(BeNil()) awaitEventPropagation() @@ -1943,9 +1943,9 @@ var _ = Describe("Rabbithole", func() { vh := "rabbit/hole" name := "temporary" - fu, err := rmqc.GetFederationUpstream(vh, name) + up, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) - Ω(fu).Should(BeNil()) + Ω(up).Should(BeNil()) }) }) @@ -1954,7 +1954,7 @@ var _ = Describe("Rabbithole", func() { vh := "rabbit/hole" name := "temporary" - fd := FederationDefinition{ + def := FederationDefinition{ Uri: "amqp://127.0.0.1/%2f", PrefetchCount: 1000, ReconnectDelay: 1, @@ -1962,21 +1962,21 @@ var _ = Describe("Rabbithole", func() { TrustUserId: false, } - _, err := rmqc.PutFederationUpstream(vh, name, fd) + _, err := rmqc.PutFederationUpstream(vh, name, def) Ω(err).Should(BeNil()) awaitEventPropagation() - fu, err := rmqc.GetFederationUpstream(vh, name) + up, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) - Ω(fu.Vhost).Should(Equal(vh)) - Ω(fu.Name).Should(Equal(name)) - Ω(fu.Component).Should(Equal("federation-upstream")) - Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) - Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) - Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) - Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) - Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) + Ω(up.Vhost).Should(Equal(vh)) + Ω(up.Name).Should(Equal(name)) + Ω(up.Component).Should(Equal("federation-upstream")) + Ω(up.Definition.Uri).Should(Equal(def.Uri)) + Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) + Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) + Ω(up.Definition.AckMode).Should(Equal(def.AckMode)) + Ω(up.Definition.TrustUserId).Should(Equal(def.TrustUserId)) _, err = rmqc.DeleteFederationUpstream(vh, name) Ω(err).Should(BeNil()) @@ -1990,29 +1990,39 @@ var _ = Describe("Rabbithole", func() { vh := "rabbit/hole" name := "temporary" - fd := FederationDefinition{ + def := FederationDefinition{ Uri: "amqp://127.0.0.1/%2f", + Expires: 1800000, + MessageTTL: 360000, + MaxHops: 1, PrefetchCount: 500, ReconnectDelay: 5, AckMode: "on-publish", TrustUserId: false, + Exchange: "", + Queue: "", } - _, err := rmqc.PutFederationUpstream(vh, name, fd) + _, err := rmqc.PutFederationUpstream(vh, name, def) Ω(err).Should(BeNil()) awaitEventPropagation() - fu, err := rmqc.GetFederationUpstream(vh, name) + up, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) - Ω(fu.Vhost).Should(Equal(vh)) - Ω(fu.Name).Should(Equal(name)) - Ω(fu.Component).Should(Equal("federation-upstream")) - Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) - Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) - Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) - Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) - Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) + Ω(up.Vhost).Should(Equal(vh)) + Ω(up.Name).Should(Equal(name)) + Ω(up.Component).Should(Equal("federation-upstream")) + Ω(up.Definition.Uri).Should(Equal(def.Uri)) + Ω(up.Definition.Expires).Should(Equal(def.Expires)) + Ω(up.Definition.MessageTTL).Should(Equal(def.MessageTTL)) + Ω(up.Definition.MaxHops).Should(Equal(def.MaxHops)) + Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) + Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) + Ω(up.Definition.AckMode).Should(Equal(def.AckMode)) + Ω(up.Definition.TrustUserId).Should(Equal(def.TrustUserId)) + Ω(up.Definition.Exchange).Should(Equal(def.Exchange)) + Ω(up.Definition.Queue).Should(Equal(def.Queue)) _, err = rmqc.DeleteFederationUpstream(vh, name) Ω(err).Should(BeNil()) @@ -2025,7 +2035,7 @@ var _ = Describe("Rabbithole", func() { name := "temporary" // create the upstream - fd := FederationDefinition{ + def := FederationDefinition{ Uri: "amqp://127.0.0.1/%2f", PrefetchCount: 1000, ReconnectDelay: 1, @@ -2033,24 +2043,24 @@ var _ = Describe("Rabbithole", func() { TrustUserId: false, } - _, err := rmqc.PutFederationUpstream(vh, name, fd) + _, err := rmqc.PutFederationUpstream(vh, name, def) Ω(err).Should(BeNil()) awaitEventPropagation() - fu, err := rmqc.GetFederationUpstream(vh, name) + up, err := rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) - Ω(fu.Vhost).Should(Equal(vh)) - Ω(fu.Name).Should(Equal(name)) - Ω(fu.Component).Should(Equal("federation-upstream")) - Ω(fu.Definition.Uri).Should(Equal(fd.Uri)) - Ω(fu.Definition.PrefetchCount).Should(Equal(fd.PrefetchCount)) - Ω(fu.Definition.ReconnectDelay).Should(Equal(fd.ReconnectDelay)) - Ω(fu.Definition.AckMode).Should(Equal(fd.AckMode)) - Ω(fu.Definition.TrustUserId).Should(Equal(fd.TrustUserId)) + Ω(up.Vhost).Should(Equal(vh)) + Ω(up.Name).Should(Equal(name)) + Ω(up.Component).Should(Equal("federation-upstream")) + Ω(up.Definition.Uri).Should(Equal(def.Uri)) + Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) + Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) + Ω(up.Definition.AckMode).Should(Equal(def.AckMode)) + Ω(up.Definition.TrustUserId).Should(Equal(def.TrustUserId)) // update the upstream - fd2 := FederationDefinition{ + def2 := FederationDefinition{ Uri: "amqp://127.0.0.1/%2f", PrefetchCount: 500, ReconnectDelay: 10, @@ -2058,21 +2068,21 @@ var _ = Describe("Rabbithole", func() { TrustUserId: true, } - _, err = rmqc.PutFederationUpstream(vh, name, fd2) + _, err = rmqc.PutFederationUpstream(vh, name, def2) Ω(err).Should(BeNil()) awaitEventPropagation() - fu2, err := rmqc.GetFederationUpstream(vh, name) + up, err = rmqc.GetFederationUpstream(vh, name) Ω(err).Should(BeNil()) - Ω(fu2.Vhost).Should(Equal(vh)) - Ω(fu2.Name).Should(Equal(name)) - Ω(fu2.Component).Should(Equal("federation-upstream")) - Ω(fu2.Definition.Uri).Should(Equal(fd2.Uri)) - Ω(fu2.Definition.PrefetchCount).Should(Equal(fd2.PrefetchCount)) - Ω(fu2.Definition.ReconnectDelay).Should(Equal(fd2.ReconnectDelay)) - Ω(fu2.Definition.AckMode).Should(Equal(fd2.AckMode)) - Ω(fu2.Definition.TrustUserId).Should(Equal(fd2.TrustUserId)) + Ω(up.Vhost).Should(Equal(vh)) + Ω(up.Name).Should(Equal(name)) + Ω(up.Component).Should(Equal("federation-upstream")) + Ω(up.Definition.Uri).Should(Equal(def2.Uri)) + Ω(up.Definition.PrefetchCount).Should(Equal(def2.PrefetchCount)) + Ω(up.Definition.ReconnectDelay).Should(Equal(def2.ReconnectDelay)) + Ω(up.Definition.AckMode).Should(Equal(def2.AckMode)) + Ω(up.Definition.TrustUserId).Should(Equal(def2.TrustUserId)) _, err = rmqc.DeleteFederationUpstream(vh, name) Ω(err).Should(BeNil()) @@ -2109,11 +2119,11 @@ var _ = Describe("Rabbithole", func() { vh := "rabbit/hole" name := "temporary" - fd := FederationDefinition{ + def := FederationDefinition{ Uri: "amqp://127.0.0.1/%2f", } - _, err := rmqc.PutFederationUpstream(vh, name, fd) + _, err := rmqc.PutFederationUpstream(vh, name, def) Ω(err).Should(BeNil()) awaitEventPropagation() @@ -2210,66 +2220,4 @@ var _ = Describe("Rabbithole", func() { }) }) }) - - Context("V2 PUT /api/parameters/federation-upstream/{vhost}/{name}", func() { - Context("when the parameter does not exist", func() { - It("creates the parameter", func() { - component := "federation-upstream" - vhost := "rabbit/hole" - name := "temporary" - - def := FederationDefinition{ - Uri: "amqp://127.0.0.1/%2f", - PrefetchCount: 1000, - ReconnectDelay: 1, - AckMode: "on-confirm", - TrustUserId: false, - } - - // use the federation API to create the parameter - _, err := rmqc.PutFederationUpstreamV2(vhost, name, def) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - - // use the runtime parameter API to read the federation info - // param is RuntimeParameter - param, err := rmqc.GetRuntimeParameter(component, vhost, name) - - Ω(err).Should(BeNil()) - Ω(param.Component).Should(Equal("federation-upstream")) - Ω(param.Vhost).Should(Equal(vhost)) - Ω(param.Name).Should(Equal(name)) - - // the federation defintion is contained in a map - v := param.Value.(map[string]interface{}) - Ω(v["uri"]).Should(Equal(def.Uri)) - - // this could be avoided by using decoder.UserNumber() - Ω(int(v["prefetch-count"].(float64))).Should(Equal(def.PrefetchCount)) - Ω(int(v["reconnect-delay"].(float64))).Should(Equal(def.ReconnectDelay)) - - Ω(v["ack-mode"]).Should(Equal(def.AckMode)) - Ω(v["trust-user-id"]).Should(Equal(def.TrustUserId)) - - // use the federation API to read the federation info - // info is FederationInfo - info, err := rmqc.GetFederationUpstreamV2(vhost, name) - - Ω(err).Should(BeNil()) - Ω(info.Component).Should(Equal(component)) - Ω(info.Vhost).Should(Equal(vhost)) - Ω(info.Name).Should(Equal(name)) - - Ω(info.Definition.Uri).Should(Equal(def.Uri)) - Ω(info.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) - Ω(info.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) - Ω(info.Definition.AckMode).Should(Equal(def.AckMode)) - Ω(info.Definition.TrustUserId).Should(Equal(def.TrustUserId)) - - _, err = rmqc.DeleteRuntimeParameter(component, vhost, name) - Ω(err).Should(BeNil()) - }) - }) - }) }) diff --git a/runtime_parameters.go b/runtime_parameters.go index 49912d1..47fc38c 100644 --- a/runtime_parameters.go +++ b/runtime_parameters.go @@ -79,24 +79,16 @@ func (c *Client) ListRuntimeParametersIn(component, vhost string) (p []RuntimePa // GetRuntimeParameter returns a runtime parameter. func (c *Client) GetRuntimeParameter(component, vhost, name string) (p *RuntimeParameter, err error) { - if err = c.PopulateRuntimeParameter(component, vhost, name, &p); err != nil { - return nil, err - } - return p, nil -} - -// PopulateRuntimeParameter hydrates a runtime parameter using the provided interface. -func (c *Client) PopulateRuntimeParameter(component, vhost, name string, p interface{}) error { req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name)) if err != nil { - return err + return nil, err } if err = executeAndParseRequest(c, req, &p); err != nil { - return err + return nil, err } - return nil + return p, nil } // From e90c68c87f4def9d931253a4acb8a45058369584 Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Tue, 5 May 2020 20:04:45 -0400 Subject: [PATCH 15/19] List federation upstreams using parameter API. --- federation.go | 83 ++++++++++++++++++++++++++------------------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/federation.go b/federation.go index 48ab130..3c5213a 100644 --- a/federation.go +++ b/federation.go @@ -2,7 +2,6 @@ package rabbithole import ( "net/http" - "net/url" ) // Federation definition: additional arguments @@ -34,17 +33,18 @@ type FederationUpstream struct { // // ListFederationUpstreams returns all federation upstreams -func (c *Client) ListFederationUpstreams() (rec []FederationUpstream, err error) { - req, err := newGETRequest(c, "parameters/federation-upstream") +func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) { + params, err := c.ListRuntimeParametersFor("federation-upstream") if err != nil { - return []FederationUpstream{}, err + return nil, err } - if err = executeAndParseRequest(c, req, &rec); err != nil { - return []FederationUpstream{}, err + ups = []FederationUpstream{} + for _, param := range params { + up := paramToUpstream(¶m) + ups = append(ups, *up) } - - return rec, nil + return ups, nil } // @@ -52,17 +52,18 @@ func (c *Client) ListFederationUpstreams() (rec []FederationUpstream, err error) // // ListFederationUpstreamsIn returns all federation upstreams in a vhost -func (c *Client) ListFederationUpstreamsIn(vhost string) (rec []FederationUpstream, err error) { - req, err := newGETRequest(c, "parameters/federation-upstream/"+url.PathEscape(vhost)) +func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstream, err error) { + params, err := c.ListRuntimeParametersIn("federation-upstream", vhost) if err != nil { - return []FederationUpstream{}, err + return nil, err } - if err = executeAndParseRequest(c, req, &rec); err != nil { - return []FederationUpstream{}, err + ups = []FederationUpstream{} + for _, param := range params { + up := paramToUpstream(¶m) + ups = append(ups, *up) } - - return rec, nil + return ups, nil } // @@ -75,16 +76,37 @@ func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstre if err != nil { return nil, err } + return paramToUpstream(param), nil +} + +// +// PUT /api/parameters/federation-upstream/{vhost}/{upstream} +// + +// Updates a federation upstream +func (c *Client) PutFederationUpstream(vhost string, name string, def FederationDefinition) (res *http.Response, err error) { + return c.PutRuntimeParameter("federation-upstream", vhost, name, def) +} + +// +// DELETE /api/parameters/federation-upstream/{vhost}/{name} +// + +// Deletes a federation upstream. +func (c *Client) DeleteFederationUpstream(vhost, name string) (res *http.Response, err error) { + return c.DeleteRuntimeParameter("federation-upstream", vhost, name) +} - // TODO: extract to function when migrating List* methods. +// paramToUpstream maps from a RuntimeParameter structure to a FederationUpstream structure. +func paramToUpstream(p *RuntimeParameter) (up *FederationUpstream) { up = &FederationUpstream{ - Name: param.Name, - Vhost: param.Vhost, - Component: param.Component, + Name: p.Name, + Vhost: p.Vhost, + Component: p.Component, } def := FederationDefinition{} - m := param.Value.(map[string]interface{}) + m := p.Value.(map[string]interface{}) if v, ok := m["uri"].(string); ok { def.Uri = v @@ -127,24 +149,5 @@ func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstre } up.Definition = def - - return up, nil -} - -// -// PUT /api/parameters/federation-upstream/{vhost}/{upstream} -// - -// Updates a federation upstream -func (c *Client) PutFederationUpstream(vhost string, name string, def FederationDefinition) (res *http.Response, err error) { - return c.PutRuntimeParameter("federation-upstream", vhost, name, def) -} - -// -// DELETE /api/parameters/federation-upstream/{vhost}/{name} -// - -// Deletes a federation upstream. -func (c *Client) DeleteFederationUpstream(vhost, name string) (res *http.Response, err error) { - return c.DeleteRuntimeParameter("federation-upstream", vhost, name) + return up } From 583610e12e2e9f4e7d6f96c84b946362c3332c28 Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Tue, 5 May 2020 20:10:48 -0400 Subject: [PATCH 16/19] Remove redundant initialization. --- federation.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/federation.go b/federation.go index 3c5213a..05d2a53 100644 --- a/federation.go +++ b/federation.go @@ -39,7 +39,6 @@ func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) return nil, err } - ups = []FederationUpstream{} for _, param := range params { up := paramToUpstream(¶m) ups = append(ups, *up) @@ -58,7 +57,6 @@ func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstre return nil, err } - ups = []FederationUpstream{} for _, param := range params { up := paramToUpstream(¶m) ups = append(ups, *up) From bf145efaaf8d075f1d57ddb89bd586c07166c1d9 Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Tue, 5 May 2020 20:16:24 -0400 Subject: [PATCH 17/19] Use const for "federation-upstream" component. --- federation.go | 12 +++++++----- rabbithole_test.go | 12 ++++++------ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/federation.go b/federation.go index 05d2a53..874203f 100644 --- a/federation.go +++ b/federation.go @@ -28,13 +28,15 @@ type FederationUpstream struct { Definition FederationDefinition `json:"value"` } +const FederationUpstreamComponent string = "federation-upstream" + // // GET /api/parameters/federation-upstream // // ListFederationUpstreams returns all federation upstreams func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) { - params, err := c.ListRuntimeParametersFor("federation-upstream") + params, err := c.ListRuntimeParametersFor(FederationUpstreamComponent) if err != nil { return nil, err } @@ -52,7 +54,7 @@ func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) // ListFederationUpstreamsIn returns all federation upstreams in a vhost func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstream, err error) { - params, err := c.ListRuntimeParametersIn("federation-upstream", vhost) + params, err := c.ListRuntimeParametersIn(FederationUpstreamComponent, vhost) if err != nil { return nil, err } @@ -70,7 +72,7 @@ func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstre // GetFederationUpstream returns a federation upstream func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstream, err error) { - param, err := c.GetRuntimeParameter("federation-upstream", vhost, name) + param, err := c.GetRuntimeParameter(FederationUpstreamComponent, vhost, name) if err != nil { return nil, err } @@ -83,7 +85,7 @@ func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstre // Updates a federation upstream func (c *Client) PutFederationUpstream(vhost string, name string, def FederationDefinition) (res *http.Response, err error) { - return c.PutRuntimeParameter("federation-upstream", vhost, name, def) + return c.PutRuntimeParameter(FederationUpstreamComponent, vhost, name, def) } // @@ -92,7 +94,7 @@ func (c *Client) PutFederationUpstream(vhost string, name string, def Federation // Deletes a federation upstream. func (c *Client) DeleteFederationUpstream(vhost, name string) (res *http.Response, err error) { - return c.DeleteRuntimeParameter("federation-upstream", vhost, name) + return c.DeleteRuntimeParameter(FederationUpstreamComponent, vhost, name) } // paramToUpstream maps from a RuntimeParameter structure to a FederationUpstream structure. diff --git a/rabbithole_test.go b/rabbithole_test.go index 8152d6b..eb06673 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -1971,7 +1971,7 @@ var _ = Describe("Rabbithole", func() { Ω(err).Should(BeNil()) Ω(up.Vhost).Should(Equal(vh)) Ω(up.Name).Should(Equal(name)) - Ω(up.Component).Should(Equal("federation-upstream")) + Ω(up.Component).Should(Equal(FederationUpstreamComponent)) Ω(up.Definition.Uri).Should(Equal(def.Uri)) Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) @@ -2012,7 +2012,7 @@ var _ = Describe("Rabbithole", func() { Ω(err).Should(BeNil()) Ω(up.Vhost).Should(Equal(vh)) Ω(up.Name).Should(Equal(name)) - Ω(up.Component).Should(Equal("federation-upstream")) + Ω(up.Component).Should(Equal(FederationUpstreamComponent)) Ω(up.Definition.Uri).Should(Equal(def.Uri)) Ω(up.Definition.Expires).Should(Equal(def.Expires)) Ω(up.Definition.MessageTTL).Should(Equal(def.MessageTTL)) @@ -2052,7 +2052,7 @@ var _ = Describe("Rabbithole", func() { Ω(err).Should(BeNil()) Ω(up.Vhost).Should(Equal(vh)) Ω(up.Name).Should(Equal(name)) - Ω(up.Component).Should(Equal("federation-upstream")) + Ω(up.Component).Should(Equal(FederationUpstreamComponent)) Ω(up.Definition.Uri).Should(Equal(def.Uri)) Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) @@ -2077,7 +2077,7 @@ var _ = Describe("Rabbithole", func() { Ω(err).Should(BeNil()) Ω(up.Vhost).Should(Equal(vh)) Ω(up.Name).Should(Equal(name)) - Ω(up.Component).Should(Equal("federation-upstream")) + Ω(up.Component).Should(Equal(FederationUpstreamComponent)) Ω(up.Definition.Uri).Should(Equal(def2.Uri)) Ω(up.Definition.PrefetchCount).Should(Equal(def2.PrefetchCount)) Ω(up.Definition.ReconnectDelay).Should(Equal(def2.ReconnectDelay)) @@ -2179,7 +2179,7 @@ var _ = Describe("Rabbithole", func() { Context("PUT /api/parameters/{component}/{vhost}/{name}", func() { Context("when the parameter does not exist", func() { It("creates the parameter", func() { - component := "federation-upstream" + component := FederationUpstreamComponent vhost := "rabbit/hole" name := "temporary" @@ -2199,7 +2199,7 @@ var _ = Describe("Rabbithole", func() { p, err := rmqc.GetRuntimeParameter(component, vhost, name) Ω(err).Should(BeNil()) - Ω(p.Component).Should(Equal("federation-upstream")) + Ω(p.Component).Should(Equal(FederationUpstreamComponent)) Ω(p.Vhost).Should(Equal(vhost)) Ω(p.Name).Should(Equal(name)) From 9b6ed3bbe16b2ccee37e4e842410ca93ca11421d Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Tue, 5 May 2020 22:50:53 -0400 Subject: [PATCH 18/19] Update docs and comments. - use consistent variable names --- README.md | 47 +++++++++++++++++++++++++++++++++++-------- doc.go | 36 +++++++++++++++++++++++++++++---- federation.go | 24 +++++++++++----------- rabbithole_test.go | 1 - runtime_parameters.go | 16 +++++++-------- 5 files changed, 91 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index d1fbdc5..bbb6888 100644 --- a/README.md +++ b/README.md @@ -326,29 +326,60 @@ resp, err := rmqc.DeleteShovel("/", "a.shovel") ``` +### Operations on Runtime (vhost-scoped) Parameters + +```golang +// list all runtime parameters +params, err := rmqc.ListRuntimeParameters() +// => []RuntimeParameter, error + +// list all runtime parameters for a component +params, err := rmqc.ListRuntimeParametersFor("federation-upstream") +// => []RuntimeParameter, error + +// list runtime parameters in a vhost +params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/") +// => []RuntimeParameter, error + +// information about a runtime parameter +p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name") +// => *RuntimeParameter, error + +// declare or update a runtime parameter +resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{ + Uri: "amqp://server-name", +}) +// => *http.Response, error + +// remove a runtime parameter +resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name") +// => *http.Response, error + +``` + ### Operations on Federation Upstreams -```go -// list all upstreams +```golang +// list all federation upstreams ups, err := rmqc.ListFederationUpstreams() // => []FederationUpstream, error -// list upstreams in a vhost +// list federation upstreams in a vhost ups, err := rmqc.ListFederationUpstreamsIn("/") // => []FederationUpstream, error -// information about an upstream -up, err := rmqc.GetFederationUpstream("/", "upstream-name") +// information about a federated upstream +up, err := rmqc.GetFederationUpstream("/", "name") // => *FederationUpstream, error -// declare an upstream -resp, err := rmqc.PutFederationUpstream("/", "upstream-name", FederationDefinition{ +// declare or update a federation upstream +resp, err := rmqc.PutFederationUpstream("/", "name", FederationDefinition{ Uri: "amqp://server-name", }) // => *http.Response, error // delete an upstream -resp, err := rmqc.DeleteFederationUpstream("/", "upstream-name") +resp, err := rmqc.DeleteFederationUpstream("/", "name") // => *http.Response, error ``` diff --git a/doc.go b/doc.go index 01d437a..b6738b9 100644 --- a/doc.go +++ b/doc.go @@ -199,21 +199,49 @@ Managing Topic Permissions resp, err := rmqc.DeleteTopicPermissionsIn("/", "my.user", "exchange") // => *http.Response, err +Managing Runtime Parameters + + // list all runtime parameters + params, err := rmqc.ListRuntimeParameters() + // => []RuntimeParameter, error + + // list all runtime parameters for a component + params, err := rmqc.ListRuntimeParametersFor("federation-upstream") + // => []RuntimeParameter, error + + // list runtime parameters in a vhost + params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/") + // => []RuntimeParameter, error + + // information about a runtime parameter + p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name") + // => *RuntimeParameter, error + + // declare or update a runtime parameter + resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{ + Uri: "amqp://server-name", + }) + // => *http.Response, error + + // remove a runtime parameter + resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name") + // => *http.Response, error + Managing Federation Upstreams - // list all upstreams + // list all federation upstreams ups, err := rmqc.ListFederationUpstreams() // => []FederationUpstream, error - // list upstreams in a vhost + // list federation upstreams in a vhost ups, err := rmqc.ListFederationUpstreamsIn("/") // => []FederationUpstream, error - // information about an upstream + // information about a federated upstream up, err := rmqc.GetFederationUpstream("/", "upstream-name") // => *FederationUpstream, error - // declare an upstream + // declare or update a federation upstream resp, err := rmqc.PutFederationUpstream("/", "upstream-name", FederationDefinition{ Uri: "amqp://server-name", }) diff --git a/federation.go b/federation.go index 874203f..761d060 100644 --- a/federation.go +++ b/federation.go @@ -34,15 +34,15 @@ const FederationUpstreamComponent string = "federation-upstream" // GET /api/parameters/federation-upstream // -// ListFederationUpstreams returns all federation upstreams +// ListFederationUpstreams returns a list of all federation upstreams. func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) { params, err := c.ListRuntimeParametersFor(FederationUpstreamComponent) if err != nil { return nil, err } - for _, param := range params { - up := paramToUpstream(¶m) + for _, p := range params { + up := paramToUpstream(&p) ups = append(ups, *up) } return ups, nil @@ -52,15 +52,15 @@ func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) // GET /api/parameters/federation-upstream/{vhost} // -// ListFederationUpstreamsIn returns all federation upstreams in a vhost +// ListFederationUpstreamsIn returns a list of all federation upstreams in a vhost. func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstream, err error) { params, err := c.ListRuntimeParametersIn(FederationUpstreamComponent, vhost) if err != nil { return nil, err } - for _, param := range params { - up := paramToUpstream(¶m) + for _, p := range params { + up := paramToUpstream(&p) ups = append(ups, *up) } return ups, nil @@ -70,21 +70,21 @@ func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstre // GET /api/parameters/federation-upstream/{vhost}/{upstream} // -// GetFederationUpstream returns a federation upstream +// GetFederationUpstream returns information about a federation upstream. func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstream, err error) { - param, err := c.GetRuntimeParameter(FederationUpstreamComponent, vhost, name) + p, err := c.GetRuntimeParameter(FederationUpstreamComponent, vhost, name) if err != nil { return nil, err } - return paramToUpstream(param), nil + return paramToUpstream(p), nil } // // PUT /api/parameters/federation-upstream/{vhost}/{upstream} // -// Updates a federation upstream -func (c *Client) PutFederationUpstream(vhost string, name string, def FederationDefinition) (res *http.Response, err error) { +// PutFederationUpstream creates or updates a federation upstream configuration. +func (c *Client) PutFederationUpstream(vhost, name string, def FederationDefinition) (res *http.Response, err error) { return c.PutRuntimeParameter(FederationUpstreamComponent, vhost, name, def) } @@ -92,7 +92,7 @@ func (c *Client) PutFederationUpstream(vhost string, name string, def Federation // DELETE /api/parameters/federation-upstream/{vhost}/{name} // -// Deletes a federation upstream. +// DeleteFederationUpstream removes a federation upstream. func (c *Client) DeleteFederationUpstream(vhost, name string) (res *http.Response, err error) { return c.DeleteRuntimeParameter(FederationUpstreamComponent, vhost, name) } diff --git a/rabbithole_test.go b/rabbithole_test.go index eb06673..2e08b48 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -2206,7 +2206,6 @@ var _ = Describe("Rabbithole", func() { // we need to convert from interface{} v := p.Value.(map[string]interface{}) - // could use reflect or a better assertion here. Ω(v["uri"]).Should(Equal(pv["uri"])) Ω(int(v["prefetch-count"].(float64))).Should(Equal(pv["prefetch-count"])) diff --git a/runtime_parameters.go b/runtime_parameters.go index 47fc38c..21f851c 100644 --- a/runtime_parameters.go +++ b/runtime_parameters.go @@ -8,7 +8,7 @@ import ( // RuntimeParameter represents a vhost-scoped parameter. // Value is interface{} to support creating parameters directly from types such as -// FederationInfo and ShovelInfo. +// FederationUpstream and ShovelInfo. type RuntimeParameter struct { Name string `json:"name"` Vhost string `json:"vhost"` @@ -23,7 +23,7 @@ type RuntimeParameterValue map[string]interface{} // GET /api/parameters // -// ListRuntimeParameters returns all runtime parameters. +// ListRuntimeParameters returns a list of all runtime parameters. func (c *Client) ListRuntimeParameters() (params []RuntimeParameter, err error) { req, err := newGETRequest(c, "parameters") if err != nil { @@ -41,7 +41,7 @@ func (c *Client) ListRuntimeParameters() (params []RuntimeParameter, err error) // GET /api/parameters/{component} // -// ListRuntimeParametersFor returns all runtime parameters for a component in all vhosts. +// ListRuntimeParametersFor returns a list of all runtime parameters for a component in all vhosts. func (c *Client) ListRuntimeParametersFor(component string) (params []RuntimeParameter, err error) { req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)) if err != nil { @@ -59,7 +59,7 @@ func (c *Client) ListRuntimeParametersFor(component string) (params []RuntimePar // GET /api/parameters/{component}/{vhost} // -// ListRuntimeParametersIn returns all runtime parameters for a component in a vhost. +// ListRuntimeParametersIn returns a list of all runtime parameters for a component in a vhost. func (c *Client) ListRuntimeParametersIn(component, vhost string) (p []RuntimeParameter, err error) { req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)) if err != nil { @@ -77,7 +77,7 @@ func (c *Client) ListRuntimeParametersIn(component, vhost string) (p []RuntimePa // GET /api/parameters/{component}/{vhost}/{name} // -// GetRuntimeParameter returns a runtime parameter. +// GetRuntimeParameter returns information about a runtime parameter. func (c *Client) GetRuntimeParameter(component, vhost, name string) (p *RuntimeParameter, err error) { req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name)) if err != nil { @@ -95,16 +95,16 @@ func (c *Client) GetRuntimeParameter(component, vhost, name string) (p *RuntimeP // PUT /api/parameters/{component}/{vhost}/{name} // -// PutRuntimeParameter creates a runtime parameter. +// PutRuntimeParameter creates or updates a runtime parameter. func (c *Client) PutRuntimeParameter(component, vhost, name string, value interface{}) (res *http.Response, err error) { - param := RuntimeParameter{ + p := RuntimeParameter{ name, vhost, component, value, } - body, err := json.Marshal(param) + body, err := json.Marshal(p) if err != nil { return nil, err } From 0259cac3bde4aad15662606b676c37e2aaa8a16e Mon Sep 17 00:00:00 2001 From: Raymond Fallon Date: Tue, 5 May 2020 23:14:40 -0400 Subject: [PATCH 19/19] Tests for ListRuntimeParameters API. --- rabbithole_test.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/rabbithole_test.go b/rabbithole_test.go index 2e08b48..506dee8 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -2219,4 +2219,55 @@ var _ = Describe("Rabbithole", func() { }) }) }) + + Context("GET /api/parameters", func() { + Context("when there are no runtime parameters", func() { + It("returns an empty response", func() { + list, err := rmqc.ListRuntimeParameters() + Ω(err).Should(BeNil()) + Ω(list).Should(BeEmpty()) + }) + }) + + Context("when there are runtime parameters", func() { + It("returns the list of parameters", func() { + fDef := FederationDefinition{ + Uri: "amqp://server-name/%2f", + } + _, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", fDef) + Ω(err).Should(BeNil()) + + sDef := ShovelDefinition{ + SourceURI: "amqp://127.0.0.1/%2f", + SourceQueue: "mySourceQueue", + DestinationURI: "amqp://127.0.0.1/%2f", + DestinationQueue: "myDestQueue", + AddForwardHeaders: true, + AckMode: "on-confirm", + DeleteAfter: "never", + } + + _, err = rmqc.DeclareShovel("/", "shovel1", sDef) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err := rmqc.ListRuntimeParameters() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(2)) + + _, err = rmqc.DeleteFederationUpstream("rabbit/hole", "upstream1") + Ω(err).Should(BeNil()) + + _, err = rmqc.DeleteShovel("/", "shovel1") + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err = rmqc.ListRuntimeParameters() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(0)) + }) + }) + }) })