Skip to content
This repository has been archived by the owner on Jan 23, 2025. It is now read-only.

Add and update GCP VPC peering with retries #43

Merged
merged 5 commits into from
Dec 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 127 additions & 48 deletions api/vpc_gcp_peering.go
Original file line number Diff line number Diff line change
@@ -7,13 +7,26 @@ import (
"time"
)

func (api *API) waitForGcpPeeringStatus(instanceID int, peerID string) error {
// waitForGcpPeeringStatus: waits for the VPC peering status to be ACTIVE or until timed out
func (api *API) waitForGcpPeeringStatus(path, peerID string,
attempt, sleep, timeout int) error {

var (
data map[string]interface{}
err error
)

for {
time.Sleep(10 * time.Second)
data, err := api.ReadVpcGcpPeering(instanceID, peerID)
time.Sleep(time.Duration(sleep) * time.Second)
if attempt*sleep > timeout {
return fmt.Errorf("wait until GCP VPC peering status reached timeout of %d seconds", timeout)
}

attempt, data, err = api.readVpcGcpPeeringWithRetry(path, attempt, sleep, timeout)
if err != nil {
return err
}

rows := data["rows"].([]interface{})
if len(rows) > 0 {
for _, row := range rows {
@@ -26,57 +39,117 @@ func (api *API) waitForGcpPeeringStatus(instanceID int, peerID string) error {
}
}
}
log.Printf("[INFO] go-api::vpc_gcp_peering::waitForGcpPeeringStatus Waiting for state = ACTIVE "+
"attempt %d until timeout: %d", attempt, (timeout - (attempt * sleep)))
attempt++
}
}

// RequestVpcGcpPeering: requests a VPC peering from an instance.
func (api *API) RequestVpcGcpPeering(instanceID int, params map[string]interface{},
waitOnStatus bool) (map[string]interface{}, error) {
waitOnStatus bool, sleep, timeout int) (map[string]interface{}, error) {

path := fmt.Sprintf("api/instances/%v/vpc-peering", instanceID)
attempt, data, err := api.requestVpcGcpPeeringWithRetry(path, params, waitOnStatus, 1, sleep, timeout)
if err != nil {
return nil, err
}

if waitOnStatus {
log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::request waiting for active state")
err = api.waitForGcpPeeringStatus(path, data["peering"].(string), attempt, sleep, timeout)
if err != nil {
return nil, err
}
}

return data, nil
}

// requestVpcGcpPeeringWithRetry: requests a VPC peering from a path with retry logic
func (api *API) requestVpcGcpPeeringWithRetry(path string, params map[string]interface{},
waitOnStatus bool, attempt, sleep, timeout int) (int, map[string]interface{}, error) {
var (
data map[string]interface{}
failed map[string]interface{}
path = fmt.Sprintf("api/instances/%v/vpc-peering", instanceID)
)

log.Printf("[DEBUG] go-api::vpc_gcp_peering::request params: %v", params)
log.Printf("[DEBUG] go-api::vpc_gcp_peering::request path: %s, params: %v", path, params)
response, err := api.sling.New().Post(path).BodyJSON(params).Receive(&data, &failed)
if err != nil {
return nil, err
}
if response.StatusCode != 200 {
return nil, fmt.Errorf("request VPC peering failed, status: %v, message: %s", response.StatusCode, failed)
return attempt, nil, err
} else if attempt*sleep > timeout {
return attempt, nil,
fmt.Errorf("request VPC peering failed, reached timeout of %d seconds", timeout)
}

if waitOnStatus {
log.Printf("[DEBUG] go-api::vpc_gcp_peering::request waiting for active state")
api.waitForGcpPeeringStatus(instanceID, data["peering"].(string))
switch response.StatusCode {
case 200:
return attempt, data, nil
case 400:
if strings.Compare(failed["error"].(string), "Timeout talking to backend") == 0 {
log.Printf("[INFO] go-api::vpc_gcp_peering::request Timeout talking to backend "+
"attempt %d until timeout: %d", attempt, (timeout - (attempt * sleep)))
attempt++
time.Sleep(time.Duration(sleep) * time.Second)
return api.requestVpcGcpPeeringWithRetry(path, params, waitOnStatus, attempt, sleep, timeout)
}
}
return data, nil
return attempt, nil, fmt.Errorf("request VPC peering failed, status: %v, message: %s",
response.StatusCode, failed)
}

func (api *API) ReadVpcGcpPeering(instanceID int, peerID string) (map[string]interface{}, error) {
// ReadVpcGcpPeering: reads the VPC peering from the API
func (api *API) ReadVpcGcpPeering(instanceID, sleep, timeout int) (
map[string]interface{}, error) {

path := fmt.Sprintf("/api/instances/%v/vpc-peering", instanceID)
_, data, err := api.readVpcGcpPeeringWithRetry(path, 1, sleep, timeout)
return data, err
}

// readVpcGcpPeeringWithRetry: reads the VPC peering from the API with retry logic
func (api *API) readVpcGcpPeeringWithRetry(path string, attempt, sleep, timeout int) (
int, map[string]interface{}, error) {

var (
data map[string]interface{}
failed map[string]interface{}
path = fmt.Sprintf("/api/instances/%v/vpc-peering", instanceID)
)

log.Printf("[DEBUG] go-api::vpc_gcp_peering::read instance_id: %v, peer_id: %v", instanceID, peerID)
log.Printf("[DEBUG] go-api::vpc_gcp_peering::read path: %s", path)
response, err := api.sling.New().Get(path).Receive(&data, &failed)
log.Printf("[DEBUG] go-api::vpc_gcp_peering::read data: %v", data)
if err != nil {
return nil, err
}
if response.StatusCode != 200 {
return nil, fmt.Errorf("ReadRequest failed, status: %v, message: %s", response.StatusCode, failed)
return attempt, nil, err
} else if attempt*sleep > timeout {
return attempt, nil, fmt.Errorf("read plugins reached timeout of %d seconds", timeout)
}

return data, nil
switch response.StatusCode {
case 200:
return attempt, data, nil
case 400:
if strings.Compare(failed["error"].(string), "Timeout talking to backend") == 0 {
log.Printf("[INFO] go-api::vpc_gcp_peering::read Timeout talking to backend "+
"attempt %d until timeout: %d", attempt, (timeout - (attempt * sleep)))
attempt++
time.Sleep(time.Duration(sleep) * time.Second)
return api.readVpcGcpPeeringWithRetry(path, attempt, sleep, timeout)
}
}
return attempt, nil, fmt.Errorf("read plugin with retry failed, status: %v, message: %s",
response.StatusCode, failed)
}

func (api *API) UpdateVpcGcpPeering(instanceID int, peerID string) (map[string]interface{}, error) {
return api.ReadVpcGcpPeering(instanceID, peerID)
// UpdateVpcGcpPeering: updates a VPC peering from an instance.
func (api *API) UpdateVpcGcpPeering(instanceID int, sleep, timeout int) (
map[string]interface{}, error) {

// NOP just read out the VPC peering
return api.ReadVpcGcpPeering(instanceID, sleep, timeout)
}

// RemoveVpcGcpPeering: removes a VPC peering from an instance.
func (api *API) RemoveVpcGcpPeering(instanceID int, peerID string) error {
var (
failed map[string]interface{}
@@ -88,45 +161,51 @@ func (api *API) RemoveVpcGcpPeering(instanceID int, peerID string) error {
if err != nil {
return err
}
if response.StatusCode != 204 {
return fmt.Errorf("RemoveVpcPeering failed, status: %v, message: %s", response.StatusCode, failed)

switch response.StatusCode {
case 204:
return nil
default:
return fmt.Errorf("remove VPC peering failed, status: %v, message: %s",
response.StatusCode, failed)
}
return nil
}

func (api *API) ReadVpcGcpInfo(instanceID int) (map[string]interface{}, error) {
// Initiale values, 5 attempts and 20 second sleep
return api.readVpcGcpInfoWithRetry(instanceID, 5, 20)
// ReadVpcGcpInfo: reads the VPC info from the API
func (api *API) ReadVpcGcpInfo(instanceID, sleep, timeout int) (map[string]interface{}, error) {
path := fmt.Sprintf("/api/instances/%v/vpc-peering/info", instanceID)
return api.readVpcGcpInfoWithRetry(path, 1, sleep, timeout)
}

func (api *API) readVpcGcpInfoWithRetry(instanceID, attempts, sleep int) (map[string]interface{}, error) {
// readVpcGcpInfoWithRetry: reads the VPC info from the API with retry logic
func (api *API) readVpcGcpInfoWithRetry(path string, attempt, sleep, timeout int) (
map[string]interface{}, error) {

var (
data map[string]interface{}
failed map[string]interface{}
path = fmt.Sprintf("/api/instances/%v/vpc-peering/info", instanceID)
)

log.Printf("[DEBUG] go-api::vpc_gcp_peering::info instance id: %v", instanceID)
log.Printf("[DEBUG] go-api::vpc_gcp_peering::info path: %s", path)
response, err := api.sling.New().Get(path).Receive(&data, &failed)
log.Printf("[DEBUG] go-api::vpc_gcp_peering::info data: %v", data)
if err != nil {
return nil, err
} else if attempt*sleep > timeout {
return nil, fmt.Errorf("read VPC info, reached timeout of %d seconds", timeout)
}

statusCode := response.StatusCode
log.Printf("[DEBUG] go-api::vpc_gcp_peering::info statusCode: %d", statusCode)
switch {
case statusCode == 400:
switch response.StatusCode {
case 200:
return data, nil
case 400:
if strings.Compare(failed["error"].(string), "Timeout talking to backend") == 0 {
if attempts--; attempts > 0 {
log.Printf("[INFO] go-api::vpc_gcp_peering::info Timeout talking to backend "+
"attempts left %d and retry in %d seconds", attempts, sleep)
time.Sleep(time.Duration(sleep) * time.Second)
return api.readVpcGcpInfoWithRetry(instanceID, attempts, 2*sleep)
} else {
return nil, fmt.Errorf("ReadInfo failed, status: %v, message: %s", response.StatusCode, failed)
}
log.Printf("[INFO] go-api::vpc_gcp_peering::info Timeout talking to backend "+
"attempt %d until timeout: %d", attempt, (timeout - (attempt * sleep)))
attempt++
time.Sleep(time.Duration(sleep) * time.Second)
return api.readVpcGcpInfoWithRetry(path, attempt, sleep, timeout)
}
}
return data, nil
return nil, fmt.Errorf("read VPC info failed, status: %v, message: %s",
response.StatusCode, failed)
}
131 changes: 37 additions & 94 deletions api/vpc_gcp_peering_withvpcid.go
Original file line number Diff line number Diff line change
@@ -5,131 +5,74 @@ package api
import (
"fmt"
"log"
"strings"
"time"
)

func (api *API) waitForGcpPeeringStatusWithVpcId(vpcID, peerID string) error {
for {
time.Sleep(10 * time.Second)
data, err := api.ReadVpcGcpPeeringWithVpcId(vpcID, peerID)
if err != nil {
return err
}
rows := data["rows"].([]interface{})
if len(rows) > 0 {
for _, row := range rows {
tempRow := row.(map[string]interface{})
if tempRow["name"] != peerID {
continue
}
if tempRow["state"] == "ACTIVE" {
return nil
}
}
}
}
}

// RequestVpcGcpPeeringWithVpcId: requests a VPC peering from an instance.
func (api *API) RequestVpcGcpPeeringWithVpcId(vpcID string, params map[string]interface{},
waitOnStatus bool) (map[string]interface{}, error) {
var (
data map[string]interface{}
failed map[string]interface{}
path = fmt.Sprintf("api/vpcs/%s/vpc-peering", vpcID)
)
waitOnStatus bool, sleep, timeout int) (map[string]interface{}, error) {

log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::request params: %v", params)
response, err := api.sling.New().Post(path).BodyJSON(params).Receive(&data, &failed)
path := fmt.Sprintf("api/vpcs/%s/vpc-peering", vpcID)
attempt, data, err := api.requestVpcGcpPeeringWithRetry(path, params, waitOnStatus, 1, sleep, timeout)
if err != nil {
return nil, err
}
if response.StatusCode != 200 {
return nil, fmt.Errorf("request VPC peering failed, status: %v, message: %s", response.StatusCode, failed)
}

if waitOnStatus {
log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::request waiting for active state")
api.waitForGcpPeeringStatusWithVpcId(vpcID, data["peering"].(string))
err = api.waitForGcpPeeringStatus(path, data["peering"].(string), attempt, sleep, timeout)
if err != nil {
return nil, err
}
}

return data, nil
}

func (api *API) ReadVpcGcpPeeringWithVpcId(vpcID, peerID string) (map[string]interface{}, error) {
var (
data map[string]interface{}
failed map[string]interface{}
path = fmt.Sprintf("/api/vpcs/%s/vpc-peering", vpcID)
)

log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::read instance_id: %s, peer_id: %s", vpcID, peerID)
response, err := api.sling.New().Get(path).Receive(&data, &failed)
log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::read data: %v", data)
if err != nil {
return nil, err
}
if response.StatusCode != 200 {
return nil, fmt.Errorf("ReadRequest failed, status: %v, message: %s", response.StatusCode, failed)
}
// func (api *API) ReadVpcGcpPeering(instanceID, sleep, timeout int) (
func (api *API) ReadVpcGcpPeeringWithVpcId(vpcID string, sleep, timeout int) (
map[string]interface{}, error) {

return data, nil
path := fmt.Sprintf("/api/vpcs/%s/vpc-peering", vpcID)
_, data, err := api.readVpcGcpPeeringWithRetry(path, 1, sleep, timeout)
return data, err
}

func (api *API) UpdateVpcGcpPeeringWithVpcId(vpcID, peerID string) (map[string]interface{}, error) {
return api.ReadVpcGcpPeeringWithVpcId(vpcID, peerID)
// UpdateVpcGcpPeeringWithVpcId: updates the VPC peering from the API
func (api *API) UpdateVpcGcpPeeringWithVpcId(vpcID string, sleep, timeout int) (
map[string]interface{}, error) {

// NOP just read out the VPC peering
return api.ReadVpcGcpPeeringWithVpcId(vpcID, sleep, timeout)
}

// RemoveVpcGcpPeeringWithVpcId: removes the VPC peering from the API
func (api *API) RemoveVpcGcpPeeringWithVpcId(vpcID, peerID string) error {
var (
failed map[string]interface{}
path = fmt.Sprintf("/api/vpcs/%s/vpc-peering/%s", vpcID, peerID)
)

log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::remove vpc id: %s, peering id: %s", vpcID, peerID)
log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::remove vpc id: %s, peering id: %s",
vpcID, peerID)
response, err := api.sling.New().Delete(path).Receive(nil, &failed)
if err != nil {
return err
}
if response.StatusCode != 204 {
return fmt.Errorf("RemoveVpcPeering failed, status: %v, message: %s", response.StatusCode, failed)
}
return nil
}

func (api *API) ReadVpcGcpInfoWithVpcId(vpcID string) (map[string]interface{}, error) {
// Initiale values, 5 attempts and 20 second sleep
return api.readVpcGcpInfoWithRetryWithVpcId(vpcID, 5, 20)
switch response.StatusCode {
case 204:
return nil
default:
return fmt.Errorf("remove VPC peering failed, status: %v, message: %s",
response.StatusCode, failed)
}
}

func (api *API) readVpcGcpInfoWithRetryWithVpcId(vpcID string, attempts, sleep int) (map[string]interface{}, error) {
var (
data map[string]interface{}
failed map[string]interface{}
path = fmt.Sprintf("/api/vpcs/%s/vpc-peering/info", vpcID)
)

log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::info vpc id: %s", vpcID)
response, err := api.sling.New().Get(path).Receive(&data, &failed)
log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::info data: %v", data)
if err != nil {
return nil, err
}
// ReadVpcGcpInfoWithVpcId: reads the VPC info from the API
func (api *API) ReadVpcGcpInfoWithVpcId(vpcID string, sleep, timeout int) (
map[string]interface{}, error) {

statusCode := response.StatusCode
log.Printf("[DEBUG] go-api::vpc_gcp_peering_withvpcid::info statusCode: %d", statusCode)
switch {
case statusCode == 400:
// Todo: Add error code to avoid using string comparison
if strings.Compare(failed["error"].(string), "Timeout talking to backend") == 0 {
if attempts--; attempts > 0 {
log.Printf("[INFO] go-api::vpc_gcp_peering_withvpcid::info Timeout talking to backend "+
"attempts left %d and retry in %d seconds", attempts, sleep)
time.Sleep(time.Duration(sleep) * time.Second)
return api.readVpcGcpInfoWithRetryWithVpcId(vpcID, attempts, 2*sleep)
} else {
return nil, fmt.Errorf("ReadInfo failed, status: %v, message: %s", response.StatusCode, failed)
}
}
}
return data, nil
path := fmt.Sprintf("/api/vpcs/%s/vpc-peering/info", vpcID)
_, data, err := api.readVpcGcpPeeringWithRetry(path, 1, sleep, timeout)
return data, err
}