diff --git a/input_dummy.go b/input_dummy.go index a2b46740..beb1f9cc 100644 --- a/input_dummy.go +++ b/input_dummy.go @@ -34,10 +34,10 @@ func (i *DummyInput) emit() { select { case <-ticker.C: uuid := uuid() - reqh := payloadHeader(RequestPayload, uuid, time.Now().UnixNano()) + reqh := payloadHeader(RequestPayload, uuid, time.Now().UnixNano(), -1) i.data <- append(reqh, []byte("GET / HTTP/1.1\r\nHost: www.w3.org\r\nUser-Agent: Go 1.1 package http\r\nAccept-Encoding: gzip\r\n\r\n")...) - resh := payloadHeader(ResponsePayload, uuid, 1) + resh := payloadHeader(ResponsePayload, uuid, time.Now().UnixNano()+1, 1) i.data <- append(resh, []byte("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")...) } } diff --git a/input_file.go b/input_file.go index ba3f542a..6dfffc44 100644 --- a/input_file.go +++ b/input_file.go @@ -9,31 +9,114 @@ import ( "log" "os" "path/filepath" - "sort" "strconv" "strings" + "sync" "time" ) +type fileInputReader struct { + reader *bufio.Reader + data []byte + file *os.File + timestamp int64 +} + +func (f *fileInputReader) parseNext() error { + payloadSeparatorAsBytes := []byte(payloadSeparator) + var buffer bytes.Buffer + + for { + line, err := f.reader.ReadBytes('\n') + + if err != nil { + if err != io.EOF { + log.Println(err) + return err + } + + if err == io.EOF { + f.file.Close() + f.file = nil + return err + } + } + + if bytes.Equal(payloadSeparatorAsBytes[1:], line) { + asBytes := buffer.Bytes() + meta := payloadMeta(asBytes) + + f.timestamp, _ = strconv.ParseInt(string(meta[2]), 10, 64) + f.data = asBytes[:len(asBytes)-1] + + return nil + } + + buffer.Write(line) + } + + return nil +} + +func (f *fileInputReader) ReadPayload() []byte { + defer f.parseNext() + + return f.data +} +func (f *fileInputReader) Close() error { + if f.file != nil { + f.file.Close() + } + + return nil +} + +func NewFileInputReader(path string) *fileInputReader { + file, err := os.Open(path) + + if err != nil { + log.Println(err) + return nil + } + + r := &fileInputReader{file: file} + if strings.HasSuffix(path, ".gz") { + gzReader, err := gzip.NewReader(file) + if err != nil { + log.Println(err) + return nil + } + r.reader = bufio.NewReader(gzReader) + } else { + r.reader = bufio.NewReader(file) + } + + r.parseNext() + + return r +} + // FileInput can read requests generated by FileOutput type FileInput struct { - data chan []byte - path string - currentFile *os.File - currentReader *bufio.Reader - speedFactor float64 - loop bool + mu sync.Mutex + data chan []byte + exit chan bool + path string + readers []*fileInputReader + speedFactor float64 + loop bool } // NewFileInput constructor for FileInput. Accepts file path as argument. func NewFileInput(path string, loop bool) (i *FileInput) { i = new(FileInput) - i.data = make(chan []byte) + i.data = make(chan []byte, 1000) + i.exit = make(chan bool, 1) i.path = path i.speedFactor = 1 i.loop = loop - if err := i.updateFile(); err != nil { + if err := i.init(); err != nil { return } @@ -48,9 +131,10 @@ func (_ *NextFileNotFound) Error() string { return "There is no new files" } -// path can be a pattern -// It sort paths lexicographically and tries to choose next one -func (i *FileInput) updateFile() (err error) { +func (i *FileInput) init() (err error) { + defer i.mu.Unlock() + i.mu.Lock() + var matches []string if matches, err = filepath.Glob(i.path); err != nil { @@ -63,40 +147,10 @@ func (i *FileInput) updateFile() (err error) { return errors.New("No matching files") } - sort.Sort(sortByFileIndex(matches)) + i.readers = make([]*fileInputReader, len(matches)) - // Just pick first file, if there is many, and we are just started - if i.currentFile == nil { - if i.currentFile, err = os.Open(matches[0]); err != nil { - log.Println("Can't read file ", matches[0], err) - return - } - } else { - found := false - for idx, p := range matches { - if p == i.currentFile.Name() && idx != len(matches)-1 { - if i.currentFile, err = os.Open(matches[idx+1]); err != nil { - log.Println("Can't read file ", matches[idx+1], err) - return - } - - found = true - } - } - - if !found { - return new(NextFileNotFound) - } - } - - if strings.HasSuffix(i.currentFile.Name(), ".gz") { - gzReader, err := gzip.NewReader(i.currentFile) - if err != nil { - log.Fatal(err) - } - i.currentReader = bufio.NewReader(gzReader) - } else { - i.currentReader = bufio.NewReader(i.currentFile) + for idx, p := range matches { + i.readers[idx] = NewFileInputReader(p) } return nil @@ -113,83 +167,72 @@ func (i *FileInput) String() string { return "File input: " + i.path } -func (i *FileInput) emit() { - var lastTime int64 +// Find reader with smallest timestamp e.g next payload in row +func (i *FileInput) nextReader() (next *fileInputReader) { + for _, r := range i.readers { + if r == nil || r.file == nil { + continue + } - payloadSeparatorAsBytes := []byte(payloadSeparator) + if next == nil || r.timestamp < next.timestamp { + next = r + continue + } + } - var buffer bytes.Buffer + return +} - if i.currentReader == nil { - return - } +func (i *FileInput) emit() { + var lastTime int64 = -1 for { - line, err := i.currentReader.ReadBytes('\n') - - if err != nil { - if err != io.EOF { - log.Fatal(err) - } + select { + case <-i.exit: + return + default: + } - // If our path pattern match multiple files, try to find them - if err == io.EOF { - if e := i.updateFile(); e != nil { - if _, ok := e.(*NextFileNotFound); ok && i.loop { - // Start from the first file - i.Close() - i.currentFile = nil - i.currentReader = nil - lastTime = 0 - i.updateFile() - - continue - } else { - break - } - } + reader := i.nextReader() + if reader == nil { + if i.loop { + i.init() + lastTime = -1 continue + } else { + break } } - if bytes.Equal(payloadSeparatorAsBytes[1:], line) { - asBytes := buffer.Bytes() - buffer.Reset() - - meta := payloadMeta(asBytes) - - if len(meta) > 2 && meta[0][0] == RequestPayload { - ts, _ := strconv.ParseInt(string(meta[2]), 10, 64) - - if lastTime != 0 { - timeDiff := ts - lastTime - - if i.speedFactor != 1 { - timeDiff = int64(float64(timeDiff) / i.speedFactor) - } - - time.Sleep(time.Duration(timeDiff)) - } + if lastTime != -1 { + diff := reader.timestamp - lastTime + lastTime = reader.timestamp - lastTime = ts + if i.speedFactor != 1 { + diff = int64(float64(diff) / i.speedFactor) } - // Bytes() returns only pointer, so to remove data-race copy the data to an array - newBuf := make([]byte, len(asBytes)-1) - copy(newBuf, asBytes) - - i.data <- newBuf + time.Sleep(time.Duration(diff)) } else { - buffer.Write(line) + lastTime = reader.timestamp } + i.data <- reader.ReadPayload() } log.Printf("FileInput: end of file '%s'\n", i.path) } func (i *FileInput) Close() error { - i.currentFile.Close() + defer i.mu.Unlock() + i.mu.Lock() + + i.exit <- true + + for _, r := range i.readers { + r.Close() + } + return nil } diff --git a/input_file_test.go b/input_file_test.go index e5857cfd..90296117 100644 --- a/input_file_test.go +++ b/input_file_test.go @@ -97,35 +97,109 @@ func TestInputFileWithGETAndPOST(t *testing.T) { } -func TestInputFileMultipleFiles(t *testing.T) { +func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) { rnd := rand.Int63() file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) file1.Write([]byte("1 1 1\ntest1")) file1.Write([]byte(payloadSeparator)) - file1.Write([]byte("1 1 2\ntest2")) + file1.Write([]byte("1 1 3\ntest2")) file1.Write([]byte(payloadSeparator)) file1.Close() file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) - file2.Write([]byte("1 1 3\ntest3")) + file2.Write([]byte("1 1 2\ntest3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("1 1 4\ntest4")) file2.Write([]byte(payloadSeparator)) file2.Close() input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false) buf := make([]byte, 1000) - n, _ := input.Read(buf) - if buf[10] != '1' { - t.Error("Shound emit requests in right order", string(buf[:n])) + + for i := '1'; i <= '4'; i++ { + n, _ := input.Read(buf) + if buf[4] != byte(i) { + t.Error("Should emit requests in right order", string(buf[:n])) + } + } + + os.Remove(file1.Name()) + os.Remove(file2.Name()) +} + +func TestInputFileRequestsWithLatency(t *testing.T) { + rnd := rand.Int63() + + file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + defer file.Close() + + file.Write([]byte("1 1 100000000\nrequest1")) + file.Write([]byte(payloadSeparator)) + file.Write([]byte("1 2 150000000\nrequest2")) + file.Write([]byte(payloadSeparator)) + file.Write([]byte("1 3 250000000\nrequest3")) + file.Write([]byte(payloadSeparator)) + + input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false) + buf := make([]byte, 1000) + + start := time.Now().UnixNano() + for i := 0; i < 3; i++ { + input.Read(buf) } - input.Read(buf) - if buf[10] != '2' { - t.Error("Shound emit requests in right order", string(buf[:n])) + end := time.Now().UnixNano() + + var expectedLatency int64 = 250000000 - 100000000 + realLatency := end - start + if realLatency < expectedLatency { + t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) } - input.Read(buf) - if buf[10] != '3' { - t.Error("Shound emit requests from second file", string(buf[:n])) + if realLatency > expectedLatency+10000000 { + t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) + + } +} + +func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { + rnd := rand.Int63() + + file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file1.Write([]byte("1 1 1\nrequest1")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("2 1 1\nresponse1")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("1 2 3\nrequest2")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("2 2 3\nresponse2")) + file1.Write([]byte(payloadSeparator)) + file1.Close() + + file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file2.Write([]byte("1 3 2\nrequest3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("2 3 2\nresponse3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("1 4 4\nrequest4")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("2 4 4\nresponse4")) + file2.Write([]byte(payloadSeparator)) + file2.Close() + + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false) + buf := make([]byte, 1000) + + for i := '1'; i <= '4'; i++ { + n, _ := input.Read(buf) + if buf[0] != '1' && buf[4] != byte(i) { + t.Error("Shound emit requests in right order", string(buf[:n])) + } + + n, _ = input.Read(buf) + if buf[0] != '2' && buf[4] != byte(i) { + t.Error("Shound emit responses in right order", string(buf[:n])) + } } os.Remove(file1.Name()) @@ -149,8 +223,8 @@ func TestInputFileLoop(t *testing.T) { for i := 0; i < 1000; i++ { input.Read(buf) } - input.Close() + input.Close() os.Remove(file.Name()) } @@ -221,11 +295,9 @@ func (expectedCaptureFile *CaptureFile) PayloadsEqual(other [][]byte) bool { } for i, payload := range other { - if !bytes.Equal(expectedCaptureFile.data[i], payload) { return false } - } return true diff --git a/input_http.go b/input_http.go index 33263b92..f3a6419f 100644 --- a/input_http.go +++ b/input_http.go @@ -29,7 +29,7 @@ func NewHTTPInput(address string) (i *HTTPInput) { func (i *HTTPInput) Read(data []byte) (int, error) { buf := <-i.data - header := payloadHeader(RequestPayload, uuid(), time.Now().UnixNano()) + header := payloadHeader(RequestPayload, uuid(), time.Now().UnixNano(), -1) copy(data[0:len(header)], header) copy(data[len(header):], buf) diff --git a/input_raw.go b/input_raw.go index 7b920966..26b3dd03 100644 --- a/input_raw.go +++ b/input_raw.go @@ -50,12 +50,12 @@ func (i *RAWInput) Read(data []byte) (int, error) { var header []byte if msg.IsIncoming { - header = payloadHeader(RequestPayload, msg.UUID(), msg.Start.UnixNano()) + header = payloadHeader(RequestPayload, msg.UUID(), msg.Start.UnixNano(), -1) if len(i.realIPHeader) > 0 { buf = proto.SetHeader(buf, i.realIPHeader, []byte(msg.IP().String())) } } else { - header = payloadHeader(ResponsePayload, msg.UUID(), msg.End.UnixNano()-msg.AssocMessage.Start.UnixNano()) + header = payloadHeader(ResponsePayload, msg.UUID(), msg.AssocMessage.Start.UnixNano(), msg.End.UnixNano()-msg.AssocMessage.Start.UnixNano()) } copy(data[0:len(header)], header) diff --git a/output_http.go b/output_http.go index 97423a86..d442b832 100644 --- a/output_http.go +++ b/output_http.go @@ -14,6 +14,7 @@ type response struct { payload []byte uuid []byte roundTripTime int64 + startedAt int64 } // HTTPOutputConfig struct for holding http output configuration @@ -178,7 +179,7 @@ func (o *HTTPOutput) Read(data []byte) (int, error) { Debug("[OUTPUT-HTTP] Received response:", string(resp.payload)) - header := payloadHeader(ReplayedResponsePayload, resp.uuid, resp.roundTripTime) + header := payloadHeader(ReplayedResponsePayload, resp.uuid, resp.startedAt, resp.roundTripTime) copy(data[0:len(header)], header) copy(data[len(header):], resp.payload) @@ -206,7 +207,7 @@ func (o *HTTPOutput) sendRequest(client *HTTPClient, request []byte) { } if o.config.TrackResponses { - o.responses <- response{resp, uuid, stop.UnixNano() - start.UnixNano()} + o.responses <- response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()} } if o.elasticSearch != nil { diff --git a/protocol.go b/protocol.go index 1b4c4844..50d6943b 100644 --- a/protocol.go +++ b/protocol.go @@ -42,13 +42,24 @@ func payloadScanner(data []byte, atEOF bool) (advance int, token []byte, err err } // Timing is request start or round-trip time, depending on payloadType -func payloadHeader(payloadType byte, uuid []byte, timing int64) (header []byte) { - sTime := strconv.FormatInt(timing, 10) +func payloadHeader(payloadType byte, uuid []byte, timing int64, latency int64) (header []byte) { + var sTime, sLatency string + + sTime = strconv.FormatInt(timing, 10) + if latency != -1 { + sLatency = strconv.FormatInt(latency, 10) + } //Example: // 3 f45590522cd1838b4a0d5c5aab80b77929dea3b3 1231\n // `+ 1` indicates space characters or end of line - header = make([]byte, 1+1+len(uuid)+1+len(sTime)+1) + headerLen := 1 + 1 + len(uuid) + 1 + len(sTime) + 1 + + if latency != -1 { + headerLen += len(sLatency) + 1 + } + + header = make([]byte, headerLen) header[0] = payloadType header[1] = ' ' header[2+len(uuid)] = ' ' @@ -57,6 +68,10 @@ func payloadHeader(payloadType byte, uuid []byte, timing int64) (header []byte) copy(header[2:], uuid) copy(header[3+len(uuid):], sTime) + if latency != -1 { + copy(header[4+len(uuid)+len(sTime):], sLatency) + } + return header } diff --git a/test_input.go b/test_input.go index cd146c56..0f990e72 100644 --- a/test_input.go +++ b/test_input.go @@ -22,7 +22,7 @@ func NewTestInput() (i *TestInput) { func (i *TestInput) Read(data []byte) (int, error) { buf := <-i.data - header := payloadHeader(RequestPayload, uuid(), time.Now().UnixNano()) + header := payloadHeader(RequestPayload, uuid(), time.Now().UnixNano(), -1) copy(data[0:len(header)], header) copy(data[len(header):], buf)