From 01a4ad695858bc1cf44eb3e5b9cc71d7e95ae7ba Mon Sep 17 00:00:00 2001 From: Vadim Inshakov Date: Sun, 8 Oct 2023 22:18:02 +0500 Subject: [PATCH] refactor --- voteslog/file.go | 315 ++++++++++++----------------------------------- 1 file changed, 78 insertions(+), 237 deletions(-) diff --git a/voteslog/file.go b/voteslog/file.go index 1777440..898cab7 100644 --- a/voteslog/file.go +++ b/voteslog/file.go @@ -6,17 +6,14 @@ import ( "fmt" "github.com/pkg/errors" "github.com/vadiminshakov/committer/core/entity" - "io" - "maps" "os" "path" - "sort" "strconv" "strings" ) const ( - segmentThreshold = 10 + segmentThreshold = 1000 ) var ErrExists = errors.New("msg with such index already exists") @@ -32,7 +29,8 @@ type FileVotesLog struct { indexMsgs map[uint64]msg tmpIndexMsgs map[uint64]msg // index that matches height of votes round record with offset in file - indexVotes map[uint64]votesMsg + indexVotes map[uint64]votesMsg + tmpIndexVotes map[uint64]votesMsg // gob encoder for proposed messages encMsgs *gob.Encoder @@ -53,9 +51,13 @@ type FileVotesLog struct { pathToLogsDir string // name of the old segment for msgs log - oldMsgsSegmentName string + oldestMsgsSegmentName string + // name of the old segment for votes log + oldestVotesSegmentName string - segmentsNumberMsgs int + // number of segments for msgs log + segmentsNumberMsgs int + // number of segments for votes log segmentsNumberVotes int } @@ -71,219 +73,31 @@ func NewOnDiskLog(dir string) (*FileVotesLog, error) { return nil, errors.Wrap(err, "failed to load msgs segments") } numberOfMsgsSegments := len(msgsIndex) / segmentThreshold + if numberOfMsgsSegments == 0 { + numberOfMsgsSegments = 1 + } votes, statVotes, votesIndex, err := segmentInfoAndIndexVotes(voteSegmentsNumbers, path.Join(dir, "votes_")) if err != nil { return nil, errors.Wrap(err, "failed to load votes segments") } numberOfVotesSegments := len(votesIndex) / segmentThreshold + if numberOfVotesSegments == 0 { + numberOfVotesSegments = 1 + } var bufMsgs bytes.Buffer encMsgs := gob.NewEncoder(&bufMsgs) var bufVotes bytes.Buffer encVotes := gob.NewEncoder(&bufVotes) - return &FileVotesLog{msgs: msgs, votes: votes, indexMsgs: msgsIndex, tmpIndexMsgs: make(map[uint64]msg), indexVotes: votesIndex, bufMsgs: &bufMsgs, bufVotes: &bufVotes, + return &FileVotesLog{msgs: msgs, votes: votes, indexMsgs: msgsIndex, tmpIndexMsgs: make(map[uint64]msg), + tmpIndexVotes: make(map[uint64]votesMsg), indexVotes: votesIndex, bufMsgs: &bufMsgs, bufVotes: &bufVotes, encMsgs: encMsgs, encVotes: encVotes, lastOffsetMsgs: statMsgs.Size(), lastOffsetVotes: statVotes.Size(), pathToLogsDir: dir, segmentsNumberMsgs: numberOfMsgsSegments, segmentsNumberVotes: numberOfVotesSegments}, nil } -func segmentInfoAndIndexMsg(segNumbers []int, path string) (*os.File, os.FileInfo, map[uint64]msg, error) { - index := make(map[uint64]msg) - var ( - logFileFD *os.File - logFileInfo os.FileInfo - idxFromSegment map[uint64]msg - err error - ) - for _, segindex := range segNumbers { - logFileFD, logFileInfo, idxFromSegment, err = loadSegmentMsg(path + strconv.Itoa(segindex)) - if err != nil { - return nil, nil, nil, errors.Wrap(err, "failed to load indexes from msg log file") - } - - maps.Copy(index, idxFromSegment) - } - - return logFileFD, logFileInfo, index, nil -} - -func segmentInfoAndIndexVotes(segNumbers []int, path string) (*os.File, os.FileInfo, map[uint64]votesMsg, error) { - index := make(map[uint64]votesMsg) - var ( - logFileFD *os.File - logFileInfo os.FileInfo - idxFromSegment map[uint64]votesMsg - err error - ) - for _, segindex := range segNumbers { - logFileFD, logFileInfo, idxFromSegment, err = loadSegmentVote(path + strconv.Itoa(segindex)) - if err != nil { - return nil, nil, nil, errors.Wrap(err, "failed to load indexes from msg log file") - } - - maps.Copy(index, idxFromSegment) - } - - return logFileFD, logFileInfo, index, nil -} - -func loadSegmentMsg(path string) (fd *os.File, fileinfo os.FileInfo, index map[uint64]msg, err error) { - fd, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0755) - if err != nil { - return nil, nil, nil, errors.Wrap(err, "failed to open log segment file") - } - - fileinfo, err = fd.Stat() - if err != nil { - return nil, nil, nil, errors.Wrap(err, "failed to read log segment file stat") - } - - index, err = loadIndexesMsg(fd, fileinfo) - if err != nil { - return nil, nil, nil, errors.Wrap(err, "failed to build index from log segment") - } - - return fd, fileinfo, index, nil -} - -func loadSegmentVote(path string) (fd *os.File, fileinfo os.FileInfo, index map[uint64]votesMsg, err error) { - fd, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0755) - if err != nil { - return nil, nil, nil, errors.Wrap(err, "failed to open log segment file") - } - - fileinfo, err = fd.Stat() - if err != nil { - return nil, nil, nil, errors.Wrap(err, "failed to read log segment file stat") - } - - index, err = loadIndexesVotes(fd, fileinfo) - if err != nil { - return nil, nil, nil, errors.Wrap(err, "failed to build index from log segment") - } - - return fd, fileinfo, index, nil -} - -func findSegmentNumbers(dir string) (msgSegmentsNumbers []int, voteSegmentsNumbers []int, err error) { - _, err = os.Stat(dir) - if os.IsNotExist(err) { - if err := os.Mkdir(dir, 0755); err != nil { - return nil, nil, errors.Wrap(err, "failed to create dir for votes log") - } - } - de, err := os.ReadDir(dir) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to read dir for votes log") - } - - msgSegmentsNumbers = make([]int, 0) - voteSegmentsNumbers = make([]int, 0) - for _, d := range de { - if d.IsDir() { - continue - } - if strings.HasPrefix(d.Name(), "msgs_") { - i, err := extractSegmentNum(d.Name()) - if err != nil { - return nil, nil, errors.Wrap(err, "initialization failed: failed to extract segment number from msgs log file name") - } - - msgSegmentsNumbers = append(msgSegmentsNumbers, i) - } - - if strings.HasPrefix(d.Name(), "votes_") { - i, err := extractSegmentNum(d.Name()) - if err != nil { - return nil, nil, errors.Wrap(err, "initialization failed: failed to extract segment number from votes log file name") - } - - voteSegmentsNumbers = append(voteSegmentsNumbers, i) - } - } - - sort.Slice(msgSegmentsNumbers, func(i, j int) bool { - return msgSegmentsNumbers[i] < msgSegmentsNumbers[j] - }) - sort.Slice(voteSegmentsNumbers, func(i, j int) bool { - return voteSegmentsNumbers[i] < voteSegmentsNumbers[j] - }) - - if len(msgSegmentsNumbers) == 0 { - msgSegmentsNumbers = append(msgSegmentsNumbers, 0) - } - if len(voteSegmentsNumbers) == 0 { - voteSegmentsNumbers = append(voteSegmentsNumbers, 0) - } - - return msgSegmentsNumbers, voteSegmentsNumbers, nil -} - -func loadIndexesMsg(file *os.File, stat os.FileInfo) (map[uint64]msg, error) { - buf := make([]byte, stat.Size()) - if n, err := file.Read(buf); err != nil { - if len(buf) == 0 && n == 0 && err == io.EOF { - return make(map[uint64]msg), nil - } else if err != io.EOF { - return nil, errors.Wrap(err, "failed to read log file") - } - } - - var msgs []msg - dec := gob.NewDecoder(bytes.NewReader(buf)) - for { - var msgIndexed msg - if err := dec.Decode(&msgIndexed); err != nil { - if err == io.EOF { - break - } - return nil, errors.Wrap(err, "failed to decode indexed msg from log") - } - msgs = append(msgs, msgIndexed) - } - - index := make(map[uint64]msg, len(msgs)) - for _, idxMsg := range msgs { - index[idxMsg.Idx] = idxMsg - } - - return index, nil -} - -func loadIndexesVotes(file *os.File, stat os.FileInfo) (map[uint64]votesMsg, error) { - buf := make([]byte, stat.Size()) - if n, err := file.Read(buf); err != nil { - if len(buf) == 0 && n == 0 && err == io.EOF { - return make(map[uint64]votesMsg), nil - } else if err != io.EOF { - return nil, errors.Wrap(err, "failed to read log file") - } - } - - var msgs []votesMsg - dec := gob.NewDecoder(bytes.NewReader(buf)) - for { - var msgIndexed votesMsg - if err := dec.Decode(&msgIndexed); err != nil { - if err != nil { - if err == io.EOF { - break - } - return nil, errors.Wrap(err, "failed to decode indexed msg from log") - } - } - msgs = append(msgs, msgIndexed) - } - - index := make(map[uint64]votesMsg, len(msgs)) - for _, idxMsg := range msgs { - index[idxMsg.Idx] = idxMsg - } - - return index, nil -} - +// Set writes key/value pair to the msgs log. func (c *FileVotesLog) Set(index uint64, key string, value []byte) error { if _, ok := c.indexMsgs[index]; ok { return ErrExists @@ -291,7 +105,10 @@ func (c *FileVotesLog) Set(index uint64, key string, value []byte) error { // rotate segment if threshold is reached // (close current segment, open new one with incremented suffix in name) - itemsAddedTotal := len(c.indexMsgs) + (c.segmentsNumberMsgs-1)*segmentThreshold + itemsAddedTotal := len(c.indexMsgs) + if itemsAddedTotal > maxSegments*segmentThreshold { + itemsAddedTotal = itemsAddedTotal + (c.segmentsNumberMsgs-1)*segmentThreshold + } if itemsAddedTotal == segmentThreshold*c.segmentsNumberMsgs { c.bufMsgs.Reset() c.encMsgs = gob.NewEncoder(c.bufMsgs) @@ -299,11 +116,7 @@ func (c *FileVotesLog) Set(index uint64, key string, value []byte) error { return errors.Wrap(err, "failed to close msgs log file") } - latestSegmentIndex := 0 - if c.segmentsNumberMsgs >= maxSegments { - latestSegmentIndex = c.segmentsNumberMsgs - maxSegments - } - c.oldMsgsSegmentName = path.Join(c.pathToLogsDir, "msgs_"+strconv.Itoa(latestSegmentIndex)) + c.oldestMsgsSegmentName = c.oldestSegmentName(c.segmentsNumberMsgs) segmentIndex, err := extractSegmentNum(c.msgs.Name()) if err != nil { @@ -312,7 +125,6 @@ func (c *FileVotesLog) Set(index uint64, key string, value []byte) error { segmentIndex++ c.msgs, err = os.OpenFile(path.Join(c.pathToLogsDir, "msgs_"+strconv.Itoa(segmentIndex)), os.O_RDWR|os.O_CREATE, 0755) - c.segmentsNumberMsgs = segmentIndex + 1 c.lastOffsetMsgs = 0 @@ -327,9 +139,9 @@ func (c *FileVotesLog) Set(index uint64, key string, value []byte) error { if err != nil { return errors.Wrap(err, "failed to write msg to log") } - if err := c.msgs.Sync(); err != nil { - return errors.Wrap(err, "failed to sync msg log file") - } + //if err := c.msgs.Sync(); err != nil { + // return errors.Wrap(err, "failed to sync msg log file") + //} c.lastOffsetMsgs += int64(c.bufMsgs.Len()) c.bufMsgs.Reset() @@ -337,27 +149,20 @@ func (c *FileVotesLog) Set(index uint64, key string, value []byte) error { // update index c.indexMsgs[index] = msg{index, key, value} - // if threshold is reached, start writing to tmp index buffer - multiplier := 1 - if c.segmentsNumberMsgs > 1 { - multiplier = c.segmentsNumberMsgs - 1 - } - if len(c.indexMsgs) > segmentThreshold*multiplier { - // if tmp index buffer is full, flush it to index and rm old segment and associated index file - if c.segmentsNumberMsgs > maxSegments { - c.indexMsgs = c.tmpIndexMsgs - c.tmpIndexMsgs = make(map[uint64]msg) - - if err = os.Remove(c.oldMsgsSegmentName); err != nil { - return errors.Wrapf(err, "failed to remove old segment %s", c.msgs.Name()) - } - return nil - } - c.tmpIndexMsgs[index] = msg{index, key, value} - } + c.rotateSegmentMsgs(msg{index, key, value}) + return nil } +// oldestSegmentName returns name of the oldest segment in the directory. +func (c *FileVotesLog) oldestSegmentName(numberOfSegments int) string { + latestSegmentIndex := 0 + if numberOfSegments >= maxSegments { + latestSegmentIndex = numberOfSegments - maxSegments + } + return path.Join(c.pathToLogsDir, "msgs_"+strconv.Itoa(int(latestSegmentIndex))) +} + func extractSegmentNum(segmentName string) (int, error) { _, suffix, ok := strings.Cut(segmentName, "_") if !ok { @@ -371,6 +176,7 @@ func extractSegmentNum(segmentName string) (int, error) { return i, nil } +// Get queries value at specific index in the msgs log. func (c *FileVotesLog) Get(index uint64) (string, []byte, bool) { msg, ok := c.indexMsgs[index] if !ok { @@ -380,30 +186,64 @@ func (c *FileVotesLog) Get(index uint64) (string, []byte, bool) { return msg.Key, msg.Value, true } +// SetVotes writes cohort votes to the votes log. func (c *FileVotesLog) SetVotes(index uint64, votes []*entity.Vote) error { if _, ok := c.indexVotes[index]; ok { return ErrExists } + + // rotate segment if threshold is reached + // (close current segment, open new one with incremented suffix in name) + itemsAddedTotal := len(c.indexMsgs) + if itemsAddedTotal > maxSegments*segmentThreshold { + itemsAddedTotal = itemsAddedTotal + (c.segmentsNumberVotes-1)*segmentThreshold + } + if itemsAddedTotal == segmentThreshold*c.segmentsNumberVotes { + c.bufVotes.Reset() + c.encVotes = gob.NewEncoder(c.bufVotes) + if err := c.votes.Close(); err != nil { + return errors.Wrap(err, "failed to close votes log file") + } + + c.oldestVotesSegmentName = c.oldestSegmentName(c.segmentsNumberVotes) + + segmentIndex, err := extractSegmentNum(c.votes.Name()) + if err != nil { + return errors.Wrap(err, "failed to extract segment number from votes log file name") + } + + segmentIndex++ + c.votes, err = os.OpenFile(path.Join(c.pathToLogsDir, "votes_"+strconv.Itoa(segmentIndex)), os.O_RDWR|os.O_CREATE, 0755) + c.segmentsNumberVotes = segmentIndex + 1 + + c.lastOffsetVotes = 0 + } + // gob encode key and value if err := c.encVotes.Encode(votesMsg{index, votes}); err != nil { - return errors.Wrap(err, "failed to encode msg for log") + return errors.Wrap(err, "failed to encode msg for votes log") } // write to log at last offset _, err := c.votes.WriteAt(c.bufVotes.Bytes(), c.lastOffsetVotes) if err != nil { - return errors.Wrap(err, "failed to write msg to log") - } - if err := c.votes.Sync(); err != nil { - return errors.Wrap(err, "failed to sync votes log file") + return errors.Wrap(err, "failed to write msg to votes log") } + //if err := c.votes.Sync(); err != nil { + // return errors.Wrap(err, "failed to sync votes log file") + //} c.lastOffsetVotes += int64(c.bufVotes.Len()) c.bufVotes.Reset() + // update index c.indexVotes[index] = votesMsg{index, votes} + + c.rotateSegmentVotes(votesMsg{index, votes}) + return nil } +// GetVotes queries cohort votes at specific index in the votes log. func (c *FileVotesLog) GetVotes(index uint64) []*entity.Vote { msg, ok := c.indexVotes[index] if !ok { @@ -413,6 +253,7 @@ func (c *FileVotesLog) GetVotes(index uint64) []*entity.Vote { return msg.Votes } +// Close closes log files. func (c *FileVotesLog) Close() error { if err := c.msgs.Close(); err != nil { return errors.Wrap(err, "failed to close msgs log file")