Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
vadiminshakov committed Dec 18, 2023
1 parent 3dac017 commit a76507e
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 86 deletions.
14 changes: 7 additions & 7 deletions core/cohort/cohort.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"errors"
"github.com/openzipkin/zipkin-go"
"github.com/vadiminshakov/committer/core/cohort/commitalgo"
"github.com/vadiminshakov/committer/core/entity"
"github.com/vadiminshakov/committer/core/dto"
)

type Cohort interface {
Propose(ctx context.Context, req *entity.ProposeRequest) (*entity.CohortResponse, error)
Precommit(ctx context.Context, index uint64, votes []*entity.Vote) (*entity.CohortResponse, error)
Commit(ctx context.Context, in *entity.CommitRequest) (*entity.CohortResponse, error)
Propose(ctx context.Context, req *dto.ProposeRequest) (*dto.CohortResponse, error)
Precommit(ctx context.Context, index uint64, votes []*dto.Vote) (*dto.CohortResponse, error)
Commit(ctx context.Context, in *dto.CommitRequest) (*dto.CohortResponse, error)
Height() uint64
}

Expand All @@ -37,18 +37,18 @@ func (c *CohortImpl) Height() uint64 {
return c.committer.Height()
}

func (c *CohortImpl) Propose(ctx context.Context, req *entity.ProposeRequest) (*entity.CohortResponse, error) {
func (c *CohortImpl) Propose(ctx context.Context, req *dto.ProposeRequest) (*dto.CohortResponse, error) {
return c.committer.Propose(ctx, req)
}

func (s *CohortImpl) Precommit(ctx context.Context, index uint64, votes []*entity.Vote) (*entity.CohortResponse, error) {
func (s *CohortImpl) Precommit(ctx context.Context, index uint64, votes []*dto.Vote) (*dto.CohortResponse, error) {
if s.commitType != THREE_PHASE {
return nil, errors.New("precommit is allowed for 3PC mode only")
}

return s.committer.Precommit(ctx, index, votes)
}

func (c *CohortImpl) Commit(ctx context.Context, in *entity.CommitRequest) (resp *entity.CohortResponse, err error) {
func (c *CohortImpl) Commit(ctx context.Context, in *dto.CommitRequest) (resp *dto.CohortResponse, err error) {
return c.committer.Commit(ctx, in)
}
42 changes: 21 additions & 21 deletions core/cohort/commitalgo/commitalgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/vadiminshakov/committer/core/entity"
"github.com/vadiminshakov/committer/core/dto"
"github.com/vadiminshakov/committer/io/db"
"github.com/vadiminshakov/committer/voteslog"
"google.golang.org/grpc/codes"
Expand All @@ -16,9 +16,9 @@ import (
)

type Committer struct {
proposeHook func(req *entity.ProposeRequest) bool
proposeHook func(req *dto.ProposeRequest) bool
precommitHook func(height uint64) bool
commitHook func(req *entity.CommitRequest) bool
commitHook func(req *dto.CommitRequest) bool
height uint64
db db.Repository
vlog voteslog.Log
Expand Down Expand Up @@ -76,8 +76,8 @@ func (p *pendingPrecommit) signalToChan(height uint64) {
}

func NewCommitter(d db.Repository, vlog voteslog.Log,
proposeHook func(req *entity.ProposeRequest) bool,
commitHook func(req *entity.CommitRequest) bool,
proposeHook func(req *dto.ProposeRequest) bool,
commitHook func(req *dto.CommitRequest) bool,
timeout uint64) *Committer {
return &Committer{
proposeHook: proposeHook,
Expand All @@ -95,24 +95,24 @@ func (c *Committer) Height() uint64 {
return c.height
}

func (c *Committer) Propose(_ context.Context, req *entity.ProposeRequest) (*entity.CohortResponse, error) {
var response *entity.CohortResponse
func (c *Committer) Propose(_ context.Context, req *dto.ProposeRequest) (*dto.CohortResponse, error) {
var response *dto.CohortResponse
if atomic.LoadUint64(&c.height) > req.Height {
return &entity.CohortResponse{ResponseType: entity.ResponseTypeNack, Height: atomic.LoadUint64(&c.height)}, nil
return &dto.CohortResponse{ResponseType: dto.ResponseTypeNack, Height: atomic.LoadUint64(&c.height)}, nil
}

if c.proposeHook(req) {
log.Infof("received: %s=%s\n", req.Key, string(req.Value))
c.vlog.Set(req.Height, req.Key, req.Value)
response = &entity.CohortResponse{ResponseType: entity.ResponseTypeAck, Height: req.Height}
response = &dto.CohortResponse{ResponseType: dto.ResponseTypeAck, Height: req.Height}
} else {
response = &entity.CohortResponse{ResponseType: entity.ResponseTypeNack, Height: req.Height}
response = &dto.CohortResponse{ResponseType: dto.ResponseTypeNack, Height: req.Height}
}

return response, nil
}

func (c *Committer) Precommit(ctx context.Context, index uint64, votes []*entity.Vote) (*entity.CohortResponse, error) {
func (c *Committer) Precommit(ctx context.Context, index uint64, votes []*dto.Vote) (*dto.CohortResponse, error) {
c.precommitDone.add(index)

go func(ctx context.Context) {
Expand All @@ -132,7 +132,7 @@ func (c *Committer) Precommit(ctx context.Context, index uint64, votes []*entity
if !isAllNodesAccepted(votes) {
break ForLoop
}
c.Commit(ctx, &entity.CommitRequest{Height: index})
c.Commit(ctx, &dto.CommitRequest{Height: index})
log.Warn("committed without coordinator after timeout")
break ForLoop
}
Expand All @@ -141,14 +141,14 @@ func (c *Committer) Precommit(ctx context.Context, index uint64, votes []*entity
for _, v := range votes {
if !v.IsAccepted {
log.Printf("Node %s is not accepted proposal with index %d\n", v.Node, index)
return &entity.CohortResponse{ResponseType: entity.ResponseTypeNack}, nil
return &dto.CohortResponse{ResponseType: dto.ResponseTypeNack}, nil
}
}

return &entity.CohortResponse{ResponseType: entity.ResponseTypeAck}, nil
return &dto.CohortResponse{ResponseType: dto.ResponseTypeAck}, nil
}

func isAllNodesAccepted(votes []*entity.Vote) bool {
func isAllNodesAccepted(votes []*dto.Vote) bool {
for _, v := range votes {
if !v.IsAccepted {
return false
Expand All @@ -158,31 +158,31 @@ func isAllNodesAccepted(votes []*entity.Vote) bool {
return true
}

func (c *Committer) Commit(ctx context.Context, req *entity.CommitRequest) (*entity.CohortResponse, error) {
func (c *Committer) Commit(ctx context.Context, req *dto.CommitRequest) (*dto.CohortResponse, error) {
c.precommitDone.signalToChan(atomic.LoadUint64(&c.height))

c.noAutoCommit[req.Height] = struct{}{}

var response *entity.CohortResponse
var response *dto.CohortResponse
if req.Height < atomic.LoadUint64(&c.height) {
return nil, status.Errorf(codes.AlreadyExists, "stale commit proposed by coordinator (got %d, but actual height is %d)", req.Height, c.height)
}
if c.commitHook(req) {
log.Printf("Committing on height: %d\n", req.Height)
key, value, ok := c.vlog.Get(req.Height)
if !ok {
return &entity.CohortResponse{ResponseType: entity.ResponseTypeNack}, fmt.Errorf("no value in node cache on the index %d", req.Height)
return &dto.CohortResponse{ResponseType: dto.ResponseTypeNack}, fmt.Errorf("no value in node cache on the index %d", req.Height)
}

if err := c.db.Put(key, value); err != nil {
return nil, err
}
response = &entity.CohortResponse{ResponseType: entity.ResponseTypeAck}
response = &dto.CohortResponse{ResponseType: dto.ResponseTypeAck}
} else {
response = &entity.CohortResponse{ResponseType: entity.ResponseTypeNack}
response = &dto.CohortResponse{ResponseType: dto.ResponseTypeNack}
}

if response.ResponseType == entity.ResponseTypeAck {
if response.ResponseType == dto.ResponseTypeAck {
fmt.Println("ack cohort", atomic.LoadUint64(&c.height))
atomic.AddUint64(&c.height, 1)
}
Expand Down
6 changes: 3 additions & 3 deletions core/cohort/commitalgo/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package hooks

import (
log "github.com/sirupsen/logrus"
"github.com/vadiminshakov/committer/core/entity"
"github.com/vadiminshakov/committer/core/dto"
)

func Propose(req *entity.ProposeRequest) bool {
func Propose(req *dto.ProposeRequest) bool {
log.Infof("propose hook on height %d is OK", req.Height)
return true
}

func Commit(req *entity.CommitRequest) bool {
func Commit(req *dto.CommitRequest) bool {
log.Infof("commit hook on height %d is OK", req.Height)
return true
}
6 changes: 3 additions & 3 deletions core/cohort/entity.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package cohort

import (
"github.com/vadiminshakov/committer/core/entity"
"github.com/vadiminshakov/committer/core/dto"
)

type PrecommitRequest struct {
Index uint64 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
Votes []*entity.Vote `protobuf:"bytes,2,rep,name=votes,proto3" json:"votes,omitempty"`
Index uint64 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
Votes []*dto.Vote `protobuf:"bytes,2,rep,name=votes,proto3" json:"votes,omitempty"`
}

type Mode string
Expand Down
24 changes: 12 additions & 12 deletions core/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/vadiminshakov/committer/config"
"github.com/vadiminshakov/committer/core/entity"
"github.com/vadiminshakov/committer/core/dto"
"github.com/vadiminshakov/committer/io/db"
"github.com/vadiminshakov/committer/io/gateway/grpc/client"
pb "github.com/vadiminshakov/committer/io/gateway/grpc/proto"
Expand Down Expand Up @@ -59,7 +59,7 @@ func New(conf *config.Config, vlog voteslog.Log, database db.Repository) (*coord
}, nil
}

func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastRequest) (*entity.BroadcastResponse, error) {
func (c *coordinatorImpl) Broadcast(ctx context.Context, req dto.BroadcastRequest) (*dto.BroadcastResponse, error) {
var (
err error
span zipkin.Span
Expand All @@ -73,19 +73,19 @@ func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastReq
// propose
log.Infof("propose key %s", req.Key)
if err := c.propose(ctx, req); err != nil {
return &entity.BroadcastResponse{Type: int32(pb.Type_NACK)}, errors.Wrap(err, "failed to send propose")
return &dto.BroadcastResponse{Type: dto.ResponseTypeNack}, errors.Wrap(err, "failed to send propose")
}

// precommit phase only for three-phase mode
log.Infof("precommit key %s", req.Key)
if err := c.preCommit(ctx, req); err != nil {
return &entity.BroadcastResponse{Type: int32(pb.Type_NACK)}, errors.Wrap(err, "failed to send precommit")
return &dto.BroadcastResponse{Type: dto.ResponseTypeNack}, errors.Wrap(err, "failed to send precommit")
}

// commit
log.Infof("commit %s", req.Key)
if err := c.commit(ctx); err != nil {
return &entity.BroadcastResponse{Type: int32(pb.Type_NACK)}, errors.Wrap(err, "failed to send commit")
return &dto.BroadcastResponse{Type: dto.ResponseTypeNack}, errors.Wrap(err, "failed to send commit")
}

log.Infof("coordinator got ack from all cohorts, committed key %s", req.Key)
Expand All @@ -96,24 +96,24 @@ func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastReq
return nil, status.Error(codes.Internal, "can't to find msg in the coordinator's cache")
}
if err = c.database.Put(key, value); err != nil {
return &entity.BroadcastResponse{Type: entity.ResponseNack}, status.Error(codes.Internal, "failed to save msg on coordinator")
return &dto.BroadcastResponse{Type: dto.ResponseTypeNack}, status.Error(codes.Internal, "failed to save msg on coordinator")
}

// increase height for next round
atomic.AddUint64(&c.height, 1)

return &entity.BroadcastResponse{entity.ResponseAck, c.height}, nil
return &dto.BroadcastResponse{Type: dto.ResponseTypeNack, Index: c.height}, nil
}

func (c *coordinatorImpl) propose(ctx context.Context, req entity.BroadcastRequest) error {
func (c *coordinatorImpl) propose(ctx context.Context, req dto.BroadcastRequest) error {
var ctype pb.CommitType
if c.config.CommitType == server.THREE_PHASE {
ctype = pb.CommitType_THREE_PHASE_COMMIT
} else {
ctype = pb.CommitType_TWO_PHASE_COMMIT
}

votes := make([]*entity.Vote, 0, len(c.followers))
votes := make([]*dto.Vote, 0, len(c.followers))
var span zipkin.Span
for nodename, follower := range c.followers {
if c.tracer != nil {
Expand All @@ -137,7 +137,7 @@ func (c *coordinatorImpl) propose(ctx context.Context, req entity.BroadcastReque
log.Errorf(err.Error())
isAccepted = false
}
votes = append(votes, &entity.Vote{
votes = append(votes, &dto.Vote{
Node: nodename,
IsAccepted: isAccepted,
})
Expand All @@ -161,7 +161,7 @@ func (c *coordinatorImpl) propose(ctx context.Context, req entity.BroadcastReque
return nil
}

func (c *coordinatorImpl) preCommit(ctx context.Context, req entity.BroadcastRequest) error {
func (c *coordinatorImpl) preCommit(ctx context.Context, req dto.BroadcastRequest) error {
if c.config.CommitType != server.THREE_PHASE {
return nil
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func (c *coordinatorImpl) Height() uint64 {
return c.height
}

func votesToProto(votes []*entity.Vote) []*pb.Vote {
func votesToProto(votes []*dto.Vote) []*pb.Vote {
pbvotes := make([]*pb.Vote, 0, len(votes))
for _, v := range votes {
pbvotes = append(pbvotes, &pb.Vote{
Expand Down
9 changes: 2 additions & 7 deletions core/entity/entity.go → core/dto/dto.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package entity
package dto

type ProposeRequest struct {
Key string
Expand Down Expand Up @@ -28,17 +28,12 @@ type Vote struct {
IsAccepted bool
}

const (
ResponseAck = iota
ResponseNack
)

type BroadcastRequest struct {
Key string
Value []byte
}

type BroadcastResponse struct {
Type int32
Type ResponseType
Index uint64
}
18 changes: 9 additions & 9 deletions io/gateway/grpc/server/dto.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
package server

import (
"github.com/vadiminshakov/committer/core/entity"
"github.com/vadiminshakov/committer/core/dto"
"github.com/vadiminshakov/committer/io/gateway/grpc/proto"
)

func proposeRequestPbToEntity(request *proto.ProposeRequest) *entity.ProposeRequest {
func proposeRequestPbToEntity(request *proto.ProposeRequest) *dto.ProposeRequest {
if request == nil {
return nil
}

return &entity.ProposeRequest{
return &dto.ProposeRequest{
Key: request.Key,
Value: request.Value,
Height: request.Index,
}
}

func commitRequestPbToEntity(request *proto.CommitRequest) *entity.CommitRequest {
func commitRequestPbToEntity(request *proto.CommitRequest) *dto.CommitRequest {
if request == nil {
return nil
}

return &entity.CommitRequest{
return &dto.CommitRequest{
Height: request.Index,
IsRollback: request.IsRollback,
}
}

func cohortResponseToProto(e *entity.CohortResponse) *proto.Response {
func cohortResponseToProto(e *dto.CohortResponse) *proto.Response {
if e == nil {
return nil
}
Expand All @@ -38,14 +38,14 @@ func cohortResponseToProto(e *entity.CohortResponse) *proto.Response {
}
}

func votesPbToEntity(votes []*proto.Vote) []*entity.Vote {
func votesPbToEntity(votes []*proto.Vote) []*dto.Vote {
if votes == nil {
return nil
}

newVotes := make([]*entity.Vote, 0, len(votes))
newVotes := make([]*dto.Vote, 0, len(votes))
for _, v := range votes {
newVotes = append(newVotes, &entity.Vote{
newVotes = append(newVotes, &dto.Vote{
Node: v.Node,
IsAccepted: v.IsAccepted,
})
Expand Down
Loading

0 comments on commit a76507e

Please sign in to comment.