Skip to content

Commit

Permalink
Merge pull request #5 from vadiminshakov/voteslog
Browse files Browse the repository at this point in the history
add durable log
  • Loading branch information
vadiminshakov authored Sep 16, 2023
2 parents 6823555 + a3546c7 commit e717c49
Show file tree
Hide file tree
Showing 13 changed files with 661 additions and 88 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
# This workflow contains a single job called "build"
run-tests:
# The type of runner that the job will run on
runs-on: ubuntu-latest
runs-on: golang-latest

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
Expand Down
19 changes: 5 additions & 14 deletions core/cohort/commitalgo/commitalgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Committer struct {
commitHook func(req *entity.CommitRequest) bool
height uint64
db db.Repository
nodeCache *voteslog.VotesLog
vlog voteslog.Log
noAutoCommit map[uint64]struct{}
timeout uint64
precommitDone pendingPrecommit
Expand Down Expand Up @@ -75,7 +75,7 @@ func (p *pendingPrecommit) signalToChan(height uint64) {
}
}

func NewCommitter(d db.Repository, nodeCache *voteslog.VotesLog,
func NewCommitter(d db.Repository, vlog voteslog.Log,
proposeHook func(req *entity.ProposeRequest) bool,
commitHook func(req *entity.CommitRequest) bool,
timeout uint64) *Committer {
Expand All @@ -84,7 +84,7 @@ func NewCommitter(d db.Repository, nodeCache *voteslog.VotesLog,
precommitHook: nil,
commitHook: commitHook,
db: d,
nodeCache: nodeCache,
vlog: vlog,
noAutoCommit: make(map[uint64]struct{}),
timeout: timeout,
precommitDone: newPendingPrecommit(),
Expand All @@ -103,7 +103,7 @@ func (c *Committer) Propose(_ context.Context, req *entity.ProposeRequest) (*ent

if c.proposeHook(req) {
log.Infof("received: %s=%s\n", req.Key, string(req.Value))
c.nodeCache.Set(req.Height, req.Key, req.Value)
c.vlog.Set(req.Height, req.Key, req.Value)
response = &entity.CohortResponse{ResponseType: entity.ResponseTypeAck, Height: req.Height}
} else {
response = &entity.CohortResponse{ResponseType: entity.ResponseTypeNack, Height: req.Height}
Expand All @@ -130,7 +130,6 @@ func (c *Committer) Precommit(ctx context.Context, index uint64, votes []*entity
md := metadata.Pairs("mode", "autocommit")
ctx := metadata.NewOutgoingContext(context.Background(), md)
if !isAllNodesAccepted(votes) {
c.rollback()
break ForLoop
}
c.Commit(ctx, &entity.CommitRequest{Height: index})
Expand Down Expand Up @@ -163,17 +162,14 @@ func (c *Committer) Commit(ctx context.Context, req *entity.CommitRequest) (*ent
c.precommitDone.signalToChan(atomic.LoadUint64(&c.height))

c.noAutoCommit[req.Height] = struct{}{}
if req.IsRollback {
c.rollback()
}

var response *entity.CohortResponse
if req.Height < atomic.LoadUint64(&c.height) {
return nil, status.Errorf(codes.AlreadyExists, "stale commit proposed by coordinator (got %d, but actual height is %d)", req.Height, c.height)
}
if c.commitHook(req) {
log.Printf("Committing on height: %d\n", req.Height)
key, value, ok := c.nodeCache.Get(req.Height)
key, value, ok := c.vlog.Get(req.Height)
if !ok {
return &entity.CohortResponse{ResponseType: entity.ResponseTypeNack}, fmt.Errorf("no value in node cache on the index %d", req.Height)
}
Expand All @@ -183,7 +179,6 @@ func (c *Committer) Commit(ctx context.Context, req *entity.CommitRequest) (*ent
}
response = &entity.CohortResponse{ResponseType: entity.ResponseTypeAck}
} else {
c.nodeCache.Delete(req.Height)
response = &entity.CohortResponse{ResponseType: entity.ResponseTypeNack}
}

Expand All @@ -193,7 +188,3 @@ func (c *Committer) Commit(ctx context.Context, req *entity.CommitRequest) (*ent
}
return response, nil
}

func (c *Committer) rollback() {
c.nodeCache.Delete(c.height)
}
17 changes: 10 additions & 7 deletions core/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/openzipkin/zipkin-go"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/vadiminshakov/committer/config"
"github.com/vadiminshakov/committer/core/entity"
Expand All @@ -24,11 +25,11 @@ type coordinatorImpl struct {
tracer *zipkin.Tracer
config *config.Config
height uint64
nodeCache *voteslog.VotesLog
vlog voteslog.Log
database db.Repository
}

func New(conf *config.Config, vlog *voteslog.VotesLog, database db.Repository) (*coordinatorImpl, error) {
func New(conf *config.Config, vlog voteslog.Log, database db.Repository) (*coordinatorImpl, error) {
var tracer *zipkin.Tracer
var err error
if conf.WithTrace {
Expand All @@ -52,7 +53,7 @@ func New(conf *config.Config, vlog *voteslog.VotesLog, database db.Repository) (
followers: flwrs,
tracer: tracer,
height: 0,
nodeCache: vlog,
vlog: vlog,
database: database,
config: conf,
}, nil
Expand Down Expand Up @@ -117,8 +118,10 @@ func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastReq
}
}
}
c.nodeCache.Set(c.height, req.Key, req.Value)
c.nodeCache.SetVotes(c.height, votes)
if err := c.vlog.Set(c.height, req.Key, req.Value); err != nil {
return nil, errors.Wrap(err, "failed to save msg in the coordinator's log")
}
c.vlog.SetVotes(c.height, votes)

// precommit phase only for three-phase mode
if c.config.CommitType == server.THREE_PHASE {
Expand All @@ -128,7 +131,7 @@ func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastReq
span, ctx = c.tracer.StartSpanFromContext(ctx, "Precommit")
}

votes := votesToProto(c.nodeCache.GetVotes(c.height))
votes := votesToProto(c.vlog.GetVotes(c.height))
resp, err := follower.Precommit(ctx, &pb.PrecommitRequest{Index: c.height, Votes: votes})
if c.tracer != nil && span != nil {
span.Finish()
Expand Down Expand Up @@ -179,7 +182,7 @@ func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastReq
log.Infof("coordinator got ack from all cohorts, committed key %s", req.Key)

// the coordinator got all the answers, so it's time to persist msg and send commit command to followers
key, value, ok := c.nodeCache.Get(c.height)
key, value, ok := c.vlog.Get(c.height)
if !ok {
return nil, status.Error(codes.Internal, "can't to find msg in the coordinator's cache")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/vadiminshakov/committer

go 1.20
go 1.21

require (
github.com/dgraph-io/badger/v2 v2.0.3
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
Expand All @@ -32,6 +33,7 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
Expand All @@ -41,12 +43,16 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A=
github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down Expand Up @@ -107,6 +113,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
15 changes: 12 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@ func main() {
panic(err)
}

c := voteslog.New()
coordImpl, err := coordinator.New(conf, c, database)
var l voteslog.Log
if t := os.Getenv("LOG"); t == "disk" {
l, err = voteslog.NewOnDiskLog("./logdata")
if err != nil {
panic(err)
}
} else {
l = voteslog.NewInmemLog()
}

coordImpl, err := coordinator.New(conf, l, database)
if err != nil {
panic(err)
}
Expand All @@ -42,7 +51,7 @@ func main() {
}
}

committer := commitalgo.NewCommitter(database, c, hooks.Propose, hooks.Commit, conf.Timeout)
committer := commitalgo.NewCommitter(database, l, hooks.Propose, hooks.Commit, conf.Timeout)
cohortImpl := cohort.NewCohort(tracer, committer, cohort.Mode(conf.CommitType))

s, err := server.New(conf, tracer, cohortImpl, coordImpl, database)
Expand Down
23 changes: 21 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,19 @@ func startnodes(block int, commitType pb.CommitType) func() error {
panic(err)
}
}
if _, err := os.Stat("./tmp"); !os.IsNotExist(err) {
// del dir
err := os.RemoveAll("./tmp")
if err != nil {
panic(err)
}
}

os.Mkdir(COORDINATOR_BADGER, os.FileMode(0777))
os.Mkdir(FOLLOWER_BADGER, os.FileMode(0777))
os.Mkdir("./tmp", os.FileMode(0777))
os.Mkdir("./tmp/cohort", os.FileMode(0777))
os.Mkdir("./tmp/coord", os.FileMode(0777))

var blocking grpc.UnaryServerInterceptor
switch block {
Expand Down Expand Up @@ -323,7 +333,10 @@ func startnodes(block int, commitType pb.CommitType) func() error {
}
}

c := voteslog.New()
c, err := voteslog.NewOnDiskLog("./tmp/cohort/" + strconv.Itoa(i))
if err != nil {
panic(err)
}
committer := commitalgo.NewCommitter(database, c, hooks.Propose, hooks.Commit, node.Timeout)
cohortImpl := cohort.NewCohort(tracer, committer, cohort.Mode(node.CommitType))

Expand Down Expand Up @@ -352,7 +365,10 @@ func startnodes(block int, commitType pb.CommitType) func() error {
panic(err)
}

c := voteslog.New()
c, err := voteslog.NewOnDiskLog("./tmp/coord/" + strconv.Itoa(i))
if err != nil {
panic(err)
}
coord, err := coordinator.New(coordConfig, c, database)
if err != nil {
panic(err)
Expand Down Expand Up @@ -385,6 +401,9 @@ func startnodes(block int, commitType pb.CommitType) func() error {
for _, f := range stopfuncs {
f()
}
if err := os.RemoveAll("./tmp"); err != nil {
return err
}
return os.RemoveAll(BADGER_DIR)
}
}
Loading

0 comments on commit e717c49

Please sign in to comment.