diff --git a/README.md b/README.md index b4be96d..bbb6888 100644 --- a/README.md +++ b/README.md @@ -326,6 +326,64 @@ resp, err := rmqc.DeleteShovel("/", "a.shovel") ``` +### Operations on Runtime (vhost-scoped) Parameters + +```golang +// list all runtime parameters +params, err := rmqc.ListRuntimeParameters() +// => []RuntimeParameter, error + +// list all runtime parameters for a component +params, err := rmqc.ListRuntimeParametersFor("federation-upstream") +// => []RuntimeParameter, error + +// list runtime parameters in a vhost +params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/") +// => []RuntimeParameter, error + +// information about a runtime parameter +p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name") +// => *RuntimeParameter, error + +// declare or update a runtime parameter +resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{ + Uri: "amqp://server-name", +}) +// => *http.Response, error + +// remove a runtime parameter +resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name") +// => *http.Response, error + +``` + +### Operations on Federation Upstreams + +```golang +// list all federation upstreams +ups, err := rmqc.ListFederationUpstreams() +// => []FederationUpstream, error + +// list federation upstreams in a vhost +ups, err := rmqc.ListFederationUpstreamsIn("/") +// => []FederationUpstream, error + +// information about a federated upstream +up, err := rmqc.GetFederationUpstream("/", "name") +// => *FederationUpstream, error + +// declare or update a federation upstream +resp, err := rmqc.PutFederationUpstream("/", "name", FederationDefinition{ + Uri: "amqp://server-name", +}) +// => *http.Response, error + +// delete an upstream +resp, err := rmqc.DeleteFederationUpstream("/", "name") +// => *http.Response, error + +``` + ### Operations on cluster name ``` go // Get cluster name diff --git a/bin/ci/before_build.bat b/bin/ci/before_build.bat index d9c4468..b9e9b95 100644 --- a/bin/ci/before_build.bat +++ b/bin/ci/before_build.bat @@ -25,3 +25,7 @@ call %RABBITHOLE_RABBITMQCTL% set_permissions -p "rabbit/hole" guest ".*" ".*" " REM Enable shovel plugin call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_shovel call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_shovel_management + +REM Enable shovel plugin +call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_federation +call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_federation_management diff --git a/bin/ci/before_build.sh b/bin/ci/before_build.sh index bd940c5..11e2e99 100755 --- a/bin/ci/before_build.sh +++ b/bin/ci/before_build.sh @@ -29,3 +29,7 @@ $CTL set_cluster_name rabbitmq@localhost # Enable shovel plugin $PLUGINS enable rabbitmq_shovel $PLUGINS enable rabbitmq_shovel_management + +# Enable federation plugin +$PLUGINS enable rabbitmq_federation +$PLUGINS enable rabbitmq_federation_management diff --git a/doc.go b/doc.go index ce279e3..b6738b9 100644 --- a/doc.go +++ b/doc.go @@ -199,6 +199,58 @@ Managing Topic Permissions resp, err := rmqc.DeleteTopicPermissionsIn("/", "my.user", "exchange") // => *http.Response, err +Managing Runtime Parameters + + // list all runtime parameters + params, err := rmqc.ListRuntimeParameters() + // => []RuntimeParameter, error + + // list all runtime parameters for a component + params, err := rmqc.ListRuntimeParametersFor("federation-upstream") + // => []RuntimeParameter, error + + // list runtime parameters in a vhost + params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/") + // => []RuntimeParameter, error + + // information about a runtime parameter + p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name") + // => *RuntimeParameter, error + + // declare or update a runtime parameter + resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{ + Uri: "amqp://server-name", + }) + // => *http.Response, error + + // remove a runtime parameter + resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name") + // => *http.Response, error + +Managing Federation Upstreams + + // list all federation upstreams + ups, err := rmqc.ListFederationUpstreams() + // => []FederationUpstream, error + + // list federation upstreams in a vhost + ups, err := rmqc.ListFederationUpstreamsIn("/") + // => []FederationUpstream, error + + // information about a federated upstream + up, err := rmqc.GetFederationUpstream("/", "upstream-name") + // => *FederationUpstream, error + + // declare or update a federation upstream + resp, err := rmqc.PutFederationUpstream("/", "upstream-name", FederationDefinition{ + Uri: "amqp://server-name", + }) + // => *http.Response, error + + // delete an upstream + resp, err := rmqc.DeleteFederationUpstream("/", "upstream-name") + // => *http.Response, error + Operations on cluster name // Get cluster name cn, err := rmqc.GetClusterName() diff --git a/federation.go b/federation.go index 887cf3b..761d060 100644 --- a/federation.go +++ b/federation.go @@ -1,9 +1,7 @@ package rabbithole import ( - "encoding/json" "net/http" - "net/url" ) // Federation definition: additional arguments @@ -24,49 +22,132 @@ type FederationDefinition struct { // Represents a configured Federation upstream. type FederationUpstream struct { + Name string `json:"name"` + Vhost string `json:"vhost"` + Component string `json:"component"` Definition FederationDefinition `json:"value"` } +const FederationUpstreamComponent string = "federation-upstream" + // -// PUT /api/parameters/federation-upstream/{vhost}/{upstream} +// GET /api/parameters/federation-upstream // -// Updates a federation upstream -func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error) { - fedUp := FederationUpstream{ - Definition: fDef, - } - body, err := json.Marshal(fedUp) +// ListFederationUpstreams returns a list of all federation upstreams. +func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) { + params, err := c.ListRuntimeParametersFor(FederationUpstreamComponent) if err != nil { return nil, err } - req, err := newRequestWithBody(c, "PUT", "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName), body) + for _, p := range params { + up := paramToUpstream(&p) + ups = append(ups, *up) + } + return ups, nil +} + +// +// GET /api/parameters/federation-upstream/{vhost} +// + +// ListFederationUpstreamsIn returns a list of all federation upstreams in a vhost. +func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstream, err error) { + params, err := c.ListRuntimeParametersIn(FederationUpstreamComponent, vhost) if err != nil { return nil, err } - if res, err = executeRequest(c, req); err != nil { + for _, p := range params { + up := paramToUpstream(&p) + ups = append(ups, *up) + } + return ups, nil +} + +// +// GET /api/parameters/federation-upstream/{vhost}/{upstream} +// + +// GetFederationUpstream returns information about a federation upstream. +func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstream, err error) { + p, err := c.GetRuntimeParameter(FederationUpstreamComponent, vhost, name) + if err != nil { return nil, err } + return paramToUpstream(p), nil +} - return res, nil +// +// PUT /api/parameters/federation-upstream/{vhost}/{upstream} +// + +// PutFederationUpstream creates or updates a federation upstream configuration. +func (c *Client) PutFederationUpstream(vhost, name string, def FederationDefinition) (res *http.Response, err error) { + return c.PutRuntimeParameter(FederationUpstreamComponent, vhost, name, def) } // // DELETE /api/parameters/federation-upstream/{vhost}/{name} // -// Deletes a federation upstream. -func (c *Client) DeleteFederationUpstream(vhost, upstreamName string) (res *http.Response, err error) { - req, err := newRequestWithBody(c, "DELETE", "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName), nil) - if err != nil { - return nil, err +// DeleteFederationUpstream removes a federation upstream. +func (c *Client) DeleteFederationUpstream(vhost, name string) (res *http.Response, err error) { + return c.DeleteRuntimeParameter(FederationUpstreamComponent, vhost, name) +} + +// paramToUpstream maps from a RuntimeParameter structure to a FederationUpstream structure. +func paramToUpstream(p *RuntimeParameter) (up *FederationUpstream) { + up = &FederationUpstream{ + Name: p.Name, + Vhost: p.Vhost, + Component: p.Component, } - if res, err = executeRequest(c, req); err != nil { - return nil, err + def := FederationDefinition{} + m := p.Value.(map[string]interface{}) + + if v, ok := m["uri"].(string); ok { + def.Uri = v + } + + if v, ok := m["expires"].(float64); ok { + def.Expires = int(v) + } + + if v, ok := m["message-ttl"].(float64); ok { + def.MessageTTL = int32(v) + } + + if v, ok := m["max-hops"].(float64); ok { + def.MaxHops = int(v) + } + + if v, ok := m["prefetch-count"].(float64); ok { + def.PrefetchCount = int(v) + } + + if v, ok := m["reconnect-delay"].(float64); ok { + def.ReconnectDelay = int(v) + } + + if v, ok := m["ack-mode"].(string); ok { + def.AckMode = v + } + + if v, ok := m["trust-user-id"].(bool); ok { + def.TrustUserId = v + } + + if v, ok := m["exchange"].(string); ok { + def.Exchange = v + } + + if v, ok := m["queue"].(string); ok { + def.Queue = v } - return res, nil + up.Definition = def + return up } diff --git a/rabbithole_test.go b/rabbithole_test.go index a918db0..506dee8 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -90,47 +90,6 @@ var _ = Describe("Rabbithole", func() { rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest") }) - Context("PUT /parameters/shovel/{vhost}/{name}", func() { - It("declares a shovel", func() { - vh := "rabbit/hole" - sn := "temporary" - - ssu := "amqp://127.0.0.1/%2f" - sdu := "amqp://127.0.0.1/%2f" - - shovelDefinition := ShovelDefinition{ - SourceURI: ssu, - SourceQueue: "mySourceQueue", - DestinationURI: sdu, - DestinationQueue: "myDestQueue", - AddForwardHeaders: true, - AckMode: "on-confirm", - DeleteAfter: "never"} - - _, err := rmqc.DeclareShovel(vh, sn, shovelDefinition) - Ω(err).Should(BeNil()) - - awaitEventPropagation() - x, err := rmqc.GetShovel(vh, sn) - Ω(err).Should(BeNil()) - Ω(x.Name).Should(Equal(sn)) - Ω(x.Vhost).Should(Equal(vh)) - Ω(x.Component).Should(Equal("shovel")) - Ω(x.Definition.SourceURI).Should(Equal(ssu)) - Ω(x.Definition.SourceQueue).Should(Equal("mySourceQueue")) - Ω(x.Definition.DestinationURI).Should(Equal(sdu)) - Ω(x.Definition.DestinationQueue).Should(Equal("myDestQueue")) - Ω(x.Definition.AddForwardHeaders).Should(Equal(true)) - Ω(x.Definition.AckMode).Should(Equal("on-confirm")) - Ω(x.Definition.DeleteAfter).Should(Equal("never")) - - rmqc.DeleteShovel(vh, sn) - awaitEventPropagation() - x, _ = rmqc.GetShovel(vh, sn) - Ω(x).Should(BeNil()) - }) - }) - Context("GET /overview", func() { It("returns decoded response", func() { conn := openConnection("/") @@ -1877,4 +1836,438 @@ var _ = Describe("Rabbithole", func() { Ω(err).Should(BeNil()) }) }) + + Context("GET /api/parameters/federation-upstream", func() { + Context("when there are no upstreams", func() { + It("returns an empty response", func() { + list, err := rmqc.ListFederationUpstreams() + Ω(err).Should(BeNil()) + Ω(list).Should(BeEmpty()) + }) + }) + + Context("when there are upstreams", func() { + It("returns the list of upstreams", func() { + def1 := FederationDefinition{ + Uri: "amqp://server-name/%2f", + } + _, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", def1) + Ω(err).Should(BeNil()) + + def2 := FederationDefinition{ + Uri: "amqp://example.com/%2f", + } + _, err = rmqc.PutFederationUpstream("/", "upstream2", def2) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err := rmqc.ListFederationUpstreams() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(2)) + + _, err = rmqc.DeleteFederationUpstream("rabbit/hole", "upstream1") + Ω(err).Should(BeNil()) + + _, err = rmqc.DeleteFederationUpstream("/", "upstream2") + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err = rmqc.ListFederationUpstreams() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(0)) + }) + }) + }) + + Context("GET /api/parameters/federation-upstream/{vhost}", func() { + Context("when there are no upstreams", func() { + It("returns an empty response", func() { + list, err := rmqc.ListFederationUpstreamsIn("rabbit/hole") + Ω(err).Should(BeNil()) + Ω(list).Should(BeEmpty()) + }) + }) + + Context("when there are upstreams", func() { + It("returns the list of upstreams", func() { + vh := "rabbit/hole" + + def1 := FederationDefinition{ + Uri: "amqp://server-name/%2f", + } + + _, err := rmqc.PutFederationUpstream(vh, "upstream1", def1) + Ω(err).Should(BeNil()) + + def2 := FederationDefinition{ + Uri: "amqp://example.com/%2f", + } + + _, err = rmqc.PutFederationUpstream(vh, "upstream2", def2) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err := rmqc.ListFederationUpstreamsIn(vh) + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(2)) + + // delete upstream1 + _, err = rmqc.DeleteFederationUpstream(vh, "upstream1") + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err = rmqc.ListFederationUpstreamsIn(vh) + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(1)) + + // delete upstream2 + _, err = rmqc.DeleteFederationUpstream(vh, "upstream2") + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err = rmqc.ListFederationUpstreamsIn(vh) + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(0)) + }) + }) + }) + + Context("GET /api/parameters/federation-upstream/{vhost}/{upstream}", func() { + Context("when the upstream does not exist", func() { + It("returns a 404 error", func() { + vh := "rabbit/hole" + name := "temporary" + + up, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + Ω(up).Should(BeNil()) + }) + }) + + Context("when the upstream exists", func() { + It("returns the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + def := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 1000, + ReconnectDelay: 1, + AckMode: "on-confirm", + TrustUserId: false, + } + + _, err := rmqc.PutFederationUpstream(vh, name, def) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + up, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(up.Vhost).Should(Equal(vh)) + Ω(up.Name).Should(Equal(name)) + Ω(up.Component).Should(Equal(FederationUpstreamComponent)) + Ω(up.Definition.Uri).Should(Equal(def.Uri)) + Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) + Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) + Ω(up.Definition.AckMode).Should(Equal(def.AckMode)) + Ω(up.Definition.TrustUserId).Should(Equal(def.TrustUserId)) + + _, err = rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + }) + }) + }) + + Context("PUT /api/parameters/federation-upstream/{vhost}/{upstream}", func() { + Context("when the upstream does not exist", func() { + It("creates the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + def := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + Expires: 1800000, + MessageTTL: 360000, + MaxHops: 1, + PrefetchCount: 500, + ReconnectDelay: 5, + AckMode: "on-publish", + TrustUserId: false, + Exchange: "", + Queue: "", + } + + _, err := rmqc.PutFederationUpstream(vh, name, def) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + up, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(up.Vhost).Should(Equal(vh)) + Ω(up.Name).Should(Equal(name)) + Ω(up.Component).Should(Equal(FederationUpstreamComponent)) + Ω(up.Definition.Uri).Should(Equal(def.Uri)) + Ω(up.Definition.Expires).Should(Equal(def.Expires)) + Ω(up.Definition.MessageTTL).Should(Equal(def.MessageTTL)) + Ω(up.Definition.MaxHops).Should(Equal(def.MaxHops)) + Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) + Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) + Ω(up.Definition.AckMode).Should(Equal(def.AckMode)) + Ω(up.Definition.TrustUserId).Should(Equal(def.TrustUserId)) + Ω(up.Definition.Exchange).Should(Equal(def.Exchange)) + Ω(up.Definition.Queue).Should(Equal(def.Queue)) + + _, err = rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + }) + }) + + Context("when the upstream exists", func() { + It("updates the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + // create the upstream + def := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 1000, + ReconnectDelay: 1, + AckMode: "on-confirm", + TrustUserId: false, + } + + _, err := rmqc.PutFederationUpstream(vh, name, def) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + up, err := rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(up.Vhost).Should(Equal(vh)) + Ω(up.Name).Should(Equal(name)) + Ω(up.Component).Should(Equal(FederationUpstreamComponent)) + Ω(up.Definition.Uri).Should(Equal(def.Uri)) + Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount)) + Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay)) + Ω(up.Definition.AckMode).Should(Equal(def.AckMode)) + Ω(up.Definition.TrustUserId).Should(Equal(def.TrustUserId)) + + // update the upstream + def2 := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + PrefetchCount: 500, + ReconnectDelay: 10, + AckMode: "no-ack", + TrustUserId: true, + } + + _, err = rmqc.PutFederationUpstream(vh, name, def2) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + up, err = rmqc.GetFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(up.Vhost).Should(Equal(vh)) + Ω(up.Name).Should(Equal(name)) + Ω(up.Component).Should(Equal(FederationUpstreamComponent)) + Ω(up.Definition.Uri).Should(Equal(def2.Uri)) + Ω(up.Definition.PrefetchCount).Should(Equal(def2.PrefetchCount)) + Ω(up.Definition.ReconnectDelay).Should(Equal(def2.ReconnectDelay)) + Ω(up.Definition.AckMode).Should(Equal(def2.AckMode)) + Ω(up.Definition.TrustUserId).Should(Equal(def2.TrustUserId)) + + _, err = rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + }) + }) + + Context("when the upstream definition is bad", func() { + It("returns a 400 error response", func() { + // this is NOT an err, but a HTTP 400 response + resp, err := rmqc.PutFederationUpstream("rabbit/hole", "error", FederationDefinition{}) + Ω(err).Should(BeNil()) + Ω(resp.StatusCode).Should(Equal(400)) + Ω(resp.Status).Should(Equal("400 Bad Request")) + }) + }) + }) + + Context("DELETE /api/parameters/federation-upstream/{vhost}/{name}", func() { + Context("when the upstream does not exist", func() { + It("returns a 404 error response", func() { + vh := "rabbit/hole" + name := "temporary" + + // this is NOT an err, but a HTTP 404 response + resp, err := rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(resp.StatusCode).Should(Equal(404)) + Ω(resp.Status).Should(Equal("404 Not Found")) + }) + }) + + Context("when the upstream exists", func() { + It("deletes the upstream", func() { + vh := "rabbit/hole" + name := "temporary" + + def := FederationDefinition{ + Uri: "amqp://127.0.0.1/%2f", + } + + _, err := rmqc.PutFederationUpstream(vh, name, def) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + resp, err := rmqc.DeleteFederationUpstream(vh, name) + Ω(err).Should(BeNil()) + Ω(resp.Status).Should(HavePrefix("20")) + }) + }) + }) + + Context("PUT /parameters/shovel/{vhost}/{name}", func() { + It("declares a shovel", func() { + vh := "rabbit/hole" + sn := "temporary" + + ssu := "amqp://127.0.0.1/%2f" + sdu := "amqp://127.0.0.1/%2f" + + shovelDefinition := ShovelDefinition{ + SourceURI: ssu, + SourceQueue: "mySourceQueue", + DestinationURI: sdu, + DestinationQueue: "myDestQueue", + AddForwardHeaders: true, + AckMode: "on-confirm", + DeleteAfter: "never"} + + _, err := rmqc.DeclareShovel(vh, sn, shovelDefinition) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + x, err := rmqc.GetShovel(vh, sn) + Ω(err).Should(BeNil()) + Ω(x.Name).Should(Equal(sn)) + Ω(x.Vhost).Should(Equal(vh)) + Ω(x.Component).Should(Equal("shovel")) + Ω(x.Definition.SourceURI).Should(Equal(ssu)) + Ω(x.Definition.SourceQueue).Should(Equal("mySourceQueue")) + Ω(x.Definition.DestinationURI).Should(Equal(sdu)) + Ω(x.Definition.DestinationQueue).Should(Equal("myDestQueue")) + Ω(x.Definition.AddForwardHeaders).Should(Equal(true)) + Ω(x.Definition.AckMode).Should(Equal("on-confirm")) + Ω(x.Definition.DeleteAfter).Should(Equal("never")) + + rmqc.DeleteShovel(vh, sn) + awaitEventPropagation() + x, _ = rmqc.GetShovel(vh, sn) + Ω(x).Should(BeNil()) + }) + }) + + Context("PUT /api/parameters/{component}/{vhost}/{name}", func() { + Context("when the parameter does not exist", func() { + It("creates the parameter", func() { + component := FederationUpstreamComponent + vhost := "rabbit/hole" + name := "temporary" + + pv := RuntimeParameterValue{ + "uri": "amqp://server-name", + "prefetch-count": 500, + "reconnect-delay": 5, + "ack-mode": "on-confirm", + "trust-user-id": false, + } + + _, err := rmqc.PutRuntimeParameter(component, vhost, name, pv) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + p, err := rmqc.GetRuntimeParameter(component, vhost, name) + + Ω(err).Should(BeNil()) + Ω(p.Component).Should(Equal(FederationUpstreamComponent)) + Ω(p.Vhost).Should(Equal(vhost)) + Ω(p.Name).Should(Equal(name)) + + // we need to convert from interface{} + v := p.Value.(map[string]interface{}) + + Ω(v["uri"]).Should(Equal(pv["uri"])) + + Ω(int(v["prefetch-count"].(float64))).Should(Equal(pv["prefetch-count"])) + Ω(int(v["reconnect-delay"].(float64))).Should(Equal(pv["reconnect-delay"])) + + Ω(v["ack-mode"]).Should(Equal(pv["ack-mode"])) + Ω(v["trust-user-id"]).Should(Equal(pv["trust-user-id"])) + + _, err = rmqc.DeleteRuntimeParameter(component, vhost, name) + Ω(err).Should(BeNil()) + }) + }) + }) + + Context("GET /api/parameters", func() { + Context("when there are no runtime parameters", func() { + It("returns an empty response", func() { + list, err := rmqc.ListRuntimeParameters() + Ω(err).Should(BeNil()) + Ω(list).Should(BeEmpty()) + }) + }) + + Context("when there are runtime parameters", func() { + It("returns the list of parameters", func() { + fDef := FederationDefinition{ + Uri: "amqp://server-name/%2f", + } + _, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", fDef) + Ω(err).Should(BeNil()) + + sDef := ShovelDefinition{ + SourceURI: "amqp://127.0.0.1/%2f", + SourceQueue: "mySourceQueue", + DestinationURI: "amqp://127.0.0.1/%2f", + DestinationQueue: "myDestQueue", + AddForwardHeaders: true, + AckMode: "on-confirm", + DeleteAfter: "never", + } + + _, err = rmqc.DeclareShovel("/", "shovel1", sDef) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err := rmqc.ListRuntimeParameters() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(2)) + + _, err = rmqc.DeleteFederationUpstream("rabbit/hole", "upstream1") + Ω(err).Should(BeNil()) + + _, err = rmqc.DeleteShovel("/", "shovel1") + Ω(err).Should(BeNil()) + + awaitEventPropagation() + + list, err = rmqc.ListRuntimeParameters() + Ω(err).Should(BeNil()) + Ω(len(list)).Should(Equal(0)) + }) + }) + }) }) diff --git a/runtime_parameters.go b/runtime_parameters.go new file mode 100644 index 0000000..21f851c --- /dev/null +++ b/runtime_parameters.go @@ -0,0 +1,140 @@ +package rabbithole + +import ( + "encoding/json" + "net/http" + "net/url" +) + +// RuntimeParameter represents a vhost-scoped parameter. +// Value is interface{} to support creating parameters directly from types such as +// FederationUpstream and ShovelInfo. +type RuntimeParameter struct { + Name string `json:"name"` + Vhost string `json:"vhost"` + Component string `json:"component"` + Value interface{} `json:"value"` +} + +// RuntimeParameterValue represents arbitrary parameter data. +type RuntimeParameterValue map[string]interface{} + +// +// GET /api/parameters +// + +// ListRuntimeParameters returns a list of all runtime parameters. +func (c *Client) ListRuntimeParameters() (params []RuntimeParameter, err error) { + req, err := newGETRequest(c, "parameters") + if err != nil { + return []RuntimeParameter{}, err + } + + if err = executeAndParseRequest(c, req, ¶ms); err != nil { + return []RuntimeParameter{}, err + } + + return params, nil +} + +// +// GET /api/parameters/{component} +// + +// ListRuntimeParametersFor returns a list of all runtime parameters for a component in all vhosts. +func (c *Client) ListRuntimeParametersFor(component string) (params []RuntimeParameter, err error) { + req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)) + if err != nil { + return []RuntimeParameter{}, err + } + + if err = executeAndParseRequest(c, req, ¶ms); err != nil { + return []RuntimeParameter{}, err + } + + return params, nil +} + +// +// GET /api/parameters/{component}/{vhost} +// + +// ListRuntimeParametersIn returns a list of all runtime parameters for a component in a vhost. +func (c *Client) ListRuntimeParametersIn(component, vhost string) (p []RuntimeParameter, err error) { + req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)) + if err != nil { + return []RuntimeParameter{}, err + } + + if err = executeAndParseRequest(c, req, &p); err != nil { + return []RuntimeParameter{}, err + } + + return p, nil +} + +// +// GET /api/parameters/{component}/{vhost}/{name} +// + +// GetRuntimeParameter returns information about a runtime parameter. +func (c *Client) GetRuntimeParameter(component, vhost, name string) (p *RuntimeParameter, err error) { + req, err := newGETRequest(c, "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name)) + if err != nil { + return nil, err + } + + if err = executeAndParseRequest(c, req, &p); err != nil { + return nil, err + } + + return p, nil +} + +// +// PUT /api/parameters/{component}/{vhost}/{name} +// + +// PutRuntimeParameter creates or updates a runtime parameter. +func (c *Client) PutRuntimeParameter(component, vhost, name string, value interface{}) (res *http.Response, err error) { + p := RuntimeParameter{ + name, + vhost, + component, + value, + } + + body, err := json.Marshal(p) + if err != nil { + return nil, err + } + + req, err := newRequestWithBody(c, "PUT", "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name), body) + if err != nil { + return nil, err + } + + if res, err = executeRequest(c, req); err != nil { + return nil, err + } + + return res, nil +} + +// +// DELETE /api/parameters/{component}/{vhost}/{name} +// + +// DeleteRuntimeParameter removes a runtime parameter. +func (c *Client) DeleteRuntimeParameter(component, vhost, name string) (res *http.Response, err error) { + req, err := newRequestWithBody(c, "DELETE", "parameters/"+url.PathEscape(component)+"/"+url.PathEscape(vhost)+"/"+url.PathEscape(name), nil) + if err != nil { + return nil, err + } + + if res, err = executeRequest(c, req); err != nil { + return nil, err + } + + return res, nil +}