Skip to content

Commit

Permalink
feat(transformer): new reordering transform to sort logs by timestamp (
Browse files Browse the repository at this point in the history
…#910)

* feat(transformer): new reordering transform to sort logs by timestamp
  • Loading branch information
dmachard authored Dec 21, 2024
1 parent 6c1ecca commit c4569bf
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 2 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<p align="center">
<img src="https://goreportcard.com/badge/github.com/dmachard/go-dns-collector" alt="Go Report"/>
<img src="https://img.shields.io/badge/go%20version-min%201.21-green" alt="Go version"/>
<img src="https://img.shields.io/badge/go%20tests-518-green" alt="Go tests"/>
<img src="https://img.shields.io/badge/go%20tests-521-green" alt="Go tests"/>
<img src="https://img.shields.io/badge/go%20bench-21-green" alt="Go bench"/>
<img src="https://img.shields.io/badge/go%20lines-32573-green" alt="Go lines"/>
<img src="https://img.shields.io/badge/go%20lines-32844-green" alt="Go lines"/>
</p>

<p align="center">
Expand Down Expand Up @@ -86,6 +86,7 @@
- Add [Geographical](docs/transformers/transform_geoip.md) metadata
- Various data [Extractor](docs/transformers/transform_dataextractor.md)
- Suspicious traffic [Detector](docs/transformers/transform_suspiciousdetector.md) and [Prediction](docs/transformers/transform_trafficprediction.md)
- [Reordering](docs/transformers/transform_reordering.md) DNS messages based on timestamps

## Get Started

Expand Down
20 changes: 20 additions & 0 deletions docs/transformers/transform_reordering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Transformer: Reordering

Use this transformer to reorder DNS messages based on their timestamp. This can be useful when processing logs that may not be ordered correctly, ensuring they are sorted before further processing.

The transformer buffers DNS messages and periodically flushes them based on a configurable interval. The messages are sorted by timestamp before being passed to the next workers.

Options:

* `flush-interval` (int)
> Defines the interval (in seconds) at which the buffer will be flushed automatically. A smaller value will lead to more frequent flushing.
* `max-buffer-size` (int)
> Defines the maximum number of messages that can be buffered before the transformer triggers a flush. Once this limit is reached, the buffer will be flushed regardless of the flush interval.
```yaml
transforms:
reordering:
flush-interval: 30
max-buffer-size: 100
```
5 changes: 5 additions & 0 deletions pkgconfig/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ type ConfigTransformers struct {
WhiteDomainsFile string `yaml:"white-domains-file" default:""`
PersistenceFile string `yaml:"persistence-file" default:""`
} `yaml:"new-domain-tracker"`
Reordering struct {
Enable bool `yaml:"enable" default:"false"`
FlushInterval int `yaml:"flush-interval" default:"30"`
MaxBufferSize int `yaml:"max-buffer-size" default:"100"`
} `yaml:"reordering"`
}

func (c *ConfigTransformers) SetDefault() {
Expand Down
125 changes: 125 additions & 0 deletions transformers/reordering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package transformers

import (
"sort"
"sync"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-logger"
)

type ReorderingTransform struct {
GenericTransformer
buffer []dnsutils.DNSMessage
mutex sync.Mutex
flushTicker *time.Ticker
flushSignal chan struct{}
stopChan chan struct{}
nextWorkers []chan dnsutils.DNSMessage
}

// NewLogReorderTransform creates an instance of the transformer.
func NewReorderingTransform(config *pkgconfig.ConfigTransformers, logger *logger.Logger, name string, instance int, nextWorkers []chan dnsutils.DNSMessage) *ReorderingTransform {
t := &ReorderingTransform{
GenericTransformer: NewTransformer(config, logger, "reordering", name, instance, nextWorkers),
stopChan: make(chan struct{}),
flushSignal: make(chan struct{}),
nextWorkers: nextWorkers,
}

return t
}

// GetTransforms returns the available subtransformations.
func (t *ReorderingTransform) GetTransforms() ([]Subtransform, error) {
subtransforms := []Subtransform{}
if t.config.Reordering.Enable {
subtransforms = append(subtransforms, Subtransform{name: "reordering:sort-by-timestamp", processFunc: t.ReorderLogs})
// Start a goroutine to handle periodic flushing.
t.flushTicker = time.NewTicker(time.Duration(t.config.Reordering.FlushInterval) * time.Second)
t.buffer = make([]dnsutils.DNSMessage, 0)
go t.flushPeriodically()

}
return subtransforms, nil
}

// ReorderLogs adds a log to the buffer and flushes if the buffer is full.
func (t *ReorderingTransform) ReorderLogs(dm *dnsutils.DNSMessage) (int, error) {
// Add the log to the buffer.
t.mutex.Lock()
t.buffer = append(t.buffer, *dm)
t.mutex.Unlock()
// If the buffer exceeds a certain size, flush it.
if len(t.buffer) >= t.config.Reordering.MaxBufferSize {
select {
case t.flushSignal <- struct{}{}:
default:
}
}

return ReturnDrop, nil
}

// Close stops the periodic flushing.
func (t *ReorderingTransform) Reset() {
select {
case <-t.stopChan:
default:
close(t.stopChan)
}
}

// flushPeriodically periodically flushes the buffer based on a timer.
func (t *ReorderingTransform) flushPeriodically() {
for {
select {
case <-t.flushTicker.C:
t.flushBuffer()
case <-t.flushSignal:
t.flushBuffer()
case <-t.stopChan:
t.flushTicker.Stop()
return
}
}
}

// flushBuffer sorts and sends the logs in the buffer to the next workers.
func (t *ReorderingTransform) flushBuffer() {
t.mutex.Lock()
defer t.mutex.Unlock()

if len(t.buffer) == 0 {
return
}

// Sort the buffer by timestamp.
sort.SliceStable(t.buffer, func(i, j int) bool {
ti, err1 := time.Parse(time.RFC3339Nano, t.buffer[i].DNSTap.TimestampRFC3339)
tj, err2 := time.Parse(time.RFC3339Nano, t.buffer[j].DNSTap.TimestampRFC3339)
if err1 != nil || err2 != nil {
// If timestamps are invalid, maintain the original order.
return false
}
return ti.Before(tj)
})

// Send sorted logs to the next workers.
for _, sortedMsg := range t.buffer {
for _, worker := range t.nextWorkers {
// Non-blocking send to avoid worker congestion.
select {
case worker <- sortedMsg:
default:
// Log or handle if the worker channel is full.
t.logger.Info("Worker channel is full, dropping message")
}
}
}

// Clear the buffer.
t.buffer = t.buffer[:0]
}
70 changes: 70 additions & 0 deletions transformers/reordering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package transformers

import (
"sort"
"testing"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-logger"
)

func TestReorderingTransform_SortByTimestamp(t *testing.T) {
// enable feature
config := pkgconfig.GetFakeConfigTransformers()
config.Reordering.Enable = true

// initialize logger
log := logger.New(false)

// create output channels
outChans := []chan dnsutils.DNSMessage{
make(chan dnsutils.DNSMessage, 10),
}

// initialize transformer
reorder := NewReorderingTransform(config, log, "test", 0, outChans)

dm1 := dnsutils.GetFakeDNSMessage()
dm1.DNSTap.TimestampRFC3339 = "2024-12-20T21:12:14.786109Z"

dm2 := dnsutils.GetFakeDNSMessage()
dm2.DNSTap.TimestampRFC3339 = "2024-12-20T21:12:14.766361Z"

dm3 := dnsutils.GetFakeDNSMessage()
dm3.DNSTap.TimestampRFC3339 = "2024-12-20T21:12:14.803447Z"

reorder.ReorderLogs(&dm1)
reorder.ReorderLogs(&dm2)
reorder.ReorderLogs(&dm3)

// manually trigger a buffer flush
reorder.flushBuffer()

// collect results from the output channel
var results []dnsutils.DNSMessage
done := false
for !done {
select {
case msg := <-outChans[0]:
results = append(results, msg)
default:
done = true
}
}

// validate order
if len(results) != 3 {
t.Fatalf("Expected 3 messages, got %d", len(results))
}

timestamps := []string{
results[0].DNSTap.TimestampRFC3339,
results[1].DNSTap.TimestampRFC3339,
results[2].DNSTap.TimestampRFC3339,
}

if !sort.StringsAreSorted(timestamps) {
t.Errorf("Timestamps are not sorted: %v", timestamps)
}
}
1 change: 1 addition & 0 deletions transformers/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func NewTransforms(config *pkgconfig.ConfigTransformers, logger *logger.Logger,
d.availableTransforms = append(d.availableTransforms, TransformEntry{NewDNSGeoIPTransform(config, logger, name, instance, nextWorkers)})
d.availableTransforms = append(d.availableTransforms, TransformEntry{NewRewriteTransform(config, logger, name, instance, nextWorkers)})
d.availableTransforms = append(d.availableTransforms, TransformEntry{NewNewDomainTrackerTransform(config, logger, name, instance, nextWorkers)})
d.availableTransforms = append(d.availableTransforms, TransformEntry{NewReorderingTransform(config, logger, name, instance, nextWorkers)})

d.Prepare()
return d
Expand Down

0 comments on commit c4569bf

Please sign in to comment.