Skip to content

Commit

Permalink
Modify shard keys based on level of collections (#774)
Browse files Browse the repository at this point in the history
  • Loading branch information
sejongk authored Jan 25, 2024
1 parent 18a460f commit 31c1023
Show file tree
Hide file tree
Showing 36 changed files with 658 additions and 1,001 deletions.
5 changes: 2 additions & 3 deletions admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (c *Client) UpdateProject(
func (c *Client) ListDocuments(
ctx context.Context,
projectName string,
previousRefKey types.DocRefKey,
previousID string,
pageSize int32,
isForward bool,
includeSnapshot bool,
Expand All @@ -247,8 +247,7 @@ func (c *Client) ListDocuments(
ctx,
connect.NewRequest(&api.ListDocumentsRequest{
ProjectName: projectName,
PreviousId: previousRefKey.ID.String(),
PreviousKey: previousRefKey.Key.String(),
PreviousId: previousID,
PageSize: pageSize,
IsForward: isForward,
IncludeSnapshot: includeSnapshot,
Expand Down
34 changes: 4 additions & 30 deletions api/types/resource_ref_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,21 @@
package types

import (
"errors"
"fmt"
"strings"

"github.com/yorkie-team/yorkie/pkg/document/key"
)

// EmptyDocRefKey is an empty value of DocRefKey.
var EmptyDocRefKey = DocRefKey{"", ""}

// ErrInvalidDocRefKeyStringFormat is returned when the input of DocRefKey Set is invalid.
var ErrInvalidDocRefKeyStringFormat = errors.New("use the format 'docKey,docID' for the string of the docRefKey")

// DocRefKey represents an identifier used to reference a document.
type DocRefKey struct {
Key key.Key
ID ID
ProjectID ID
DocID ID
}

// String returns the string representation of the given DocRefKey.
func (r DocRefKey) String() string {
return fmt.Sprintf("Document (%s.%s)", r.Key, r.ID)
}

// Set parses the given string (format: `{docKey},{docID}`) and assigns the values
// to the given DocRefKey.
// NOTE(sejongk): This function is necessary for Viper, an external command-line module.
func (r *DocRefKey) Set(v string) error {
parsed := strings.Split(v, ",")
if len(parsed) != 2 {
return ErrInvalidDocRefKeyStringFormat
}
r.Key = key.Key(parsed[0])
r.ID = ID(parsed[1])
return nil
}

// Type returns the type string of the given DocRefKey, used in cli help text.
// NOTE(sejongk): This function is necessray for Viper, an external command-line module.
func (r DocRefKey) Type() string {
return "DocumentRefKey"
return fmt.Sprintf("Document (%s.%s)", r.ProjectID, r.DocID)
}

// SnapshotRefKey represents an identifier used to reference a snapshot.
Expand All @@ -68,5 +42,5 @@ type SnapshotRefKey struct {

// String returns the string representation of the given SnapshotRefKey.
func (r SnapshotRefKey) String() string {
return fmt.Sprintf("Snapshot (%s.%s.%d)", r.DocRefKey.Key, r.DocRefKey.ID, r.ServerSeq)
return fmt.Sprintf("Snapshot (%s.%s.%d)", r.ProjectID, r.DocID, r.ServerSeq)
}
45 changes: 0 additions & 45 deletions api/types/resource_ref_key_test.go

This file was deleted.

293 changes: 141 additions & 152 deletions api/yorkie/v1/admin.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion api/yorkie/v1/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ message UpdateProjectResponse {

message ListDocumentsRequest {
string project_name = 1;
string previous_key = 6;
string previous_id = 2;
int32 page_size = 3;
bool is_forward = 4;
Expand Down
232 changes: 106 additions & 126 deletions api/yorkie/v1/yorkie.pb.go

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions api/yorkie/v1/yorkie.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ message DetachDocumentResponse {

message WatchDocumentRequest {
string client_id = 1;
string document_key = 3;
string document_id = 2;
}

Expand Down Expand Up @@ -115,7 +114,6 @@ message PushPullChangesResponse {

message BroadcastRequest {
string client_id = 1;
string document_key = 5;
string document_id = 2;
string topic = 3;
bytes payload = 4;
Expand Down
27 changes: 14 additions & 13 deletions build/docker/sharding/scripts/init-mongos1.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,28 @@ function findAnotherShard(shard) {
}

function shardOfChunk(minKeyOfChunk) {
return db.getSiblingDB("config").chunks.findOne({ min: { key: minKeyOfChunk } }).shard
return db.getSiblingDB("config").chunks.findOne({ min: { project_id: minKeyOfChunk } }).shard
}

// Shard the database for the mongo client test
const mongoClientDB = "test-yorkie-meta-mongo-client"
sh.enableSharding(mongoClientDB)
sh.shardCollection(mongoClientDB + ".documents", { key: 1 })
sh.shardCollection(mongoClientDB + ".changes", { doc_key: 1 })
sh.shardCollection(mongoClientDB + ".snapshots", { doc_key: 1 })
sh.shardCollection(mongoClientDB + ".syncedseqs", { doc_key: 1 })
sh.shardCollection(mongoClientDB + ".documents", { project_id: 1 })
sh.shardCollection(mongoClientDB + ".changes", { doc_id: 1 })
sh.shardCollection(mongoClientDB + ".snapshots", { doc_id: 1 })
sh.shardCollection(mongoClientDB + ".syncedseqs", { doc_id: 1 })

// Split the inital range at "duplicateIDTestDocKey5" to allow doc_ids duplicate in different shards.
const docSplitKey = "duplicateIDTestDocKey5"
sh.splitAt(mongoClientDB + ".documents", { key: docSplitKey })
// Split the inital range at `splitPoint` to allow doc_ids duplicate in different shards.
const splitPoint = ObjectId("500000000000000000000000")
sh.splitAt(mongoClientDB + ".documents", { project_id: splitPoint })
// Move the chunk to another shard.
db.adminCommand({ moveChunk: mongoClientDB + ".documents", find: { key: docSplitKey }, to: findAnotherShard(shardOfChunk(docSplitKey)) })
db.adminCommand({ moveChunk: mongoClientDB + ".documents", find: { project_id: splitPoint }, to: findAnotherShard(shardOfChunk(splitPoint)) })

// Shard the database for the server test
const serverDB = "test-yorkie-meta-server"
sh.enableSharding(serverDB)
sh.shardCollection(serverDB + ".documents", { key: 1 })
sh.shardCollection(serverDB + ".changes", { doc_key: 1 })
sh.shardCollection(serverDB + ".snapshots", { doc_key: 1 })
sh.shardCollection(serverDB + ".syncedseqs", { doc_key: 1 })
sh.shardCollection(serverDB + ".documents", { project_id: 1 })
sh.shardCollection(serverDB + ".changes", { doc_id: 1 })
sh.shardCollection(serverDB + ".snapshots", { doc_id: 1 })
sh.shardCollection(serverDB + ".syncedseqs", { doc_id: 1 })

14 changes: 6 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,8 @@ func (c *Client) Watch(
stream, err := c.client.WatchDocument(
ctx,
withShardKey(connect.NewRequest(&api.WatchDocumentRequest{
ClientId: c.id.String(),
DocumentKey: doc.Key().String(),
DocumentId: attachment.docID.String(),
ClientId: c.id.String(),
DocumentId: attachment.docID.String(),
},
), c.options.APIKey, doc.Key().String()))
if err != nil {
Expand Down Expand Up @@ -698,11 +697,10 @@ func (c *Client) broadcast(ctx context.Context, doc *document.Document, topic st
_, err := c.client.Broadcast(
ctx,
withShardKey(connect.NewRequest(&api.BroadcastRequest{
ClientId: c.id.String(),
DocumentKey: doc.Key().String(),
DocumentId: attachment.docID.String(),
Topic: topic,
Payload: payload,
ClientId: c.id.String(),
DocumentId: attachment.docID.String(),
Topic: topic,
Payload: payload,
},
), c.options.APIKey, doc.Key().String()))
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions cmd/yorkie/document/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ import (
"github.com/spf13/viper"

"github.com/yorkie-team/yorkie/admin"
"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/cmd/yorkie/config"
"github.com/yorkie-team/yorkie/pkg/units"
)

var (
previousRefKey types.DocRefKey
pageSize int32
isForward bool
previousID string
pageSize int32
isForward bool
)

func newListCommand() *cobra.Command {
Expand Down Expand Up @@ -64,7 +63,7 @@ func newListCommand() *cobra.Command {
}()

ctx := context.Background()
documents, err := cli.ListDocuments(ctx, projectName, previousRefKey, pageSize, isForward, true)
documents, err := cli.ListDocuments(ctx, projectName, previousID, pageSize, isForward, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -101,10 +100,11 @@ func newListCommand() *cobra.Command {

func init() {
cmd := newListCommand()
cmd.Flags().Var(
&previousRefKey,
"previous-refKey",
"The previous document refKey to start from. Use the format 'docKey,docID' for the input.",
cmd.Flags().StringVar(
&previousID,
"previous-id",
"",
"The previous document ID to start from",
)
cmd.Flags().Int32Var(
&pageSize,
Expand Down
3 changes: 1 addition & 2 deletions server/backend/database/change_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/innerpresence"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/operations"
"github.com/yorkie-team/yorkie/pkg/document/time"
)
Expand All @@ -42,7 +41,7 @@ var ErrDecodeOperationFailed = errors.New("decode operations failed")
// ChangeInfo is a structure representing information of a change.
type ChangeInfo struct {
ID types.ID `bson:"_id"`
DocKey key.Key `bson:"doc_key"`
ProjectID types.ID `bson:"project_id"`
DocID types.ID `bson:"doc_id"`
ServerSeq int64 `bson:"server_seq"`
ClientSeq uint32 `bson:"client_seq"`
Expand Down
Loading

0 comments on commit 31c1023

Please sign in to comment.