From 9e7351d28c19914bce9cf2f465a8e97c6556e3e0 Mon Sep 17 00:00:00 2001 From: Yohan Legat Date: Fri, 10 Jun 2016 15:24:21 +0200 Subject: [PATCH 1/3] Resolve buger/gor#300 : respect timestamps when replaying requests When Gor deals with multiple input files, it sorts them and replay their requests file after file. This behavior is not convenient and should be changed. We should respect timestamps order when replaying requests. --- input_file.go | 205 ++++++++++++++++++++++++++------------------- input_file_test.go | 100 +++++++++++++++++++--- 2 files changed, 205 insertions(+), 100 deletions(-) diff --git a/input_file.go b/input_file.go index ba3f542a..81701793 100644 --- a/input_file.go +++ b/input_file.go @@ -9,31 +9,40 @@ import ( "log" "os" "path/filepath" - "sort" - "strconv" "strings" + "strconv" "time" ) // FileInput can read requests generated by FileOutput type FileInput struct { - data chan []byte - path string - currentFile *os.File - currentReader *bufio.Reader - speedFactor float64 - loop bool + data chan []byte + exit chan bool + path string + fileInputReaders []*fileInputReader + speedFactor float64 + loop bool +} + + +type fileInputReader struct { + reader *bufio.Reader + meta [][]byte + data []byte + file *os.File + timestamp int64 } // NewFileInput constructor for FileInput. Accepts file path as argument. func NewFileInput(path string, loop bool) (i *FileInput) { i = new(FileInput) i.data = make(chan []byte) + i.exit = make(chan bool) i.path = path i.speedFactor = 1 i.loop = loop - if err := i.updateFile(); err != nil { + if err := i.init(); err != nil { return } @@ -48,9 +57,7 @@ func (_ *NextFileNotFound) Error() string { return "There is no new files" } -// path can be a pattern -// It sort paths lexicographically and tries to choose next one -func (i *FileInput) updateFile() (err error) { +func (i *FileInput) init() (err error) { var matches []string if matches, err = filepath.Glob(i.path); err != nil { @@ -63,40 +70,24 @@ func (i *FileInput) updateFile() (err error) { return errors.New("No matching files") } - sort.Sort(sortByFileIndex(matches)) - - // Just pick first file, if there is many, and we are just started - if i.currentFile == nil { - if i.currentFile, err = os.Open(matches[0]); err != nil { - log.Println("Can't read file ", matches[0], err) - return - } - } else { - found := false - for idx, p := range matches { - if p == i.currentFile.Name() && idx != len(matches)-1 { - if i.currentFile, err = os.Open(matches[idx+1]); err != nil { - log.Println("Can't read file ", matches[idx+1], err) - return - } + i.fileInputReaders = make([]*fileInputReader, len(matches)) - found = true + for idx, p := range matches { + file, _ := os.Open(p) + fileInputReader := &fileInputReader{} + fileInputReader.file = file + if strings.HasSuffix(p, ".gz") { + gzReader, err := gzip.NewReader(file) + if err != nil { + log.Fatal(err) } + fileInputReader.reader = bufio.NewReader(gzReader) + } else { + fileInputReader.reader = bufio.NewReader(file) } - if !found { - return new(NextFileNotFound) - } - } - - if strings.HasSuffix(i.currentFile.Name(), ".gz") { - gzReader, err := gzip.NewReader(i.currentFile) - if err != nil { - log.Fatal(err) - } - i.currentReader = bufio.NewReader(gzReader) - } else { - i.currentReader = bufio.NewReader(i.currentFile) + fileInputReader.readNextInput() + i.fileInputReaders[idx] = fileInputReader } return nil @@ -113,83 +104,123 @@ func (i *FileInput) String() string { return "File input: " + i.path } -func (i *FileInput) emit() { - var lastTime int64 +func (f *fileInputReader) readNextInput() { + nextInput := f.nextInput() + f.parseNextInput(nextInput) +} - payloadSeparatorAsBytes := []byte(payloadSeparator) +func (f *fileInputReader) parseNextInput(input []byte) { + if (input != nil) { + f.meta = payloadMeta(input) + f.timestamp, _ = strconv.ParseInt(string(f.meta[2]), 10, 64) + f.data = input + } +} +func (f *fileInputReader) nextInput() []byte { + payloadSeparatorAsBytes := []byte(payloadSeparator) var buffer bytes.Buffer - if i.currentReader == nil { - return - } - for { - line, err := i.currentReader.ReadBytes('\n') + line, err := f.reader.ReadBytes('\n') if err != nil { if err != io.EOF { log.Fatal(err) } - // If our path pattern match multiple files, try to find them if err == io.EOF { - if e := i.updateFile(); e != nil { - if _, ok := e.(*NextFileNotFound); ok && i.loop { - // Start from the first file - i.Close() - i.currentFile = nil - i.currentReader = nil - lastTime = 0 - i.updateFile() - - continue - } else { - break - } - } - - continue + f.file.Close() + f.file = nil + return nil } } if bytes.Equal(payloadSeparatorAsBytes[1:], line) { asBytes := buffer.Bytes() - buffer.Reset() - meta := payloadMeta(asBytes) + // Bytes() returns only pointer, so to remove data-race copy the data to an array + newBuf := make([]byte, len(asBytes) - 1) + copy(newBuf, asBytes) + return newBuf + } + + buffer.Write(line) + } +} - if len(meta) > 2 && meta[0][0] == RequestPayload { - ts, _ := strconv.ParseInt(string(meta[2]), 10, 64) +func (i *FileInput) nextInputReader() *fileInputReader { + var nextFileInputReader *fileInputReader + for _, fileInputReader := range i.fileInputReaders { + if fileInputReader.file == nil { + continue + } - if lastTime != 0 { - timeDiff := ts - lastTime + if fileInputReader.meta[0][0] == ResponsePayload { + return fileInputReader + } - if i.speedFactor != 1 { - timeDiff = int64(float64(timeDiff) / i.speedFactor) - } + if nextFileInputReader == nil || nextFileInputReader.timestamp > fileInputReader.timestamp { + nextFileInputReader = fileInputReader + continue + } + } - time.Sleep(time.Duration(timeDiff)) - } + return nextFileInputReader; +} - lastTime = ts - } +func (i *FileInput) emit() { + var lastTime int64 = -1 - // Bytes() returns only pointer, so to remove data-race copy the data to an array - newBuf := make([]byte, len(asBytes)-1) - copy(newBuf, asBytes) + for { + fileInputReader := i.nextInputReader() - i.data <- newBuf - } else { - buffer.Write(line) + if fileInputReader == nil { + if i.loop { + i.init() + lastTime = -1 + continue + } else { + break; + } + } + + if fileInputReader.meta[0][0] == RequestPayload { + lastTime = i.simulateRequestDelay(fileInputReader, lastTime) } + select { + case <-i.exit: + for _, fileInputReader := range i.fileInputReaders { + if fileInputReader.file != nil { + fileInputReader.file.Close() + } + } + break + case i.data <- fileInputReader.data: + fileInputReader.readNextInput() + } } log.Printf("FileInput: end of file '%s'\n", i.path) } +func (i*FileInput) simulateRequestDelay(fileInputReader *fileInputReader, lastTime int64) int64 { + if lastTime != -1 { + timeDiff := fileInputReader.timestamp - lastTime + + if i.speedFactor != 1 { + timeDiff = int64(float64(timeDiff) / i.speedFactor) + } + + time.Sleep(time.Duration(timeDiff)) + } + + return fileInputReader.timestamp +} + func (i *FileInput) Close() error { - i.currentFile.Close() + i.exit <- true return nil } + diff --git a/input_file_test.go b/input_file_test.go index e5857cfd..61c18cdc 100644 --- a/input_file_test.go +++ b/input_file_test.go @@ -97,35 +97,109 @@ func TestInputFileWithGETAndPOST(t *testing.T) { } -func TestInputFileMultipleFiles(t *testing.T) { +func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) { rnd := rand.Int63() file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) file1.Write([]byte("1 1 1\ntest1")) file1.Write([]byte(payloadSeparator)) - file1.Write([]byte("1 1 2\ntest2")) + file1.Write([]byte("1 1 3\ntest2")) file1.Write([]byte(payloadSeparator)) file1.Close() file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) - file2.Write([]byte("1 1 3\ntest3")) + file2.Write([]byte("1 1 2\ntest3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("1 1 4\ntest4")) file2.Write([]byte(payloadSeparator)) file2.Close() input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false) buf := make([]byte, 1000) - n, _ := input.Read(buf) - if buf[10] != '1' { - t.Error("Shound emit requests in right order", string(buf[:n])) + + for i := '1'; i <= '4'; i++ { + n, _ := input.Read(buf) + if buf[4] != byte(i) { + t.Error("Should emit requests in right order", string(buf[:n])) + } + } + + os.Remove(file1.Name()) + os.Remove(file2.Name()) +} + +func TestInputFileRequestsWithLatency(t *testing.T) { + rnd := rand.Int63() + + file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY | os.O_CREATE | os.O_TRUNC, 0660) + defer file.Close() + + file.Write([]byte("1 1 100000000\nrequest1")) + file.Write([]byte(payloadSeparator)) + file.Write([]byte("1 2 150000000\nrequest2")) + file.Write([]byte(payloadSeparator)) + file.Write([]byte("1 3 250000000\nrequest3")) + file.Write([]byte(payloadSeparator)) + + input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false) + buf := make([]byte, 1000) + + start := time.Now().UnixNano() + for i := 0; i < 3; i++ { + input.Read(buf) } - input.Read(buf) - if buf[10] != '2' { - t.Error("Shound emit requests in right order", string(buf[:n])) + end := time.Now().UnixNano() + + var expectedLatency int64 = 250000000 - 100000000 + realLatency := end - start + if realLatency < expectedLatency { + t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) } - input.Read(buf) - if buf[10] != '3' { - t.Error("Shound emit requests from second file", string(buf[:n])) + if realLatency > expectedLatency + 10000000 { + t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) + + } +} + +func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { + rnd := rand.Int63() + + file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file1.Write([]byte("1 1 1\nrequest1")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("2 1 1\nresponse1")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("1 2 3\nrequest2")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("2 2 3\nresponse2")) + file1.Write([]byte(payloadSeparator)) + file1.Close() + + file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file2.Write([]byte("1 3 2\nrequest3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("2 3 2\nresponse3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("1 4 4\nrequest4")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("2 4 4\nresponse4")) + file2.Write([]byte(payloadSeparator)) + file2.Close() + + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false) + buf := make([]byte, 1000) + + for i := '1'; i <= '4'; i++ { + n, _ := input.Read(buf) + if buf[0] != '1' && buf[4] != byte(i) { + t.Error("Shound emit requests in right order", string(buf[:n])) + } + + n, _ = input.Read(buf) + if buf[0] != '2' && buf[4] != byte(i) { + t.Error("Shound emit responses in right order", string(buf[:n])) + } } os.Remove(file1.Name()) @@ -149,8 +223,8 @@ func TestInputFileLoop(t *testing.T) { for i := 0; i < 1000; i++ { input.Read(buf) } - input.Close() + input.Close() os.Remove(file.Name()) } From 37650df2c86e7b2ae752c46b3ff03bf81f58436e Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 29 Jun 2016 17:29:09 +0300 Subject: [PATCH 2/3] Refactor #308 --- input_dummy.go | 4 +- input_file.go | 241 ++++++++++++++------------- input_file_test.go | 8 +- input_http.go | 2 +- input_raw.go | 4 +- output_http.go | 5 +- proto/proto.go | 14 +- protocol.go | 21 ++- raw_socket_listener/listener.go | 42 ++--- raw_socket_listener/listener_test.go | 2 - raw_socket_listener/tcp_message.go | 4 +- test_input.go | 2 +- 12 files changed, 186 insertions(+), 163 deletions(-) diff --git a/input_dummy.go b/input_dummy.go index a2b46740..beb1f9cc 100644 --- a/input_dummy.go +++ b/input_dummy.go @@ -34,10 +34,10 @@ func (i *DummyInput) emit() { select { case <-ticker.C: uuid := uuid() - reqh := payloadHeader(RequestPayload, uuid, time.Now().UnixNano()) + reqh := payloadHeader(RequestPayload, uuid, time.Now().UnixNano(), -1) i.data <- append(reqh, []byte("GET / HTTP/1.1\r\nHost: www.w3.org\r\nUser-Agent: Go 1.1 package http\r\nAccept-Encoding: gzip\r\n\r\n")...) - resh := payloadHeader(ResponsePayload, uuid, 1) + resh := payloadHeader(ResponsePayload, uuid, time.Now().UnixNano()+1, 1) i.data <- append(resh, []byte("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")...) } } diff --git a/input_file.go b/input_file.go index 81701793..37e2b6a3 100644 --- a/input_file.go +++ b/input_file.go @@ -9,35 +9,109 @@ import ( "log" "os" "path/filepath" - "strings" "strconv" + "strings" + "sync" "time" ) -// FileInput can read requests generated by FileOutput -type FileInput struct { - data chan []byte - exit chan bool - path string - fileInputReaders []*fileInputReader - speedFactor float64 - loop bool -} - - type fileInputReader struct { reader *bufio.Reader - meta [][]byte data []byte file *os.File timestamp int64 } +func (f *fileInputReader) parseNext() error { + payloadSeparatorAsBytes := []byte(payloadSeparator) + var buffer bytes.Buffer + + for { + line, err := f.reader.ReadBytes('\n') + + if err != nil { + if err != io.EOF { + log.Println(err) + return err + } + + if err == io.EOF { + f.file.Close() + f.file = nil + return err + } + } + + if bytes.Equal(payloadSeparatorAsBytes[1:], line) { + asBytes := buffer.Bytes() + meta := payloadMeta(asBytes) + + f.timestamp, _ = strconv.ParseInt(string(meta[2]), 10, 64) + f.data = asBytes[:len(asBytes)-1] + + return nil + } + + buffer.Write(line) + } + + return nil +} + +func (f *fileInputReader) ReadPayload() []byte { + defer f.parseNext() + + return f.data +} +func (f *fileInputReader) Close() error { + if f.file != nil { + f.file.Close() + } + + return nil +} + +func NewFileInputReader(path string) *fileInputReader { + file, err := os.Open(path) + + if err != nil { + log.Println(err) + return nil + } + + r := &fileInputReader{file: file} + if strings.HasSuffix(path, ".gz") { + gzReader, err := gzip.NewReader(file) + if err != nil { + log.Println(err) + return nil + } + r.reader = bufio.NewReader(gzReader) + } else { + r.reader = bufio.NewReader(file) + } + + r.parseNext() + + return r +} + +// FileInput can read requests generated by FileOutput +type FileInput struct { + mu sync.Mutex + data chan []byte + exit chan bool + path string + readers []*fileInputReader + speedFactor float64 + loop bool +} + // NewFileInput constructor for FileInput. Accepts file path as argument. func NewFileInput(path string, loop bool) (i *FileInput) { i = new(FileInput) - i.data = make(chan []byte) - i.exit = make(chan bool) + i.data = make(chan []byte, 1000) + i.exit = make(chan bool, 1) i.path = path i.speedFactor = 1 i.loop = loop @@ -58,6 +132,9 @@ func (_ *NextFileNotFound) Error() string { } func (i *FileInput) init() (err error) { + defer i.mu.Unlock() + i.mu.Lock() + var matches []string if matches, err = filepath.Glob(i.path); err != nil { @@ -70,24 +147,10 @@ func (i *FileInput) init() (err error) { return errors.New("No matching files") } - i.fileInputReaders = make([]*fileInputReader, len(matches)) + i.readers = make([]*fileInputReader, len(matches)) for idx, p := range matches { - file, _ := os.Open(p) - fileInputReader := &fileInputReader{} - fileInputReader.file = file - if strings.HasSuffix(p, ".gz") { - gzReader, err := gzip.NewReader(file) - if err != nil { - log.Fatal(err) - } - fileInputReader.reader = bufio.NewReader(gzReader) - } else { - fileInputReader.reader = bufio.NewReader(file) - } - - fileInputReader.readNextInput() - i.fileInputReaders[idx] = fileInputReader + i.readers[idx] = NewFileInputReader(p) } return nil @@ -104,123 +167,71 @@ func (i *FileInput) String() string { return "File input: " + i.path } -func (f *fileInputReader) readNextInput() { - nextInput := f.nextInput() - f.parseNextInput(nextInput) -} - -func (f *fileInputReader) parseNextInput(input []byte) { - if (input != nil) { - f.meta = payloadMeta(input) - f.timestamp, _ = strconv.ParseInt(string(f.meta[2]), 10, 64) - f.data = input - } -} - -func (f *fileInputReader) nextInput() []byte { - payloadSeparatorAsBytes := []byte(payloadSeparator) - var buffer bytes.Buffer - - for { - line, err := f.reader.ReadBytes('\n') - - if err != nil { - if err != io.EOF { - log.Fatal(err) - } - - if err == io.EOF { - f.file.Close() - f.file = nil - return nil - } - } - - if bytes.Equal(payloadSeparatorAsBytes[1:], line) { - asBytes := buffer.Bytes() - - // Bytes() returns only pointer, so to remove data-race copy the data to an array - newBuf := make([]byte, len(asBytes) - 1) - copy(newBuf, asBytes) - return newBuf - } - - buffer.Write(line) - } -} - -func (i *FileInput) nextInputReader() *fileInputReader { - var nextFileInputReader *fileInputReader - for _, fileInputReader := range i.fileInputReaders { - if fileInputReader.file == nil { +// Find reader with smallest timestamp e.g next payload in row +func (i *FileInput) nextReader() (next *fileInputReader) { + for _, r := range i.readers { + if r == nil || r.file == nil { continue } - if fileInputReader.meta[0][0] == ResponsePayload { - return fileInputReader - } - - if nextFileInputReader == nil || nextFileInputReader.timestamp > fileInputReader.timestamp { - nextFileInputReader = fileInputReader + if next == nil || r.timestamp < next.timestamp { + next = r continue } } - return nextFileInputReader; + return } func (i *FileInput) emit() { var lastTime int64 = -1 for { - fileInputReader := i.nextInputReader() + select { + case <-i.exit: + return + default: + } + + reader := i.nextReader() - if fileInputReader == nil { + if reader == nil { if i.loop { i.init() lastTime = -1 continue } else { - break; + break } } - if fileInputReader.meta[0][0] == RequestPayload { - lastTime = i.simulateRequestDelay(fileInputReader, lastTime) - } + if lastTime != -1 { + diff := reader.timestamp - lastTime + lastTime = reader.timestamp - select { - case <-i.exit: - for _, fileInputReader := range i.fileInputReaders { - if fileInputReader.file != nil { - fileInputReader.file.Close() - } + if i.speedFactor != 1 { + diff = int64(float64(diff) / i.speedFactor) } - break - case i.data <- fileInputReader.data: - fileInputReader.readNextInput() + + time.Sleep(time.Duration(diff)) + } else { + lastTime = reader.timestamp } + + i.data <- reader.ReadPayload() } log.Printf("FileInput: end of file '%s'\n", i.path) } -func (i*FileInput) simulateRequestDelay(fileInputReader *fileInputReader, lastTime int64) int64 { - if lastTime != -1 { - timeDiff := fileInputReader.timestamp - lastTime +func (i *FileInput) Close() error { + defer i.mu.Unlock() + i.mu.Lock() - if i.speedFactor != 1 { - timeDiff = int64(float64(timeDiff) / i.speedFactor) - } + i.exit <- true - time.Sleep(time.Duration(timeDiff)) + r.Close() } - return fileInputReader.timestamp -} - -func (i *FileInput) Close() error { - i.exit <- true return nil } - diff --git a/input_file_test.go b/input_file_test.go index 61c18cdc..90296117 100644 --- a/input_file_test.go +++ b/input_file_test.go @@ -131,7 +131,7 @@ func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) { func TestInputFileRequestsWithLatency(t *testing.T) { rnd := rand.Int63() - file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY | os.O_CREATE | os.O_TRUNC, 0660) + file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) defer file.Close() file.Write([]byte("1 1 100000000\nrequest1")) @@ -156,7 +156,7 @@ func TestInputFileRequestsWithLatency(t *testing.T) { t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) } - if realLatency > expectedLatency + 10000000 { + if realLatency > expectedLatency+10000000 { t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) } @@ -180,7 +180,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { file2.Write([]byte("1 3 2\nrequest3")) file2.Write([]byte(payloadSeparator)) file2.Write([]byte("2 3 2\nresponse3")) - file2.Write([]byte(payloadSeparator)) + file2.Write([]byte(payloadSeparator)) file2.Write([]byte("1 4 4\nrequest4")) file2.Write([]byte(payloadSeparator)) file2.Write([]byte("2 4 4\nresponse4")) @@ -295,11 +295,9 @@ func (expectedCaptureFile *CaptureFile) PayloadsEqual(other [][]byte) bool { } for i, payload := range other { - if !bytes.Equal(expectedCaptureFile.data[i], payload) { return false } - } return true diff --git a/input_http.go b/input_http.go index 33263b92..f3a6419f 100644 --- a/input_http.go +++ b/input_http.go @@ -29,7 +29,7 @@ func NewHTTPInput(address string) (i *HTTPInput) { func (i *HTTPInput) Read(data []byte) (int, error) { buf := <-i.data - header := payloadHeader(RequestPayload, uuid(), time.Now().UnixNano()) + header := payloadHeader(RequestPayload, uuid(), time.Now().UnixNano(), -1) copy(data[0:len(header)], header) copy(data[len(header):], buf) diff --git a/input_raw.go b/input_raw.go index 7b920966..26b3dd03 100644 --- a/input_raw.go +++ b/input_raw.go @@ -50,12 +50,12 @@ func (i *RAWInput) Read(data []byte) (int, error) { var header []byte if msg.IsIncoming { - header = payloadHeader(RequestPayload, msg.UUID(), msg.Start.UnixNano()) + header = payloadHeader(RequestPayload, msg.UUID(), msg.Start.UnixNano(), -1) if len(i.realIPHeader) > 0 { buf = proto.SetHeader(buf, i.realIPHeader, []byte(msg.IP().String())) } } else { - header = payloadHeader(ResponsePayload, msg.UUID(), msg.End.UnixNano()-msg.AssocMessage.Start.UnixNano()) + header = payloadHeader(ResponsePayload, msg.UUID(), msg.AssocMessage.Start.UnixNano(), msg.End.UnixNano()-msg.AssocMessage.Start.UnixNano()) } copy(data[0:len(header)], header) diff --git a/output_http.go b/output_http.go index 97423a86..d442b832 100644 --- a/output_http.go +++ b/output_http.go @@ -14,6 +14,7 @@ type response struct { payload []byte uuid []byte roundTripTime int64 + startedAt int64 } // HTTPOutputConfig struct for holding http output configuration @@ -178,7 +179,7 @@ func (o *HTTPOutput) Read(data []byte) (int, error) { Debug("[OUTPUT-HTTP] Received response:", string(resp.payload)) - header := payloadHeader(ReplayedResponsePayload, resp.uuid, resp.roundTripTime) + header := payloadHeader(ReplayedResponsePayload, resp.uuid, resp.startedAt, resp.roundTripTime) copy(data[0:len(header)], header) copy(data[len(header):], resp.payload) @@ -206,7 +207,7 @@ func (o *HTTPOutput) sendRequest(client *HTTPClient, request []byte) { } if o.config.TrackResponses { - o.responses <- response{resp, uuid, stop.UnixNano() - start.UnixNano()} + o.responses <- response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()} } if o.elasticSearch != nil { diff --git a/proto/proto.go b/proto/proto.go index 40e59b14..186926fa 100644 --- a/proto/proto.go +++ b/proto/proto.go @@ -133,7 +133,7 @@ func header(payload []byte, name []byte) (value []byte, headerStart, headerEnd, valueStart = headerStart + len(name) + 1 // Skip ":" after header name headerEnd = valueStart + bytes.IndexByte(payload[valueStart:], '\n') - for valueStart < headerEnd { // Ignore empty space after ':' + for valueStart < headerEnd { // Ignore empty space after ':' if payload[valueStart] == ' ' { valueStart++ } else { @@ -148,7 +148,7 @@ func header(payload []byte, name []byte) (value []byte, headerStart, headerEnd, } // ignore empty space at end of header value - for valueStart < valueEnd { + for valueStart < valueEnd { if payload[valueEnd-1] == ' ' { valueEnd-- } else { @@ -170,7 +170,7 @@ func Header(payload, name []byte) []byte { // SetHeader sets header value. If header not found it creates new one. // Returns modified request payload func SetHeader(payload, name, value []byte) []byte { - _, hs, _, vs, ve := header(payload, name) + _, hs, _, vs, ve := header(payload, name) if hs != -1 { // If header found we just replace its value @@ -196,12 +196,12 @@ func AddHeader(payload, name, value []byte) []byte { // DelHeader takes http payload and removes header name from headers section // Returns modified request payload -func DeleteHeader(payload, name[]byte) []byte { +func DeleteHeader(payload, name []byte) []byte { _, hs, he, _, _ := header(payload, name) if hs != -1 { - newHeader := make([]byte, len(payload) - (he - hs) - 1) + newHeader := make([]byte, len(payload)-(he-hs)-1) copy(newHeader[:hs], payload[:hs]) - copy(newHeader[hs:], payload[he + 1:]) + copy(newHeader[hs:], payload[he+1:]) return newHeader } return payload @@ -322,7 +322,7 @@ func Status(payload []byte) []byte { } var httpMethods []string = []string{ - "GET ", "OPTI", "HEAD", "POST", "PUT ", "DELE", "TRAC", "CONN", /* custom methods */"BAN", "PURG", + "GET ", "OPTI", "HEAD", "POST", "PUT ", "DELE", "TRAC", "CONN" /* custom methods */, "BAN", "PURG", } func IsHTTPPayload(payload []byte) bool { diff --git a/protocol.go b/protocol.go index 1b4c4844..50d6943b 100644 --- a/protocol.go +++ b/protocol.go @@ -42,13 +42,24 @@ func payloadScanner(data []byte, atEOF bool) (advance int, token []byte, err err } // Timing is request start or round-trip time, depending on payloadType -func payloadHeader(payloadType byte, uuid []byte, timing int64) (header []byte) { - sTime := strconv.FormatInt(timing, 10) +func payloadHeader(payloadType byte, uuid []byte, timing int64, latency int64) (header []byte) { + var sTime, sLatency string + + sTime = strconv.FormatInt(timing, 10) + if latency != -1 { + sLatency = strconv.FormatInt(latency, 10) + } //Example: // 3 f45590522cd1838b4a0d5c5aab80b77929dea3b3 1231\n // `+ 1` indicates space characters or end of line - header = make([]byte, 1+1+len(uuid)+1+len(sTime)+1) + headerLen := 1 + 1 + len(uuid) + 1 + len(sTime) + 1 + + if latency != -1 { + headerLen += len(sLatency) + 1 + } + + header = make([]byte, headerLen) header[0] = payloadType header[1] = ' ' header[2+len(uuid)] = ' ' @@ -57,6 +68,10 @@ func payloadHeader(payloadType byte, uuid []byte, timing int64) (header []byte) copy(header[2:], uuid) copy(header[3+len(uuid):], sTime) + if latency != -1 { + copy(header[4+len(uuid)+len(sTime):], sLatency) + } + return header } diff --git a/raw_socket_listener/listener.go b/raw_socket_listener/listener.go index 3cd5e707..5a492d83 100644 --- a/raw_socket_listener/listener.go +++ b/raw_socket_listener/listener.go @@ -16,6 +16,7 @@ import ( "bytes" "encoding/binary" "fmt" + "github.com/buger/gor/proto" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" @@ -28,7 +29,6 @@ import ( "strings" "sync" "time" - "github.com/buger/gor/proto" ) var _ = fmt.Println @@ -303,7 +303,7 @@ func (t *Listener) readPcap() { for i, addr := range device.Addresses { bpfDstHost += "dst host " + addr.IP.String() bpfSrcHost += "src host " + addr.IP.String() - if i != len(device.Addresses) - 1 { + if i != len(device.Addresses)-1 { bpfDstHost += " or " bpfSrcHost += " or " } @@ -330,9 +330,9 @@ func (t *Listener) readPcap() { // Special case for tunnel interface https://github.com/google/gopacket/issues/99 if handle.LinkType() == 12 { - decoder = layers.LayerTypeIPv4 + decoder = layers.LayerTypeIPv4 } else { - decoder = handle.LinkType() + decoder = handle.LinkType() } source := gopacket.NewPacketSource(handle, decoder) @@ -355,23 +355,23 @@ func (t *Listener) readPcap() { // We should remove network layer before parsing TCP/IP data var of int switch decoder { - case layers.LinkTypeEthernet: - of = 14 - case layers.LinkTypePPP: - of = 1 - case layers.LinkTypeFDDI: - of = 13 - case layers.LinkTypeNull: - of = 4 - case layers.LinkTypeLoop: - of = 4 - case layers.LinkTypeRaw: - of = 0 - case layers.LinkTypeLinuxSLL: - of = 16 - default: - log.Println("Unknown packet layer", packet) - break + case layers.LinkTypeEthernet: + of = 14 + case layers.LinkTypePPP: + of = 1 + case layers.LinkTypeFDDI: + of = 13 + case layers.LinkTypeNull: + of = 4 + case layers.LinkTypeLoop: + of = 4 + case layers.LinkTypeRaw: + of = 0 + case layers.LinkTypeLinuxSLL: + of = 16 + default: + log.Println("Unknown packet layer", packet) + break } data = packet.Data()[of:] diff --git a/raw_socket_listener/listener_test.go b/raw_socket_listener/listener_test.go index 3f3b250e..19afb74e 100644 --- a/raw_socket_listener/listener_test.go +++ b/raw_socket_listener/listener_test.go @@ -166,8 +166,6 @@ func TestAlt100ContinueHeaderOrder(t *testing.T) { testRawListener100Continue(t, listener, result, reqPacket1, reqPacket2, reqPacket3, respPacket1, respPacket2) } - - func testRawListener100Continue(t *testing.T, listener *Listener, result []byte, packets ...*TCPPacket) { var req, resp *TCPMessage for _, p := range packets { diff --git a/raw_socket_listener/tcp_message.go b/raw_socket_listener/tcp_message.go index 5045f4db..43002333 100644 --- a/raw_socket_listener/tcp_message.go +++ b/raw_socket_listener/tcp_message.go @@ -7,9 +7,9 @@ import ( "encoding/hex" "github.com/buger/gor/proto" "log" + "net" "strconv" "time" - "net" ) var _ = log.Println @@ -300,4 +300,4 @@ func (t *TCPMessage) ID() tcpID { func (t *TCPMessage) IP() net.IP { return net.IP(t.packets[0].Addr) -} \ No newline at end of file +} diff --git a/test_input.go b/test_input.go index cd146c56..0f990e72 100644 --- a/test_input.go +++ b/test_input.go @@ -22,7 +22,7 @@ func NewTestInput() (i *TestInput) { func (i *TestInput) Read(data []byte) (int, error) { buf := <-i.data - header := payloadHeader(RequestPayload, uuid(), time.Now().UnixNano()) + header := payloadHeader(RequestPayload, uuid(), time.Now().UnixNano(), -1) copy(data[0:len(header)], header) copy(data[len(header):], buf) From 7c3333643362e702eb61a0be47dc0b07805d0bb2 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 29 Jun 2016 17:29:36 +0300 Subject: [PATCH 3/3] Add missing change --- input_file.go | 1 + 1 file changed, 1 insertion(+) diff --git a/input_file.go b/input_file.go index 37e2b6a3..6dfffc44 100644 --- a/input_file.go +++ b/input_file.go @@ -230,6 +230,7 @@ func (i *FileInput) Close() error { i.exit <- true + for _, r := range i.readers { r.Close() }