Skip to content

Commit

Permalink
cleaner code
Browse files Browse the repository at this point in the history
  • Loading branch information
vadiminshakov committed Dec 18, 2023
1 parent 3d32f98 commit 915fa77
Showing 1 changed file with 79 additions and 60 deletions.
139 changes: 79 additions & 60 deletions core/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,75 +70,24 @@ func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastReq
defer span.Finish()
}

var ctype pb.CommitType
if c.config.CommitType == server.THREE_PHASE {
ctype = pb.CommitType_THREE_PHASE_COMMIT
} else {
ctype = pb.CommitType_TWO_PHASE_COMMIT
}

// propose
log.Infof("propose key %s", req.Key)
if err := c.propose(ctx, req, ctype); err != nil {
return nil, errors.Wrap(err, "failed to send propose")
if err := c.propose(ctx, req); err != nil {
return &entity.BroadcastResponse{Type: int32(pb.Type_NACK)}, errors.Wrap(err, "failed to send propose")
}

// precommit phase only for three-phase mode
if c.config.CommitType == server.THREE_PHASE {
log.Infof("precommit key %s", req.Key)
for _, follower := range c.followers {
if c.tracer != nil {
span, ctx = c.tracer.StartSpanFromContext(ctx, "Precommit")
}

votes := votesToProto(c.vlog.GetVotes(c.height))
resp, err := follower.Precommit(ctx, &pb.PrecommitRequest{Index: c.height, Votes: votes})
if c.tracer != nil && span != nil {
span.Finish()
}
if err != nil {
log.Errorf(err.Error())
return &entity.BroadcastResponse{Type: int32(pb.Type_NACK)}, nil
}
if resp.Type != pb.Type_ACK {
return nil, status.Error(codes.Internal, "follower not acknowledged msg")
}
}

// block after precommit if needs
{
blockPrecommit, ok := ctx.Value("block").(string)
blockt, okblocktime := ctx.Value("blocktime").(string)
if ok && okblocktime {
if blockPrecommit == "precommit" {
dur, err := time.ParseDuration(blockt)
if err != nil {
return nil, err
}
time.Sleep(dur)
}
}
}
log.Infof("precommit key %s", req.Key)
if err := c.preCommit(ctx, req); err != nil {
return &entity.BroadcastResponse{Type: int32(pb.Type_NACK)}, errors.Wrap(err, "failed to send precommit")
}

// commit
log.Infof("commit %s", req.Key)
for _, follower := range c.followers {
if c.tracer != nil {
span, ctx = c.tracer.StartSpanFromContext(ctx, "Commit")
}
r, err := follower.Commit(ctx, &pb.CommitRequest{Index: c.height})
if c.tracer != nil && span != nil {
span.Finish()
}
if err != nil {
log.Errorf("coordinator failed to commit: %s", err.Error())
return &entity.BroadcastResponse{Type: entity.ResponseNack}, nil
}
if r.Type != pb.Type_ACK {
return nil, status.Error(codes.Internal, "follower not acknowledged msg")
}
if err := c.commit(ctx); err != nil {
return &entity.BroadcastResponse{Type: int32(pb.Type_NACK)}, errors.Wrap(err, "failed to send commit")
}

log.Infof("coordinator got ack from all cohorts, committed key %s", req.Key)

// the coordinator got all the answers, so it's time to persist msg and send commit command to followers
Expand All @@ -156,7 +105,14 @@ func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastReq
return &entity.BroadcastResponse{entity.ResponseAck, c.height}, nil
}

func (c *coordinatorImpl) propose(ctx context.Context, req entity.BroadcastRequest, ctype pb.CommitType) error {
func (c *coordinatorImpl) propose(ctx context.Context, req entity.BroadcastRequest) error {
var ctype pb.CommitType
if c.config.CommitType == server.THREE_PHASE {
ctype = pb.CommitType_THREE_PHASE_COMMIT
} else {
ctype = pb.CommitType_TWO_PHASE_COMMIT
}

votes := make([]*entity.Vote, 0, len(c.followers))
var span zipkin.Span
for nodename, follower := range c.followers {
Expand Down Expand Up @@ -205,6 +161,69 @@ func (c *coordinatorImpl) propose(ctx context.Context, req entity.BroadcastReque
return nil
}

func (c *coordinatorImpl) preCommit(ctx context.Context, req entity.BroadcastRequest) error {
if c.config.CommitType != server.THREE_PHASE {
return nil
}

var span zipkin.Span
for _, follower := range c.followers {
if c.tracer != nil {
span, ctx = c.tracer.StartSpanFromContext(ctx, "Precommit")
}

votes := votesToProto(c.vlog.GetVotes(c.height))
resp, err := follower.Precommit(ctx, &pb.PrecommitRequest{Index: c.height, Votes: votes})
if c.tracer != nil && span != nil {
span.Finish()
}
if err != nil {
return err
}
if resp.Type != pb.Type_ACK {
return status.Error(codes.Internal, "follower not acknowledged msg")
}
}

// block after precommit if needs
{
blockPrecommit, ok := ctx.Value("block").(string)
blockt, okblocktime := ctx.Value("blocktime").(string)
if ok && okblocktime {
if blockPrecommit == "precommit" {
dur, err := time.ParseDuration(blockt)
if err != nil {
return err
}
time.Sleep(dur)
}
}
}

return nil
}

func (c *coordinatorImpl) commit(ctx context.Context) error {
var span zipkin.Span
for _, follower := range c.followers {
if c.tracer != nil {
span, ctx = c.tracer.StartSpanFromContext(ctx, "Commit")
}
r, err := follower.Commit(ctx, &pb.CommitRequest{Index: c.height})
if c.tracer != nil && span != nil {
span.Finish()
}
if err != nil {
return err
}
if r.Type != pb.Type_ACK {
return status.Error(codes.Internal, "follower not acknowledged msg")
}
}

return nil
}

func (c *coordinatorImpl) Height() uint64 {
return c.height
}
Expand Down

0 comments on commit 915fa77

Please sign in to comment.