diff --git a/README.md b/README.md index f125b1f1..b2f5111a 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@
- + - +
@@ -86,6 +86,7 @@ - Add [Geographical](docs/transformers/transform_geoip.md) metadata - Various data [Extractor](docs/transformers/transform_dataextractor.md) - Suspicious traffic [Detector](docs/transformers/transform_suspiciousdetector.md) and [Prediction](docs/transformers/transform_trafficprediction.md) + - [Reordering](docs/transformers/transform_reordering.md) DNS messages based on timestamps ## Get Started diff --git a/docs/transformers/transform_reordering.md b/docs/transformers/transform_reordering.md new file mode 100644 index 00000000..97104cf0 --- /dev/null +++ b/docs/transformers/transform_reordering.md @@ -0,0 +1,20 @@ +# Transformer: Reordering + +Use this transformer to reorder DNS messages based on their timestamp. This can be useful when processing logs that may not be ordered correctly, ensuring they are sorted before further processing. + +The transformer buffers DNS messages and periodically flushes them based on a configurable interval. The messages are sorted by timestamp before being passed to the next workers. + +Options: + +* `flush-interval` (int) + > Defines the interval (in seconds) at which the buffer will be flushed automatically. A smaller value will lead to more frequent flushing. + +* `max-buffer-size` (int) + > Defines the maximum number of messages that can be buffered before the transformer triggers a flush. Once this limit is reached, the buffer will be flushed regardless of the flush interval. + +```yaml +transforms: + reordering: + flush-interval: 30 + max-buffer-size: 100 +``` \ No newline at end of file diff --git a/pkgconfig/transformers.go b/pkgconfig/transformers.go index 52823d80..4bdd04da 100644 --- a/pkgconfig/transformers.go +++ b/pkgconfig/transformers.go @@ -103,6 +103,11 @@ type ConfigTransformers struct { WhiteDomainsFile string `yaml:"white-domains-file" default:""` PersistenceFile string `yaml:"persistence-file" default:""` } `yaml:"new-domain-tracker"` + Reordering struct { + Enable bool `yaml:"enable" default:"false"` + FlushInterval int `yaml:"flush-interval" default:"30"` + MaxBufferSize int `yaml:"max-buffer-size" default:"100"` + } `yaml:"reordering"` } func (c *ConfigTransformers) SetDefault() { diff --git a/transformers/reordering.go b/transformers/reordering.go new file mode 100644 index 00000000..a2fb8659 --- /dev/null +++ b/transformers/reordering.go @@ -0,0 +1,125 @@ +package transformers + +import ( + "sort" + "sync" + "time" + + "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-logger" +) + +type ReorderingTransform struct { + GenericTransformer + buffer []dnsutils.DNSMessage + mutex sync.Mutex + flushTicker *time.Ticker + flushSignal chan struct{} + stopChan chan struct{} + nextWorkers []chan dnsutils.DNSMessage +} + +// NewLogReorderTransform creates an instance of the transformer. +func NewReorderingTransform(config *pkgconfig.ConfigTransformers, logger *logger.Logger, name string, instance int, nextWorkers []chan dnsutils.DNSMessage) *ReorderingTransform { + t := &ReorderingTransform{ + GenericTransformer: NewTransformer(config, logger, "reordering", name, instance, nextWorkers), + stopChan: make(chan struct{}), + flushSignal: make(chan struct{}), + nextWorkers: nextWorkers, + } + + return t +} + +// GetTransforms returns the available subtransformations. +func (t *ReorderingTransform) GetTransforms() ([]Subtransform, error) { + subtransforms := []Subtransform{} + if t.config.Reordering.Enable { + subtransforms = append(subtransforms, Subtransform{name: "reordering:sort-by-timestamp", processFunc: t.ReorderLogs}) + // Start a goroutine to handle periodic flushing. + t.flushTicker = time.NewTicker(time.Duration(t.config.Reordering.FlushInterval) * time.Second) + t.buffer = make([]dnsutils.DNSMessage, 0) + go t.flushPeriodically() + + } + return subtransforms, nil +} + +// ReorderLogs adds a log to the buffer and flushes if the buffer is full. +func (t *ReorderingTransform) ReorderLogs(dm *dnsutils.DNSMessage) (int, error) { + // Add the log to the buffer. + t.mutex.Lock() + t.buffer = append(t.buffer, *dm) + t.mutex.Unlock() + // If the buffer exceeds a certain size, flush it. + if len(t.buffer) >= t.config.Reordering.MaxBufferSize { + select { + case t.flushSignal <- struct{}{}: + default: + } + } + + return ReturnDrop, nil +} + +// Close stops the periodic flushing. +func (t *ReorderingTransform) Reset() { + select { + case <-t.stopChan: + default: + close(t.stopChan) + } +} + +// flushPeriodically periodically flushes the buffer based on a timer. +func (t *ReorderingTransform) flushPeriodically() { + for { + select { + case <-t.flushTicker.C: + t.flushBuffer() + case <-t.flushSignal: + t.flushBuffer() + case <-t.stopChan: + t.flushTicker.Stop() + return + } + } +} + +// flushBuffer sorts and sends the logs in the buffer to the next workers. +func (t *ReorderingTransform) flushBuffer() { + t.mutex.Lock() + defer t.mutex.Unlock() + + if len(t.buffer) == 0 { + return + } + + // Sort the buffer by timestamp. + sort.SliceStable(t.buffer, func(i, j int) bool { + ti, err1 := time.Parse(time.RFC3339Nano, t.buffer[i].DNSTap.TimestampRFC3339) + tj, err2 := time.Parse(time.RFC3339Nano, t.buffer[j].DNSTap.TimestampRFC3339) + if err1 != nil || err2 != nil { + // If timestamps are invalid, maintain the original order. + return false + } + return ti.Before(tj) + }) + + // Send sorted logs to the next workers. + for _, sortedMsg := range t.buffer { + for _, worker := range t.nextWorkers { + // Non-blocking send to avoid worker congestion. + select { + case worker <- sortedMsg: + default: + // Log or handle if the worker channel is full. + t.logger.Info("Worker channel is full, dropping message") + } + } + } + + // Clear the buffer. + t.buffer = t.buffer[:0] +} diff --git a/transformers/reordering_test.go b/transformers/reordering_test.go new file mode 100644 index 00000000..87d7853f --- /dev/null +++ b/transformers/reordering_test.go @@ -0,0 +1,70 @@ +package transformers + +import ( + "sort" + "testing" + + "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-logger" +) + +func TestReorderingTransform_SortByTimestamp(t *testing.T) { + // enable feature + config := pkgconfig.GetFakeConfigTransformers() + config.Reordering.Enable = true + + // initialize logger + log := logger.New(false) + + // create output channels + outChans := []chan dnsutils.DNSMessage{ + make(chan dnsutils.DNSMessage, 10), + } + + // initialize transformer + reorder := NewReorderingTransform(config, log, "test", 0, outChans) + + dm1 := dnsutils.GetFakeDNSMessage() + dm1.DNSTap.TimestampRFC3339 = "2024-12-20T21:12:14.786109Z" + + dm2 := dnsutils.GetFakeDNSMessage() + dm2.DNSTap.TimestampRFC3339 = "2024-12-20T21:12:14.766361Z" + + dm3 := dnsutils.GetFakeDNSMessage() + dm3.DNSTap.TimestampRFC3339 = "2024-12-20T21:12:14.803447Z" + + reorder.ReorderLogs(&dm1) + reorder.ReorderLogs(&dm2) + reorder.ReorderLogs(&dm3) + + // manually trigger a buffer flush + reorder.flushBuffer() + + // collect results from the output channel + var results []dnsutils.DNSMessage + done := false + for !done { + select { + case msg := <-outChans[0]: + results = append(results, msg) + default: + done = true + } + } + + // validate order + if len(results) != 3 { + t.Fatalf("Expected 3 messages, got %d", len(results)) + } + + timestamps := []string{ + results[0].DNSTap.TimestampRFC3339, + results[1].DNSTap.TimestampRFC3339, + results[2].DNSTap.TimestampRFC3339, + } + + if !sort.StringsAreSorted(timestamps) { + t.Errorf("Timestamps are not sorted: %v", timestamps) + } +} diff --git a/transformers/transformers.go b/transformers/transformers.go index 808bcc67..949851d5 100644 --- a/transformers/transformers.go +++ b/transformers/transformers.go @@ -87,6 +87,7 @@ func NewTransforms(config *pkgconfig.ConfigTransformers, logger *logger.Logger, d.availableTransforms = append(d.availableTransforms, TransformEntry{NewDNSGeoIPTransform(config, logger, name, instance, nextWorkers)}) d.availableTransforms = append(d.availableTransforms, TransformEntry{NewRewriteTransform(config, logger, name, instance, nextWorkers)}) d.availableTransforms = append(d.availableTransforms, TransformEntry{NewNewDomainTrackerTransform(config, logger, name, instance, nextWorkers)}) + d.availableTransforms = append(d.availableTransforms, TransformEntry{NewReorderingTransform(config, logger, name, instance, nextWorkers)}) d.Prepare() return d