Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/file] add append mode #31369

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/file-exporter_append_mode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: fileexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: File write mode is configurable now (truncate or append)

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31364]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions exporter/fileexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The following settings are optional:
- localtime : [default: false (use UTC)] whether or not the timestamps in backup files is formatted according to the host's local time.

- `format`[default: json]: define the data format of encoded telemetry data. The setting can be overridden with `proto`.
- `append`[default: `false`] defines whether append to the file (`true`) or truncate (`false`). If `append: true` is set then setting `rotation` or `compression` is currently not supported.
- `compression`[no default]: the compression algorithm used when exporting telemetry data to file. Supported compression algorithms:`zstd`
- `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats.
NOTE: a value without unit is in nanoseconds and `flush_interval` is ignored and writes are not buffered if `rotation` is set.
Expand Down
13 changes: 13 additions & 0 deletions exporter/fileexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collecto

import (
"errors"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -22,6 +23,12 @@ type Config struct {
// Path of the file to write to. Path is relative to current directory.
Path string `mapstructure:"path"`

// Mode defines whether the exporter should append to the file
// Options:
// - false[default]: truncates the file
// - true: appends to the file.
Append bool `mapstructure:"append"`

// Rotation defines an option about rotation of telemetry files
Rotation *Rotation `mapstructure:"rotation"`

Expand Down Expand Up @@ -70,6 +77,12 @@ func (cfg *Config) Validate() error {
if cfg.Path == "" {
return errors.New("path must be non-empty")
}
if cfg.Append && cfg.Compression != "" {
return fmt.Errorf("append and compression enabled at the same time is not supported")
}
if cfg.Append && cfg.Rotation != nil {
return fmt.Errorf("append and rotation enabled at the same time is not supported")
}
if cfg.FormatType != formatTypeJSON && cfg.FormatType != formatTypeProto {
return errors.New("format type is not supported")
}
Expand Down
10 changes: 8 additions & 2 deletions exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,16 @@ func newFileExporter(conf *Config) FileExporter {
}
}

func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
func newFileWriter(path string, shouldAppend bool, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
var wc io.WriteCloser
if rotation == nil {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
fileFlags := os.O_RDWR | os.O_CREATE
OverOrion marked this conversation as resolved.
Show resolved Hide resolved
if shouldAppend {
fileFlags |= os.O_APPEND
} else {
fileFlags |= os.O_TRUNC
}
f, err := os.OpenFile(path, fileFlags, 0644)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/fileexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestNewFileWriter(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newFileWriter(tt.args.cfg.Path, tt.args.cfg.Rotation, tt.args.cfg.FlushInterval, nil)
got, err := newFileWriter(tt.args.cfg.Path, tt.args.cfg.Append, tt.args.cfg.Rotation, tt.args.cfg.FlushInterval, nil)
defer func() {
assert.NoError(t, got.file.Close())
}()
Expand Down
2 changes: 1 addition & 1 deletion exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (e *fileExporter) Start(_ context.Context, _ component.Host) error {
export := buildExportFunc(e.conf)

var err error
e.writer, err = newFileWriter(e.conf.Path, e.conf.Rotation, e.conf.FlushInterval, export)
e.writer, err = newFileWriter(e.conf.Path, e.conf.Append, e.conf.Rotation, e.conf.FlushInterval, export)
if err != nil {
return err
}
Expand Down
86 changes: 85 additions & 1 deletion exporter/fileexporter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"os"
"path/filepath"
"slices"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -648,7 +649,7 @@ func TestFlushing(t *testing.T) {
}
export := buildExportFunc(fe.conf)
var err error
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Rotation, fe.conf.FlushInterval, export)
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Append, fe.conf.Rotation, fe.conf.FlushInterval, export)
assert.NoError(t, err)
err = fe.writer.file.Close()
assert.NoError(t, err)
Expand All @@ -673,3 +674,86 @@ func TestFlushing(t *testing.T) {
assert.EqualValues(t, b, bbuf.Bytes())
assert.NoError(t, fe.Shutdown(ctx))
}

func TestAppend(t *testing.T) {
Copy link
Member

@andrzej-stencel andrzej-stencel Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is a bit too low level to my taste, using fileWriter directly. I think it's because it was based on the TestFlushing test. Perhaps better to write this test more like the TestFileTracesExporter and similar tests are written:

  1. create newFileExporter()
  2. generate traces
  3. write telemetry with consumeTraces
  4. check that traces written (check number of lines with JSON or sth)
  5. create a new instance of newFileExporter
  6. consumeTraces
  7. check the cumulative number of lines in file, or that the size of file has increased from point 4.

cfg := &Config{
Path: tempFileName(t),
FlushInterval: time.Second,
Append: true,
}

// Create a buffer to capture the output.
bbuf := &tsBuffer{b: &bytes.Buffer{}}
buf := &NopWriteCloser{bbuf}
// Wrap the buffer with the buffered writer closer that implements flush() method.
bwc := newBufferedWriteCloser(buf)
// Create a file exporter with flushing enabled.
feI := newFileExporter(cfg)
assert.IsType(t, &fileExporter{}, feI)
fe := feI.(*fileExporter)

// Start the flusher.
ctx := context.Background()
fe.marshaller = &marshaller{
formatType: fe.conf.FormatType,
tracesMarshaler: tracesMarshalers[fe.conf.FormatType],
metricsMarshaler: metricsMarshalers[fe.conf.FormatType],
logsMarshaler: logsMarshalers[fe.conf.FormatType],
compression: fe.conf.Compression,
compressor: buildCompressor(fe.conf.Compression),
}
export := buildExportFunc(fe.conf)
var err error
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Append, fe.conf.Rotation, fe.conf.FlushInterval, export)
assert.NoError(t, err)
err = fe.writer.file.Close()
assert.NoError(t, err)
fe.writer.file = bwc
fe.writer.start()

// Write 10 bytes.
b1 := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
i, err := safeFileExporterWrite(fe, b1)
assert.NoError(t, err)
assert.EqualValues(t, len(b1), i, "bytes written")

// Assert buf contains 0 bytes before flush is called.
assert.EqualValues(t, 0, bbuf.Len(), "before flush")

// Wait 1.5 sec
time.Sleep(1500 * time.Millisecond)

// Assert buf contains 10 bytes after flush is called.
assert.EqualValues(t, 10, bbuf.Len(), "after flush")
// Compare the content.
assert.EqualValues(t, b1, bbuf.Bytes())
assert.NoError(t, fe.Shutdown(ctx))

// Restart the exporter
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Append, fe.conf.Rotation, fe.conf.FlushInterval, export)
assert.NoError(t, err)
err = fe.writer.file.Close()
assert.NoError(t, err)
fe.writer.file = bwc
fe.writer.start()

// Write 10 bytes - again
b2 := []byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
i, err = safeFileExporterWrite(fe, b2)
assert.NoError(t, err)
assert.EqualValues(t, len(b2), i, "bytes written")

// Assert buf contains 10 bytes before flush is called.
assert.EqualValues(t, 10, bbuf.Len(), "after restart - before flush")

// Wait 1.5 sec
time.Sleep(1500 * time.Millisecond)

// Assert buf contains 20 bytes after flush is called.
assert.EqualValues(t, 20, bbuf.Len(), "after restart - after flush")
// Compare the content.
bComplete := slices.Clone(b1)
bComplete = append(bComplete, b2...)
assert.EqualValues(t, bComplete, bbuf.Bytes())
assert.NoError(t, fe.Shutdown(ctx))
}
Loading