Skip to content

Commit

Permalink
Merge pull request #8184 from hashicorp/f-multiregion_oss
Browse files Browse the repository at this point in the history
Multiregion Deployments (OSS integration)
  • Loading branch information
tgross authored Jun 17, 2020
2 parents 2cbfc83 + 968713b commit d49eab8
Show file tree
Hide file tree
Showing 39 changed files with 2,021 additions and 56 deletions.
22 changes: 22 additions & 0 deletions api/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ func (d *Deployments) PromoteGroups(deploymentID string, groups []string, q *Wri
return &resp, wm, nil
}

// Unblock is used to unblock the given deployment.
func (d *Deployments) Unblock(deploymentID string, q *WriteOptions) (*DeploymentUpdateResponse, *WriteMeta, error) {
var resp DeploymentUpdateResponse
req := &DeploymentUnblockRequest{
DeploymentID: deploymentID,
}
wm, err := d.client.write("/v1/deployment/unblock/"+deploymentID, req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}

// SetAllocHealth is used to set allocation health for allocs that are part of
// the given deployment
func (d *Deployments) SetAllocHealth(deploymentID string, healthy, unhealthy []string, q *WriteOptions) (*DeploymentUpdateResponse, *WriteMeta, error) {
Expand Down Expand Up @@ -150,6 +163,9 @@ type Deployment struct {
// present the correct list of deployments for the job and not old ones.
JobCreateIndex uint64

// IsMultiregion specifies if this deployment is part of a multi-region deployment
IsMultiregion bool

// TaskGroups is the set of task groups effected by the deployment and their
// current deployment status.
TaskGroups map[string]*DeploymentState
Expand Down Expand Up @@ -257,6 +273,12 @@ type DeploymentFailRequest struct {
WriteRequest
}

// DeploymentUnblockRequest is used to unblock a particular deployment
type DeploymentUnblockRequest struct {
DeploymentID string
WriteRequest
}

// SingleDeploymentResponse is used to respond with a single deployment
type SingleDeploymentResponse struct {
Deployment *Deployment
Expand Down
81 changes: 81 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,78 @@ func (u *UpdateStrategy) Empty() bool {
return true
}

type Multiregion struct {
Strategy *MultiregionStrategy
Regions []*MultiregionRegion
}

func (m *Multiregion) Canonicalize() {
if m.Strategy == nil {
m.Strategy = &MultiregionStrategy{
MaxParallel: intToPtr(0),
OnFailure: stringToPtr(""),
}
} else {
if m.Strategy.MaxParallel == nil {
m.Strategy.MaxParallel = intToPtr(0)
}
if m.Strategy.OnFailure == nil {
m.Strategy.OnFailure = stringToPtr("")
}
}
if m.Regions == nil {
m.Regions = []*MultiregionRegion{}
}
for _, region := range m.Regions {
if region.Count == nil {
region.Count = intToPtr(1)
}
if region.Datacenters == nil {
region.Datacenters = []string{}
}
if region.Meta == nil {
region.Meta = map[string]string{}
}
}
}

func (m *Multiregion) Copy() *Multiregion {
if m == nil {
return nil
}
copy := new(Multiregion)
if m.Strategy != nil {
copy.Strategy = new(MultiregionStrategy)
copy.Strategy.MaxParallel = intToPtr(*m.Strategy.MaxParallel)
copy.Strategy.OnFailure = stringToPtr(*m.Strategy.OnFailure)
}
for _, region := range m.Regions {
copyRegion := new(MultiregionRegion)
copyRegion.Name = region.Name
copyRegion.Count = intToPtr(*region.Count)
for _, dc := range region.Datacenters {
copyRegion.Datacenters = append(copyRegion.Datacenters, dc)
}
for k, v := range region.Meta {
copyRegion.Meta[k] = v
}
copy.Regions = append(copy.Regions, copyRegion)
}
return copy
}

type MultiregionStrategy struct {
MaxParallel *int `mapstructure:"max_parallel"`
OnFailure *string `mapstructure:"on_failure"`
}

type MultiregionRegion struct {
Name string
Count *int
Datacenters []string
Meta map[string]string
}

// PeriodicConfig is for serializing periodic config for a job.
type PeriodicConfig struct {
Enabled *bool
Expand Down Expand Up @@ -711,6 +783,7 @@ type Job struct {
Affinities []*Affinity
TaskGroups []*TaskGroup
Update *UpdateStrategy
Multiregion *Multiregion
Spreads []*Spread
Periodic *PeriodicConfig
ParameterizedJob *ParameterizedJobConfig
Expand Down Expand Up @@ -741,6 +814,11 @@ func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil && !j.Dispatched
}

// IsMultiregion returns whether a job is a multiregion job
func (j *Job) IsMultiregion() bool {
return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0
}

func (j *Job) Canonicalize() {
if j.ID == nil {
j.ID = stringToPtr("")
Expand Down Expand Up @@ -807,6 +885,9 @@ func (j *Job) Canonicalize() {
} else if *j.Type == JobTypeService {
j.Update = DefaultUpdateStrategy()
}
if j.Multiregion != nil {
j.Multiregion.Canonicalize()
}

for _, tg := range j.TaskGroups {
tg.Canonicalize(j)
Expand Down
62 changes: 62 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,68 @@ func TestJobs_Canonicalize(t *testing.T) {
},
},
},

{
name: "multiregion",
input: &Job{
Name: stringToPtr("foo"),
ID: stringToPtr("bar"),
ParentID: stringToPtr("lol"),
Multiregion: &Multiregion{
Regions: []*MultiregionRegion{
{
Name: "west",
Count: intToPtr(1),
},
},
},
},
expected: &Job{
Multiregion: &Multiregion{
Strategy: &MultiregionStrategy{
MaxParallel: intToPtr(0),
OnFailure: stringToPtr(""),
},
Regions: []*MultiregionRegion{
{
Name: "west",
Count: intToPtr(1),
Datacenters: []string{},
Meta: map[string]string{},
},
},
},
Namespace: stringToPtr(DefaultNamespace),
ID: stringToPtr("bar"),
Name: stringToPtr("foo"),
Region: stringToPtr("global"),
Type: stringToPtr("service"),
ParentID: stringToPtr("lol"),
Priority: intToPtr(50),
AllAtOnce: boolToPtr(false),
ConsulToken: stringToPtr(""),
VaultToken: stringToPtr(""),
Stop: boolToPtr(false),
Stable: boolToPtr(false),
Version: uint64ToPtr(0),
Status: stringToPtr(""),
StatusDescription: stringToPtr(""),
CreateIndex: uint64ToPtr(0),
ModifyIndex: uint64ToPtr(0),
JobModifyIndex: uint64ToPtr(0),
Update: &UpdateStrategy{
Stagger: timeToPtr(30 * time.Second),
MaxParallel: intToPtr(1),
HealthCheck: stringToPtr("checks"),
MinHealthyTime: timeToPtr(10 * time.Second),
HealthyDeadline: timeToPtr(5 * time.Minute),
ProgressDeadline: timeToPtr(10 * time.Minute),
AutoRevert: boolToPtr(false),
Canary: intToPtr(0),
AutoPromote: boolToPtr(false),
},
},
},
}

for _, tc := range testCases {
Expand Down
76 changes: 52 additions & 24 deletions command/agent/deployment_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

func (s *HTTPServer) DeploymentsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

args := structs.DeploymentListRequest{}
Expand Down Expand Up @@ -47,15 +47,18 @@ func (s *HTTPServer) DeploymentSpecificRequest(resp http.ResponseWriter, req *ht
case strings.HasPrefix(path, "allocation-health/"):
deploymentID := strings.TrimPrefix(path, "allocation-health/")
return s.deploymentSetAllocHealth(resp, req, deploymentID)
case strings.HasPrefix(path, "unblock/"):
deploymentID := strings.TrimPrefix(path, "unblock/")
return s.deploymentUnblock(resp, req, deploymentID)
default:
return s.deploymentQuery(resp, req, path)
}
}

// TODO test and api
func (s *HTTPServer) deploymentFail(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
args := structs.DeploymentFailRequest{
DeploymentID: deploymentID,
Expand All @@ -71,19 +74,19 @@ func (s *HTTPServer) deploymentFail(resp http.ResponseWriter, req *http.Request,
}

func (s *HTTPServer) deploymentPause(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

var pauseRequest structs.DeploymentPauseRequest
if err := decodeBody(req, &pauseRequest); err != nil {
return nil, CodedError(400, err.Error())
return nil, CodedError(http.StatusBadRequest, err.Error())
}
if pauseRequest.DeploymentID == "" {
return nil, CodedError(400, "DeploymentID must be specified")
return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified")
}
if pauseRequest.DeploymentID != deploymentID {
return nil, CodedError(400, "Deployment ID does not match")
return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match")
}
s.parseWriteRequest(req, &pauseRequest.WriteRequest)

Expand All @@ -96,19 +99,19 @@ func (s *HTTPServer) deploymentPause(resp http.ResponseWriter, req *http.Request
}

func (s *HTTPServer) deploymentPromote(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

var promoteRequest structs.DeploymentPromoteRequest
if err := decodeBody(req, &promoteRequest); err != nil {
return nil, CodedError(400, err.Error())
return nil, CodedError(http.StatusBadRequest, err.Error())
}
if promoteRequest.DeploymentID == "" {
return nil, CodedError(400, "DeploymentID must be specified")
return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified")
}
if promoteRequest.DeploymentID != deploymentID {
return nil, CodedError(400, "Deployment ID does not match")
return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match")
}
s.parseWriteRequest(req, &promoteRequest.WriteRequest)

Expand All @@ -120,20 +123,45 @@ func (s *HTTPServer) deploymentPromote(resp http.ResponseWriter, req *http.Reque
return out, nil
}

func (s *HTTPServer) deploymentUnblock(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

var unblockRequest structs.DeploymentUnblockRequest
if err := decodeBody(req, &unblockRequest); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}
if unblockRequest.DeploymentID == "" {
return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified")
}
if unblockRequest.DeploymentID != deploymentID {
return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match")
}
s.parseWriteRequest(req, &unblockRequest.WriteRequest)

var out structs.DeploymentUpdateResponse
if err := s.agent.RPC("Deployment.Unblock", &unblockRequest, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return out, nil
}

func (s *HTTPServer) deploymentSetAllocHealth(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodPut && req.Method != http.MethodPost {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

var healthRequest structs.DeploymentAllocHealthRequest
if err := decodeBody(req, &healthRequest); err != nil {
return nil, CodedError(400, err.Error())
return nil, CodedError(http.StatusBadRequest, err.Error())
}
if healthRequest.DeploymentID == "" {
return nil, CodedError(400, "DeploymentID must be specified")
return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified")
}
if healthRequest.DeploymentID != deploymentID {
return nil, CodedError(400, "Deployment ID does not match")
return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match")
}
s.parseWriteRequest(req, &healthRequest.WriteRequest)

Expand All @@ -146,8 +174,8 @@ func (s *HTTPServer) deploymentSetAllocHealth(resp http.ResponseWriter, req *htt
}

func (s *HTTPServer) deploymentAllocations(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

args := structs.DeploymentSpecificRequest{
Expand All @@ -173,8 +201,8 @@ func (s *HTTPServer) deploymentAllocations(resp http.ResponseWriter, req *http.R
}

func (s *HTTPServer) deploymentQuery(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

args := structs.DeploymentSpecificRequest{
Expand All @@ -191,7 +219,7 @@ func (s *HTTPServer) deploymentQuery(resp http.ResponseWriter, req *http.Request

setMeta(resp, &out.QueryMeta)
if out.Deployment == nil {
return nil, CodedError(404, "deployment not found")
return nil, CodedError(http.StatusNotFound, "deployment not found")
}
return out.Deployment, nil
}
Loading

0 comments on commit d49eab8

Please sign in to comment.