From 6bfcafd3d3a327055b2468f628da8fb599e08d7a Mon Sep 17 00:00:00 2001 From: bemasher Date: Mon, 28 May 2018 00:32:57 -0600 Subject: [PATCH] Add multi-protocol decoding support. --- decode/decode.go | 296 ------------------------------------------ decode/decode_test.go | 65 ---------- flags.go | 47 ++++++- idm/idm.go | 70 +++++----- main.go | 72 ++++++---- netidm/netidm.go | 61 +++++---- parse/parse.go | 135 ------------------- r900/r900.go | 79 ++++++----- r900bcd/r900bcd.go | 33 +++-- scm/scm.go | 70 +++++----- scmplus/scmplus.go | 65 +++++----- 11 files changed, 284 insertions(+), 709 deletions(-) delete mode 100644 decode/decode.go delete mode 100644 decode/decode_test.go delete mode 100644 parse/parse.go diff --git a/decode/decode.go b/decode/decode.go deleted file mode 100644 index 7451940cf..000000000 --- a/decode/decode.go +++ /dev/null @@ -1,296 +0,0 @@ -// RTLAMR - An rtl-sdr receiver for smart meters operating in the 900MHz ISM band. -// Copyright (C) 2015 Douglas Hall -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package decode - -import ( - "fmt" - "log" - "math" -) - -// PacketConfig specifies packet-specific radio configuration. -type PacketConfig struct { - DataRate int - - BlockSize, BlockSize2 int - ChipLength, SymbolLength int - SampleRate int - - PreambleSymbols, PacketSymbols int - PreambleLength, PacketLength int - Preamble string - - BufferLength int - - CenterFreq uint32 -} - -func (d Decoder) Log() { - log.Println("CenterFreq:", d.Cfg.CenterFreq) - log.Println("SampleRate:", d.Cfg.SampleRate) - log.Println("DataRate:", d.Cfg.DataRate) - log.Println("ChipLength:", d.Cfg.ChipLength) - log.Println("PreambleSymbols:", d.Cfg.PreambleSymbols) - log.Println("PreambleLength:", d.Cfg.PreambleLength) - log.Println("PacketSymbols:", d.Cfg.PacketSymbols) - log.Println("PacketLength:", d.Cfg.PacketLength) - log.Println("Preamble:", d.Cfg.Preamble) -} - -// Decoder contains buffers and radio configuration. -type Decoder struct { - Cfg PacketConfig - - Signal []float64 - Quantized []byte - - csum []float64 - demod Demodulator - - preamble []byte - - pkt []byte - - packed []byte - sIdxA, sIdxB []int -} - -// Create a new decoder with the given packet configuration. -func NewDecoder(cfg PacketConfig) (d Decoder) { - d.Cfg = cfg - - d.Cfg.SymbolLength = d.Cfg.ChipLength << 1 - d.Cfg.SampleRate = d.Cfg.DataRate * d.Cfg.ChipLength - - d.Cfg.PreambleLength = d.Cfg.PreambleSymbols * d.Cfg.SymbolLength - d.Cfg.PacketLength = d.Cfg.PacketSymbols * d.Cfg.SymbolLength - - d.Cfg.BlockSize = NextPowerOf2(d.Cfg.PreambleLength) - d.Cfg.BlockSize2 = d.Cfg.BlockSize << 1 - - d.Cfg.BufferLength = d.Cfg.PacketLength + d.Cfg.BlockSize - - // Allocate necessary buffers. - d.Signal = make([]float64, d.Cfg.BlockSize+d.Cfg.SymbolLength) - d.Quantized = make([]byte, d.Cfg.BufferLength) - - d.csum = make([]float64, len(d.Signal)+1) - - // Calculate magnitude lookup table specified by -fastmag flag. - d.demod = NewMagLUT() - - // Pre-calculate a byte-slice version of the preamble for searching. - d.preamble = make([]byte, d.Cfg.PreambleSymbols) - for idx := range d.Cfg.Preamble { - if d.Cfg.Preamble[idx] == '1' { - d.preamble[idx] = 1 - } - } - - // Signal up to the final stage is 1-bit per byte. Allocate a buffer to - // store packed version 8-bits per byte. - d.pkt = make([]byte, (d.Cfg.PacketSymbols+7)>>3) - - d.sIdxA = make([]int, 0, d.Cfg.BlockSize) - d.sIdxB = make([]int, 0, d.Cfg.BlockSize) - - d.packed = make([]byte, (d.Cfg.BlockSize+d.Cfg.PreambleLength+7)>>3) - - return -} - -// Decode accepts a sample block and performs various DSP techniques to extract a packet. -func (d Decoder) Decode(input []byte) []int { - // Shift buffers to append new block. - copy(d.Signal, d.Signal[d.Cfg.BlockSize:]) - copy(d.Quantized, d.Quantized[d.Cfg.BlockSize:]) - - // Compute the magnitude of the new block. - d.demod.Execute(input, d.Signal[d.Cfg.SymbolLength:]) - - // Perform matched filter on new block. - d.Filter(d.Signal, d.Quantized[d.Cfg.PacketLength:]) - - // Return a list of indices the preamble exists at. - return d.Search() -} - -// A Demodulator knows how to demodulate an array of uint8 IQ samples into an -// array of float64 samples. -type Demodulator interface { - Execute([]byte, []float64) -} - -// Default Magnitude Lookup Table -type MagLUT []float64 - -// Pre-computes normalized squares with most common DC offset for rtl-sdr dongles. -func NewMagLUT() (lut MagLUT) { - lut = make([]float64, 0x100) - for idx := range lut { - lut[idx] = (127.5 - float64(idx)) / 127.5 - lut[idx] *= lut[idx] - } - return -} - -// Calculates complex magnitude on given IQ stream writing result to output. -func (lut MagLUT) Execute(input []byte, output []float64) { - i := 0 - for idx := range output { - output[idx] = lut[input[i]] + lut[input[i+1]] - i += 2 - } -} - -// Matched filter for Manchester coded signals. Output signal's sign at each -// sample determines the bit-value due to Manchester symbol odd symmetry. -func (d Decoder) Filter(input []float64, output []byte) { - // Computing the cumulative summation over the signal simplifies - // filtering to the difference of a pair of subtractions. - var sum float64 - for idx, v := range input { - sum += v - d.csum[idx+1] = sum - } - - // Filter result is difference of summation of lower and upper chips. - lower := d.csum[d.Cfg.ChipLength:] - upper := d.csum[d.Cfg.SymbolLength:] - for idx, l := range lower[:len(output)] { - f := (l - d.csum[idx]) - (upper[idx] - l) - output[idx] = 1 - byte(math.Float64bits(f)>>63) - } - - return -} - -// Return a list of indices into the quantized signal at which a valid preamble exists. -func (d *Decoder) Search() []int { - symLenByte := d.Cfg.SymbolLength >> 3 - - // Pack the bit-wise quantized signal into bytes. - for bIdx := range d.packed { - var b byte - for _, qBit := range d.Quantized[bIdx<<3 : (bIdx+1)<<3] { - b = (b << 1) | qBit - } - d.packed[bIdx] = b - } - - // Filter out indices at which the preamble cannot exist. - for pIdx, pBit := range d.preamble { - pBit = (pBit ^ 1) * 0xFF - offset := pIdx * symLenByte - if pIdx == 0 { - d.sIdxA = d.sIdxA[:0] - for qIdx, b := range d.packed[:d.Cfg.BlockSize>>3] { - if b != pBit { - d.sIdxA = append(d.sIdxA, qIdx) - } - } - } else { - d.sIdxB, d.sIdxA = searchPassByte(pBit, d.packed[offset:], d.sIdxA, d.sIdxB[:0]) - - if len(d.sIdxA) == 0 { - return nil - } - } - } - - symLen := d.Cfg.SymbolLength - - // Unpack the indices from bytes to bits. - d.sIdxB = d.sIdxB[:0] - for _, qIdx := range d.sIdxA { - for idx := 0; idx < 8; idx++ { - d.sIdxB = append(d.sIdxB, (qIdx<<3)+idx) - } - } - d.sIdxA, d.sIdxB = d.sIdxB, d.sIdxA - - // Filter out indices at which the preamble does not exist. - for pIdx, pBit := range d.preamble { - offset := pIdx * symLen - offsetQuantized := d.Quantized[offset : offset+d.Cfg.BlockSize] - d.sIdxB, d.sIdxA = searchPass(pBit, offsetQuantized, d.sIdxA, d.sIdxB[:0]) - - if len(d.sIdxA) == 0 { - return nil - } - } - - return d.sIdxA -} - -func searchPassByte(pBit byte, sig []byte, a, b []int) ([]int, []int) { - for _, qIdx := range a { - if sig[qIdx] != pBit { - b = append(b, qIdx) - } - } - - return a, b -} - -func searchPass(pBit byte, sig []byte, a, b []int) ([]int, []int) { - for _, qIdx := range a { - if sig[qIdx] == pBit { - b = append(b, qIdx) - } - } - - return a, b -} - -// Given a list of indices the preamble exists at, sample the appropriate bits -// of the signal's bit-decision. Pack bits of each index into an array of byte -// arrays and return. -func (d Decoder) Slice(indices []int) (pkts [][]byte) { - // It is likely that a message will be successfully decoded at multiple indices, - // only keep track of unique instances. - seen := make(map[string]bool) - - // For each of the indices the preamble exists at. - for _, qIdx := range indices { - // Check that we're still within the first sample block. We'll catch - // the message on the next sample block otherwise. - if qIdx > d.Cfg.BlockSize { - continue - } - - // Packet is 1 bit per byte, pack to 8-bits per byte. - for pIdx := 0; pIdx < d.Cfg.PacketSymbols; pIdx++ { - d.pkt[pIdx>>3] <<= 1 - d.pkt[pIdx>>3] |= d.Quantized[qIdx+(pIdx*d.Cfg.SymbolLength)] - } - - // Store the packet in the seen map and append to the packet list. - pktStr := fmt.Sprintf("%02X", d.pkt) - if !seen[pktStr] { - seen[pktStr] = true - pkts = append(pkts, make([]byte, len(d.pkt))) - copy(pkts[len(pkts)-1], d.pkt) - } - } - - return -} - -func NextPowerOf2(v int) int { - return 1 << uint(math.Ceil(math.Log2(float64(v)))) -} diff --git a/decode/decode_test.go b/decode/decode_test.go deleted file mode 100644 index 5071a5eee..000000000 --- a/decode/decode_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package decode - -import ( - "testing" -) - -func NewPacketConfig(chipLength int) (cfg PacketConfig) { - cfg.CenterFreq = 912600155 - cfg.DataRate = 32768 - cfg.ChipLength = chipLength - cfg.PreambleSymbols = 21 - cfg.PacketSymbols = 96 - - cfg.Preamble = "111110010101001100000" - - return -} - -func BenchmarkMagLUT(b *testing.B) { - d := NewDecoder(NewPacketConfig(72)) - - input := make([]byte, d.Cfg.BlockSize2) - - b.SetBytes(int64(d.Cfg.BlockSize)) - b.ReportAllocs() - b.ResetTimer() - for n := 0; n < b.N; n++ { - d.demod.Execute(input, d.Signal[d.Cfg.SymbolLength:]) - } -} - -func BenchmarkFilter(b *testing.B) { - d := NewDecoder(NewPacketConfig(72)) - - b.SetBytes(int64(d.Cfg.BlockSize)) - b.ReportAllocs() - b.ResetTimer() - for n := 0; n < b.N; n++ { - d.Filter(d.Signal, d.Quantized[d.Cfg.PacketLength:]) - } -} - -func BenchmarkSearch(b *testing.B) { - d := NewDecoder(NewPacketConfig(72)) - - b.SetBytes(int64(d.Cfg.BlockSize)) - b.ReportAllocs() - b.ResetTimer() - for n := 0; n < b.N; n++ { - _ = d.Search() - } -} - -func BenchmarkDecode(b *testing.B) { - d := NewDecoder(NewPacketConfig(72)) - - block := make([]byte, d.Cfg.BlockSize2) - - b.SetBytes(int64(d.Cfg.BlockSize)) - b.ReportAllocs() - b.ResetTimer() - for n := 0; n < b.N; n++ { - _ = d.Decode(block) - } -} diff --git a/flags.go b/flags.go index f99221295..476daee75 100644 --- a/flags.go +++ b/flags.go @@ -24,17 +24,18 @@ import ( "fmt" "log" "os" + "path/filepath" "strconv" "strings" "github.com/bemasher/rtlamr/csv" - "github.com/bemasher/rtlamr/parse" + "github.com/bemasher/rtlamr/protocol" ) var sampleFilename = flag.String("samplefile", os.DevNull, "raw signal dump file") var sampleFile *os.File -var msgType = flag.String("msgtype", "scm", "message type to receive: scm, scm+, idm, netidm, r900 and r900bcd") +var msgType StringMap var symbolLength = flag.Int("symbollength", 72, "symbol length in samples (8, 32, 40, 48, 56, 64, 72, 80, 88, 96)") @@ -52,6 +53,9 @@ var single = flag.Bool("single", false, "one shot execution, if used with -filte var version = flag.Bool("version", false, "display build date and commit hash") func RegisterFlags() { + msgType = StringMap{"scm": true} + flag.Var(msgType, "msgtype", "comma-separated list of message types to receive: all, scm, scm+, idm, netidm, r900 and r900bcd") + meterID = MeterIDFilter{make(UintMap)} meterType = MeterTypeFilter{make(UintMap)} @@ -84,7 +88,7 @@ func RegisterFlags() { } flag.Usage = func() { - fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Usage of %s:\n", filepath.Base(os.Args[0])) printDefaults(rtlamrFlags, true) fmt.Fprintln(os.Stderr) @@ -144,6 +148,35 @@ type Encoder interface { Encode(interface{}) error } +// A Flag value that populates a map of string to bool from a comma-separated list. +type StringMap map[string]bool + +func (m StringMap) String() (s string) { + var keys []string + for key, _ := range m { + keys = append(keys, key) + } + return strings.Join(keys, ",") +} + +func (m StringMap) Set(value string) error { + // Delete any default keys. + var keys []string + for key := range m { + keys = append(keys, key) + } + for _, key := range keys { + delete(m, key) + } + + // Set keys from value. + for _, val := range strings.Split(value, ",") { + m[strings.ToLower(val)] = true + } + + return nil +} + type UintMap map[uint]bool func (m UintMap) String() (s string) { @@ -173,7 +206,7 @@ type MeterIDFilter struct { UintMap } -func (m MeterIDFilter) Filter(msg parse.Message) bool { +func (m MeterIDFilter) Filter(msg protocol.Message) bool { return m.UintMap[uint(msg.MeterID())] } @@ -181,7 +214,7 @@ type MeterTypeFilter struct { UintMap } -func (m MeterTypeFilter) Filter(msg parse.Message) bool { +func (m MeterTypeFilter) Filter(msg protocol.Message) bool { return m.UintMap[uint(msg.MeterType())] } @@ -191,7 +224,7 @@ func NewUniqueFilter() UniqueFilter { return make(UniqueFilter) } -func (uf UniqueFilter) Filter(msg parse.Message) bool { +func (uf UniqueFilter) Filter(msg protocol.Message) bool { checksum := msg.Checksum() mid := uint(msg.MeterID()) @@ -209,7 +242,7 @@ type PlainEncoder struct { } func (pe PlainEncoder) Encode(msg interface{}) (err error) { - if m, ok := msg.(parse.LogMessage); ok && pe.sampleFilename == os.DevNull { + if m, ok := msg.(protocol.LogMessage); ok && pe.sampleFilename == os.DevNull { _, err = fmt.Println(m.StringNoOffset()) } else { _, err = fmt.Println(m) diff --git a/idm/idm.go b/idm/idm.go index 4cc47d2f2..4b5b995ae 100644 --- a/idm/idm.go +++ b/idm/idm.go @@ -21,80 +21,74 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/bemasher/rtlamr/crc" - "github.com/bemasher/rtlamr/decode" - "github.com/bemasher/rtlamr/parse" + "github.com/bemasher/rtlamr/protocol" ) func init() { - parse.Register("idm", NewParser) -} - -func NewPacketConfig(chipLength int) (cfg decode.PacketConfig) { - cfg.CenterFreq = 912600155 - cfg.DataRate = 32768 - cfg.ChipLength = chipLength - cfg.PreambleSymbols = 32 - cfg.PacketSymbols = 92 * 8 - cfg.Preamble = "01010101010101010001011010100011" - - return + protocol.RegisterParser("idm", NewParser) } type Parser struct { - decode.Decoder crc.CRC + cfg protocol.PacketConfig + data protocol.Data } -func (p Parser) Dec() decode.Decoder { - return p.Decoder -} +func (p Parser) SetDecoder(*protocol.Decoder) {} -func (p *Parser) Cfg() *decode.PacketConfig { - return &p.Decoder.Cfg +func (p Parser) Cfg() protocol.PacketConfig { + return p.cfg } -func NewParser(chipLength int) (p parse.Parser) { +func NewParser(chipLength int) (p protocol.Parser) { return &Parser{ - decode.NewDecoder(NewPacketConfig(chipLength)), - crc.NewCRC("CCITT", 0xFFFF, 0x1021, 0x1D0F), + CRC: crc.NewCRC("CCITT", 0xFFFF, 0x1021, 0x1D0F), + cfg: protocol.PacketConfig{ + Protocol: "idm", + CenterFreq: 912600155, + DataRate: 32768, + ChipLength: chipLength, + PreambleSymbols: 32, + PacketSymbols: 92 * 8, + Preamble: "01010101010101010001011010100011", + }, + data: protocol.Data{Bytes: make([]byte, 92)}, } } -func (p Parser) Parse(indices []int) (msgs []parse.Message) { +func (p Parser) Parse(pkts []protocol.Data, msgCh chan protocol.Message, wg *sync.WaitGroup) { seen := make(map[string]bool) - for _, pkt := range p.Decoder.Slice(indices) { - s := string(pkt) + for _, pkt := range pkts { + p.data.Idx = pkt.Idx + p.data.Bits = pkt.Bits[0:p.cfg.PacketSymbols] + copy(p.data.Bytes, pkt.Bytes) + + s := string(p.data.Bytes) if seen[s] { continue } seen[s] = true - data := parse.NewDataFromBytes(pkt) - - // If the packet is too short, bail. - if l := len(data.Bytes); l != 92 { - continue - } - // If the checksum fails, bail. - if residue := p.Checksum(data.Bytes[4:92]); residue != p.Residue { + if residue := p.Checksum(p.data.Bytes[4:92]); residue != p.Residue { continue } - idm := NewIDM(data) + idm := NewIDM(p.data) // If the meter id is 0, bail. if idm.ERTSerialNumber == 0 { continue } - msgs = append(msgs, idm) + msgCh <- idm } - return + wg.Done() } // Standard Consumption Message @@ -118,7 +112,7 @@ type IDM struct { PacketCRC uint16 } -func NewIDM(data parse.Data) (idm IDM) { +func NewIDM(data protocol.Data) (idm IDM) { idm.Preamble = binary.BigEndian.Uint32(data.Bytes[0:4]) idm.PacketTypeID = data.Bytes[4] idm.PacketLength = data.Bytes[5] diff --git a/main.go b/main.go index 060899d2e..3788f5375 100644 --- a/main.go +++ b/main.go @@ -26,11 +26,10 @@ import ( "log" "os" "os/signal" - "strings" "sync" "time" - "github.com/bemasher/rtlamr/parse" + "github.com/bemasher/rtlamr/protocol" "github.com/bemasher/rtltcp" _ "github.com/bemasher/rtlamr/idm" @@ -45,24 +44,41 @@ var rcvr Receiver type Receiver struct { rtltcp.SDR - p parse.Parser - fc parse.FilterChain + d protocol.Decoder + fc protocol.FilterChain } func (rcvr *Receiver) NewReceiver() { - var err error - if rcvr.p, err = parse.NewParser(strings.ToLower(*msgType), *symbolLength); err != nil { - log.Fatal(err) + rcvr.d = protocol.NewDecoder() + + // If the msgtype "all" is given alone, register and use scm, scm+, idm and r900. + if _, all := msgType["all"]; all && len(msgType) == 1 { + delete(msgType, "all") + msgType["scm"] = true + msgType["scm+"] = true + msgType["idm"] = true + msgType["r900"] = true + } + + // For each given msgType, register it with the decoder. + for name := range msgType { + p, err := protocol.NewParser(name, *symbolLength) + if err != nil { + log.Fatal(err) + } + + rcvr.d.RegisterProtocol(p) } + // Allocate the internal buffers of the decoder. + rcvr.d.Allocate() + // Connect to rtl_tcp server. if err := rcvr.Connect(nil); err != nil { log.Fatal(err) } - rcvr.HandleFlags() - - cfg := rcvr.p.Cfg() + cfg := rcvr.d.Cfg gainFlagSet := false flag.Visit(func(f *flag.Flag) { @@ -89,7 +105,7 @@ func (rcvr *Receiver) NewReceiver() { rcvr.SetGainMode(true) } - rcvr.p.Log() + rcvr.d.Log() // Tell the user how many gain settings were reported by rtl_tcp. log.Println("GainCount:", rcvr.SDR.Info.GainCount) @@ -110,6 +126,7 @@ func (rcvr *Receiver) Run() { in, out := io.Pipe() + // Read blocks of samples from the receiver and write them to a pipe. go func() { tcpBlock := make([]byte, 16384) for { @@ -124,13 +141,16 @@ func (rcvr *Receiver) Run() { sampleBuf := new(bytes.Buffer) start := time.Now() + // Allocate a channel of blocks and a sync.Pool for allocating/reusing + // sample blocks. blockCh := make(chan []byte, 128) blockPool := sync.Pool{ New: func() interface{} { - return make([]byte, rcvr.p.Cfg().BlockSize2) + return make([]byte, rcvr.d.Cfg.BlockSize2) }, } + // Read sample blocks from the pipe created and fed above. go func() { for { block := blockPool.Get().([]byte) @@ -157,27 +177,30 @@ func (rcvr *Receiver) Run() { // If dumping samples, discard the oldest block from the buffer if // it's full and write the new block to it. if *sampleFilename != os.DevNull { - if sampleBuf.Len() > rcvr.p.Cfg().BufferLength<<1 { + if sampleBuf.Len() > rcvr.d.Cfg.BufferLength<<1 { io.CopyN(ioutil.Discard, sampleBuf, int64(len(block))) } sampleBuf.Write(block) } pktFound := false - indices := rcvr.p.Dec().Decode(block) - for _, pkt := range rcvr.p.Parse(indices) { - if !rcvr.fc.Match(pkt) { + // For each message returned + for msg := range rcvr.d.Decode(block) { + // If the filterchain rejects the message, skip it. + if !rcvr.fc.Match(msg) { continue } - var msg parse.LogMessage - msg.Time = time.Now() - msg.Offset, _ = sampleFile.Seek(0, os.SEEK_CUR) - msg.Length = sampleBuf.Len() - msg.Message = pkt + // Make a new LogMessage + var logMsg protocol.LogMessage + logMsg.Time = time.Now() + logMsg.Offset, _ = sampleFile.Seek(0, os.SEEK_CUR) + logMsg.Length = sampleBuf.Len() + logMsg.Message = msg - err := encoder.Encode(msg) + // Encode the message + err := encoder.Encode(logMsg) if err != nil { log.Fatal("Error encoding message: ", err) } @@ -192,7 +215,7 @@ func (rcvr *Receiver) Run() { if len(meterID.UintMap) == 0 { break } else { - delete(meterID.UintMap, uint(pkt.MeterID())) + delete(meterID.UintMap, uint(msg.MeterID())) } } } @@ -225,8 +248,9 @@ func main() { rcvr.RegisterFlags() RegisterFlags() EnvOverride() - flag.Parse() + rcvr.HandleFlags() + if *version { if buildDate == "" || commitHash == "" { fmt.Println("Built from source.") diff --git a/netidm/netidm.go b/netidm/netidm.go index fcecc87e4..51f1a3511 100644 --- a/netidm/netidm.go +++ b/netidm/netidm.go @@ -21,17 +21,17 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/bemasher/rtlamr/crc" - "github.com/bemasher/rtlamr/decode" - "github.com/bemasher/rtlamr/parse" + "github.com/bemasher/rtlamr/protocol" ) func init() { - parse.Register("netidm", NewParser) + protocol.RegisterParser("netidm", NewParser) } -func NewPacketConfig(chipLength int) (cfg decode.PacketConfig) { +func NewPacketConfig(chipLength int) (cfg protocol.PacketConfig) { cfg.CenterFreq = 912600155 cfg.DataRate = 32768 cfg.ChipLength = chipLength @@ -43,58 +43,63 @@ func NewPacketConfig(chipLength int) (cfg decode.PacketConfig) { } type Parser struct { - decode.Decoder crc.CRC + cfg protocol.PacketConfig + data protocol.Data } -func (p Parser) Dec() decode.Decoder { - return p.Decoder -} +func (p Parser) SetDecoder(d *protocol.Decoder) {} -func (p *Parser) Cfg() *decode.PacketConfig { - return &p.Decoder.Cfg +func (p *Parser) Cfg() protocol.PacketConfig { + return p.cfg } -func NewParser(chipLength int) (p parse.Parser) { +func NewParser(chipLength int) (p protocol.Parser) { return &Parser{ - decode.NewDecoder(NewPacketConfig(chipLength)), - crc.NewCRC("CCITT", 0xFFFF, 0x1021, 0x1D0F), + CRC: crc.NewCRC("CCITT", 0xFFFF, 0x1021, 0x1D0F), + cfg: protocol.PacketConfig{ + Protocol: "netidm", + CenterFreq: 912600155, + DataRate: 32768, + ChipLength: chipLength, + PreambleSymbols: 32, + PacketSymbols: 92 * 8, + Preamble: "01010101010101010001011010100011", + }, + data: protocol.Data{Bytes: make([]byte, 92)}, } } -func (p Parser) Parse(indices []int) (msgs []parse.Message) { +func (p Parser) Parse(pkts []protocol.Data, msgCh chan protocol.Message, wg *sync.WaitGroup) { seen := make(map[string]bool) - for _, pkt := range p.Decoder.Slice(indices) { - s := string(pkt) + for _, pkt := range pkts { + p.data.Idx = pkt.Idx + p.data.Bits = pkt.Bits[0:p.cfg.PacketSymbols] + copy(p.data.Bytes, pkt.Bytes) + + s := string(p.data.Bytes) if seen[s] { continue } seen[s] = true - data := parse.NewDataFromBytes(pkt) - - // If the packet is too short, bail. - if l := len(data.Bytes); l != 92 { - continue - } - // If the checksum fails, bail. - if residue := p.Checksum(data.Bytes[4:92]); residue != p.Residue { + if residue := p.Checksum(p.data.Bytes[4:92]); residue != p.Residue { continue } - netidm := NewIDM(data) + netidm := NewNetIDM(p.data) // If the meter id is 0, bail. if netidm.ERTSerialNumber == 0 { continue } - msgs = append(msgs, netidm) + msgCh <- netidm } - return + wg.Done() } // Net Meter Interval Data Message @@ -116,7 +121,7 @@ type NetIDM struct { PacketCRC uint16 } -func NewIDM(data parse.Data) (netidm NetIDM) { +func NewNetIDM(data protocol.Data) (netidm NetIDM) { netidm.Preamble = binary.BigEndian.Uint32(data.Bytes[0:4]) netidm.ProtocolID = data.Bytes[4] netidm.PacketLength = data.Bytes[5] diff --git a/parse/parse.go b/parse/parse.go deleted file mode 100644 index 989086da2..000000000 --- a/parse/parse.go +++ /dev/null @@ -1,135 +0,0 @@ -package parse - -import ( - "fmt" - "strconv" - "sync" - "time" - - "github.com/bemasher/rtlamr/decode" - - "github.com/bemasher/rtlamr/csv" -) - -const ( - TimeFormat = "2006-01-02T15:04:05.000" -) - -var ( - parserMutex sync.Mutex - parsers = make(map[string]NewParserFunc) -) - -type NewParserFunc func(symbolLength int) Parser - -func Register(name string, parserFn NewParserFunc) { - parserMutex.Lock() - defer parserMutex.Unlock() - - if parserFn == nil { - panic("parser: new parser func is nil") - } - if _, dup := parsers[name]; dup { - panic(fmt.Sprintf("parser: parser already registered (%s)", name)) - } - parsers[name] = parserFn -} - -func NewParser(name string, symbolLength int) (Parser, error) { - parserMutex.Lock() - defer parserMutex.Unlock() - - if parserFn, exists := parsers[name]; exists { - return parserFn(symbolLength), nil - } else { - return nil, fmt.Errorf("invalid message type: %q\n", name) - } -} - -type Data struct { - Bits string - Bytes []byte -} - -func NewDataFromBytes(data []byte) (d Data) { - d.Bytes = data - for _, b := range data { - d.Bits += fmt.Sprintf("%08b", b) - } - - return -} - -func NewDataFromBits(data string) (d Data) { - d.Bits = data - d.Bytes = make([]byte, (len(data)+7)>>3) - for idx := 0; idx < len(data); idx += 8 { - b, _ := strconv.ParseUint(d.Bits[idx:idx+8], 2, 8) - d.Bytes[idx>>3] = uint8(b) - } - return -} - -type Parser interface { - Parse([]int) []Message - Dec() decode.Decoder - Cfg() *decode.PacketConfig - Log() -} - -type Message interface { - csv.Recorder - MsgType() string - MeterID() uint32 - MeterType() uint8 - Checksum() []byte -} - -type LogMessage struct { - Time time.Time - Offset int64 - Length int - Message -} - -func (msg LogMessage) String() string { - return fmt.Sprintf("{Time:%s Offset:%d Length:%d %s:%s}", - msg.Time.Format(TimeFormat), msg.Offset, msg.Length, msg.MsgType(), msg.Message, - ) -} - -func (msg LogMessage) StringNoOffset() string { - return fmt.Sprintf("{Time:%s %s:%s}", msg.Time.Format(TimeFormat), msg.MsgType(), msg.Message) -} - -func (msg LogMessage) Record() (r []string) { - r = append(r, msg.Time.Format(time.RFC3339Nano)) - r = append(r, strconv.FormatInt(msg.Offset, 10)) - r = append(r, strconv.FormatInt(int64(msg.Length), 10)) - r = append(r, msg.Message.Record()...) - return r -} - -type FilterChain []MessageFilter - -func (fc *FilterChain) Add(filter MessageFilter) { - *fc = append(*fc, filter) -} - -func (fc FilterChain) Match(msg Message) bool { - if len(fc) == 0 { - return true - } - - for _, filter := range fc { - if !filter.Filter(msg) { - return false - } - } - - return true -} - -type MessageFilter interface { - Filter(Message) bool -} diff --git a/r900/r900.go b/r900/r900.go index 51152bf1b..19c0a451d 100644 --- a/r900/r900.go +++ b/r900/r900.go @@ -21,9 +21,9 @@ import ( "fmt" "math" "strconv" + "sync" - "github.com/bemasher/rtlamr/decode" - "github.com/bemasher/rtlamr/parse" + "github.com/bemasher/rtlamr/protocol" "github.com/bemasher/rtlamr/r900/gf" ) @@ -32,22 +32,17 @@ const ( ) func init() { - parse.Register("r900", NewParser) + protocol.RegisterParser("r900", NewParser) } -func NewPacketConfig(chipLength int) (cfg decode.PacketConfig) { - cfg.CenterFreq = 912380000 - cfg.DataRate = 32768 - cfg.ChipLength = chipLength - cfg.PreambleSymbols = 32 - cfg.PacketSymbols = 116 - cfg.Preamble = "00000000000000001110010101100100" +func NewPacketConfig(chipLength int) (cfg protocol.PacketConfig) { return } type Parser struct { - decode.Decoder + *protocol.Decoder + cfg protocol.PacketConfig field *gf.Field rsBuf [31]byte @@ -55,34 +50,39 @@ type Parser struct { csum []float64 filtered [][3]float64 quantized []byte -} -func NewParser(chipLength int) parse.Parser { - p := new(Parser) + once sync.Once +} - p.Decoder = decode.NewDecoder(NewPacketConfig(chipLength)) +func NewParser(chipLength int) protocol.Parser { + var p Parser + + p.cfg = protocol.PacketConfig{ + Protocol: "r900", + CenterFreq: 912380000, + DataRate: 32768, + ChipLength: chipLength, + PreambleSymbols: 32, + PacketSymbols: 116, + Preamble: "00000000000000001110010101100100", + } // GF of order 32, polynomial 37, generator 2. p.field = gf.NewField(32, 37, 2) - p.signal = make([]float64, p.Decoder.Cfg.BufferLength) - p.csum = make([]float64, p.Decoder.Cfg.BufferLength+1) - p.filtered = make([][3]float64, p.Decoder.Cfg.BufferLength) - p.quantized = make([]byte, p.Decoder.Cfg.BufferLength) - - return p + return &p } -func (p Parser) Dec() decode.Decoder { - return p.Decoder +func (p *Parser) SetDecoder(d *protocol.Decoder) { + p.Decoder = d } -func (p *Parser) Cfg() *decode.PacketConfig { - return &p.Decoder.Cfg +func (p Parser) Cfg() protocol.PacketConfig { + return p.cfg } // Perform matched filtering. -func (p Parser) Filter() { +func (p Parser) filter() { // This function computes the convolution of each symbol kernel with the // signal. The naive approach requires for each symbol to calculate the // summation of samples between a pair of indices. @@ -128,7 +128,7 @@ func (p Parser) Filter() { } // Determine the symbol that exists at each sample of the signal. -func (p Parser) Quantize() { +func (p Parser) quantize() { // 0 0011, 3 1100 // 1 0101, 4 1010 // 2 0110, 5 1001 @@ -160,13 +160,20 @@ func (p Parser) Quantize() { } // Given a list of indices the preamble exists at, decode and parse a message. -func (p Parser) Parse(indices []int) (msgs []parse.Message) { - cfg := p.Decoder.Cfg +func (p *Parser) Parse(pkts []protocol.Data, msgCh chan protocol.Message, wg *sync.WaitGroup) { + p.once.Do(func() { + p.signal = make([]float64, p.Decoder.Cfg.BufferLength) + p.csum = make([]float64, p.Decoder.Cfg.BufferLength+1) + p.filtered = make([][3]float64, p.Decoder.Cfg.BufferLength) + p.quantized = make([]byte, p.Decoder.Cfg.BufferLength) + }) + + cfg := p.cfg copy(p.signal, p.signal[cfg.BlockSize:]) copy(p.signal[cfg.PacketLength:], p.Decoder.Signal[cfg.SymbolLength:]) - p.Filter() - p.Quantize() + p.filter() + p.quantize() preambleLength := cfg.PreambleLength chipLength := cfg.ChipLength @@ -176,12 +183,12 @@ func (p Parser) Parse(indices []int) (msgs []parse.Message) { seen := make(map[string]bool) - for _, preambleIdx := range indices { - if preambleIdx > cfg.BlockSize { + for _, pkt := range pkts { + if pkt.Idx > cfg.BlockSize { break } - payloadIdx := preambleIdx + preambleLength - p.Dec().Cfg.SymbolLength + payloadIdx := pkt.Idx + preambleLength - p.cfg.SymbolLength var digits string for idx := 0; idx < PayloadSymbols*4*cfg.ChipLength; idx += chipLength * 4 { qIdx := payloadIdx + idx @@ -238,10 +245,10 @@ func (p Parser) Parse(indices []int) (msgs []parse.Message) { r900.LeakNow = uint8(leaknow) copy(r900.checksum[:], symbols[16:]) - msgs = append(msgs, r900) + msgCh <- r900 } - return + wg.Done() } type R900 struct { diff --git a/r900bcd/r900bcd.go b/r900bcd/r900bcd.go index e1726ce52..91d0de1fd 100644 --- a/r900bcd/r900bcd.go +++ b/r900bcd/r900bcd.go @@ -17,33 +17,48 @@ package r900bcd import ( + "log" "strconv" + "sync" - "github.com/bemasher/rtlamr/parse" + "github.com/bemasher/rtlamr/protocol" "github.com/bemasher/rtlamr/r900" ) func init() { - parse.Register("r900bcd", NewParser) + protocol.RegisterParser("r900bcd", NewParser) } type Parser struct { - parse.Parser + protocol.Parser } -func NewParser(ChipLength int) parse.Parser { +func NewParser(ChipLength int) protocol.Parser { return Parser{r900.NewParser(ChipLength)} } // Parse messages using r900 parser and convert consumption from BCD to int. -func (p Parser) Parse(indices []int) (msgs []parse.Message) { - msgs = p.Parser.Parse(indices) - for idx, msg := range msgs { +func (p Parser) Parse(pkts []protocol.Data, msgCh chan protocol.Message, wg *sync.WaitGroup) { + localWg := new(sync.WaitGroup) + + localMsgCh := make(chan protocol.Message) + localWg.Add(1) + + go func() { + localWg.Wait() + close(localMsgCh) + }() + + go p.Parser.Parse(pkts, localMsgCh, localWg) + + for msg := range localMsgCh { r900msg := msg.(r900.R900) + log.Printf("%+v\n", r900msg) hex := strconv.FormatUint(uint64(r900msg.Consumption), 16) consumption, _ := strconv.ParseUint(hex, 10, 32) r900msg.Consumption = uint32(consumption) - msgs[idx] = r900msg + msgCh <- r900msg } - return + + wg.Done() } diff --git a/scm/scm.go b/scm/scm.go index 9cd62f781..9bce0d8e1 100644 --- a/scm/scm.go +++ b/scm/scm.go @@ -20,80 +20,74 @@ import ( "encoding/binary" "fmt" "strconv" + "sync" "github.com/bemasher/rtlamr/crc" - "github.com/bemasher/rtlamr/decode" - "github.com/bemasher/rtlamr/parse" + "github.com/bemasher/rtlamr/protocol" ) func init() { - parse.Register("scm", NewParser) -} - -func NewPacketConfig(chipLength int) (cfg decode.PacketConfig) { - cfg.CenterFreq = 912600155 - cfg.DataRate = 32768 - cfg.ChipLength = chipLength - cfg.PreambleSymbols = 21 - cfg.PacketSymbols = 96 - cfg.Preamble = "111110010101001100000" - - return + protocol.RegisterParser("scm", NewParser) } type Parser struct { - decode.Decoder crc.CRC + cfg protocol.PacketConfig + data protocol.Data } -func NewParser(chipLength int) (p parse.Parser) { +func NewParser(chipLength int) (p protocol.Parser) { return &Parser{ - decode.NewDecoder(NewPacketConfig(chipLength)), - crc.NewCRC("BCH", 0, 0x6F63, 0), + CRC: crc.NewCRC("BCH", 0, 0x6F63, 0), + cfg: protocol.PacketConfig{ + Protocol: "scm", + CenterFreq: 912600155, + DataRate: 32768, + ChipLength: chipLength, + PreambleSymbols: 21, + PacketSymbols: 96, + Preamble: "111110010101001100000", + }, + data: protocol.Data{Bytes: make([]byte, 96>>3)}, } } -func (p Parser) Dec() decode.Decoder { - return p.Decoder -} +func (p Parser) SetDecoder(d *protocol.Decoder) {} -func (p *Parser) Cfg() *decode.PacketConfig { - return &p.Decoder.Cfg +func (p *Parser) Cfg() protocol.PacketConfig { + return p.cfg } -func (p Parser) Parse(indices []int) (msgs []parse.Message) { +func (p Parser) Parse(pkts []protocol.Data, msgCh chan protocol.Message, wg *sync.WaitGroup) { seen := make(map[string]bool) - for _, pkt := range p.Decoder.Slice(indices) { - s := string(pkt) + for _, pkt := range pkts { + p.data.Idx = pkt.Idx + p.data.Bits = pkt.Bits[0:p.cfg.PacketSymbols] + copy(p.data.Bytes, pkt.Bytes) + + s := string(p.data.Bytes) if seen[s] { continue } seen[s] = true - data := parse.NewDataFromBytes(pkt) - - // If the packet is too short, bail. - if l := len(data.Bytes); l != 12 { - continue - } - // If the checksum fails, bail. - if p.Checksum(data.Bytes[2:12]) != 0 { + if p.Checksum(p.data.Bytes[2:12]) != 0 { continue } - scm := NewSCM(data) + scm := NewSCM(p.data) // If the meter id is 0, bail. if scm.ID == 0 { continue } - msgs = append(msgs, scm) + msgCh <- scm } - return + wg.Done() } // Standard Consumption Message @@ -106,7 +100,7 @@ type SCM struct { ChecksumVal uint16 `xml:"Checksum,attr"` } -func NewSCM(data parse.Data) (scm SCM) { +func NewSCM(data protocol.Data) (scm SCM) { ertid, _ := strconv.ParseUint(data.Bits[21:23]+data.Bits[56:80], 2, 26) erttype, _ := strconv.ParseUint(data.Bits[26:30], 2, 4) tamperphy, _ := strconv.ParseUint(data.Bits[24:26], 2, 2) diff --git a/scmplus/scmplus.go b/scmplus/scmplus.go index 057efbd5b..5364feb80 100644 --- a/scmplus/scmplus.go +++ b/scmplus/scmplus.go @@ -21,75 +21,74 @@ import ( "encoding/binary" "fmt" "strconv" + "sync" "github.com/bemasher/rtlamr/crc" - "github.com/bemasher/rtlamr/decode" - "github.com/bemasher/rtlamr/parse" + "github.com/bemasher/rtlamr/protocol" ) func init() { - parse.Register("scm+", NewParser) -} - -func NewPacketConfig(chipLength int) (cfg decode.PacketConfig) { - cfg.CenterFreq = 912600155 - cfg.DataRate = 32768 - cfg.ChipLength = chipLength - cfg.PreambleSymbols = 16 - cfg.PacketSymbols = 16 * 8 - cfg.Preamble = "0001011010100011" - - return + protocol.RegisterParser("scm+", NewParser) } type Parser struct { - decode.Decoder crc.CRC + cfg protocol.PacketConfig + data protocol.Data } -func (p Parser) Dec() decode.Decoder { - return p.Decoder -} +func (p Parser) SetDecoder(d *protocol.Decoder) {} -func (p *Parser) Cfg() *decode.PacketConfig { - return &p.Decoder.Cfg +func (p *Parser) Cfg() protocol.PacketConfig { + return p.cfg } -func NewParser(chipLength int) (p parse.Parser) { +func NewParser(chipLength int) (p protocol.Parser) { return &Parser{ - decode.NewDecoder(NewPacketConfig(chipLength)), - crc.NewCRC("CCITT", 0xFFFF, 0x1021, 0x1D0F), + CRC: crc.NewCRC("CCITT", 0xFFFF, 0x1021, 0x1D0F), + cfg: protocol.PacketConfig{ + Protocol: "scm+", + CenterFreq: 912600155, + DataRate: 32768, + ChipLength: chipLength, + PreambleSymbols: 16, + PacketSymbols: 16 * 8, + Preamble: "0001011010100011", + }, + data: protocol.Data{Bytes: make([]byte, 16)}, } } -func (p Parser) Parse(indices []int) (msgs []parse.Message) { +func (p Parser) Parse(pkts []protocol.Data, msgCh chan protocol.Message, wg *sync.WaitGroup) { seen := make(map[string]bool) - for _, pkt := range p.Decoder.Slice(indices) { - s := string(pkt) + for _, pkt := range pkts { + p.data.Idx = pkt.Idx + p.data.Bits = pkt.Bits[0:p.cfg.PacketSymbols] + copy(p.data.Bytes, pkt.Bytes) + + s := string(p.data.Bytes) if seen[s] { continue } seen[s] = true - data := parse.NewDataFromBytes(pkt) - // If the checksum fails, bail. - if residue := p.Checksum(data.Bytes[2:]); residue != p.Residue { + if residue := p.Checksum(p.data.Bytes[2:]); residue != p.Residue { continue } - scm := NewSCM(data) + scm := NewSCM(p.data) // If the EndpointID is 0 or ProtocolID is invalid, bail. if scm.EndpointID == 0 || scm.ProtocolID != 0x1E { continue } - msgs = append(msgs, scm) + msgCh <- scm } - return + wg.Done() } // Standard Consumption Message Plus @@ -103,7 +102,7 @@ type SCM struct { PacketCRC uint16 `xml:"Checksum,attr",json:"Checksum"` } -func NewSCM(data parse.Data) (scm SCM) { +func NewSCM(data protocol.Data) (scm SCM) { binary.Read(bytes.NewReader(data.Bytes), binary.BigEndian, &scm) return