Skip to content

Commit

Permalink
Added: Deduplication feature with bloom filters
Browse files Browse the repository at this point in the history
  • Loading branch information
danielunderwood committed Mar 8, 2022
1 parent 52c927d commit 64f8eec
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 6 deletions.
101 changes: 101 additions & 0 deletions dedupe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"errors"
"fmt"
"io"
"os"

"github.com/bits-and-blooms/bloom/v3"
)

type Deduplicator interface {
Exists([]byte) (bool, error)
Add([]byte) error
}

// The null deduplicator doesn't do any deduplication and always returns
// false for existence
type NullDeduplicator struct{}

func NewNullDeduplicator() *NullDeduplicator {
return &NullDeduplicator{}
}

func (*NullDeduplicator) Exists([]byte) (bool, error) {
return false, nil
}

func (*NullDeduplicator) Add([]byte) error {
return nil
}

// The bloom filter deduplicator does deduplication based on a bloom filter
type BloomFilterDeduplicator struct {
path string
bf *bloom.BloomFilter
}

// Capacity for size n with a given false positive chance
func NewBloomFilterDeduplicator(path string, n uint, errorRate float64) *BloomFilterDeduplicator {
if _, err := os.Stat(path); err == nil {
// File exists
// Note that in this case, the arguments are ignored
file, err := os.Open(path)
if err != nil {
fmt.Printf("ERROR: Could not open bloom file: %s\n", err)
return nil
}
defer file.Close()
// Note that the parameters won't be preserved
bf := bloom.NewWithEstimates(n, errorRate)
_, err = bf.ReadFrom(file)
if err == io.EOF {
// If we get an EOF, it means the old file was corrupted, so we can just create a new one
fmt.Printf("WARNING: Bloom filter file is corrupted. Creating a new one\n")
bf := bloom.NewWithEstimates(n, errorRate)
return &BloomFilterDeduplicator{path: path, bf: bf}
} else if err != nil {
fmt.Printf("ERROR: Could not read bloom file: %s\n", err)
return nil
}
return &BloomFilterDeduplicator{path: path, bf: bf}
} else if errors.Is(err, os.ErrNotExist) {
// Doesn't exist, need to create new filter
bf := bloom.NewWithEstimates(n, errorRate)
return &BloomFilterDeduplicator{path: path, bf: bf}
} else {
// Didn't do what we expected
fmt.Printf("ERROR: Could not process bloom path: %s\n", err)
return nil
}
}

func (d *BloomFilterDeduplicator) Exists(b []byte) (bool, error) {
return d.bf.Test(b), nil
}

func (d *BloomFilterDeduplicator) Add(b []byte) error {
d.bf = d.bf.Add(b)
// Save the filter
// This probably isn't the best way to do this and could lead to a number of problems,
// but it will work for now
// It would probably be better to do this on a separate thread at some fixed interval

// Use a temporary file and then move it to the chosen path to do an atomic write
// This prevents corruption of the file in the case that the program dies while writing the file
file, err := os.CreateTemp("", "*")
if err != nil {
fmt.Printf("WARNING: Could not open bloom file for saving: %s\n", err)
}
defer file.Close()

_, err = d.bf.WriteTo(file)
if err != nil {
fmt.Printf("WARNING: Unable to write bloom file for saving: %s\n", err)
} else {
os.Rename(file.Name(), d.path)
}

return nil
}
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
# remeber to bump this hash when your dependencies change.
# vendorSha256 = pkgs.lib.fakeSha256;

vendorSha256 = "sha256-K/kZHbW26W7+9N2xRrm3tY6linMnkSG2mlN7MnxLfNk=";
vendorSha256 = "sha256-CWJ9fBvDU1RUkvGr6E46FuU6jNdvc5kci3Qk1uRjS6U=";
};
});

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/danielunderwood/log2http
go 1.16

require (
github.com/bits-and-blooms/bitset v1.2.1 // indirect
github.com/bits-and-blooms/bloom/v3 v3.1.0
github.com/nxadm/tail v1.4.8
golang.org/x/sys v0.0.0-20220224120231-95c6836cb0e7 // indirect
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bits-and-blooms/bitset v1.2.1 h1:M+/hrU9xlMp7t4TyTDQW97d3tRPVuKFC6zBEK16QnXY=
github.com/bits-and-blooms/bitset v1.2.1/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bits-and-blooms/bloom/v3 v3.1.0 h1:o3Adl6bGuD9eZzMiLDepS5jqmoEAv/ZH+fFe/MH1quA=
github.com/bits-and-blooms/bloom/v3 v3.1.0/go.mod h1:MC8muvBzzPOFsrcdND/A7kU7kMhkqb9KI70JlZCP+C8=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220224120231-95c6836cb0e7 h1:BXxu8t6QN0G1uff4bzZzSkpsax8+ALqTGUtz08QrV00=
golang.org/x/sys v0.0.0-20220224120231-95c6836cb0e7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
36 changes: 31 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@ import (
"flag"
"fmt"
"os"
"reflect"
"regexp"
"strings"

"github.com/nxadm/tail"
)

func main() {
var file, expression, url, sourceName string
var file, expression, url, sourceName, dedupeUri string
flag.StringVar(&file, "file", "", "File to read. Required")
flag.StringVar(&url, "url", "", "URL to POST to. Must be supplied or in environment")
flag.StringVar(&expression, "regexp", ".*", "Expression to match, defaults to `.*`")
flag.StringVar(&sourceName, "sourceName", "", "Name of source. Defaults to hostname")
flag.StringVar(&dedupeUri, "dedupe", "", "URI for deduplication, such as bloom:///path/to/filter.bin. Defaults to no deduplication")

flag.Parse()

if len(file) == 0 {
fmt.Println("Usage: -file FILENAME -regexp EXPRESSION")
fmt.Println("Usage: -file FILENAME -regexp EXPRESSION -dedupe DEDUPE_URI")
os.Exit(1)
}
if len(url) == 0 {
Expand All @@ -33,6 +36,23 @@ func main() {
sourceName, _ = os.Hostname()
}

var dedupe Deduplicator
if len(dedupeUri) == 0 {
dedupe = NewNullDeduplicator()
} else if strings.HasPrefix(dedupeUri, "bloom://") {
// TODO These parameters should probably be tunable by the user
dedupe = NewBloomFilterDeduplicator(dedupeUri[len("bloom://"):], 1e9, 1e-4)
} else {
fmt.Println("ERROR: Unable to parse deduplication URI")
os.Exit(1)
}
// This is some weird go thing about how interfaces are stored in memory
// Just comparing to nil probably won't work (it still has a type), so you have to use reflection to check the value
if dedupe == nil || reflect.ValueOf(dedupe).IsNil() {
fmt.Println("ERROR: Unable to create deduplicator")
os.Exit(1)
}

re := regexp.MustCompile(expression)

client := NewDiscordWebhook(url)
Expand Down Expand Up @@ -60,22 +80,28 @@ func main() {
fields = append(fields, Field{Name: name, Value: match[i], Inline: true})
}
}

dedupeKey := []byte(line.Text)
exists, err := dedupe.Exists(dedupeKey)
if err != nil {
fmt.Println("ERROR", err)
continue
}
if len(match) > 0 {
if exists {
fmt.Println("Already exists!", line.Text)
}
if len(match) > 0 && !exists {
fmt.Println("Matched", line.Text)
message := DiscordMessage{
Embeds: []Embed{
Embed{
{
Author: Author{Name: fmt.Sprintf("%s on %s", file, sourceName)},
Description: fmt.Sprintf("```\n%s\n```", line.Text),
Fields: fields,
},
}}

client.MessageQueue <- message
dedupe.Add(dedupeKey)
}
}
}

0 comments on commit 64f8eec

Please sign in to comment.