Skip to content

Commit

Permalink
For Loki Sources position files: Use a best effort atomic rename (#5772)
Browse files Browse the repository at this point in the history
* Use a best effort atomic rename, this should work in all but the most edge cases.

* Add changelog

* Add changelog and fix write positions file for loki sources.

* Add error handling and details on promtail version.
  • Loading branch information
mattdurham authored Nov 16, 2023
1 parent 461a4b2 commit a608dd6
Show file tree
Hide file tree
Showing 6 changed files with 448 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ Main (unreleased)

- Change User-Agent header for outbound requests to include agent-mode, goos, and deployment mode. Example `GrafanaAgent/v0.38.0 (flow; linux; docker)` (@captncraig)

- `loki.source.windowsevent` and `loki.source.*` changed to use a more robust positions file to prevent corruption on reboots when writing
the positions file. (@mattdurham)

v0.37.4 (2023-11-06)
-----------------

Expand Down
17 changes: 3 additions & 14 deletions component/common/loki/positions/write_positions_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,18 @@ package positions
// same place in case of a restart.

import (
"os"
"path/filepath"

"bytes"
"github.com/natefinch/atomic"
yaml "gopkg.in/yaml.v2"
)

// writePositionFile is a fallback for Windows because renameio does not support Windows.
// See https://github.com/google/renameio#windows-support
func writePositionFile(filename string, positions map[Entry]string) error {
buf, err := yaml.Marshal(File{
Positions: positions,
})
if err != nil {
return err
}
return atomic.WriteFile(filename, bytes.NewReader(buf))

target := filepath.Clean(filename)
temp := target + "-new"

err = os.WriteFile(temp, buf, os.FileMode(positionFileMode))
if err != nil {
return err
}

return os.Rename(temp, target)
}
89 changes: 89 additions & 0 deletions component/loki/source/windowsevent/bookmark.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//go:build windows
// +build windows

// This code is copied from Promtail v1.6.2-0.20231004111112-07cbef92268a with minor changes.

package windowsevent

import (
"bytes"
"errors"
"github.com/natefinch/atomic"
"io"
"io/fs"
"os"

"github.com/grafana/loki/clients/pkg/promtail/targets/windows/win_eventlog"
)

type bookMark struct {
handle win_eventlog.EvtHandle
isNew bool
path string
buf []byte
}

// newBookMark creates a new windows event bookmark.
// The bookmark will be saved at the given path. Use save to save the current position for a given event.
func newBookMark(path string) (*bookMark, error) {
// 16kb buffer for rendering bookmark
buf := make([]byte, 16<<10)

_, err := os.Stat(path)
// creates a new bookmark file if none exists.
if errors.Is(err, fs.ErrNotExist) {
_, err := os.Create(path)
if err != nil {
return nil, err
}
bm, err := win_eventlog.CreateBookmark("")
if err != nil {
return nil, err
}
return &bookMark{
handle: bm,
path: path,
isNew: true,
buf: buf,
}, nil
}
if err != nil {
return nil, err
}
// otherwise open the current one.
file, err := os.OpenFile(path, os.O_RDWR, 0666)
if err != nil {
return nil, err
}
fileContent, err := io.ReadAll(file)
if err != nil {
return nil, err
}
fileString := string(fileContent)
// load the current bookmark.
bm, err := win_eventlog.CreateBookmark(fileString)
if err != nil {
// If we errored likely due to incorrect data then create a blank one
bm, err = win_eventlog.CreateBookmark("")
fileString = ""
// This should never fail but just in case.
if err != nil {
return nil, err
}
}
return &bookMark{
handle: bm,
path: path,
isNew: fileString == "",
buf: buf,
}, nil
}

// save Saves the bookmark at the current event position.
func (b *bookMark) save(event win_eventlog.EvtHandle) error {
newBookmark, err := win_eventlog.UpdateBookmark(b.handle, event, b.buf)
if err != nil {
return err
}
return atomic.WriteFile(b.path, bytes.NewReader([]byte(newBookmark)))
}
5 changes: 2 additions & 3 deletions component/loki/source/windowsevent/component_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/grafana/agent/component/common/loki/utils"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/windows"
)

func init() {
Expand All @@ -35,7 +34,7 @@ type Component struct {

mut sync.RWMutex
args Arguments
target *windows.Target
target *Target
handle *handler
receivers []loki.LogsReceiver
}
Expand Down Expand Up @@ -123,7 +122,7 @@ func (c *Component) Update(args component.Arguments) error {
_ = f.Close()
}

winTarget, err := windows.New(c.opts.Logger, c.handle, nil, convertConfig(newArgs))
winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs))
if err != nil {
return err
}
Expand Down
121 changes: 121 additions & 0 deletions component/loki/source/windowsevent/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//go:build windows
// +build windows

// This code is copied from Promtail v1.6.2-0.20231004111112-07cbef92268a with minor changes.

package windowsevent

import (
"fmt"
"syscall"

jsoniter "github.com/json-iterator/go"

"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/windows/win_eventlog"
)

type Event struct {
Source string `json:"source,omitempty"`
Channel string `json:"channel,omitempty"`
Computer string `json:"computer,omitempty"`
EventID int `json:"event_id,omitempty"`
Version int `json:"version,omitempty"`

Level int `json:"level,omitempty"`
Task int `json:"task,omitempty"`
Opcode int `json:"opCode,omitempty"`

LevelText string `json:"levelText,omitempty"`
TaskText string `json:"taskText,omitempty"`
OpcodeText string `json:"opCodeText,omitempty"`

Keywords string `json:"keywords,omitempty"`
TimeCreated string `json:"timeCreated,omitempty"`
EventRecordID int `json:"eventRecordID,omitempty"`
Correlation *Correlation `json:"correlation,omitempty"`
Execution *Execution `json:"execution,omitempty"`

Security *Security `json:"security,omitempty"`
UserData string `json:"user_data,omitempty"`
EventData string `json:"event_data,omitempty"`
Message string `json:"message,omitempty"`
}

type Security struct {
UserID string `json:"userId,omitempty"`
UserName string `json:"userName,omitempty"`
}

type Execution struct {
ProcessID uint32 `json:"processId,omitempty"`
ThreadID uint32 `json:"threadId,omitempty"`
ProcessName string `json:"processName,omitempty"`
}

type Correlation struct {
ActivityID string `json:"activityID,omitempty"`
RelatedActivityID string `json:"relatedActivityID,omitempty"`
}

// formatLine format a Loki log line from a windows event.
func formatLine(cfg *scrapeconfig.WindowsEventsTargetConfig, event win_eventlog.Event) (string, error) {
structuredEvent := Event{
Source: event.Source.Name,
Channel: event.Channel,
Computer: event.Computer,
EventID: event.EventID,
Version: event.Version,
Level: event.Level,
Task: event.Task,
Opcode: event.Opcode,
LevelText: event.LevelText,
TaskText: event.TaskText,
OpcodeText: event.OpcodeText,
Keywords: event.Keywords,
TimeCreated: event.TimeCreated.SystemTime,
EventRecordID: event.EventRecordID,
}

if !cfg.ExcludeEventData {
structuredEvent.EventData = string(event.EventData.InnerXML)
}
if !cfg.ExcludeUserData {
structuredEvent.UserData = string(event.UserData.InnerXML)
}
if !cfg.ExcludeEventMessage {
structuredEvent.Message = event.Message
}
if event.Correlation.ActivityID != "" || event.Correlation.RelatedActivityID != "" {
structuredEvent.Correlation = &Correlation{
ActivityID: event.Correlation.ActivityID,
RelatedActivityID: event.Correlation.RelatedActivityID,
}
}
// best effort to get the username of the event.
if event.Security.UserID != "" {
var userName string
usid, err := syscall.StringToSid(event.Security.UserID)
if err == nil {
username, domain, _, err := usid.LookupAccount("")
if err == nil {
userName = fmt.Sprint(domain, "\\", username)
}
}
structuredEvent.Security = &Security{
UserID: event.Security.UserID,
UserName: userName,
}
}
if event.Execution.ProcessID != 0 {
structuredEvent.Execution = &Execution{
ProcessID: event.Execution.ProcessID,
ThreadID: event.Execution.ThreadID,
}
_, _, processName, err := win_eventlog.GetFromSnapProcess(event.Execution.ProcessID)
if err == nil {
structuredEvent.Execution.ProcessName = processName
}
}
return jsoniter.MarshalToString(structuredEvent)
}
Loading

0 comments on commit a608dd6

Please sign in to comment.