Skip to content

Commit

Permalink
Client API changes for secondary idx (#545)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Oct 8, 2024
1 parent 0b405d7 commit 7b7f322
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 13 deletions.
29 changes: 17 additions & 12 deletions oxia/async_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (c *clientImpl) Put(key string, value []byte, options ...PutOption) <-chan
SequenceKeysDeltas: opts.sequenceKeysDeltas,
PartitionKey: opts.partitionKey,
Callback: callback,
SecondaryIndexes: toSecondaryIndexes(opts.secondaryIndexes),
}
if opts.ephemeral {
putCall.ClientIdentity = &c.options.identity
Expand Down Expand Up @@ -319,11 +320,13 @@ func processAllGetResponses(key string, results []*proto.GetResponse, comparison
close(ch)
}

func (c *clientImpl) listFromShard(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, shardId int64, ch chan<- ListResult) {
func (c *clientImpl) listFromShard(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, shardId int64, secondaryIndexName *string,
ch chan<- ListResult) {
request := &proto.ListRequest{
Shard: &shardId,
StartInclusive: minKeyInclusive,
EndExclusive: maxKeyExclusive,
Shard: &shardId,
StartInclusive: minKeyInclusive,
EndExclusive: maxKeyExclusive,
SecondaryIndexName: secondaryIndexName,
}

client, err := c.executor.ExecuteList(ctx, request)
Expand Down Expand Up @@ -355,7 +358,7 @@ func (c *clientImpl) List(ctx context.Context, minKeyInclusive string, maxKeyExc
// If the partition key is specified, we only need to make the request to one shard
shardId := c.getShardForKey("", opts)
go func() {
c.listFromShard(ctx, minKeyInclusive, maxKeyExclusive, shardId, ch)
c.listFromShard(ctx, minKeyInclusive, maxKeyExclusive, shardId, opts.secondaryIndexName, ch)
close(ch)
}()
} else {
Expand All @@ -368,7 +371,7 @@ func (c *clientImpl) List(ctx context.Context, minKeyInclusive string, maxKeyExc
go func() {
defer wg.Done()

c.listFromShard(ctx, minKeyInclusive, maxKeyExclusive, shardIdPtr, ch)
c.listFromShard(ctx, minKeyInclusive, maxKeyExclusive, shardIdPtr, opts.secondaryIndexName, ch)
}()
}

Expand All @@ -381,11 +384,13 @@ func (c *clientImpl) List(ctx context.Context, minKeyInclusive string, maxKeyExc
return ch
}

func (c *clientImpl) rangeScanFromShard(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, shardId int64, ch chan<- GetResult) {
func (c *clientImpl) rangeScanFromShard(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, shardId int64, secondaryIndexName *string,
ch chan<- GetResult) {
request := &proto.RangeScanRequest{
Shard: &shardId,
StartInclusive: minKeyInclusive,
EndExclusive: maxKeyExclusive,
Shard: &shardId,
StartInclusive: minKeyInclusive,
EndExclusive: maxKeyExclusive,
SecondaryIndexName: secondaryIndexName,
}

client, err := c.executor.ExecuteRangeScan(ctx, request)
Expand Down Expand Up @@ -421,7 +426,7 @@ func (c *clientImpl) RangeScan(ctx context.Context, minKeyInclusive string, maxK
// If the partition key is specified, we only need to make the request to one shard
shardId := c.getShardForKey("", opts)
go func() {
c.rangeScanFromShard(ctx, minKeyInclusive, maxKeyExclusive, shardId, outCh)
c.rangeScanFromShard(ctx, minKeyInclusive, maxKeyExclusive, shardId, opts.secondaryIndexName, outCh)
}()
} else {
// Do the list on all shards and aggregate the responses
Expand All @@ -433,7 +438,7 @@ func (c *clientImpl) RangeScan(ctx context.Context, minKeyInclusive string, maxK
ch := make(chan GetResult)
channels[i] = ch
go func() {
c.rangeScanFromShard(ctx, minKeyInclusive, maxKeyExclusive, shardIdPtr, ch)
c.rangeScanFromShard(ctx, minKeyInclusive, maxKeyExclusive, shardIdPtr, opts.secondaryIndexName, ch)
}()
}

Expand Down
2 changes: 2 additions & 0 deletions oxia/internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type PutCall struct {
SessionId *int64
ClientIdentity *string
PartitionKey *string
SecondaryIndexes []*proto.SecondaryIndex
Callback func(*proto.PutResponse, error)
}

Expand Down Expand Up @@ -56,6 +57,7 @@ func (r PutCall) ToProto() *proto.PutRequest {
ClientIdentity: r.ClientIdentity,
PartitionKey: r.PartitionKey,
SequenceKeyDelta: r.SequenceKeysDeltas,
SecondaryIndexes: r.SecondaryIndexes,
}
}

Expand Down
21 changes: 21 additions & 0 deletions oxia/options_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package oxia

type listOptions struct {
baseOptions

secondaryIndexName *string
}

// ListOption represents an option for the [SyncClient.List] operation.
type ListOption interface {
applyList(opts *listOptions)
applyRangeScan(opts *rangeScanOptions)
}

func newListOptions(opts []ListOption) *listOptions {
Expand All @@ -30,3 +33,21 @@ func newListOptions(opts []ListOption) *listOptions {
}
return listOpts
}

type useIndex struct {
indexName string
}

func (u *useIndex) applyList(opts *listOptions) {
opts.secondaryIndexName = &u.indexName
}

func (u *useIndex) applyRangeScan(opts *rangeScanOptions) {
opts.secondaryIndexName = &u.indexName
}

// UseIndex let the users specify a different index to follow for the
// Note: The returned list will contain they primary keys of the records
func UseIndex(indexName string) ListOption {
return &useIndex{indexName}
}
20 changes: 20 additions & 0 deletions oxia/options_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type putOptions struct {
expectedVersion *int64
ephemeral bool
sequenceKeysDeltas []uint64
secondaryIndexes []*secondaryIdxOption
}

// PutOption represents an option for the [SyncClient.Put] operation.
Expand Down Expand Up @@ -95,3 +96,22 @@ func (s *sequenceKeysDeltas) applyPut(opts *putOptions) {
func SequenceKeysDeltas(delta ...uint64) PutOption {
return &sequenceKeysDeltas{delta}
}

type secondaryIdxOption struct {
indexName string
secondaryKey string
}

func (s *secondaryIdxOption) applyPut(opts *putOptions) {
opts.secondaryIndexes = append(opts.secondaryIndexes, s)
}

// SecondaryIndex let the users specify additional keys to index the record
// Index names are arbitrary strings and can be used in `List` and
// `RangeScan` requests.
// Secondary keys are not required to be unique.
// Multiple secondary indexes can be passed on the same record, even
// reusing multiple times the same indexName.
func SecondaryIndex(indexName string, secondaryKey string) PutOption {
return &secondaryIdxOption{indexName, secondaryKey}
}
2 changes: 1 addition & 1 deletion oxia/options_range_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package oxia

type rangeScanOptions struct {
baseOptions
listOptions
}

// RangeScanOption represents an option for the [SyncClient.RangeScan] operation.
Expand Down
11 changes: 11 additions & 0 deletions oxia/proto_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,14 @@ func toError(status proto.Status) error {
return ErrUnknownStatus
}
}

func toSecondaryIndexes(secondaryIndexes []*secondaryIdxOption) (res []*proto.SecondaryIndex) {
for _, si := range secondaryIndexes {
res = append(res, &proto.SecondaryIndex{
IndexName: si.indexName,
SecondaryKey: si.secondaryKey,
})
}

return res
}
102 changes: 102 additions & 0 deletions oxia/sync_client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ package oxia

import (
"context"
"fmt"
"github.com/streamnative/oxia/server"
"log/slog"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -91,3 +95,101 @@ func assertCancellable(t *testing.T, operationFunc func(context.Context) error)

assert.ErrorIs(t, <-errCh, context.Canceled)
}

func TestSyncClientImpl_SecondaryIndexes(t *testing.T) {
config := server.NewTestConfig(t.TempDir())
// Test with multiple shards to ensure correctness across shards
config.NumShards = 1
standaloneServer, err := server.NewStandalone(config)
assert.NoError(t, err)

serviceAddress := fmt.Sprintf("localhost:%d", standaloneServer.RpcPort())
client, err := NewSyncClient(serviceAddress)
assert.NoError(t, err)

// ////////////////////////////////////////////////////////////////////////

ctx := context.Background()
for i := 0; i < 10; i++ {
primKey := fmt.Sprintf("%c", 'a'+i)
val := fmt.Sprintf("%d", i)
slog.Info("Adding record",
slog.String("key", primKey),
slog.String("value", val),
)
_, _, _ = client.Put(ctx, primKey, []byte(val), SecondaryIndex("val-idx", val))
}

// ////////////////////////////////////////////////////////////////////////

l, err := client.List(ctx, "1", "4", UseIndex("val-idx"))
assert.NoError(t, err)
assert.Equal(t, []string{"b", "c", "d"}, l)

// ////////////////////////////////////////////////////////////////////////

resCh := client.RangeScan(ctx, "1", "4", UseIndex("val-idx"))
i := 1
for res := range resCh {
assert.NoError(t, res.Err)

primKey := fmt.Sprintf("%c", 'a'+i)
val := fmt.Sprintf("%d", i)

slog.Info("Expected record",
slog.String("expected-key", primKey),
slog.String("expected-value", val),
slog.String("received-key", res.Key),
slog.String("received-value", string(res.Value)),
)
assert.Equal(t, primKey, res.Key)
assert.Equal(t, val, string(res.Value))
i++
}

assert.Equal(t, 4, i)

assert.NoError(t, client.Close())
assert.NoError(t, standaloneServer.Close())
}

func TestSyncClientImpl_SecondaryIndexesRepeated(t *testing.T) {
config := server.NewTestConfig(t.TempDir())
// Test with multiple shards to ensure correctness across shards
config.NumShards = 1
standaloneServer, err := server.NewStandalone(config)
assert.NoError(t, err)

serviceAddress := fmt.Sprintf("localhost:%d", standaloneServer.RpcPort())
client, err := NewSyncClient(serviceAddress)
assert.NoError(t, err)

// ////////////////////////////////////////////////////////////////////////

ctx := context.Background()
for i := 0; i < 10; i++ {
primKey := fmt.Sprintf("/%c", 'a'+i)
val := fmt.Sprintf("%c", 'a'+i)
slog.Info("Adding record",
slog.String("key", primKey),
slog.String("value", val),
)
_, _, _ = client.Put(ctx, primKey, []byte(val),
SecondaryIndex("val-idx", val),
SecondaryIndex("val-idx", strings.ToUpper(val)),
)
}

// ////////////////////////////////////////////////////////////////////////

l, err := client.List(ctx, "b", "e", UseIndex("val-idx"))
assert.NoError(t, err)
assert.Equal(t, []string{"/b", "/c", "/d"}, l)

l, err = client.List(ctx, "I", "d", UseIndex("val-idx"))
assert.NoError(t, err)
assert.Equal(t, []string{"/i", "/j", "/a", "/b", "/c"}, l)

assert.NoError(t, client.Close())
assert.NoError(t, standaloneServer.Close())
}

0 comments on commit 7b7f322

Please sign in to comment.