From 3f404506673f11e93f29e2448f5ed6dc05cb9463 Mon Sep 17 00:00:00 2001 From: Kevin Park Date: Thu, 23 Mar 2023 18:29:06 +0900 Subject: [PATCH 1/3] Add shard key in context --- api/types/metadata.go | 3 ++ client/client.go | 11 +++++ client/client_test.go | 99 ++++++++++++++++++++++++++++++++++++++++++- go.mod | 2 +- 4 files changed, 113 insertions(+), 2 deletions(-) diff --git a/api/types/metadata.go b/api/types/metadata.go index 0c0757cf9..219ff9e30 100644 --- a/api/types/metadata.go +++ b/api/types/metadata.go @@ -26,5 +26,8 @@ const UserAgentKey = "x-yorkie-user-agent" // APIKeyKey is the key of the api key header. const APIKeyKey = "x-api-key" +// ShardKey is the key of the shard header. +const ShardKey = "x-shard-key" + // GoSDKType is the type part of Go SDK in value of UserAgent. const GoSDKType = "yorkie-go-sdk" diff --git a/client/client.go b/client/client.go index fd0ab865a..dbfd9b4ac 100644 --- a/client/client.go +++ b/client/client.go @@ -23,6 +23,8 @@ import ( "errors" "fmt" + "google.golang.org/grpc/metadata" + "github.com/rs/xid" "go.uber.org/zap" "google.golang.org/grpc" @@ -75,6 +77,7 @@ type Attachment struct { type Client struct { conn *grpc.ClientConn client api.YorkieServiceClient + options Options dialOptions []grpc.DialOption logger *zap.Logger @@ -150,6 +153,7 @@ func New(opts ...Option) (*Client, error) { return &Client{ dialOptions: dialOptions, + options: options, logger: logger, key: k, @@ -207,6 +211,7 @@ func (c *Client) Activate(ctx context.Context) error { return nil } + ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, c.options.APIKey) response, err := c.client.ActivateClient(ctx, &api.ActivateClientRequest{ ClientKey: c.key, }) @@ -231,6 +236,7 @@ func (c *Client) Deactivate(ctx context.Context) error { return nil } + ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, c.options.APIKey) _, err := c.client.DeactivateClient(ctx, &api.DeactivateClientRequest{ ClientId: c.id.Bytes(), }) @@ -261,6 +267,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document) error { return err } + ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, doc.Key().String())) res, err := c.client.AttachDocument(ctx, &api.AttachDocumentRequest{ ClientId: c.id.Bytes(), ChangePack: pbChangePack, @@ -320,6 +327,7 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document) error { return err } + ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, doc.Key().String())) res, err := c.client.DetachDocument(ctx, &api.DetachDocumentRequest{ ClientId: c.id.Bytes(), DocumentId: attachment.docID.String(), @@ -378,6 +386,7 @@ func (c *Client) Watch( return nil, ErrDocumentNotAttached } + ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, doc.Key().String())) rch := make(chan WatchResponse) stream, err := c.client.WatchDocument(ctx, &api.WatchDocumentRequest{ Client: converter.ToClient(types.Client{ @@ -572,6 +581,7 @@ func (c *Client) pushPull(ctx context.Context, key key.Key) error { return err } + ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, key.String())) res, err := c.client.PushPullChanges(ctx, &api.PushPullChangesRequest{ ClientId: c.id.Bytes(), DocumentId: attachment.docID.String(), @@ -613,6 +623,7 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error { } pbChangePack.IsRemoved = true + ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, doc.Key().String())) res, err := c.client.RemoveDocument(ctx, &api.RemoveDocumentRequest{ ClientId: c.id.Bytes(), DocumentId: attachment.docID.String(), diff --git a/client/client_test.go b/client/client_test.go index ec4f74d73..e477dfbe4 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,3 +1,5 @@ +//go:build amd64 + /* * Copyright 2021 The Yorkie Authors. All rights reserved. * @@ -17,15 +19,89 @@ package client_test import ( + "context" + "reflect" "testing" "github.com/rs/xid" "github.com/stretchr/testify/assert" + monkey "github.com/undefinedlabs/go-mpatch" + "golang.org/x/net/nettest" + "google.golang.org/grpc" "github.com/yorkie-team/yorkie/api/types" + api "github.com/yorkie-team/yorkie/api/yorkie/v1" "github.com/yorkie-team/yorkie/client" ) +type testYorkieServer struct { + grpcServer *grpc.Server +} + +func (t *testYorkieServer) ActivateClient(ctx context.Context, request *api.ActivateClientRequest) (*api.ActivateClientResponse, error) { + panic("implement me") +} + +func (t *testYorkieServer) DeactivateClient(ctx context.Context, request *api.DeactivateClientRequest) (*api.DeactivateClientResponse, error) { + panic("implement me") +} + +func (t *testYorkieServer) UpdatePresence(ctx context.Context, request *api.UpdatePresenceRequest) (*api.UpdatePresenceResponse, error) { + panic("implement me") +} + +func (t *testYorkieServer) AttachDocument(ctx context.Context, request *api.AttachDocumentRequest) (*api.AttachDocumentResponse, error) { + panic("implement me") +} + +func (t *testYorkieServer) DetachDocument(ctx context.Context, request *api.DetachDocumentRequest) (*api.DetachDocumentResponse, error) { + panic("implement me") +} + +func (t *testYorkieServer) RemoveDocument(ctx context.Context, request *api.RemoveDocumentRequest) (*api.RemoveDocumentResponse, error) { + panic("implement me") +} + +func (t *testYorkieServer) PushPullChanges(ctx context.Context, request *api.PushPullChangesRequest) (*api.PushPullChangesResponse, error) { + panic("implement me") +} + +func (t *testYorkieServer) WatchDocument(request *api.WatchDocumentRequest, server api.YorkieService_WatchDocumentServer) error { + panic("implement me") +} + +// newYorkieServer creates a new instance of yorkieServer. +func dialTestYorkieServer(t *testing.T) (*testYorkieServer, string) { + testYorkieServer := &testYorkieServer{} + grpcServer := grpc.NewServer() + api.RegisterYorkieServiceServer(grpcServer, testYorkieServer) + testYorkieServer.grpcServer = grpcServer + + addr := testYorkieServer.listenAndServe(t) + return testYorkieServer, addr +} + +func (s *testYorkieServer) listenAndServe(t *testing.T) string { + lis, err := nettest.NewLocalListener("tcp") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + + go func() { + if err := s.grpcServer.Serve(lis); err != nil { + if err != grpc.ErrServerStopped { + t.Error(err) + } + } + }() + + return lis.Addr().String() +} + +func (s *testYorkieServer) Stop() { + s.grpcServer.Stop() +} + func TestClient(t *testing.T) { t.Run("create instance test", func(t *testing.T) { presence := types.Presence{"Name": "ClientName"} @@ -34,7 +110,28 @@ func TestClient(t *testing.T) { client.WithPresence(presence), ) assert.NoError(t, err) - assert.Equal(t, presence, cli.Presence()) }) + + // t.Run("x-shard-key test", func(t *testing.T) { + // testServer, addr := dialTestYorkieServer(t) + // defer testServer.Stop() + + // cli, err := client.Dial(addr, client.WithAPIKey("dummy-api-key")) + // assert.NoError(t, err) + + // var patch *monkey.Patch + // patch, err = monkey.PatchInstanceMethodByName(reflect.TypeOf(testServer), "ActivateClient", func(m *testYorkieServer, ctx context.Context, req *api.ActivateClientRequest) (*api.ActivateClientResponse, error) { + // patch.Unpatch() + // defer patch.Patch() + // t.Log("ActivateClient called") + // return &api.ActivateClientResponse{}, nil + // }) + // if err != nil { + // t.Fatal(err) + // } + + // ctx := context.Background() + // err = cli.Activate(ctx) + // }) } diff --git a/go.mod b/go.mod index 5935b319b..76361dc40 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( go.mongodb.org/mongo-driver v1.10.3 go.uber.org/zap v1.23.0 golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b + golang.org/x/net v0.7.0 google.golang.org/genproto v0.0.0-20220930163606-c98284e70a91 google.golang.org/grpc v1.50.0 google.golang.org/protobuf v1.28.1 @@ -58,7 +59,6 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect - golang.org/x/net v0.7.0 // indirect golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect From a38802ad41186600b9480de21355fa0caa55a6c6 Mon Sep 17 00:00:00 2001 From: hackerwins Date: Thu, 23 Mar 2023 19:16:14 +0900 Subject: [PATCH 2/3] Add the rest of the test code for x-api-key --- client/auth.go | 8 ++-- client/client_test.go | 109 +++++++++++++++++++----------------------- 2 files changed, 53 insertions(+), 64 deletions(-) diff --git a/client/auth.go b/client/auth.go index 948790520..55d41392b 100644 --- a/client/auth.go +++ b/client/auth.go @@ -51,11 +51,11 @@ func (i *AuthInterceptor) Unary() grpc.UnaryClientInterceptor { invoker grpc.UnaryInvoker, opts ...grpc.CallOption, ) error { - ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs( + ctx = metadata.AppendToOutgoingContext(ctx, types.APIKeyKey, i.apiKey, types.AuthorizationKey, i.token, types.UserAgentKey, types.GoSDKType+"/"+version.Version, - )) + ) return invoker(ctx, method, req, reply, cc, opts...) } } @@ -70,11 +70,11 @@ func (i *AuthInterceptor) Stream() grpc.StreamClientInterceptor { streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { - ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs( + ctx = metadata.AppendToOutgoingContext(ctx, types.APIKeyKey, i.apiKey, types.AuthorizationKey, i.token, types.UserAgentKey, types.GoSDKType+"/"+version.Version, - )) + ) return streamer(ctx, desc, cc, method, opts...) } } diff --git a/client/client_test.go b/client/client_test.go index e477dfbe4..0b1539af0 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -28,6 +28,7 @@ import ( monkey "github.com/undefinedlabs/go-mpatch" "golang.org/x/net/nettest" "google.golang.org/grpc" + grpcmetadata "google.golang.org/grpc/metadata" "github.com/yorkie-team/yorkie/api/types" api "github.com/yorkie-team/yorkie/api/yorkie/v1" @@ -35,50 +36,23 @@ import ( ) type testYorkieServer struct { - grpcServer *grpc.Server + grpcServer *grpc.Server + yorkieServer *api.UnimplementedYorkieServiceServer } -func (t *testYorkieServer) ActivateClient(ctx context.Context, request *api.ActivateClientRequest) (*api.ActivateClientResponse, error) { - panic("implement me") -} - -func (t *testYorkieServer) DeactivateClient(ctx context.Context, request *api.DeactivateClientRequest) (*api.DeactivateClientResponse, error) { - panic("implement me") -} - -func (t *testYorkieServer) UpdatePresence(ctx context.Context, request *api.UpdatePresenceRequest) (*api.UpdatePresenceResponse, error) { - panic("implement me") -} - -func (t *testYorkieServer) AttachDocument(ctx context.Context, request *api.AttachDocumentRequest) (*api.AttachDocumentResponse, error) { - panic("implement me") -} - -func (t *testYorkieServer) DetachDocument(ctx context.Context, request *api.DetachDocumentRequest) (*api.DetachDocumentResponse, error) { - panic("implement me") -} - -func (t *testYorkieServer) RemoveDocument(ctx context.Context, request *api.RemoveDocumentRequest) (*api.RemoveDocumentResponse, error) { - panic("implement me") -} - -func (t *testYorkieServer) PushPullChanges(ctx context.Context, request *api.PushPullChangesRequest) (*api.PushPullChangesResponse, error) { - panic("implement me") -} - -func (t *testYorkieServer) WatchDocument(request *api.WatchDocumentRequest, server api.YorkieService_WatchDocumentServer) error { - panic("implement me") -} - -// newYorkieServer creates a new instance of yorkieServer. +// dialTestYorkieServer creates a new instance of testYorkieServer and +// dials it with LocalListener. func dialTestYorkieServer(t *testing.T) (*testYorkieServer, string) { - testYorkieServer := &testYorkieServer{} + yorkieServer := &api.UnimplementedYorkieServiceServer{} grpcServer := grpc.NewServer() - api.RegisterYorkieServiceServer(grpcServer, testYorkieServer) - testYorkieServer.grpcServer = grpcServer + api.RegisterYorkieServiceServer(grpcServer, yorkieServer) - addr := testYorkieServer.listenAndServe(t) - return testYorkieServer, addr + testYorkieServer := &testYorkieServer{ + grpcServer: grpcServer, + yorkieServer: yorkieServer, + } + + return testYorkieServer, testYorkieServer.listenAndServe(t) } func (s *testYorkieServer) listenAndServe(t *testing.T) string { @@ -113,25 +87,40 @@ func TestClient(t *testing.T) { assert.Equal(t, presence, cli.Presence()) }) - // t.Run("x-shard-key test", func(t *testing.T) { - // testServer, addr := dialTestYorkieServer(t) - // defer testServer.Stop() - - // cli, err := client.Dial(addr, client.WithAPIKey("dummy-api-key")) - // assert.NoError(t, err) - - // var patch *monkey.Patch - // patch, err = monkey.PatchInstanceMethodByName(reflect.TypeOf(testServer), "ActivateClient", func(m *testYorkieServer, ctx context.Context, req *api.ActivateClientRequest) (*api.ActivateClientResponse, error) { - // patch.Unpatch() - // defer patch.Patch() - // t.Log("ActivateClient called") - // return &api.ActivateClientResponse{}, nil - // }) - // if err != nil { - // t.Fatal(err) - // } - - // ctx := context.Background() - // err = cli.Activate(ctx) - // }) + t.Run("x-shard-key test", func(t *testing.T) { + dummyID := types.ID("000000000000000000000000") + dummyActorID, err := dummyID.Bytes() + assert.NoError(t, err) + + testServer, addr := dialTestYorkieServer(t) + defer testServer.Stop() + + cli, err := client.Dial(addr, client.WithAPIKey("dummy-api-key")) + assert.NoError(t, err) + + var patch *monkey.Patch + patch, err = monkey.PatchInstanceMethodByName( + reflect.TypeOf(testServer.yorkieServer), + "ActivateClient", + func( + m *api.UnimplementedYorkieServiceServer, + ctx context.Context, + req *api.ActivateClientRequest, + ) (*api.ActivateClientResponse, error) { + assert.NoError(t, patch.Unpatch()) + defer func() { + assert.NoError(t, patch.Patch()) + }() + + data, _ := grpcmetadata.FromIncomingContext(ctx) + assert.Equal(t, "dummy-api-key", data[types.ShardKey][0]) + + return &api.ActivateClientResponse{ + ClientId: dummyActorID, + }, nil + }, + ) + assert.NoError(t, err) + assert.NoError(t, cli.Activate(context.Background())) + }) } From 370a18e03bb1d5bec62e8e385394b6082e960026 Mon Sep 17 00:00:00 2001 From: Youngteac Hong Date: Thu, 23 Mar 2023 19:27:14 +0900 Subject: [PATCH 3/3] Extract withShardKey from naive metadata.AppendToOutgoingContext --- client/client.go | 106 ++++++++++++++++++++++++++++------------------- 1 file changed, 64 insertions(+), 42 deletions(-) diff --git a/client/client.go b/client/client.go index dbfd9b4ac..fcede4d8f 100644 --- a/client/client.go +++ b/client/client.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "strings" "google.golang.org/grpc/metadata" @@ -211,8 +212,7 @@ func (c *Client) Activate(ctx context.Context) error { return nil } - ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, c.options.APIKey) - response, err := c.client.ActivateClient(ctx, &api.ActivateClientRequest{ + response, err := c.client.ActivateClient(withShardKey(ctx, c.options.APIKey), &api.ActivateClientRequest{ ClientKey: c.key, }) if err != nil { @@ -236,8 +236,7 @@ func (c *Client) Deactivate(ctx context.Context) error { return nil } - ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, c.options.APIKey) - _, err := c.client.DeactivateClient(ctx, &api.DeactivateClientRequest{ + _, err := c.client.DeactivateClient(withShardKey(ctx, c.options.APIKey), &api.DeactivateClientRequest{ ClientId: c.id.Bytes(), }) if err != nil { @@ -267,11 +266,13 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document) error { return err } - ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, doc.Key().String())) - res, err := c.client.AttachDocument(ctx, &api.AttachDocumentRequest{ - ClientId: c.id.Bytes(), - ChangePack: pbChangePack, - }) + res, err := c.client.AttachDocument( + withShardKey(ctx, c.options.APIKey, doc.Key().String()), + &api.AttachDocumentRequest{ + ClientId: c.id.Bytes(), + ChangePack: pbChangePack, + }, + ) if err != nil { return err } @@ -327,12 +328,14 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document) error { return err } - ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, doc.Key().String())) - res, err := c.client.DetachDocument(ctx, &api.DetachDocumentRequest{ - ClientId: c.id.Bytes(), - DocumentId: attachment.docID.String(), - ChangePack: pbChangePack, - }) + res, err := c.client.DetachDocument( + withShardKey(ctx, c.options.APIKey, doc.Key().String()), + &api.DetachDocumentRequest{ + ClientId: c.id.Bytes(), + DocumentId: attachment.docID.String(), + ChangePack: pbChangePack, + }, + ) if err != nil { return err } @@ -386,15 +389,17 @@ func (c *Client) Watch( return nil, ErrDocumentNotAttached } - ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, doc.Key().String())) rch := make(chan WatchResponse) - stream, err := c.client.WatchDocument(ctx, &api.WatchDocumentRequest{ - Client: converter.ToClient(types.Client{ - ID: c.id, - PresenceInfo: c.presenceInfo, - }), - DocumentId: attachment.docID.String(), - }) + stream, err := c.client.WatchDocument( + withShardKey(ctx, c.options.APIKey, doc.Key().String()), + &api.WatchDocumentRequest{ + Client: converter.ToClient(types.Client{ + ID: c.id, + PresenceInfo: c.presenceInfo, + }), + DocumentId: attachment.docID.String(), + }, + ) if err != nil { return nil, err } @@ -514,13 +519,15 @@ func (c *Client) UpdatePresence(ctx context.Context, k, v string) error { // following. // TODO(hackerwins): We will move Presence from client-level to document-level. for _, attachment := range c.attachments { - if _, err := c.client.UpdatePresence(ctx, &api.UpdatePresenceRequest{ - Client: converter.ToClient(types.Client{ - ID: c.id, - PresenceInfo: c.presenceInfo, - }), - DocumentId: attachment.docID.String(), - }); err != nil { + if _, err := c.client.UpdatePresence( + withShardKey(ctx, c.options.APIKey, attachment.doc.Key().String()), + &api.UpdatePresenceRequest{ + Client: converter.ToClient(types.Client{ + ID: c.id, + PresenceInfo: c.presenceInfo, + }), + DocumentId: attachment.docID.String(), + }); err != nil { return err } } @@ -581,12 +588,14 @@ func (c *Client) pushPull(ctx context.Context, key key.Key) error { return err } - ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, key.String())) - res, err := c.client.PushPullChanges(ctx, &api.PushPullChangesRequest{ - ClientId: c.id.Bytes(), - DocumentId: attachment.docID.String(), - ChangePack: pbChangePack, - }) + res, err := c.client.PushPullChanges( + withShardKey(ctx, c.options.APIKey, key.String()), + &api.PushPullChangesRequest{ + ClientId: c.id.Bytes(), + DocumentId: attachment.docID.String(), + ChangePack: pbChangePack, + }, + ) if err != nil { return err } @@ -623,12 +632,14 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error { } pbChangePack.IsRemoved = true - ctx = metadata.AppendToOutgoingContext(ctx, types.ShardKey, fmt.Sprintf("%s/%s", c.options.APIKey, doc.Key().String())) - res, err := c.client.RemoveDocument(ctx, &api.RemoveDocumentRequest{ - ClientId: c.id.Bytes(), - DocumentId: attachment.docID.String(), - ChangePack: pbChangePack, - }) + res, err := c.client.RemoveDocument( + withShardKey(ctx, c.options.APIKey, doc.Key().String()), + &api.RemoveDocumentRequest{ + ClientId: c.id.Bytes(), + DocumentId: attachment.docID.String(), + ChangePack: pbChangePack, + }, + ) if err != nil { return err } @@ -647,3 +658,14 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error { return nil } + +/** + * withShardKey returns a context with the given shard key in metadata. + */ +func withShardKey(ctx context.Context, keys ...string) context.Context { + return metadata.AppendToOutgoingContext( + ctx, + types.ShardKey, + strings.Join(keys, "/"), + ) +}