Skip to content

Commit

Permalink
feat: adds support for kvm entries upsert #488 (#489)
Browse files Browse the repository at this point in the history
* feat: adds support for kvm entries upsert #488

* feat: adds support to handle json entries #488

* chore: disable prints for entries #488

* chore: add print statements #488
  • Loading branch information
srinandan authored Jun 27, 2024
1 parent 531ccab commit 6d7d437
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 66 deletions.
2 changes: 1 addition & 1 deletion internal/client/apidocs/apidocs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestUpdateDocumentation(t *testing.T) {
if openAPIDoc, err = utils.ReadFile(path.Join(cliPath, "test", "openapi.json")); err != nil {
t.Fatalf("%v", err)
}
if _, err = UpdateDocumentation(siteID, aPIDocID, displayName, string(openAPIDoc), "", ""); err != nil {
if _, err = UpdateDocumentation(siteID, aPIDocID, displayName, openAPIDoc, nil, ""); err != nil {
t.Fatalf("%v", err)
}
}
Expand Down
146 changes: 91 additions & 55 deletions internal/client/kvm/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kvm

import (
"encoding/json"
"errors"
"io"
"net/url"
"os"
Expand All @@ -32,6 +33,11 @@ import (
)

type keyvalueentry struct {
Name string `json:"name,omitempty"`
Value json.RawMessage `json:"value,omitempty"`
}

type kventry struct {
Name string `json:"name,omitempty"`
Value string `json:"value,omitempty"`
}
Expand All @@ -42,7 +48,7 @@ type keyvalueentries struct {
}

// CreateEntry
func CreateEntry(proxyName string, mapName string, keyName string, value string) (respBody []byte, err error) {
func CreateEntry(proxyName string, mapName string, keyName string, value []byte) (respBody []byte, err error) {
u, _ := url.Parse(apiclient.GetApigeeBaseURL())
if apiclient.GetApigeeEnv() != "" {
u.Path = path.Join(u.Path, apiclient.GetApigeeOrg(), "environments", apiclient.GetApigeeEnv(), "keyvaluemaps", mapName, "entries")
Expand All @@ -51,7 +57,10 @@ func CreateEntry(proxyName string, mapName string, keyName string, value string)
} else {
u.Path = path.Join(u.Path, apiclient.GetApigeeOrg(), "keyvaluemaps", mapName, "entries")
}
payload := "{\"name\":\"" + keyName + "\",\"value\":\"" + value + "\"}"
payload, err := getKVPayload(keyName, value)
if err != nil {
return nil, err
}
respBody, err = apiclient.HttpClient(u.String(), payload)
return respBody, err
}
Expand Down Expand Up @@ -85,7 +94,7 @@ func GetEntry(proxyName string, mapName string, keyName string) (respBody []byte
}

// UpdateEntry
func UpdateEntry(proxyName string, mapName string, keyName string, value string) (respBody []byte, err error) {
func UpdateEntry(proxyName string, mapName string, keyName string, value []byte) (respBody []byte, err error) {
u, _ := url.Parse(apiclient.GetApigeeBaseURL())
if apiclient.GetApigeeEnv() != "" {
u.Path = path.Join(u.Path, apiclient.GetApigeeOrg(), "environments", apiclient.GetApigeeEnv(), "keyvaluemaps", mapName, "entries", keyName)
Expand All @@ -94,11 +103,47 @@ func UpdateEntry(proxyName string, mapName string, keyName string, value string)
} else {
u.Path = path.Join(u.Path, apiclient.GetApigeeOrg(), "keyvaluemaps", mapName, "entries", keyName)
}
payload := "{\"name\":\"" + keyName + "\",\"value\":\"" + value + "\"}"
respBody, err = apiclient.HttpClient(u.String(), payload, "PUT")
payload, err := getKVPayload(keyName, value)
if err != nil {
return nil, err
}
respBody, err = apiclient.HttpClient(u.String(), string(payload), "PUT")
return respBody, err
}

func getKVPayload(keyName string, value []byte) (payload string, err error) {

var p []byte

// attempt as json
k1 := keyvalueentry{Name: keyName, Value: value}
p, err = json.Marshal(k1)
if err != nil {
k2 := kventry{Name: keyName, Value: string(value)}
p, err = json.Marshal(k2)
if err != nil {
return "", err
}
}
return string(p), nil
}

func upsertEntry(proxyName string, mapName string, keyName string, value []byte) (respBody []byte, err error) {
update := false
apiclient.ClientPrintHttpResponse.Set(false)
_, err = GetEntry(proxyName, mapName, keyName)
if err == nil {
update = true
}
apiclient.ClientPrintHttpResponse.Set(apiclient.GetCmdPrintHttpResponseSetting())
if update {
clilog.Info.Printf("Updating entry in map [%s] with key [%s]\n", mapName, keyName)
return UpdateEntry(proxyName, mapName, keyName, value)
}
clilog.Info.Printf("Creating a new entry in map [%s] with key [%s]\n", mapName, keyName)
return CreateEntry(proxyName, mapName, keyName, value)
}

// ListEntries
func ListEntries(proxyName string, mapName string, pageSize int, pageToken string) (respBody []byte, err error) {
u, _ := url.Parse(apiclient.GetApigeeBaseURL())
Expand Down Expand Up @@ -223,16 +268,6 @@ func ExportEntries(proxyName string, mapName string) (payload [][]byte, err erro

// ImportEntries
func ImportEntries(proxyName string, mapName string, conn int, filePath string) (err error) {
var pwg sync.WaitGroup

u, _ := url.Parse(apiclient.GetApigeeBaseURL())
if apiclient.GetApigeeEnv() != "" {
u.Path = path.Join(u.Path, apiclient.GetApigeeOrg(), "environments", apiclient.GetApigeeEnv(), "keyvaluemaps", mapName, "entries")
} else if proxyName != "" {
u.Path = path.Join(u.Path, apiclient.GetApigeeOrg(), "apis", proxyName, "keyvaluemaps", mapName, "entries")
} else {
u.Path = path.Join(u.Path, apiclient.GetApigeeOrg(), "keyvaluemaps", mapName, "entries")
}

kvmEntries, err := readKVMfile(filePath)
if err != nil {
Expand All @@ -244,58 +279,59 @@ func ImportEntries(proxyName string, mapName string, conn int, filePath string)
clilog.Debug.Printf("Found %d entries in the file\n", numEntities)
clilog.Debug.Printf("Create KVM entries with %d connections\n", conn)

numOfLoops, remaining := numEntities/conn, numEntities%conn
jobChan := make(chan keyvalueentry)
errChan := make(chan error)

// ensure connections aren't greater than entities
if conn > numEntities {
conn = numEntities
}
fanOutWg := sync.WaitGroup{}
fanInWg := sync.WaitGroup{}

start := 0
errs := []string{}
fanInWg.Add(1)
go func() {
defer fanInWg.Done()
for {
newErr, ok := <-errChan
if !ok {
return
}
errs = append(errs, newErr.Error())
}
}()

for i, end := 0, 0; i < numOfLoops; i++ {
pwg.Add(1)
end = (i * conn) + conn
clilog.Debug.Printf("Creating batch %d of entries\n", (i + 1))
go batchImport(u.String(), kvmEntries.KeyValueEntries[start:end], &pwg)
start = end
pwg.Wait()
for i := 0; i < conn; i++ {
fanOutWg.Add(1)
go batchImport(&fanOutWg, proxyName, mapName, jobChan, errChan)
}

if remaining > 0 {
pwg.Add(1)
clilog.Debug.Printf("Creating remaining %d entries\n", remaining)
go batchImport(u.String(), kvmEntries.KeyValueEntries[start:numEntities], &pwg)
pwg.Wait()
for _, entity := range kvmEntries.KeyValueEntries {
jobChan <- entity
}
close(jobChan)
fanOutWg.Wait()
close(errChan)
fanInWg.Wait()

if len(errs) > 0 {
return errors.New(strings.Join(errs, "\n"))
}
return nil
}

func batchImport(url string, entities []keyvalueentry, pwg *sync.WaitGroup) {
defer pwg.Done()
// batch workgroup
var bwg sync.WaitGroup

bwg.Add(len(entities))
func batchImport(wg *sync.WaitGroup, proxyName string, mapName string, jobs <-chan keyvalueentry, errs chan<- error) {
var err error
defer wg.Done()
for {
job, ok := <-jobs
if !ok {
return
}

for _, entity := range entities {
go createAsyncEntry(url, entity, &bwg)
}
bwg.Wait()
}
_, err = upsertEntry(proxyName, mapName, job.Name, job.Value)

func createAsyncEntry(url string, kvEntry keyvalueentry, wg *sync.WaitGroup) {
defer wg.Done()
out, err := json.Marshal(kvEntry)
if err != nil {
clilog.Error.Println(err)
return
}
_, err = apiclient.HttpClient(url, string(out))
if err != nil {
clilog.Error.Println(err)
return
if err != nil {
errs <- err
continue
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions internal/client/kvm/entries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ func TestCreateEntry(t *testing.T) {
apiclient.SetApigeeEnv("")

// add entry to org kvm
if _, err := CreateEntry("", kvmName, keyName, value); err != nil {
if _, err := CreateEntry("", kvmName, keyName, []byte(value)); err != nil {
t.Fatalf("%v", err)
}

// add entry to proxy kvm
if _, err := CreateEntry(proxyName, kvmName, keyName, value); err != nil {
if _, err := CreateEntry(proxyName, kvmName, keyName, []byte(value)); err != nil {
t.Fatalf("%v", err)
}

// add entry to env kvm
apiclient.SetApigeeEnv(env)
if _, err := CreateEntry("", kvmName, keyName, value); err != nil {
if _, err := CreateEntry("", kvmName, keyName, []byte(value)); err != nil {
t.Fatalf("%v", err)
}
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestUpdateEntry(t *testing.T) {
env := apiclient.GetApigeeEnv()
apiclient.SetApigeeEnv("")

updatedValue := "updatedTest"
updatedValue := []byte("updatedTest")

// add entry to org kvm
if _, err := UpdateEntry("", kvmName, keyName, updatedValue); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions internal/cmd/kvm/crtentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"

"internal/apiclient"

"internal/client/kvm"

"github.com/spf13/cobra"
Expand All @@ -41,8 +40,7 @@ var CreateEntryCmd = &cobra.Command{
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
cmd.SilenceUsage = true

_, err = kvm.CreateEntry(proxyName, mapName, keyName, value)
_, err = kvm.CreateEntry(proxyName, mapName, keyName, []byte(value))
return
},
}
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/kvm/impentries.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var ImpEntryCmd = &cobra.Command{
return apiclient.SetApigeeOrg(org)
},
RunE: func(cmd *cobra.Command, args []string) error {
apiclient.DisableCmdPrintHttpResponse()
return kvm.ImportEntries(proxyName, mapName, conn, filePath)
},
}
Expand Down
4 changes: 1 addition & 3 deletions internal/cmd/kvm/updentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"

"internal/apiclient"

"internal/client/kvm"

"github.com/spf13/cobra"
Expand All @@ -41,8 +40,7 @@ var UpdateEntryCmd = &cobra.Command{
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
cmd.SilenceUsage = true

_, err = kvm.UpdateEntry(proxyName, mapName, keyName, value)
_, err = kvm.UpdateEntry(proxyName, mapName, keyName, []byte(value))
return
},
}
Expand Down
17 changes: 17 additions & 0 deletions test/kvm-json.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"keyValueEntries": [
{
"name": "json1",
"value": "{\"message\":\"hello\"}"
},
{
"name": "basepath",
"value": "/chris/duncan"
},
{
"name": "pathsuffix",
"value": "/is/great"
}
],
"nextPageToken": ""
}

0 comments on commit 6d7d437

Please sign in to comment.