Skip to content

Commit

Permalink
Merge pull request #150 from niclic/federation-additions
Browse files Browse the repository at this point in the history
API for runtime parameters, e.g. for configuring federation upstream components.
  • Loading branch information
michaelklishin authored May 6, 2020
2 parents fb19cc2 + 0259cac commit f408e4a
Show file tree
Hide file tree
Showing 7 changed files with 793 additions and 61 deletions.
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,64 @@ resp, err := rmqc.DeleteShovel("/", "a.shovel")

```

### Operations on Runtime (vhost-scoped) Parameters

```golang
// list all runtime parameters
params, err := rmqc.ListRuntimeParameters()
// => []RuntimeParameter, error

// list all runtime parameters for a component
params, err := rmqc.ListRuntimeParametersFor("federation-upstream")
// => []RuntimeParameter, error

// list runtime parameters in a vhost
params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/")
// => []RuntimeParameter, error

// information about a runtime parameter
p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name")
// => *RuntimeParameter, error

// declare or update a runtime parameter
resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{
Uri: "amqp://server-name",
})
// => *http.Response, error

// remove a runtime parameter
resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name")
// => *http.Response, error

```

### Operations on Federation Upstreams

```golang
// list all federation upstreams
ups, err := rmqc.ListFederationUpstreams()
// => []FederationUpstream, error

// list federation upstreams in a vhost
ups, err := rmqc.ListFederationUpstreamsIn("/")
// => []FederationUpstream, error

// information about a federated upstream
up, err := rmqc.GetFederationUpstream("/", "name")
// => *FederationUpstream, error

// declare or update a federation upstream
resp, err := rmqc.PutFederationUpstream("/", "name", FederationDefinition{
Uri: "amqp://server-name",
})
// => *http.Response, error

// delete an upstream
resp, err := rmqc.DeleteFederationUpstream("/", "name")
// => *http.Response, error

```

### Operations on cluster name
``` go
// Get cluster name
Expand Down
4 changes: 4 additions & 0 deletions bin/ci/before_build.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ call %RABBITHOLE_RABBITMQCTL% set_permissions -p "rabbit/hole" guest ".*" ".*" "
REM Enable shovel plugin
call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_shovel
call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_shovel_management

REM Enable shovel plugin
call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_federation
call %RABBITHOLE_RABBITMQ_PLUGINS% enable rabbitmq_federation_management
4 changes: 4 additions & 0 deletions bin/ci/before_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ $CTL set_cluster_name rabbitmq@localhost
# Enable shovel plugin
$PLUGINS enable rabbitmq_shovel
$PLUGINS enable rabbitmq_shovel_management

# Enable federation plugin
$PLUGINS enable rabbitmq_federation
$PLUGINS enable rabbitmq_federation_management
52 changes: 52 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,58 @@ Managing Topic Permissions
resp, err := rmqc.DeleteTopicPermissionsIn("/", "my.user", "exchange")
// => *http.Response, err
Managing Runtime Parameters
// list all runtime parameters
params, err := rmqc.ListRuntimeParameters()
// => []RuntimeParameter, error
// list all runtime parameters for a component
params, err := rmqc.ListRuntimeParametersFor("federation-upstream")
// => []RuntimeParameter, error
// list runtime parameters in a vhost
params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/")
// => []RuntimeParameter, error
// information about a runtime parameter
p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name")
// => *RuntimeParameter, error
// declare or update a runtime parameter
resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{
Uri: "amqp://server-name",
})
// => *http.Response, error
// remove a runtime parameter
resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name")
// => *http.Response, error
Managing Federation Upstreams
// list all federation upstreams
ups, err := rmqc.ListFederationUpstreams()
// => []FederationUpstream, error
// list federation upstreams in a vhost
ups, err := rmqc.ListFederationUpstreamsIn("/")
// => []FederationUpstream, error
// information about a federated upstream
up, err := rmqc.GetFederationUpstream("/", "upstream-name")
// => *FederationUpstream, error
// declare or update a federation upstream
resp, err := rmqc.PutFederationUpstream("/", "upstream-name", FederationDefinition{
Uri: "amqp://server-name",
})
// => *http.Response, error
// delete an upstream
resp, err := rmqc.DeleteFederationUpstream("/", "upstream-name")
// => *http.Response, error
Operations on cluster name
// Get cluster name
cn, err := rmqc.GetClusterName()
Expand Down
121 changes: 101 additions & 20 deletions federation.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package rabbithole

import (
"encoding/json"
"net/http"
"net/url"
)

// Federation definition: additional arguments
Expand All @@ -24,49 +22,132 @@ type FederationDefinition struct {

// Represents a configured Federation upstream.
type FederationUpstream struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Component string `json:"component"`
Definition FederationDefinition `json:"value"`
}

const FederationUpstreamComponent string = "federation-upstream"

//
// PUT /api/parameters/federation-upstream/{vhost}/{upstream}
// GET /api/parameters/federation-upstream
//

// Updates a federation upstream
func (c *Client) PutFederationUpstream(vhost string, upstreamName string, fDef FederationDefinition) (res *http.Response, err error) {
fedUp := FederationUpstream{
Definition: fDef,
}
body, err := json.Marshal(fedUp)
// ListFederationUpstreams returns a list of all federation upstreams.
func (c *Client) ListFederationUpstreams() (ups []FederationUpstream, err error) {
params, err := c.ListRuntimeParametersFor(FederationUpstreamComponent)
if err != nil {
return nil, err
}

req, err := newRequestWithBody(c, "PUT", "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName), body)
for _, p := range params {
up := paramToUpstream(&p)
ups = append(ups, *up)
}
return ups, nil
}

//
// GET /api/parameters/federation-upstream/{vhost}
//

// ListFederationUpstreamsIn returns a list of all federation upstreams in a vhost.
func (c *Client) ListFederationUpstreamsIn(vhost string) (ups []FederationUpstream, err error) {
params, err := c.ListRuntimeParametersIn(FederationUpstreamComponent, vhost)
if err != nil {
return nil, err
}

if res, err = executeRequest(c, req); err != nil {
for _, p := range params {
up := paramToUpstream(&p)
ups = append(ups, *up)
}
return ups, nil
}

//
// GET /api/parameters/federation-upstream/{vhost}/{upstream}
//

// GetFederationUpstream returns information about a federation upstream.
func (c *Client) GetFederationUpstream(vhost, name string) (up *FederationUpstream, err error) {
p, err := c.GetRuntimeParameter(FederationUpstreamComponent, vhost, name)
if err != nil {
return nil, err
}
return paramToUpstream(p), nil
}

return res, nil
//
// PUT /api/parameters/federation-upstream/{vhost}/{upstream}
//

// PutFederationUpstream creates or updates a federation upstream configuration.
func (c *Client) PutFederationUpstream(vhost, name string, def FederationDefinition) (res *http.Response, err error) {
return c.PutRuntimeParameter(FederationUpstreamComponent, vhost, name, def)
}

//
// DELETE /api/parameters/federation-upstream/{vhost}/{name}
//

// Deletes a federation upstream.
func (c *Client) DeleteFederationUpstream(vhost, upstreamName string) (res *http.Response, err error) {
req, err := newRequestWithBody(c, "DELETE", "parameters/federation-upstream/"+url.PathEscape(vhost)+"/"+url.PathEscape(upstreamName), nil)
if err != nil {
return nil, err
// DeleteFederationUpstream removes a federation upstream.
func (c *Client) DeleteFederationUpstream(vhost, name string) (res *http.Response, err error) {
return c.DeleteRuntimeParameter(FederationUpstreamComponent, vhost, name)
}

// paramToUpstream maps from a RuntimeParameter structure to a FederationUpstream structure.
func paramToUpstream(p *RuntimeParameter) (up *FederationUpstream) {
up = &FederationUpstream{
Name: p.Name,
Vhost: p.Vhost,
Component: p.Component,
}

if res, err = executeRequest(c, req); err != nil {
return nil, err
def := FederationDefinition{}
m := p.Value.(map[string]interface{})

if v, ok := m["uri"].(string); ok {
def.Uri = v
}

if v, ok := m["expires"].(float64); ok {
def.Expires = int(v)
}

if v, ok := m["message-ttl"].(float64); ok {
def.MessageTTL = int32(v)
}

if v, ok := m["max-hops"].(float64); ok {
def.MaxHops = int(v)
}

if v, ok := m["prefetch-count"].(float64); ok {
def.PrefetchCount = int(v)
}

if v, ok := m["reconnect-delay"].(float64); ok {
def.ReconnectDelay = int(v)
}

if v, ok := m["ack-mode"].(string); ok {
def.AckMode = v
}

if v, ok := m["trust-user-id"].(bool); ok {
def.TrustUserId = v
}

if v, ok := m["exchange"].(string); ok {
def.Exchange = v
}

if v, ok := m["queue"].(string); ok {
def.Queue = v
}

return res, nil
up.Definition = def
return up
}
Loading

0 comments on commit f408e4a

Please sign in to comment.