Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement WAL replay and markers for loki.write #5590

Merged
merged 31 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e84ca0f
marker changes copied
thepalbi Oct 24, 2023
f07aeae
using markers from paulin PR
thepalbi Oct 24, 2023
4ad23eb
adding markers in queue client and batches
thepalbi Oct 24, 2023
7dc9859
marker handler hooked up
thepalbi Oct 24, 2023
7bbf1ba
clean up test tests
thepalbi Oct 24, 2023
828d689
hook up send data call
thepalbi Oct 24, 2023
97a560f
added markers tests
thepalbi Oct 24, 2023
08bca2d
post rebase fixes
thepalbi Oct 26, 2023
fba1acc
make marker file and folder name consts
thepalbi Oct 26, 2023
2814b86
marker handler comments and graceful stop
thepalbi Oct 26, 2023
f3cde09
address watcher comments
thepalbi Oct 26, 2023
1f83f50
some comments
thepalbi Oct 30, 2023
9c24f38
metric and some comment
thepalbi Oct 30, 2023
d464e10
small refactor in queue client
thepalbi Oct 30, 2023
a728d35
redone last markable segment algo
thepalbi Oct 31, 2023
a0efc6b
add comment explaining algo
thepalbi Oct 31, 2023
a96f05c
filepath for windows tests
thepalbi Oct 31, 2023
97e87de
Logs WAL marker encoding and atomic swap (#5655)
thepalbi Nov 1, 2023
8588219
limiter to markup routine
thepalbi Nov 1, 2023
102cc1e
fix tests
thepalbi Nov 1, 2023
43e7f54
limit find executions
thepalbi Nov 1, 2023
a6d24d5
wip pr comments
thepalbi Nov 1, 2023
2b80969
fixed file marker tests
thepalbi Nov 1, 2023
69d0d42
fix linter
thepalbi Nov 1, 2023
00e19ec
comments
thepalbi Nov 1, 2023
f37a0b8
fix race in mock marker file handler
thepalbi Nov 1, 2023
f730369
added bench
thepalbi Nov 1, 2023
0fee315
added no replay test
thepalbi Nov 1, 2023
9224ccc
marker metrics, test, lint and final comments
thepalbi Nov 2, 2023
c6f2a64
remove correct perms logic
thepalbi Nov 2, 2023
08b291f
missing arg in constructor
thepalbi Nov 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions component/common/loki/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ const (
errMaxStreamsLimitExceeded = "streams limit exceeded, streams: %d exceeds limit: %d, stream: '%s'"
)

// SentDataMarkerHandler is a slice of the MarkerHandler interface, that the batch interacts with to report the event that
// all data in the batch has been delivered or a client failed to do so.
type SentDataMarkerHandler interface {
UpdateSentData(segmentId, dataCount int)
}

// batch holds pending log streams waiting to be sent to Loki, and it's used
// to reduce the number of push requests to Loki aggregating multiple log streams
// and entries in a single batch request. In case of multi-tenant Promtail, log
Expand All @@ -30,14 +36,18 @@ type batch struct {
createdAt time.Time

maxStreams int

// segmentCounter tracks the amount of entries for each segment present in this batch.
segmentCounter map[int]int
}

func newBatch(maxStreams int, entries ...loki.Entry) *batch {
b := &batch{
streams: map[string]*logproto.Stream{},
totalBytes: 0,
createdAt: time.Now(),
maxStreams: maxStreams,
streams: map[string]*logproto.Stream{},
totalBytes: 0,
createdAt: time.Now(),
maxStreams: maxStreams,
segmentCounter: map[int]int{},
}

// Add entries to the batch
Expand Down Expand Up @@ -72,14 +82,16 @@ func (b *batch) add(entry loki.Entry) error {
return nil
}

// add an entry to the batch
func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry) error {
// addFromWAL adds an entry to the batch, tracking that the data being added comes from segment segmentNum read from the
// WAL.
func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum int) error {
b.totalBytes += len(entry.Line)

// Append the entry to an already existing stream (if any)
labels := labelsMapToString(lbs, ReservedLabelTenantID)
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, entry)
b.countForSegment(segmentNum)
return nil
}

Expand All @@ -93,6 +105,7 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry) error {
Labels: labels,
Entries: []logproto.Entry{entry},
}
b.countForSegment(segmentNum)

return nil
}
Expand Down Expand Up @@ -171,3 +184,20 @@ func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
}
return &req, entriesCount
}

// countForSegment tracks that one data item has been read from a certain WAL segment.
func (b *batch) countForSegment(segmentNum int) {
if curr, ok := b.segmentCounter[segmentNum]; ok {
b.segmentCounter[segmentNum] = curr + 1
return
}
b.segmentCounter[segmentNum] = 1
}

// reportAsSentData will report for all segments whose data is part of this batch, the amount of that data as sent to
// the provided SentDataMarkerHandler
func (b *batch) reportAsSentData(h SentDataMarkerHandler) {
for seg, data := range b.segmentCounter {
h.UpdateSentData(seg, data)
}
}
55 changes: 55 additions & 0 deletions component/common/loki/client/internal/marker_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package internal

import (
"encoding/binary"
"fmt"
"hash/crc32"
)

var (
markerHeaderV1 = []byte{'0', '1'}
)

// EncodeMarkerV1 encodes the segment number, from whom we need to create a marker, in the marker file format,
// which in v1 includes the segment number and a trailing CRC code of the first 10 bytes.
func EncodeMarkerV1(segment uint64) ([]byte, error) {
// marker format v1
// marker [ 0 , 1 ] - HEADER, which is used to track version
// marker [ 2 , 9 ] - encoded unit 64 which is the content of the marker, the last "consumed" segment
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
// marker [ 10, 13 ] - CRC32 of the first 10 bytes of the marker, using IEEE polynomial
bs := make([]byte, 14)
// write header with marker format version
bs[0] = markerHeaderV1[0]
bs[1] = markerHeaderV1[1]
// write actual marked segment number
binary.BigEndian.PutUint64(bs[2:10], segment)
// checksum is the IEEE CRC32 checksum of the first 10 bytes of the marker record
checksum := crc32.ChecksumIEEE(bs[0:10])
binary.BigEndian.PutUint32(bs[10:], checksum)

return bs, nil
}

// DecodeMarkerV1 decodes the segment number from a segment marker, encoded with EncodeMarkerV1.
func DecodeMarkerV1(bs []byte) (uint64, error) {
// first check that read byte stream has expected length
if len(bs) != 14 {
return 0, fmt.Errorf("bad length %d", len(bs))
}

// check CRC first
expectedCrc := crc32.ChecksumIEEE(bs[0:10])
gotCrc := binary.BigEndian.Uint32(bs[len(bs)-4:])
if expectedCrc != gotCrc {
return 0, fmt.Errorf("corrupted WAL marker")
}

// check expected version header
header := bs[:2]
if !(header[0] == markerHeaderV1[0] && header[1] == markerHeaderV1[1]) {
return 0, fmt.Errorf("wrong WAL marker header")
}

// lastly, decode marked segment number
return binary.BigEndian.Uint64(bs[2:10]), nil
}
50 changes: 50 additions & 0 deletions component/common/loki/client/internal/marker_encoding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package internal

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMarkerEncodingV1(t *testing.T) {
t.Run("encode and decode", func(t *testing.T) {
segment := uint64(123)
bs, err := EncodeMarkerV1(segment)
require.NoError(t, err)

gotSegment, err := DecodeMarkerV1(bs)
require.NoError(t, err)
require.Equal(t, segment, gotSegment)
})

t.Run("decoding errors", func(t *testing.T) {
t.Run("bad checksum", func(t *testing.T) {
segment := uint64(123)
bs, err := EncodeMarkerV1(segment)
require.NoError(t, err)

// change last byte
bs[13] = '5'

_, err = DecodeMarkerV1(bs)
require.Error(t, err)
})

t.Run("bad length", func(t *testing.T) {
_, err := DecodeMarkerV1(make([]byte, 15))
require.Error(t, err)
})

t.Run("bad header", func(t *testing.T) {
segment := uint64(123)
bs, err := EncodeMarkerV1(segment)
require.NoError(t, err)

// change first header byte
bs[0] = '5'

_, err = DecodeMarkerV1(bs)
require.Error(t, err)
})
})
}
101 changes: 101 additions & 0 deletions component/common/loki/client/internal/marker_file_handler.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone deletes the WAL, but does not delete the marker file, would the WAL replay code work ok? I suppose in that case the code should know to ignore the segment ID in the marker file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the marking logic is not connected to the WAL, so it can tell if it was deleted or not. I guess for WAL deletions we should advice that the marker has to be deleted as well.

On the other hand, the watcher, once it reads from the marker, tries to find the following segment by looking into disk, so since the WAL doesn't exists it will enter a retry loop until the WAL get's re-created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add an additional logic to the marker interface, so the WAL can force write to the marker file. Like if it detects WAL deletion, force mark a segment

Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package internal

import (
"bytes"
"fmt"
"os"
"path/filepath"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component/common/loki/wal"
"github.com/natefinch/atomic"
)

const (
MarkerFolderName = "remote"
MarkerFileName = "segment_marker"

MarkerFolderMode os.FileMode = 0o700
MarkerFileMode os.FileMode = 0o600
)

// MarkerFileHandler is a file-backed wal.Marker, that also allows one to write to the backing store as particular
// segment number as the last one marked.
type MarkerFileHandler interface {
wal.Marker

// MarkSegment writes in the backing file-store that a particular segment is the last one marked.
MarkSegment(segment int)
}

type markerFileHandler struct {
logger log.Logger
lastMarkedSegmentDir string
lastMarkedSegmentFilePath string
}

var (
_ MarkerFileHandler = (*markerFileHandler)(nil)
)

// NewMarkerFileHandler creates a new markerFileHandler.
func NewMarkerFileHandler(logger log.Logger, walDir string) (MarkerFileHandler, error) {
markerDir := filepath.Join(walDir, MarkerFolderName)
// attempt to create dir if doesn't exist
if err := os.MkdirAll(markerDir, MarkerFolderMode); err != nil {
return nil, fmt.Errorf("error creating segment marker folder %q: %w", markerDir, err)
}

mfh := &markerFileHandler{
logger: logger,
lastMarkedSegmentDir: filepath.Join(markerDir),
lastMarkedSegmentFilePath: filepath.Join(markerDir, MarkerFileName),
}

return mfh, nil
}

// LastMarkedSegment implements wlog.Marker.
func (mfh *markerFileHandler) LastMarkedSegment() int {
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
bs, err := os.ReadFile(mfh.lastMarkedSegmentFilePath)
if os.IsNotExist(err) {
level.Warn(mfh.logger).Log("msg", "marker segment file does not exist", "file", mfh.lastMarkedSegmentFilePath)
return -1
} else if err != nil {
level.Error(mfh.logger).Log("msg", "could not access segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return -1
}

savedSegment, err := DecodeMarkerV1(bs)
if err != nil {
level.Error(mfh.logger).Log("msg", "could not decode segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return -1
}

return int(savedSegment)
}

// MarkSegment implements MarkerHandler.
func (mfh *markerFileHandler) MarkSegment(segment int) {
encodedMarker, err := EncodeMarkerV1(uint64(segment))
if err != nil {
level.Error(mfh.logger).Log("msg", "failed to encode marker when marking segment", "err", err)
return
}

if err := mfh.atomicallyWriteMarker(encodedMarker); err != nil {
level.Error(mfh.logger).Log("msg", "could not replace segment marker file", "file", mfh.lastMarkedSegmentFilePath, "err", err)
return
}

level.Debug(mfh.logger).Log("msg", "updated segment marker file", "file", mfh.lastMarkedSegmentFilePath, "segment", segment)
}

// atomicallyWriteMarker attempts to perform an atomic write of the marker contents. This is delegated to
// https://github.com/natefinch/atomic/blob/master/atomic.go, that first handles atomic file renaming for UNIX and
// Windows systems. Also, atomic.WriteFile will first write the contents to a temporal file, and then perform the atomic
// rename, swapping the marker, or not at all.
func (mfh *markerFileHandler) atomicallyWriteMarker(bs []byte) error {
return atomic.WriteFile(mfh.lastMarkedSegmentFilePath, bytes.NewReader(bs))
}
66 changes: 66 additions & 0 deletions component/common/loki/client/internal/marker_file_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package internal

import (
"os"
"path/filepath"
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
)

func TestMarkerFileHandler(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
getTempDir := func(t *testing.T) string {
dir := t.TempDir()
return dir
}

t.Run("invalid last marked segment when there's no marker file", func(t *testing.T) {
dir := getTempDir(t)
fh, err := NewMarkerFileHandler(logger, dir)
require.NoError(t, err)

require.Equal(t, -1, fh.LastMarkedSegment())
})

t.Run("reads the last segment from existing marker file", func(t *testing.T) {
dir := getTempDir(t)
fh, err := NewMarkerFileHandler(logger, dir)
require.NoError(t, err)

// write first something to marker
markerFile := filepath.Join(dir, MarkerFolderName, MarkerFileName)
bs, err := EncodeMarkerV1(10)
require.NoError(t, err)
err = os.WriteFile(markerFile, bs, MarkerFileMode)
require.NoError(t, err)

require.Equal(t, 10, fh.LastMarkedSegment())
})

t.Run("marks segment, and then reads value from it", func(t *testing.T) {
dir := getTempDir(t)
fh, err := NewMarkerFileHandler(logger, dir)
require.NoError(t, err)

fh.MarkSegment(12)
require.Equal(t, 12, fh.LastMarkedSegment())
})

t.Run("marker file and directory is created with correct permissions", func(t *testing.T) {
dir := getTempDir(t)
fh, err := NewMarkerFileHandler(logger, dir)
require.NoError(t, err)

fh.MarkSegment(12)
// check folder first
stats, err := os.Stat(filepath.Join(dir, MarkerFolderName))
require.NoError(t, err)
require.Equal(t, MarkerFolderMode, stats.Mode().Perm())
// then file
stats, err = os.Stat(filepath.Join(dir, MarkerFolderName, MarkerFileName))
require.NoError(t, err)
require.Equal(t, MarkerFileMode, stats.Mode().Perm())
})
}
Loading