Skip to content

Commit

Permalink
Merge pull request #440 from aerospike/stage
Browse files Browse the repository at this point in the history
Go Client v7.5
  • Loading branch information
khaf authored Jul 1, 2024
2 parents 9d59955 + ff1f225 commit cd4355c
Show file tree
Hide file tree
Showing 20 changed files with 1,204 additions and 644 deletions.
22 changes: 20 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
# Change History

## May 16 2024: v7.4.0
## June 27 2024: v7.5.0

This a minor feature and fix release.

- **New Features**
- [CLIENT-2968] Support new v71 proxy features:
- Info command.
- `QueryPolicy.QueryDuration`
- [CLIENT-3012] Support new server 7.1 info command error response strings.

- **Improvements**
- [CLIENT-2997] Scans should work in a mixed cluster of v5.7 and v6.4 server nodes.
- [CLIENT-3012] Support new server 7.1 info command error response strings.
- [CLIENT-3020] Change ReadModeSC doc from server to client perspective.

- **Fixes**
- [CLIENT-3019] Prevent Goroutine leak in AuthInterceptor for the Proxy Client.

## May 20 2024: v7.4.0

This a minor fix release. We strongly suggest you upgrade to this version over the v7.3.0 if you use the `Client.BatchGetOperate` API.

Expand All @@ -16,7 +34,7 @@

## May 3 2024: v7.3.0

> [!WARNING]
> [!WARNING]
> Do not use this version if you are using the `Client.BatchGetOperate` API.
This is a major feature release of the Go client and touches some of the fundamental aspects of the inner workings of it.
Expand Down
1 change: 1 addition & 0 deletions aerospike_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func initTestVars() {
var buf bytes.Buffer
var err error

log.SetFlags(log.LstdFlags | log.Lshortfile)
logger := log.New(&buf, "", log.LstdFlags|log.Lshortfile)
logger.SetOutput(os.Stdout)
asl.Logger.SetLogger(logger)
Expand Down
2 changes: 1 addition & 1 deletion cdt_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ const (
// MapWriteFlagsNoFail means: Do not raise error if a map item is denied due to write flag constraints.
MapWriteFlagsNoFail = 4

// MapWriteFlagsNoFail means: Allow other valid map items to be committed if a map item is denied due to
// MapWriteFlagsPartial means: Allow other valid map items to be committed if a map item is denied due to
// write flag constraints.
MapWriteFlagsPartial = 8
)
Expand Down
46 changes: 27 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"os"
"regexp"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -1089,7 +1090,7 @@ func (clnt *Client) ExecuteUDFNode(policy *QueryPolicy,

// SetXDRFilter sets XDR filter for given datacenter name and namespace. The expression filter indicates
// which records XDR should ship to the datacenter.
// Pass nil as filter to remove the currentl filter on the server.
// Pass nil as filter to remove the current filter on the server.
func (clnt *Client) SetXDRFilter(policy *InfoPolicy, datacenter string, namespace string, filter *Expression) Error {
policy = clnt.getUsableInfoPolicy(policy)

Expand All @@ -1116,26 +1117,37 @@ func (clnt *Client) SetXDRFilter(policy *InfoPolicy, datacenter string, namespac
return nil
}

code := parseIndexErrorCode(response)
return newError(code, response)
return parseIndexErrorCode(response)
}

func parseIndexErrorCode(response string) types.ResultCode {
var code = types.OK
var indexErrRegexp = regexp.MustCompile(`(?i)(fail|error)(:[0-9]+)?(:.+)?`)

list := strings.Split(response, ":")
if len(list) >= 2 && list[0] == "FAIL" {
i, err := strconv.ParseInt(list[1], 10, 64)
func parseIndexErrorCode(response string) Error {
var code = types.SERVER_ERROR
var message = response

match := indexErrRegexp.FindStringSubmatch(response)

// invalid response
if len(match) != 4 {
return newError(types.PARSE_ERROR, response)
}

// error code
if len(match[2]) > 0 {
i, err := strconv.ParseInt(string(match[2][1:]), 10, 64)
if err == nil {
code = types.ResultCode(i)
message = types.ResultCodeToString(code)
}
}

if code == 0 {
code = types.SERVER_ERROR
// message
if len(match[3]) > 0 {
message = string(match[3][1:])
}

return code
return newError(code, message)
}

//--------------------------------------------------------
Expand Down Expand Up @@ -1296,12 +1308,7 @@ func (clnt *Client) CreateComplexIndex(
return NewIndexTask(clnt.cluster, namespace, indexName), nil
}

if strings.HasPrefix(response, "FAIL:200") {
// Index has already been created. Do not need to poll for completion.
return nil, newError(types.INDEX_FOUND)
}

return nil, newError(types.INDEX_GENERIC, "Create index failed: "+response)
return nil, parseIndexErrorCode(response)
}

// DropIndex deletes a secondary index. It will block until index is dropped on all nodes.
Expand Down Expand Up @@ -1339,12 +1346,13 @@ func (clnt *Client) DropIndex(
return <-task.OnComplete()
}

if strings.HasPrefix(response, "FAIL:201") {
err = parseIndexErrorCode(response)
if err.Matches(types.INDEX_NOTFOUND) {
// Index did not previously exist. Return without error.
return nil
}

return newError(types.INDEX_GENERIC, "Drop index failed: "+response)
return err
}

// Truncate removes records in specified namespace/set efficiently. This method is many orders of magnitude
Expand Down
47 changes: 47 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

as "github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/aerospike-client-go/v7/types"
ast "github.com/aerospike/aerospike-client-go/v7/types"
asub "github.com/aerospike/aerospike-client-go/v7/utils/buffer"

Expand Down Expand Up @@ -62,6 +63,39 @@ var _ = gg.Describe("Aerospike", func() {

var actualClusterName string

gg.Describe("Client IndexErrorParser", func() {

gg.It("must parse IndexError response strings", func() {
type t struct {
r string
code types.ResultCode
err string
}

responses := []t{
{"invalid", types.PARSE_ERROR, "invalid"},
{"FAIL", types.SERVER_ERROR, "FAIL"},
{"FAiL", types.SERVER_ERROR, "FAiL"},
{"Error", types.SERVER_ERROR, "Error"},
{"ERROR", types.SERVER_ERROR, "ERROR"},
{"ERROR:200", types.INDEX_FOUND, "Index already exists"},
{"FAIL:201", types.INDEX_NOTFOUND, "Index not found"},
{"ERROR:200", types.INDEX_FOUND, "Index already exists"},
{"FAIL:201", types.INDEX_NOTFOUND, "Index not found"},
{"FAIL:201:some message from the server", types.INDEX_NOTFOUND, "some message from the server"},
}

for _, r := range responses {
err := as.ParseIndexErrorCode(r.r)
gm.Expect(err).To(gm.HaveOccurred())
gm.Expect(err.(*as.AerospikeError).Msg()).To(gm.Equal(r.err))
gm.Expect(err.Matches(r.code)).To(gm.BeTrue())
}

})

})

gg.Describe("Client Management", func() {

gg.BeforeEach(func() {
Expand Down Expand Up @@ -229,6 +263,19 @@ var _ = gg.Describe("Aerospike", func() {
})
})

gg.Describe("Info operations on proxy client", func() {
gg.BeforeEach(func() {
if !*proxy {
gg.Skip("Only supported in grpc environment")
}
})

gg.It("must successfully call info command", func() {
_, err := client.(*as.ProxyClient).RequestInfo(nil)
gm.Expect(err).ToNot(gm.HaveOccurred())
})
})

gg.Describe("Data operations on native types", func() {
// connection data
var err error
Expand Down
9 changes: 2 additions & 7 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,16 +1455,11 @@ func (cmd *baseCommand) setScan(policy *ScanPolicy, namespace *string, setName *
readAttr |= _INFO1_NOBINDATA
}

infoAttr := 0
if cmd.node == nil || cmd.node.cluster.supportsPartitionQuery.Get() {
infoAttr = _INFO3_PARTITION_DONE
}

operationCount := 0
if len(binNames) > 0 {
operationCount = len(binNames)
}
cmd.writeHeaderRead(&policy.BasePolicy, readAttr, 0, infoAttr, fieldCount, operationCount)
cmd.writeHeaderRead(&policy.BasePolicy, readAttr, 0, _INFO3_PARTITION_DONE, fieldCount, operationCount)

if namespace != nil {
cmd.writeFieldString(*namespace, NAMESPACE)
Expand Down Expand Up @@ -1735,7 +1730,7 @@ func (cmd *baseCommand) setQuery(policy *QueryPolicy, wpolicy *WritePolicy, stat
writeAttr |= _INFO2_RELAX_AP_LONG_QUERY
}
infoAttr := 0
if isNew {
if isNew || statement.Filter == nil {
infoAttr = _INFO3_PARTITION_DONE
}
cmd.writeHeaderRead(&policy.BasePolicy, readAttr, writeAttr, infoAttr, fieldCount, operationCount)
Expand Down
8 changes: 8 additions & 0 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@

package aerospike

func ParseIndexErrorCode(response string) Error {
return parseIndexErrorCode(response)
}

func (e *AerospikeError) Msg() string {
return e.msg
}

func (clstr *Cluster) GetMasterNode(partition *Partition) (*Node, Error) {
return partition.getMasterNode(clstr)
}
Expand Down
15 changes: 15 additions & 0 deletions info_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package aerospike
import (
"context"
"time"

kvs "github.com/aerospike/aerospike-client-go/v7/proto/kvs"
)

// InfoPolicy contains attributes used for info commands.
Expand Down Expand Up @@ -62,3 +64,16 @@ func (p *InfoPolicy) grpcDeadlineContext() (context.Context, context.CancelFunc)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
return ctx, cancel
}

func (p *InfoPolicy) grpc() *kvs.InfoPolicy {
if p == nil {
return nil
}

Timeout := uint32(p.Timeout / time.Millisecond)
res := &kvs.InfoPolicy{
Timeout: &Timeout,
}

return res
}
2 changes: 2 additions & 0 deletions partition_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func newPartitionFilter(begin, count int) *PartitionFilter {

// IsDone returns - if using ScanPolicy.MaxRecords or QueryPolicy,MaxRecords -
// if the previous paginated scans with this partition filter instance return all records?
// This method is not synchronized and is not meant to be called while the Scan/Query is
// ongoing. It should be called after all the records are received from the recordset.
func (pf *PartitionFilter) IsDone() bool {
return pf.Done
}
Expand Down
1 change: 0 additions & 1 deletion policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func (p *BasePolicy) compress() bool {
}

func (p *BasePolicy) grpc() *kvs.ReadPolicy {
// TODO: support ReadTouchTTLPercent in the future for the proxy client
return &kvs.ReadPolicy{
Replica: p.ReplicaPolicy.grpc(),
ReadModeSC: p.ReadModeSC.grpc(),
Expand Down
2 changes: 1 addition & 1 deletion proto/auth/aerospike_proxy_auth.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proto/auth/aerospike_proxy_auth_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cd4355c

Please sign in to comment.