From 548dac1a11b789743104ce5d64a15c098c6f7628 Mon Sep 17 00:00:00 2001 From: bemasher Date: Sat, 11 Jul 2015 03:45:27 -0600 Subject: [PATCH] Refactor message filtering. --- flags.go | 34 ++++++++++++++++++++++++++++++++++ parse/parse.go | 24 ++++++++++++++++++++++++ recv.go | 31 ++++++++++++------------------- 3 files changed, 70 insertions(+), 19 deletions(-) diff --git a/flags.go b/flags.go index 08242740b..09e3e7fb3 100644 --- a/flags.go +++ b/flags.go @@ -17,6 +17,7 @@ package main import ( + "bytes" "encoding/gob" "encoding/json" "encoding/xml" @@ -28,6 +29,7 @@ import ( "strings" "github.com/bemasher/rtlamr/csv" + "github.com/bemasher/rtlamr/parse" ) var logFilename = flag.String("logfile", "/dev/stdout", "log statement dump file") @@ -45,6 +47,7 @@ var decimation = flag.Int("decimation", 1, "integer decimation factor, keep ever var timeLimit = flag.Duration("duration", 0, "time to run for, 0 for infinite, ex. 1h5m10s") var meterID UintMap var meterType UintMap + var unique = flag.Bool("unique", false, "suppress duplicate messages from each meter") var encoder Encoder @@ -167,3 +170,34 @@ func (m UintMap) Set(value string) error { return nil } + +type MeterIDFilter UintMap + +func (m MeterIDFilter) Filter(msg parse.Message) bool { + return m[uint(msg.MeterID())] +} + +type MeterTypeFilter UintMap + +func (m MeterTypeFilter) Filter(msg parse.Message) bool { + return m[uint(msg.MeterType())] +} + +type UniqueFilter map[uint][]byte + +func NewUniqueFilter() UniqueFilter { + return make(UniqueFilter) +} + +func (uf UniqueFilter) Filter(msg parse.Message) bool { + checksum := msg.Checksum() + mid := uint(msg.MeterID()) + + if val, ok := uf[mid]; ok && bytes.Compare(val, checksum) == 0 { + return false + } + + uf[mid] = make([]byte, len(checksum)) + copy(uf[mid], checksum) + return true +} diff --git a/parse/parse.go b/parse/parse.go index 7aa43cb37..565585fb8 100644 --- a/parse/parse.go +++ b/parse/parse.go @@ -77,3 +77,27 @@ func (msg LogMessage) Record() (r []string) { r = append(r, msg.Message.Record()...) return r } + +type FilterChain []Filter + +func (fc *FilterChain) Add(filter Filter) { + *fc = append(*fc, filter) +} + +func (fc FilterChain) Match(msg Message) bool { + if len(fc) == 0 { + return true + } + + for _, filter := range fc { + if !filter.Filter(msg) { + return false + } + } + + return true +} + +type Filter interface { + Filter(Message) bool +} diff --git a/recv.go b/recv.go index 8f9045edb..d99ac189c 100644 --- a/recv.go +++ b/recv.go @@ -17,7 +17,6 @@ package main import ( - "bytes" "encoding/xml" "flag" "fmt" @@ -40,7 +39,8 @@ var rcvr Receiver type Receiver struct { rtltcp.SDR - p parse.Parser + p parse.Parser + fc parse.FilterChain } func (rcvr *Receiver) NewReceiver() { @@ -82,8 +82,17 @@ func (rcvr *Receiver) NewReceiver() { sampleRateFlagSet = true case "gainbyindex", "tunergainmode", "tunergain", "agcmode": gainFlagSet = true + case "unique": + rcvr.fc.Add(NewUniqueFilter()) + case "filterid": + rcvr.fc.Add(MeterIDFilter(meterID)) + case "filtertype": + rcvr.fc.Add(MeterTypeFilter(meterType)) + default: + fmt.Println(f.Name) } }) + fmt.Printf("%+v\n", rcvr.fc) // Set some parameters for listening. if centerfreqFlagSet { @@ -127,7 +136,6 @@ func (rcvr *Receiver) Run() { }() block := make([]byte, rcvr.p.Cfg().BlockSize2) - checksumHistory := make(map[uint][]byte) start := time.Now() for { @@ -149,24 +157,9 @@ func (rcvr *Receiver) Run() { indices := rcvr.p.Dec().Decode(block) for _, pkt := range rcvr.p.Parse(indices) { - if len(meterID) > 0 && !meterID[uint(pkt.MeterID())] { - continue - } - - if len(meterType) > 0 && !meterType[uint(pkt.MeterType())] { + if !rcvr.fc.Match(pkt) { continue } - if *unique { - checksum := pkt.Checksum() - mid := uint(pkt.MeterID()) - - if val, ok := checksumHistory[mid]; ok && bytes.Compare(val, checksum) == 0 { - continue - } - - checksumHistory[mid] = make([]byte, len(checksum)) - copy(checksumHistory[mid], checksum) - } var msg parse.LogMessage msg.Time = time.Now()