Skip to content

Commit

Permalink
Merge pull request #193 from michaelklishin/federation-uri-list
Browse files Browse the repository at this point in the history
Refactor FederationDefinition.Uri to []string
  • Loading branch information
michaelklishin authored May 28, 2021
2 parents 6c3e263 + 2c22e58 commit f0e3fbc
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 113 deletions.
91 changes: 19 additions & 72 deletions federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package rabbithole

import (
"net/http"
"net/url"
)

// FederationDefinition represents settings
// that will be used by federation links.
type FederationDefinition struct {
Uri string `json:"uri"`
Uri []string `json:"uri"`
Expires int `json:"expires,omitempty"`
MessageTTL int32 `json:"message-ttl"`
MaxHops int `json:"max-hops,omitempty"`
Expand Down Expand Up @@ -37,15 +38,15 @@ const FederationUpstreamComponent string = "federation-upstream"

// ListFederationUpstreams returns a list of all federation upstreams.
func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) {
params, err := c.ListRuntimeParametersFor(FederationUpstreamComponent)
req, err := newGETRequest(c, "parameters/" + FederationUpstreamComponent)
if err != nil {
return nil, err
return []FederationUpstream{}, err
}

for _, p := range params {
up := paramToUpstream(&p)
ups = append(ups, *up)
if err = executeAndParseRequest(c, req, &ups); err != nil {
return []FederationUpstream{}, err
}

return ups, nil
}

Expand All @@ -55,15 +56,15 @@ func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error)

// ListFederationUpstreamsIn returns a list of all federation upstreams in a vhost.
func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstream, err error) {
params, err := c.ListRuntimeParametersIn(FederationUpstreamComponent, vhost)
req, err := newGETRequest(c, "parameters/" + FederationUpstreamComponent + "/" + url.PathEscape(vhost))
if err != nil {
return nil, err
return []FederationUpstream{}, err
}

for _, p := range params {
up := paramToUpstream(&p)
ups = append(ups, *up)
if err = executeAndParseRequest(c, req, &ups); err != nil {
return []FederationUpstream{}, err
}

return ups, nil
}

Expand All @@ -73,11 +74,16 @@ func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstre

// GetFederationUpstream returns information about a federation upstream.
func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstream, err error) {
p, err := c.GetRuntimeParameter(FederationUpstreamComponent, vhost, name)
req, err := newGETRequest(c, "parameters/" + FederationUpstreamComponent + "/" +url.PathEscape(vhost)+ "/" +url.PathEscape(name))
if err != nil {
return nil, err
}
return paramToUpstream(p), nil

if err = executeAndParseRequest(c, req, &up); err != nil {
return nil, err
}

return up, nil
}

//
Expand All @@ -97,62 +103,3 @@ func (c *Client) PutFederationUpstream(vhost, name string, def FederationDefinit
func (c *Client) DeleteFederationUpstream(vhost, name string) (res *http.Response, err error) {
return c.DeleteRuntimeParameter(FederationUpstreamComponent, vhost, name)
}

// paramToUpstream maps from a RuntimeParameter structure to a FederationUpstream structure.
func paramToUpstream(p *RuntimeParameter) (up *FederationUpstream) {
up = &FederationUpstream{
Name: p.Name,
Vhost: p.Vhost,
Component: p.Component,
}

m, ok := p.Value.(map[string]interface{})
if !ok {
return up
}

def := FederationDefinition{}

if v, ok := m["uri"].(string); ok {
def.Uri = v
}

if v, ok := m["expires"].(float64); ok {
def.Expires = int(v)
}

if v, ok := m["message-ttl"].(float64); ok {
def.MessageTTL = int32(v)
}

if v, ok := m["max-hops"].(float64); ok {
def.MaxHops = int(v)
}

if v, ok := m["prefetch-count"].(float64); ok {
def.PrefetchCount = int(v)
}

if v, ok := m["reconnect-delay"].(float64); ok {
def.ReconnectDelay = int(v)
}

if v, ok := m["ack-mode"].(string); ok {
def.AckMode = v
}

if v, ok := m["trust-user-id"].(bool); ok {
def.TrustUserId = v
}

if v, ok := m["exchange"].(string); ok {
def.Exchange = v
}

if v, ok := m["queue"].(string); ok {
def.Queue = v
}

up.Definition = def
return up
}
69 changes: 28 additions & 41 deletions rabbithole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2260,13 +2260,13 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Context("when there are upstreams", func() {
It("returns the list of upstreams", func() {
def1 := FederationDefinition{
Uri: "amqp://server-name/%2f",
Uri: []string{"amqp://server-name/%2f"},
}
_, err := rmqc.PutFederationUpstream("rabbit/hole", "upstream1", def1)
Ω(err).Should(BeNil())

def2 := FederationDefinition{
Uri: "amqp://example.com/%2f",
Uri: []string{"amqp://example.com/%2f"},
}
_, err = rmqc.PutFederationUpstream("/", "upstream2", def2)
Ω(err).Should(BeNil())
Expand Down Expand Up @@ -2306,17 +2306,17 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
vh := "rabbit/hole"

def1 := FederationDefinition{
Uri: "amqp://server-name/%2f",
Uri: []string{"amqp://server-name/%2f"},
}

_, err := rmqc.PutFederationUpstream(vh, "upstream1", def1)
_, err := rmqc.PutFederationUpstream(vh, "vhost-upstream1", def1)
Ω(err).Should(BeNil())

def2 := FederationDefinition{
Uri: "amqp://example.com/%2f",
Uri: []string{"amqp://example.com/%2f"},
}

_, err = rmqc.PutFederationUpstream(vh, "upstream2", def2)
_, err = rmqc.PutFederationUpstream(vh, "vhost-upstream2", def2)
Ω(err).Should(BeNil())

awaitEventPropagation()
Expand All @@ -2325,8 +2325,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(err).Should(BeNil())
Ω(len(list)).Should(Equal(2))

// delete upstream1
_, err = rmqc.DeleteFederationUpstream(vh, "upstream1")
// delete vhost-upstream1
_, err = rmqc.DeleteFederationUpstream(vh, "vhost-upstream1")
Ω(err).Should(BeNil())

awaitEventPropagation()
Expand All @@ -2335,8 +2335,8 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(err).Should(BeNil())
Ω(len(list)).Should(Equal(1))

// delete upstream2
_, err = rmqc.DeleteFederationUpstream(vh, "upstream2")
// delete vhost-upstream2
_, err = rmqc.DeleteFederationUpstream(vh, "vhost-upstream2")
Ω(err).Should(BeNil())

awaitEventPropagation()
Expand All @@ -2352,7 +2352,7 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Context("when the upstream does not exist", func() {
It("returns a 404 error", func() {
vh := "rabbit/hole"
name := "temporary"
name := "non-existing-upstream"

up, err := rmqc.GetFederationUpstream(vh, name)
Ω(up).Should(BeNil())
Expand All @@ -2363,10 +2363,10 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Context("when the upstream exists", func() {
It("returns the upstream", func() {
vh := "rabbit/hole"
name := "temporary"
name := "valid-upstream"

def := FederationDefinition{
Uri: "amqp://127.0.0.1/%2f",
Uri: []string{"amqp://127.0.0.1/%2f"},
PrefetchCount: 1000,
ReconnectDelay: 1,
AckMode: "on-confirm",
Expand All @@ -2383,7 +2383,7 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(up.Vhost).Should(Equal(vh))
Ω(up.Name).Should(Equal(name))
Ω(up.Component).Should(Equal(FederationUpstreamComponent))
Ω(up.Definition.Uri).Should(Equal(def.Uri))
Ω(up.Definition.Uri).Should(ConsistOf(def.Uri))
Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount))
Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay))
Ω(up.Definition.AckMode).Should(Equal(def.AckMode))
Expand All @@ -2399,14 +2399,14 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Context("when the upstream does not exist", func() {
It("creates the upstream", func() {
vh := "rabbit/hole"
name := "temporary"
name := "create-upstream"

up, err := rmqc.GetFederationUpstream(vh, name)
Ω(up).Should(BeNil())
Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"}))

def := FederationDefinition{
Uri: "amqp://127.0.0.1/%2f",
Uri: []string{"amqp://127.0.0.1/%2f"},
Expires: 1800000,
MessageTTL: 360000,
MaxHops: 1,
Expand All @@ -2428,7 +2428,7 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(up.Vhost).Should(Equal(vh))
Ω(up.Name).Should(Equal(name))
Ω(up.Component).Should(Equal(FederationUpstreamComponent))
Ω(up.Definition.Uri).Should(Equal(def.Uri))
Ω(up.Definition.Uri).Should(ConsistOf(def.Uri))
Ω(up.Definition.Expires).Should(Equal(def.Expires))
Ω(up.Definition.MessageTTL).Should(Equal(def.MessageTTL))
Ω(up.Definition.MaxHops).Should(Equal(def.MaxHops))
Expand All @@ -2447,11 +2447,11 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Context("when the upstream exists", func() {
It("updates the upstream", func() {
vh := "rabbit/hole"
name := "temporary"
name := "update-upstream"

// create the upstream
def := FederationDefinition{
Uri: "amqp://127.0.0.1/%2f",
Uri: []string{"amqp://127.0.0.1/%2f"},
PrefetchCount: 1000,
ReconnectDelay: 1,
AckMode: "on-confirm",
Expand All @@ -2468,15 +2468,15 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(up.Vhost).Should(Equal(vh))
Ω(up.Name).Should(Equal(name))
Ω(up.Component).Should(Equal(FederationUpstreamComponent))
Ω(up.Definition.Uri).Should(Equal(def.Uri))
Ω(up.Definition.Uri).Should(ConsistOf(def.Uri))
Ω(up.Definition.PrefetchCount).Should(Equal(def.PrefetchCount))
Ω(up.Definition.ReconnectDelay).Should(Equal(def.ReconnectDelay))
Ω(up.Definition.AckMode).Should(Equal(def.AckMode))
Ω(up.Definition.TrustUserId).Should(Equal(def.TrustUserId))

// update the upstream
def2 := FederationDefinition{
Uri: "amqp://127.0.0.1/%2f",
Uri: []string{"amqp://128.0.0.1/%2f", "amqp://128.0.0.7/%2f"},
PrefetchCount: 500,
ReconnectDelay: 10,
AckMode: "no-ack",
Expand All @@ -2493,7 +2493,7 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(up.Vhost).Should(Equal(vh))
Ω(up.Name).Should(Equal(name))
Ω(up.Component).Should(Equal(FederationUpstreamComponent))
Ω(up.Definition.Uri).Should(Equal(def2.Uri))
Ω(up.Definition.Uri).Should(ConsistOf(def2.Uri))
Ω(up.Definition.PrefetchCount).Should(Equal(def2.PrefetchCount))
Ω(up.Definition.ReconnectDelay).Should(Equal(def2.ReconnectDelay))
Ω(up.Definition.AckMode).Should(Equal(def2.AckMode))
Expand Down Expand Up @@ -2523,7 +2523,7 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Context("when the upstream does not exist", func() {
It("returns a 404 error response", func() {
vh := "rabbit/hole"
name := "temporary"
name := "non-existing-upstream"

// an error is not returned by design
resp, err := rmqc.DeleteFederationUpstream(vh, name)
Expand All @@ -2535,10 +2535,10 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Context("when the upstream exists", func() {
It("deletes the upstream", func() {
vh := "rabbit/hole"
name := "temporary"
name := "delete-upstream"

def := FederationDefinition{
Uri: "amqp://127.0.0.1/%2f",
Uri: []string{"amqp://127.0.0.1/%2f"},
}

_, err := rmqc.PutFederationUpstream(vh, name, def)
Expand Down Expand Up @@ -2591,7 +2591,7 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
// create upstream
upstreamName := "myUpsteam"
def := FederationDefinition{
Uri: "amqp://localhost/%2f",
Uri: []string{"amqp://localhost/%2f"},
Expires: 1800000,
}

Expand Down Expand Up @@ -2623,7 +2623,7 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(link["upstream"]).Should(Equal(upstreamName))
Ω(link["type"]).Should(Equal("exchange"))
Ω(link["exchange"]).Should(Equal("amq.topic"))
Ω(link["uri"]).Should(Equal(def.Uri))
Ω(link["uri"]).Should(Equal(def.Uri[0]))
Ω(link["status"]).Should(Equal("running"))

// cleanup
Expand Down Expand Up @@ -2889,7 +2889,7 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(err).Should(BeNil())

fDef := FederationDefinition{
Uri: "amqp://server-name/%2f",
Uri: []string{"amqp://server-name/%2f"},
}
_, err = rmqc.PutFederationUpstream("rabbit/hole", "upstream1", fDef)
Ω(err).Should(BeNil())
Expand Down Expand Up @@ -2934,19 +2934,6 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
})
})

Context("paramToUpstream", func() {
Context("when the parameter value is not initialized", func() {
It("returns an empty FederationUpstream", func() {
p := RuntimeParameter{} // p.Value is interface{}
up := paramToUpstream(&p)
Ω(up.Name).Should(BeEmpty())
Ω(up.Vhost).Should(BeEmpty())
Ω(up.Component).Should(BeEmpty())
Ω(up.Definition).Should(Equal(FederationDefinition{}))
})
})
})

Context("feature flags", func() {
It("lists and enables feature flags", func() {
By("GET /feature-flags")
Expand Down

0 comments on commit f0e3fbc

Please sign in to comment.