Skip to content

Commit

Permalink
[CLIENT-2020] Support QueryPartitions with non-nil filter (secondar…
Browse files Browse the repository at this point in the history
…y index query)
  • Loading branch information
khaf committed Dec 5, 2022
1 parent 022d55b commit a5b0ed4
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 15 deletions.
4 changes: 0 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,10 +1024,6 @@ func parseIndexErrorCode(response string) types.ResultCode {
// This method is only supported by Aerospike 4.9+ servers.
// If the policy is nil, the default relevant policy will be used.
func (clnt *Client) QueryPartitions(policy *QueryPolicy, statement *Statement, partitionFilter *PartitionFilter) (*Recordset, Error) {
if statement.Filter != nil {
return nil, ErrPartitionScanQueryNotSupported.err()
}

policy = clnt.getUsableQueryPolicy(policy)
nodes := clnt.cluster.GetNodes()
if len(nodes) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion multi_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (cmd *baseMultiCommand) parseKey(fieldCount int, bval *int64) (*Key, Error)
return nil, err
}
case BVAL_ARRAY:
*bval = Buffer.LittleBytesToInt64(cmd.dataBuffer, cmd.dataOffset)
*bval = Buffer.LittleBytesToInt64(cmd.dataBuffer, 1)
}
}

Expand Down
7 changes: 3 additions & 4 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ func (clnt *Client) queryPartitions(policy *QueryPolicy, tracker *partitionTrack
cmd := newQueryPartitionCommand(policy, tracker, nodePartition, statement, recordset)
weg.execute(cmd)
}
// no need to manage the errors; they are send back via the recordset
weg.wait()
errs = chainErrors(weg.wait(), errs)

done, err := tracker.isComplete(clnt.Cluster(), &policy.BasePolicy)
if done || err != nil {
errs = chainErrors(err, errs)
// Query is complete.
if err != nil {
errs = chainErrors(err, errs)
if errs != nil {
recordset.sendError(errs)
}
return
Expand Down
85 changes: 83 additions & 2 deletions query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ var _ = gg.Describe("Query operations", func() {
bin4 := as.NewBin("Aerospike4", "constValue")
bin5 := as.NewBin("Aerospike5", -1)
bin6 := as.NewBin("Aerospike6", 1)
bin7 := as.NewBin("Aerospike7", nil)
var keys map[string]*as.Key
var indexName string
var indexName2 string
var indexName3 string

// read all records from the channel and make sure all of them are returned
var checkResults = func(recordset *as.Recordset, cancelCnt int) {
Expand Down Expand Up @@ -118,7 +120,8 @@ var _ = gg.Describe("Query operations", func() {

keys[string(key.Digest())] = key
bin3 = as.NewBin("Aerospike3", rand.Intn(math.MaxInt16))
err = client.PutBins(wpolicy, key, bin1, bin2, bin3, bin4, bin5, bin6)
bin7 = as.NewBin("Aerospike7", i%3)
err = client.PutBins(wpolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7)
gm.Expect(err).ToNot(gm.HaveOccurred())
}

Expand All @@ -129,6 +132,10 @@ var _ = gg.Describe("Query operations", func() {
// queries only work on indices
indexName2 = set + bin6.Name
createIndex(wpolicy, ns, set, indexName2, bin6.Name, as.NUMERIC)

// queries only work on indices
indexName3 = set + bin7.Name
createIndex(wpolicy, ns, set, indexName3, bin7.Name, as.NUMERIC)
})

gg.AfterEach(func() {
Expand All @@ -137,6 +144,9 @@ var _ = gg.Describe("Query operations", func() {

indexName = set + bin6.Name
gm.Expect(client.DropIndex(nil, ns, set, indexName)).ToNot(gm.HaveOccurred())

indexName = set + bin7.Name
gm.Expect(client.DropIndex(nil, ns, set, indexName)).ToNot(gm.HaveOccurred())
})

var queryPolicy = as.NewQueryPolicy()
Expand Down Expand Up @@ -170,7 +180,7 @@ var _ = gg.Describe("Query operations", func() {
gm.Expect(counter).To(gm.Equal(keyCount))
})

gg.It("must Query and get all partition records back for a specified key", func() {
gg.It("must Scan and get all partition records back for a specified key", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

counter := 0
Expand Down Expand Up @@ -203,6 +213,77 @@ var _ = gg.Describe("Query operations", func() {
gm.Expect(counter).To(gm.BeNumerically("<", keyCount))
})

gg.It("must Query per key partition and get all partition records back for a specified key and filter", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

counter := 0

var rkey *as.Key
for _, k := range keys {
rkey = k

pf := as.NewPartitionFilterByKey(rkey)
stm := as.NewStatement(ns, set)
stm.SetFilter(as.NewRangeFilter(bin7.Name, 1, 2))
recordset, err := client.QueryPartitions(queryPolicy, stm, pf)
gm.Expect(err).ToNot(gm.HaveOccurred())

for res := range recordset.Results() {
gm.Expect(res.Err).NotTo(gm.HaveOccurred())
gm.Expect(res.Record.Bins[bin1.Name]).To(gm.Equal(bin1.Value.GetObject()))
gm.Expect(res.Record.Bins[bin2.Name]).To(gm.Equal(bin2.Value.GetObject()))

delete(keys, string(res.Record.Key.Digest()))

counter++
}
}

gm.Expect(len(keys)).To(gm.Equal(334))
// This depends on how many keys end up in the same partition.
// Since keys are statistically distributed randomly and uniformly,
// we expect that there aren't many partitions that share more than one key.
gm.Expect(counter).To(gm.BeNumerically("~", keyCount - 334, 50))
})

gg.It("must Query and get all partition records back for a specified key and filter", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

counter := 0

pf := as.NewPartitionFilterAll()
stm := as.NewStatement(ns, set)
stm.SetFilter(as.NewRangeFilter(bin7.Name, 1, 2))
recordset, err := client.QueryPartitions(queryPolicy, stm, pf)
gm.Expect(err).ToNot(gm.HaveOccurred())

for res := range recordset.Results() {
gm.Expect(res.Err).NotTo(gm.HaveOccurred())
gm.Expect(res.Record.Bins[bin1.Name]).To(gm.Equal(bin1.Value.GetObject()))
gm.Expect(res.Record.Bins[bin2.Name]).To(gm.Equal(bin2.Value.GetObject()))

delete(keys, string(res.Record.Key.Digest()))

counter++
}

gm.Expect(len(keys)).To(gm.Equal(334))
gm.Expect(counter).To(gm.Equal(keyCount - 334))
})

gg.It("must return error on a Query when index is not found", func() {
pf := as.NewPartitionFilterAll()
stm := as.NewStatement(ns, set)
stm.SetFilter(as.NewRangeFilter(randString(10), 1, 2))
recordset, err := client.QueryPartitions(queryPolicy, stm, pf)
gm.Expect(err).ToNot(gm.HaveOccurred())

for res := range recordset.Results() {
gm.Expect(res.Err).To(gm.HaveOccurred())
gm.Expect(res.Err.Matches(ast.INDEX_NOTFOUND)).To(gm.BeTrue())
}
})

gg.It("must Query and get all partition records back for a specified partition range", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

Expand Down
7 changes: 3 additions & 4 deletions scan_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ func (clnt *Client) scanPartitions(policy *ScanPolicy, tracker *partitionTracker
cmd := newScanPartitionCommand(policy, tracker, nodePartition, namespace, setName, binNames, recordset)
weg.execute(cmd)
}
// no need to manage the errors; they are send back via the recordset
weg.wait()
errs = chainErrors(weg.wait(), errs)

if done, err := tracker.isComplete(clnt.Cluster(), &policy.BasePolicy); done || err != nil {
errs = chainErrors(err, errs)
// Scan is complete.
if err != nil {
errs = chainErrors(err, errs)
if errs != nil {
recordset.sendError(errs)
}
return
Expand Down

0 comments on commit a5b0ed4

Please sign in to comment.