Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shard key in context #499

Merged
merged 3 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/types/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ const UserAgentKey = "x-yorkie-user-agent"
// APIKeyKey is the key of the api key header.
const APIKeyKey = "x-api-key"

// ShardKey is the key of the shard header.
const ShardKey = "x-shard-key"

// GoSDKType is the type part of Go SDK in value of UserAgent.
const GoSDKType = "yorkie-go-sdk"
8 changes: 4 additions & 4 deletions client/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func (i *AuthInterceptor) Unary() grpc.UnaryClientInterceptor {
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(
ctx = metadata.AppendToOutgoingContext(ctx,
types.APIKeyKey, i.apiKey,
types.AuthorizationKey, i.token,
types.UserAgentKey, types.GoSDKType+"/"+version.Version,
))
)
return invoker(ctx, method, req, reply, cc, opts...)
}
}
Expand All @@ -70,11 +70,11 @@ func (i *AuthInterceptor) Stream() grpc.StreamClientInterceptor {
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(
ctx = metadata.AppendToOutgoingContext(ctx,
types.APIKeyKey, i.apiKey,
types.AuthorizationKey, i.token,
types.UserAgentKey, types.GoSDKType+"/"+version.Version,
))
)
return streamer(ctx, desc, cc, method, opts...)
}
}
103 changes: 68 additions & 35 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"context"
"errors"
"fmt"
"strings"

"google.golang.org/grpc/metadata"

"github.com/rs/xid"
"go.uber.org/zap"
Expand Down Expand Up @@ -75,6 +78,7 @@ type Attachment struct {
type Client struct {
conn *grpc.ClientConn
client api.YorkieServiceClient
options Options
dialOptions []grpc.DialOption
logger *zap.Logger

Expand Down Expand Up @@ -150,6 +154,7 @@ func New(opts ...Option) (*Client, error) {

return &Client{
dialOptions: dialOptions,
options: options,
logger: logger,

key: k,
Expand Down Expand Up @@ -207,7 +212,7 @@ func (c *Client) Activate(ctx context.Context) error {
return nil
}

response, err := c.client.ActivateClient(ctx, &api.ActivateClientRequest{
response, err := c.client.ActivateClient(withShardKey(ctx, c.options.APIKey), &api.ActivateClientRequest{
ClientKey: c.key,
})
if err != nil {
Expand All @@ -231,7 +236,7 @@ func (c *Client) Deactivate(ctx context.Context) error {
return nil
}

_, err := c.client.DeactivateClient(ctx, &api.DeactivateClientRequest{
_, err := c.client.DeactivateClient(withShardKey(ctx, c.options.APIKey), &api.DeactivateClientRequest{
ClientId: c.id.Bytes(),
})
if err != nil {
Expand Down Expand Up @@ -261,10 +266,13 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document) error {
return err
}

res, err := c.client.AttachDocument(ctx, &api.AttachDocumentRequest{
ClientId: c.id.Bytes(),
ChangePack: pbChangePack,
})
res, err := c.client.AttachDocument(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.AttachDocumentRequest{
ClientId: c.id.Bytes(),
ChangePack: pbChangePack,
},
)
if err != nil {
return err
}
Expand Down Expand Up @@ -320,11 +328,14 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document) error {
return err
}

res, err := c.client.DetachDocument(ctx, &api.DetachDocumentRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
ChangePack: pbChangePack,
})
res, err := c.client.DetachDocument(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.DetachDocumentRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
ChangePack: pbChangePack,
},
)
if err != nil {
return err
}
Expand Down Expand Up @@ -379,13 +390,16 @@ func (c *Client) Watch(
}

rch := make(chan WatchResponse)
stream, err := c.client.WatchDocument(ctx, &api.WatchDocumentRequest{
Client: converter.ToClient(types.Client{
ID: c.id,
PresenceInfo: c.presenceInfo,
}),
DocumentId: attachment.docID.String(),
})
stream, err := c.client.WatchDocument(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.WatchDocumentRequest{
Client: converter.ToClient(types.Client{
ID: c.id,
PresenceInfo: c.presenceInfo,
}),
DocumentId: attachment.docID.String(),
},
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -505,13 +519,15 @@ func (c *Client) UpdatePresence(ctx context.Context, k, v string) error {
// following.
// TODO(hackerwins): We will move Presence from client-level to document-level.
for _, attachment := range c.attachments {
if _, err := c.client.UpdatePresence(ctx, &api.UpdatePresenceRequest{
Client: converter.ToClient(types.Client{
ID: c.id,
PresenceInfo: c.presenceInfo,
}),
DocumentId: attachment.docID.String(),
}); err != nil {
if _, err := c.client.UpdatePresence(
withShardKey(ctx, c.options.APIKey, attachment.doc.Key().String()),
&api.UpdatePresenceRequest{
Client: converter.ToClient(types.Client{
ID: c.id,
PresenceInfo: c.presenceInfo,
}),
DocumentId: attachment.docID.String(),
}); err != nil {
return err
}
}
Expand Down Expand Up @@ -572,11 +588,14 @@ func (c *Client) pushPull(ctx context.Context, key key.Key) error {
return err
}

res, err := c.client.PushPullChanges(ctx, &api.PushPullChangesRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
ChangePack: pbChangePack,
})
res, err := c.client.PushPullChanges(
withShardKey(ctx, c.options.APIKey, key.String()),
&api.PushPullChangesRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
ChangePack: pbChangePack,
},
)
if err != nil {
return err
}
Expand Down Expand Up @@ -613,11 +632,14 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error {
}
pbChangePack.IsRemoved = true

res, err := c.client.RemoveDocument(ctx, &api.RemoveDocumentRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
ChangePack: pbChangePack,
})
res, err := c.client.RemoveDocument(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.RemoveDocumentRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
ChangePack: pbChangePack,
},
)
if err != nil {
return err
}
Expand All @@ -636,3 +658,14 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error {

return nil
}

/**
* withShardKey returns a context with the given shard key in metadata.
*/
func withShardKey(ctx context.Context, keys ...string) context.Context {
return metadata.AppendToOutgoingContext(
ctx,
types.ShardKey,
strings.Join(keys, "/"),
)
}
88 changes: 87 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build amd64

/*
* Copyright 2021 The Yorkie Authors. All rights reserved.
*
Expand All @@ -17,15 +19,63 @@
package client_test

import (
"context"
"reflect"
"testing"

"github.com/rs/xid"
"github.com/stretchr/testify/assert"
monkey "github.com/undefinedlabs/go-mpatch"
"golang.org/x/net/nettest"
"google.golang.org/grpc"
grpcmetadata "google.golang.org/grpc/metadata"

"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/client"
)

type testYorkieServer struct {
grpcServer *grpc.Server
yorkieServer *api.UnimplementedYorkieServiceServer
}

// dialTestYorkieServer creates a new instance of testYorkieServer and
// dials it with LocalListener.
func dialTestYorkieServer(t *testing.T) (*testYorkieServer, string) {
yorkieServer := &api.UnimplementedYorkieServiceServer{}
grpcServer := grpc.NewServer()
api.RegisterYorkieServiceServer(grpcServer, yorkieServer)

testYorkieServer := &testYorkieServer{
grpcServer: grpcServer,
yorkieServer: yorkieServer,
}

return testYorkieServer, testYorkieServer.listenAndServe(t)
}

func (s *testYorkieServer) listenAndServe(t *testing.T) string {
lis, err := nettest.NewLocalListener("tcp")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}

go func() {
if err := s.grpcServer.Serve(lis); err != nil {
if err != grpc.ErrServerStopped {
t.Error(err)
}
}
}()

return lis.Addr().String()
}

func (s *testYorkieServer) Stop() {
s.grpcServer.Stop()
}

func TestClient(t *testing.T) {
t.Run("create instance test", func(t *testing.T) {
presence := types.Presence{"Name": "ClientName"}
Expand All @@ -34,7 +84,43 @@ func TestClient(t *testing.T) {
client.WithPresence(presence),
)
assert.NoError(t, err)

assert.Equal(t, presence, cli.Presence())
})

t.Run("x-shard-key test", func(t *testing.T) {
dummyID := types.ID("000000000000000000000000")
dummyActorID, err := dummyID.Bytes()
assert.NoError(t, err)

testServer, addr := dialTestYorkieServer(t)
defer testServer.Stop()

cli, err := client.Dial(addr, client.WithAPIKey("dummy-api-key"))
assert.NoError(t, err)

var patch *monkey.Patch
patch, err = monkey.PatchInstanceMethodByName(
reflect.TypeOf(testServer.yorkieServer),
"ActivateClient",
func(
m *api.UnimplementedYorkieServiceServer,
ctx context.Context,
req *api.ActivateClientRequest,
) (*api.ActivateClientResponse, error) {
assert.NoError(t, patch.Unpatch())
defer func() {
assert.NoError(t, patch.Patch())
}()

data, _ := grpcmetadata.FromIncomingContext(ctx)
assert.Equal(t, "dummy-api-key", data[types.ShardKey][0])

return &api.ActivateClientResponse{
ClientId: dummyActorID,
}, nil
},
)
assert.NoError(t, err)
assert.NoError(t, cli.Activate(context.Background()))
})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.mongodb.org/mongo-driver v1.10.3
go.uber.org/zap v1.23.0
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b
golang.org/x/net v0.7.0
google.golang.org/genproto v0.0.0-20220930163606-c98284e70a91
google.golang.org/grpc v1.50.0
google.golang.org/protobuf v1.28.1
Expand Down Expand Up @@ -58,7 +59,6 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
Expand Down