Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support individual health check endpoints #173

Merged
merged 22 commits into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ RabbitMQ HTTP API has to be [configured to use TLS](http://www.rabbitmq.com/mana
### Getting Overview

``` go
res, err := rmqc.Overview()
resp, err := rmqc.Overview()
```


### Node and Cluster Status

``` go
Expand Down
201 changes: 201 additions & 0 deletions health_checks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package rabbithole

import "strconv"

type TimeUnit string

const (
SECONDS TimeUnit = "seconds"
DAYS TimeUnit = "days"
MONTHS TimeUnit = "months"
YEARS TimeUnit = "years"
)

type Protocol string

const (
AMQP091 Protocol = "amqp091"
AMQP10 Protocol = "amqp10"
MQTT Protocol = "mqtt"
STOMP Protocol = "stomp"
WEB_MQTT Protocol = "web-mqtt"
WEB_STOMP Protocol = "web-stomp"
)

// Health represents response from healthchecks endpoint
type Health struct {
michaelklishin marked this conversation as resolved.
Show resolved Hide resolved
Status string `json:"status"`
Reason string `json:"reason"`
Missing string `json:"missing"`
Ports []string `json:"ports"`
Protocols []string `json:"protocols"`
}

// Responds a 200 OK if there are no alarms in effect in the cluster, otherwise responds with a 503 Service Unavailable.
michaelklishin marked this conversation as resolved.
Show resolved Hide resolved
func (c *Client) HealthCheckAlarms() (rec *Health, err error) {
req, err := newGETRequest(c, "health/checks/alarms")
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// Responds a 200 OK if there are no local alarms in effect on the target node, otherwise responds with a 503 Service Unavailable.
func (c *Client) HealthCheckLocalAlarms() (rec *Health, err error) {
req, err := newGETRequest(c, "health/checks/local-alarms")
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// Checks the expiration date on the certificates for every listener configured to use TLS.
// Responds a 200 OK if all certificates are valid (have not expired), otherwise responds with a 503 Service Unavailable.
// Valid units: days, weeks, months, years. The value of the within argument is the number of units.
// So, when within is 2 and unit is "months", the expiration period used by the check will be the next two months.
func (c *Client) HealthCheckCertificateExpiration(within uint, unit TimeUnit) (rec *Health, err error) {
req, err := newGETRequest(c, "health/checks/certificate-expiration/"+strconv.Itoa(int(within))+"/"+string(unit))
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// Responds a 200 OK if there is an active listener on the give port, otherwise responds with a 503 Service Unavailable.
func (c *Client) HealthCheckPortListenerListener(port uint) (rec *Health, err error) {
req, err := newGETRequest(c, "health/checks/port-listener/"+strconv.Itoa(int(port)))
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// Responds a 200 OK if there is an active listener for the given protocol, otherwise responds with a 503 Service Unavailable.
// Valid protocol names are: amqp091, amqp10, mqtt, stomp, web-mqtt, web-stomp.
func (c *Client) HealthCheckProtocolListener(protocol Protocol) (rec *Health, err error) {
req, err := newGETRequest(c, "health/checks/protocol-listener/"+string(protocol))
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// Responds a 200 OK if all virtual hosts and running on the target node, otherwise responds with a 503 Service Unavailable.
func (c *Client) HealthCheckVirtualHosts() (rec *Health, err error) {
req, err := newGETRequest(c, "health/checks/virtual-hosts")
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// Checks if there are classic mirrored queues without synchronised mirrors online (queues that would potentially lose data if the target node is shut down).
// Responds a 200 OK if there are no such classic mirrored queues, otherwise responds with a 503 Service Unavailable.
func (c *Client) HealthCheckNodeIsMirrorSyncCritical() (rec *Health, err error) {
req, err := newGETRequest(c, "health/checks/node-is-mirror-sync-critical")
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// Checks if there are quorum queues with minimum online quorum (queues that would lose their quorum and availability if the target node is shut down).
// Responds a 200 OK if there are no such quorum queues, otherwise responds with a 503 Service Unavailable.
func (c *Client) HealthCheckNodeIsQuorumCritical() (rec *Health, err error) {
req, err := newGETRequest(c, "health/checks/node-is-quorum-critical")
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// Deprecated health check api
michaelklishin marked this conversation as resolved.
Show resolved Hide resolved

// HealthChecks endpoint checks if the application is running,
// channels and queues can be listed, and that no alarms are raised
func (c *Client) HealthCheck() (rec *Health, err error) {
req, err := newGETRequest(c, "healthchecks/node")
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// HealthChecks endpoint checks for a given node if the application is running,
// channels and queues can be listed, and that no alarms are raised
func (c *Client) HealthCheckFor(node string) (rec *Health, err error) {
req, err := newGETRequest(c, "healthchecks/node/"+node)
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}

// Aliveness represents response from aliveness-test endpoint
type Aliveness struct {
Status string `json:"status"`
}

// Aliveness endpoint declares a test queue, then publishes a message and consumes a message
func (c *Client) Aliveness(vhost string) (rec *Aliveness, err error) {
req, err := newGETRequest(c, "aliveness-test/"+vhost)
if err != nil {
return nil, err
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return nil, err
}

return rec, nil
}
166 changes: 166 additions & 0 deletions rabbithole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,172 @@ var _ = Describe("Rabbithole", func() {
rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest")
})

Context("GET /health/checks/alarms", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

res, err := rmqc.HealthCheckAlarms()
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))
})
})

Context("GET /health/checks/local-alarms", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

res, err := rmqc.HealthCheckLocalAlarms()
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))
})
})

Context("GET /health/checks/certificate-expiration/1/days", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

res, err := rmqc.HealthCheckCertificateExpiration(1, DAYS)
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))
})
})

Context("GET /health/checks/port-listener/5672", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

res, err := rmqc.HealthCheckPortListenerListener(5672)
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))
})
})

Context("GET /health/checks/protocol-listener/amqp091", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

res, err := rmqc.HealthCheckProtocolListener(AMQP091)
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))
})
})

Context("GET /health/checks/virtual-hosts", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

res, err := rmqc.HealthCheckVirtualHosts()
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))
})
})

Context("GET /health/checks/node-is-mirror-sync-critical", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

res, err := rmqc.HealthCheckNodeIsMirrorSyncCritical()
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))
})
})

Context("GET /health/checks/node-is-quorum-critical", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

res, err := rmqc.HealthCheckNodeIsMirrorSyncCritical()
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))
})
})

Context("GET /aliveness-test/%2F", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())

ensureNonZeroMessageRate(ch)

res, err := rmqc.Aliveness("%2F")
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))

ch.Close()
})
})

Context("GET /healthchecks/nodes", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())

ensureNonZeroMessageRate(ch)

res, err := rmqc.HealthCheck()
Ω(err).Should(BeNil())

Ω(res.Status).Should(Equal("ok"))

ch.Close()
})
})

Context("GET /overview", func() {
It("returns decoded response", func() {
conn := openConnection("/")
Expand Down