Skip to content

Commit

Permalink
improve logs
Browse files Browse the repository at this point in the history
  • Loading branch information
vadiminshakov committed Oct 2, 2023
1 parent 4e1d920 commit 22de5e4
Showing 1 changed file with 29 additions and 9 deletions.
38 changes: 29 additions & 9 deletions voteslog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
)

const (
segmentThreshold = 1000
tmpIndexBufferThreshold = 500
segmentThreshold = 10
)

var ErrExists = errors.New("msg with such index already exists")

const maxSegments = 5

type FileVotesLog struct {
// append-only log with proposed messages that node consumed
msgs *os.File
Expand Down Expand Up @@ -53,6 +54,9 @@ type FileVotesLog struct {

// name of the old segment for msgs log
oldMsgsSegmentName string

segmentsNumberMsgs int
segmentsNumberVotes int
}

func NewOnDiskLog(dir string) (*FileVotesLog, error) {
Expand All @@ -66,19 +70,22 @@ func NewOnDiskLog(dir string) (*FileVotesLog, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to load msgs segments")
}
numberOfMsgsSegments := len(msgsIndex) / segmentThreshold

votes, statVotes, votesIndex, err := segmentInfoAndIndexVotes(voteSegmentsNumbers, path.Join(dir, "votes_"))
if err != nil {
return nil, errors.Wrap(err, "failed to load votes segments")
}
numberOfVotesSegments := len(votesIndex) / segmentThreshold

var bufMsgs bytes.Buffer
encMsgs := gob.NewEncoder(&bufMsgs)
var bufVotes bytes.Buffer
encVotes := gob.NewEncoder(&bufVotes)

return &FileVotesLog{msgs: msgs, votes: votes, indexMsgs: msgsIndex, tmpIndexMsgs: make(map[uint64]msg), indexVotes: votesIndex, bufMsgs: &bufMsgs, bufVotes: &bufVotes,
encMsgs: encMsgs, encVotes: encVotes, lastOffsetMsgs: statMsgs.Size(), lastOffsetVotes: statVotes.Size(), pathToLogsDir: dir}, nil
encMsgs: encMsgs, encVotes: encVotes, lastOffsetMsgs: statMsgs.Size(), lastOffsetVotes: statVotes.Size(), pathToLogsDir: dir,
segmentsNumberMsgs: numberOfMsgsSegments, segmentsNumberVotes: numberOfVotesSegments}, nil
}

func segmentInfoAndIndexMsg(segNumbers []int, path string) (*os.File, os.FileInfo, map[uint64]msg, error) {
Expand Down Expand Up @@ -283,21 +290,30 @@ func (c *FileVotesLog) Set(index uint64, key string, value []byte) error {
}

// rotate segment if threshold is reached
// (close current segment, open new one with incremented suffix in name, remove old segment)
if len(c.indexMsgs) == segmentThreshold {
// (close current segment, open new one with incremented suffix in name)
itemsAddedTotal := len(c.indexMsgs) + (c.segmentsNumberMsgs-1)*segmentThreshold
if itemsAddedTotal == segmentThreshold*c.segmentsNumberMsgs {
c.bufMsgs.Reset()
c.encMsgs = gob.NewEncoder(c.bufMsgs)
if err := c.msgs.Close(); err != nil {
return errors.Wrap(err, "failed to close msgs log file")
}
c.oldMsgsSegmentName = c.msgs.Name()

latestSegmentIndex := 0
if c.segmentsNumberMsgs >= maxSegments {
latestSegmentIndex = c.segmentsNumberMsgs - maxSegments
}
c.oldMsgsSegmentName = path.Join(c.pathToLogsDir, "msgs_"+strconv.Itoa(latestSegmentIndex))

segmentIndex, err := extractSegmentNum(c.msgs.Name())
if err != nil {
return errors.Wrap(err, "failed to extract segment number from msgs log file name")
}

c.msgs, err = os.OpenFile(path.Join(c.pathToLogsDir, "msgs_"+strconv.Itoa(segmentIndex+1)), os.O_RDWR|os.O_CREATE, 0755)
segmentIndex++
c.msgs, err = os.OpenFile(path.Join(c.pathToLogsDir, "msgs_"+strconv.Itoa(segmentIndex)), os.O_RDWR|os.O_CREATE, 0755)

c.segmentsNumberMsgs = segmentIndex + 1

c.lastOffsetMsgs = 0
}
Expand All @@ -322,9 +338,13 @@ func (c *FileVotesLog) Set(index uint64, key string, value []byte) error {
c.indexMsgs[index] = msg{index, key, value}

// if threshold is reached, start writing to tmp index buffer
if len(c.indexMsgs) > segmentThreshold {
multiplier := 1
if c.segmentsNumberMsgs > 1 {
multiplier = c.segmentsNumberMsgs - 1
}
if len(c.indexMsgs) > segmentThreshold*multiplier {
// if tmp index buffer is full, flush it to index and rm old segment and associated index file
if len(c.tmpIndexMsgs) == tmpIndexBufferThreshold {
if c.segmentsNumberMsgs > maxSegments {
c.indexMsgs = c.tmpIndexMsgs
c.tmpIndexMsgs = make(map[uint64]msg)

Expand Down

0 comments on commit 22de5e4

Please sign in to comment.