Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
Add ObservedTimestamp to entry (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Mar 24, 2022
1 parent d3b42ff commit fc93e37
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 41 deletions.
42 changes: 22 additions & 20 deletions entry/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,26 @@ import (
"time"
)

var now = time.Now
var timeNow = time.Now

// Entry is a flexible representation of log data associated with a timestamp.
type Entry struct {
Timestamp time.Time `json:"timestamp" yaml:"timestamp"`
Body interface{} `json:"body" yaml:"body"`
Attributes map[string]interface{} `json:"attributes,omitempty" yaml:"attributes,omitempty"`
Resource map[string]interface{} `json:"resource,omitempty" yaml:"resource,omitempty"`
SeverityText string `json:"severity_text,omitempty" yaml:"severity_text,omitempty"`
SpanId []byte `json:"span_id,omitempty" yaml:"span_id,omitempty"`
TraceId []byte `json:"trace_id,omitempty" yaml:"trace_id,omitempty"`
TraceFlags []byte `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"`
Severity Severity `json:"severity" yaml:"severity"`
ObservedTimestamp time.Time `json:"observed_timestamp" yaml:"observed_timestamp"`
Timestamp time.Time `json:"timestamp" yaml:"timestamp"`
Body interface{} `json:"body" yaml:"body"`
Attributes map[string]interface{} `json:"attributes,omitempty" yaml:"attributes,omitempty"`
Resource map[string]interface{} `json:"resource,omitempty" yaml:"resource,omitempty"`
SeverityText string `json:"severity_text,omitempty" yaml:"severity_text,omitempty"`
SpanId []byte `json:"span_id,omitempty" yaml:"span_id,omitempty"`
TraceId []byte `json:"trace_id,omitempty" yaml:"trace_id,omitempty"`
TraceFlags []byte `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"`
Severity Severity `json:"severity" yaml:"severity"`
}

// New will create a new log entry with current timestamp and an empty body.
func New() *Entry {
return &Entry{
Timestamp: now(),
ObservedTimestamp: timeNow(),
}
}

Expand Down Expand Up @@ -174,14 +175,15 @@ func (entry *Entry) readToStringMap(field FieldInterface, dest *map[string]strin
// Copy will return a deep copy of the entry.
func (entry *Entry) Copy() *Entry {
return &Entry{
Timestamp: entry.Timestamp,
Severity: entry.Severity,
SeverityText: entry.SeverityText,
Attributes: copyInterfaceMap(entry.Attributes),
Resource: copyInterfaceMap(entry.Resource),
Body: copyValue(entry.Body),
TraceId: copyByteArray(entry.TraceId),
SpanId: copyByteArray(entry.SpanId),
TraceFlags: copyByteArray(entry.TraceFlags),
ObservedTimestamp: entry.ObservedTimestamp,
Timestamp: entry.Timestamp,
Severity: entry.Severity,
SeverityText: entry.SeverityText,
Attributes: copyInterfaceMap(entry.Attributes),
Resource: copyInterfaceMap(entry.Resource),
Body: copyValue(entry.Body),
TraceId: copyByteArray(entry.TraceId),
SpanId: copyByteArray(entry.SpanId),
TraceFlags: copyByteArray(entry.TraceFlags),
}
}
18 changes: 17 additions & 1 deletion entry/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,12 @@ func TestRead(t *testing.T) {
}

func TestCopy(t *testing.T) {
now := time.Now()

entry := New()
entry.Severity = Severity(0)
entry.SeverityText = "ok"
entry.ObservedTimestamp = now
entry.Timestamp = time.Time{}
entry.Body = "test"
entry.Attributes = map[string]interface{}{"label": "value"}
Expand All @@ -155,6 +158,7 @@ func TestCopy(t *testing.T) {
entry.SpanId[0] = 0xff
entry.TraceFlags[0] = 0xff

require.Equal(t, now, copy.ObservedTimestamp)
require.Equal(t, time.Time{}, copy.Timestamp)
require.Equal(t, Severity(0), copy.Severity)
require.Equal(t, "ok", copy.SeverityText)
Expand All @@ -167,8 +171,9 @@ func TestCopy(t *testing.T) {
}

func TestCopyNil(t *testing.T) {
now := time.Now()
entry := New()
entry.Timestamp = time.Time{}
entry.ObservedTimestamp = now
copy := entry.Copy()

entry.Severity = Severity(1)
Expand All @@ -181,6 +186,7 @@ func TestCopyNil(t *testing.T) {
entry.SpanId = []byte{0x04, 0x05, 0x06, 0x07, 0x08, 0x00, 0x01, 0x02, 0x03}
entry.TraceFlags = []byte{0x01}

require.Equal(t, now, copy.ObservedTimestamp)
require.Equal(t, time.Time{}, copy.Timestamp)
require.Equal(t, Severity(0), copy.Severity)
require.Equal(t, "", copy.SeverityText)
Expand Down Expand Up @@ -296,3 +302,13 @@ func TestReadToInterfaceMissingField(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "can not be read as a interface{}")
}

func TestDefaultTimestamps(t *testing.T) {
now := time.Now()
timeNow = func() time.Time { return now }
defer func() { timeNow = time.Now }()

e := New()
require.Equal(t, now, e.ObservedTimestamp)
require.True(t, e.Timestamp.IsZero())
}
10 changes: 5 additions & 5 deletions operator/input/k8sevent/k8s_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func TestWatchNamespace(t *testing.T) {
defer op.Stop()

select {
case entry := <-fake.Received:
require.Equal(t, entry.Timestamp, fakeTime)
require.Equal(t, entry.Resource["k8s.namespace.name"], "testnamespace")
require.Equal(t, entry.Resource["k8s.pod.uid"], "testuid")
require.Equal(t, entry.Resource["k8s.pod.name"], "testpodname")
case e := <-fake.Received:
require.Equal(t, e.Timestamp, fakeTime)
require.Equal(t, e.Resource["k8s.namespace.name"], "testnamespace")
require.Equal(t, e.Resource["k8s.pod.uid"], "testuid")
require.Equal(t, e.Resource["k8s.pod.name"], "testpodname")
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for entry")
}
Expand Down
13 changes: 9 additions & 4 deletions operator/output/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,22 @@ func TestStdoutOperator(t *testing.T) {
var buf bytes.Buffer
op.(*StdoutOperator).encoder = json.NewEncoder(&buf)

ots := time.Unix(1591043864, 0)
ts := time.Unix(1591042864, 0)
e := &entry.Entry{
Timestamp: ts,
Body: "test body",
ObservedTimestamp: ots,
Timestamp: ts,
Body: "test body",
}
err = op.Process(context.Background(), e)
require.NoError(t, err)

marshalledTimestamp, err := json.Marshal(ts)
marshalledOTS, err := json.Marshal(ots)
require.NoError(t, err)

expected := `{"timestamp":` + string(marshalledTimestamp) + `,"body":"test body","severity":0}` + "\n"
marshalledTs, err := json.Marshal(ts)
require.NoError(t, err)

expected := `{"observed_timestamp":` + string(marshalledOTS) + `,"timestamp":` + string(marshalledTs) + `,"body":"test body","severity":0}` + "\n"
require.Equal(t, expected, buf.String())
}
2 changes: 2 additions & 0 deletions operator/parser/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ func TestSyslogParser(t *testing.T) {
require.NoError(t, err)

newEntry := entry.New()
ots := newEntry.ObservedTimestamp
newEntry.Body = tc.InputBody
err = op.Process(context.Background(), newEntry)
require.NoError(t, err)

select {
case e := <-fake.Received:
require.Equal(t, ots, e.ObservedTimestamp)
require.Equal(t, tc.ExpectedBody, e.Body)
require.Equal(t, tc.ExpectedTimestamp, e.Timestamp)
require.Equal(t, tc.ExpectedSeverity, e.Severity)
Expand Down
30 changes: 19 additions & 11 deletions operator/parser/time/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,17 @@ func TestBuild(t *testing.T) {
}

func TestProcess(t *testing.T) {
now := time.Now()

testCases := []struct {
name string
config func() (*TimeParserConfig, error)
input *entry.Entry
expect *entry.Entry
}{
{
"promote",
func() (*TimeParserConfig, error) {
name: "promote",
config: func() (*TimeParserConfig, error) {
cfg := NewTimeParserConfig("test_id")
parseFrom, err := entry.NewField("body.app_time")
if err != nil {
Expand All @@ -106,21 +108,23 @@ func TestProcess(t *testing.T) {
cfg.Layout = "Mon Jan 2 15:04:05 MST 2006"
return cfg, nil
},
func() *entry.Entry {
input: func() *entry.Entry {
e := entry.New()
e.ObservedTimestamp = now
e.Body = map[string]interface{}{
"app_time": "Mon Jan 2 15:04:05 UTC 2006",
}
return e
}(),
&entry.Entry{
Timestamp: time.Date(2006, time.January, 2, 15, 4, 5, 0, time.UTC),
Body: map[string]interface{}{},
expect: &entry.Entry{
ObservedTimestamp: now,
Timestamp: time.Date(2006, time.January, 2, 15, 4, 5, 0, time.UTC),
Body: map[string]interface{}{},
},
},
{
"promote-and-preserve",
func() (*TimeParserConfig, error) {
name: "promote-and-preserve",
config: func() (*TimeParserConfig, error) {
cfg := NewTimeParserConfig("test_id")
parseFrom, err := entry.NewField("body.app_time")
if err != nil {
Expand All @@ -132,15 +136,17 @@ func TestProcess(t *testing.T) {
cfg.Layout = "Mon Jan 2 15:04:05 MST 2006"
return cfg, nil
},
func() *entry.Entry {
input: func() *entry.Entry {
e := entry.New()
e.ObservedTimestamp = now
e.Body = map[string]interface{}{
"app_time": "Mon Jan 2 15:04:05 UTC 2006",
}
return e
}(),
&entry.Entry{
Timestamp: time.Date(2006, time.January, 2, 15, 4, 5, 0, time.UTC),
expect: &entry.Entry{
ObservedTimestamp: now,
Timestamp: time.Date(2006, time.January, 2, 15, 4, 5, 0, time.UTC),
Body: map[string]interface{}{
"app_time": "Mon Jan 2 15:04:05 UTC 2006",
},
Expand Down Expand Up @@ -551,12 +557,14 @@ func runLossyTimeParseTest(_ *testing.T, cfg *TimeParserConfig, ent *entry.Entry
timeParser := op.(*TimeParserOperator)
timeParser.OutputOperators = []operator.Operator{mockOutput}

ots := ent.ObservedTimestamp
err = timeParser.Parse(ent)
if parseErr {
require.Error(t, err, "expected error when configuring operator")
return
}
require.NoError(t, err)
require.Equal(t, ots, ent.ObservedTimestamp, "time parsing should not change observed timestamp")

diff := time.Duration(math.Abs(float64(expected.Sub(ent.Timestamp))))
require.True(t, diff <= maxLoss)
Expand Down
2 changes: 2 additions & 0 deletions operator/transformer/add/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ type testCase struct {
}

func TestProcessAndBuild(t *testing.T) {
now := time.Now()
newTestEntry := func() *entry.Entry {
e := entry.New()
e.ObservedTimestamp = now
e.Timestamp = time.Unix(1586632809, 0)
e.Body = map[string]interface{}{
"key": "val",
Expand Down
2 changes: 2 additions & 0 deletions operator/transformer/copy/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type testCase struct {

// Test building and processing a CopyOperatorConfig
func TestBuildAndProcess(t *testing.T) {
now := time.Now()
newTestEntry := func() *entry.Entry {
e := entry.New()
e.ObservedTimestamp = now
e.Timestamp = time.Unix(1586632809, 0)
e.Body = map[string]interface{}{
"key": "val",
Expand Down
2 changes: 2 additions & 0 deletions operator/transformer/flatten/flatten_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type testCase struct {

// test building and processing a given config.
func TestBuildAndProcess(t *testing.T) {
now := time.Now()
newTestEntry := func() *entry.Entry {
e := entry.New()
e.ObservedTimestamp = now
e.Timestamp = time.Unix(1586632809, 0)
e.Body = map[string]interface{}{
"key": "val",
Expand Down
2 changes: 2 additions & 0 deletions operator/transformer/move/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type processTestCase struct {
}

func TestProcessAndBuild(t *testing.T) {
now := time.Now()
newTestEntry := func() *entry.Entry {
e := entry.New()
e.ObservedTimestamp = now
e.Timestamp = time.Unix(1586632809, 0)
e.Body = map[string]interface{}{
"key": "val",
Expand Down
2 changes: 2 additions & 0 deletions operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ const (
)

func TestRecombineOperator(t *testing.T) {
now := time.Now()
t1 := time.Date(2020, time.April, 11, 21, 34, 01, 0, time.UTC)
t2 := time.Date(2020, time.April, 11, 21, 34, 02, 0, time.UTC)

entryWithBody := func(ts time.Time, body interface{}) *entry.Entry {
e := entry.New()
e.ObservedTimestamp = now
e.Timestamp = ts
e.Body = body
return e
Expand Down
2 changes: 2 additions & 0 deletions operator/transformer/remove/remove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type testCase struct {

// Test building and processing a given remove config
func TestProcessAndBuild(t *testing.T) {
now := time.Now()
newTestEntry := func() *entry.Entry {
e := entry.New()
e.ObservedTimestamp = now
e.Timestamp = time.Unix(1586632809, 0)
e.Body = map[string]interface{}{
"key": "val",
Expand Down
1 change: 1 addition & 0 deletions operator/transformer/retain/retain.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (p *RetainOperator) Process(ctx context.Context, entry *entry.Entry) error
// Transform will apply the retain operation to an entry
func (p *RetainOperator) Transform(e *entry.Entry) error {
newEntry := entry.New()
newEntry.ObservedTimestamp = e.ObservedTimestamp
newEntry.Timestamp = e.Timestamp

if !p.AllResourceFields {
Expand Down
2 changes: 2 additions & 0 deletions operator/transformer/retain/retain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type testCase struct {
}

func TestBuildAndProcess(t *testing.T) {
now := time.Now()
newTestEntry := func() *entry.Entry {
e := entry.New()
e.ObservedTimestamp = now
e.Timestamp = time.Unix(1586632809, 0)
e.Body = map[string]interface{}{
"key": "val",
Expand Down

0 comments on commit fc93e37

Please sign in to comment.