From 9e7351d28c19914bce9cf2f465a8e97c6556e3e0 Mon Sep 17 00:00:00 2001 From: Yohan Legat Date: Fri, 10 Jun 2016 15:24:21 +0200 Subject: [PATCH] Resolve buger/gor#300 : respect timestamps when replaying requests When Gor deals with multiple input files, it sorts them and replay their requests file after file. This behavior is not convenient and should be changed. We should respect timestamps order when replaying requests. --- input_file.go | 205 ++++++++++++++++++++++++++------------------- input_file_test.go | 100 +++++++++++++++++++--- 2 files changed, 205 insertions(+), 100 deletions(-) diff --git a/input_file.go b/input_file.go index ba3f542a..81701793 100644 --- a/input_file.go +++ b/input_file.go @@ -9,31 +9,40 @@ import ( "log" "os" "path/filepath" - "sort" - "strconv" "strings" + "strconv" "time" ) // FileInput can read requests generated by FileOutput type FileInput struct { - data chan []byte - path string - currentFile *os.File - currentReader *bufio.Reader - speedFactor float64 - loop bool + data chan []byte + exit chan bool + path string + fileInputReaders []*fileInputReader + speedFactor float64 + loop bool +} + + +type fileInputReader struct { + reader *bufio.Reader + meta [][]byte + data []byte + file *os.File + timestamp int64 } // NewFileInput constructor for FileInput. Accepts file path as argument. func NewFileInput(path string, loop bool) (i *FileInput) { i = new(FileInput) i.data = make(chan []byte) + i.exit = make(chan bool) i.path = path i.speedFactor = 1 i.loop = loop - if err := i.updateFile(); err != nil { + if err := i.init(); err != nil { return } @@ -48,9 +57,7 @@ func (_ *NextFileNotFound) Error() string { return "There is no new files" } -// path can be a pattern -// It sort paths lexicographically and tries to choose next one -func (i *FileInput) updateFile() (err error) { +func (i *FileInput) init() (err error) { var matches []string if matches, err = filepath.Glob(i.path); err != nil { @@ -63,40 +70,24 @@ func (i *FileInput) updateFile() (err error) { return errors.New("No matching files") } - sort.Sort(sortByFileIndex(matches)) - - // Just pick first file, if there is many, and we are just started - if i.currentFile == nil { - if i.currentFile, err = os.Open(matches[0]); err != nil { - log.Println("Can't read file ", matches[0], err) - return - } - } else { - found := false - for idx, p := range matches { - if p == i.currentFile.Name() && idx != len(matches)-1 { - if i.currentFile, err = os.Open(matches[idx+1]); err != nil { - log.Println("Can't read file ", matches[idx+1], err) - return - } + i.fileInputReaders = make([]*fileInputReader, len(matches)) - found = true + for idx, p := range matches { + file, _ := os.Open(p) + fileInputReader := &fileInputReader{} + fileInputReader.file = file + if strings.HasSuffix(p, ".gz") { + gzReader, err := gzip.NewReader(file) + if err != nil { + log.Fatal(err) } + fileInputReader.reader = bufio.NewReader(gzReader) + } else { + fileInputReader.reader = bufio.NewReader(file) } - if !found { - return new(NextFileNotFound) - } - } - - if strings.HasSuffix(i.currentFile.Name(), ".gz") { - gzReader, err := gzip.NewReader(i.currentFile) - if err != nil { - log.Fatal(err) - } - i.currentReader = bufio.NewReader(gzReader) - } else { - i.currentReader = bufio.NewReader(i.currentFile) + fileInputReader.readNextInput() + i.fileInputReaders[idx] = fileInputReader } return nil @@ -113,83 +104,123 @@ func (i *FileInput) String() string { return "File input: " + i.path } -func (i *FileInput) emit() { - var lastTime int64 +func (f *fileInputReader) readNextInput() { + nextInput := f.nextInput() + f.parseNextInput(nextInput) +} - payloadSeparatorAsBytes := []byte(payloadSeparator) +func (f *fileInputReader) parseNextInput(input []byte) { + if (input != nil) { + f.meta = payloadMeta(input) + f.timestamp, _ = strconv.ParseInt(string(f.meta[2]), 10, 64) + f.data = input + } +} +func (f *fileInputReader) nextInput() []byte { + payloadSeparatorAsBytes := []byte(payloadSeparator) var buffer bytes.Buffer - if i.currentReader == nil { - return - } - for { - line, err := i.currentReader.ReadBytes('\n') + line, err := f.reader.ReadBytes('\n') if err != nil { if err != io.EOF { log.Fatal(err) } - // If our path pattern match multiple files, try to find them if err == io.EOF { - if e := i.updateFile(); e != nil { - if _, ok := e.(*NextFileNotFound); ok && i.loop { - // Start from the first file - i.Close() - i.currentFile = nil - i.currentReader = nil - lastTime = 0 - i.updateFile() - - continue - } else { - break - } - } - - continue + f.file.Close() + f.file = nil + return nil } } if bytes.Equal(payloadSeparatorAsBytes[1:], line) { asBytes := buffer.Bytes() - buffer.Reset() - meta := payloadMeta(asBytes) + // Bytes() returns only pointer, so to remove data-race copy the data to an array + newBuf := make([]byte, len(asBytes) - 1) + copy(newBuf, asBytes) + return newBuf + } + + buffer.Write(line) + } +} - if len(meta) > 2 && meta[0][0] == RequestPayload { - ts, _ := strconv.ParseInt(string(meta[2]), 10, 64) +func (i *FileInput) nextInputReader() *fileInputReader { + var nextFileInputReader *fileInputReader + for _, fileInputReader := range i.fileInputReaders { + if fileInputReader.file == nil { + continue + } - if lastTime != 0 { - timeDiff := ts - lastTime + if fileInputReader.meta[0][0] == ResponsePayload { + return fileInputReader + } - if i.speedFactor != 1 { - timeDiff = int64(float64(timeDiff) / i.speedFactor) - } + if nextFileInputReader == nil || nextFileInputReader.timestamp > fileInputReader.timestamp { + nextFileInputReader = fileInputReader + continue + } + } - time.Sleep(time.Duration(timeDiff)) - } + return nextFileInputReader; +} - lastTime = ts - } +func (i *FileInput) emit() { + var lastTime int64 = -1 - // Bytes() returns only pointer, so to remove data-race copy the data to an array - newBuf := make([]byte, len(asBytes)-1) - copy(newBuf, asBytes) + for { + fileInputReader := i.nextInputReader() - i.data <- newBuf - } else { - buffer.Write(line) + if fileInputReader == nil { + if i.loop { + i.init() + lastTime = -1 + continue + } else { + break; + } + } + + if fileInputReader.meta[0][0] == RequestPayload { + lastTime = i.simulateRequestDelay(fileInputReader, lastTime) } + select { + case <-i.exit: + for _, fileInputReader := range i.fileInputReaders { + if fileInputReader.file != nil { + fileInputReader.file.Close() + } + } + break + case i.data <- fileInputReader.data: + fileInputReader.readNextInput() + } } log.Printf("FileInput: end of file '%s'\n", i.path) } +func (i*FileInput) simulateRequestDelay(fileInputReader *fileInputReader, lastTime int64) int64 { + if lastTime != -1 { + timeDiff := fileInputReader.timestamp - lastTime + + if i.speedFactor != 1 { + timeDiff = int64(float64(timeDiff) / i.speedFactor) + } + + time.Sleep(time.Duration(timeDiff)) + } + + return fileInputReader.timestamp +} + func (i *FileInput) Close() error { - i.currentFile.Close() + i.exit <- true return nil } + diff --git a/input_file_test.go b/input_file_test.go index e5857cfd..61c18cdc 100644 --- a/input_file_test.go +++ b/input_file_test.go @@ -97,35 +97,109 @@ func TestInputFileWithGETAndPOST(t *testing.T) { } -func TestInputFileMultipleFiles(t *testing.T) { +func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) { rnd := rand.Int63() file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) file1.Write([]byte("1 1 1\ntest1")) file1.Write([]byte(payloadSeparator)) - file1.Write([]byte("1 1 2\ntest2")) + file1.Write([]byte("1 1 3\ntest2")) file1.Write([]byte(payloadSeparator)) file1.Close() file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) - file2.Write([]byte("1 1 3\ntest3")) + file2.Write([]byte("1 1 2\ntest3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("1 1 4\ntest4")) file2.Write([]byte(payloadSeparator)) file2.Close() input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false) buf := make([]byte, 1000) - n, _ := input.Read(buf) - if buf[10] != '1' { - t.Error("Shound emit requests in right order", string(buf[:n])) + + for i := '1'; i <= '4'; i++ { + n, _ := input.Read(buf) + if buf[4] != byte(i) { + t.Error("Should emit requests in right order", string(buf[:n])) + } + } + + os.Remove(file1.Name()) + os.Remove(file2.Name()) +} + +func TestInputFileRequestsWithLatency(t *testing.T) { + rnd := rand.Int63() + + file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY | os.O_CREATE | os.O_TRUNC, 0660) + defer file.Close() + + file.Write([]byte("1 1 100000000\nrequest1")) + file.Write([]byte(payloadSeparator)) + file.Write([]byte("1 2 150000000\nrequest2")) + file.Write([]byte(payloadSeparator)) + file.Write([]byte("1 3 250000000\nrequest3")) + file.Write([]byte(payloadSeparator)) + + input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false) + buf := make([]byte, 1000) + + start := time.Now().UnixNano() + for i := 0; i < 3; i++ { + input.Read(buf) } - input.Read(buf) - if buf[10] != '2' { - t.Error("Shound emit requests in right order", string(buf[:n])) + end := time.Now().UnixNano() + + var expectedLatency int64 = 250000000 - 100000000 + realLatency := end - start + if realLatency < expectedLatency { + t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) } - input.Read(buf) - if buf[10] != '3' { - t.Error("Shound emit requests from second file", string(buf[:n])) + if realLatency > expectedLatency + 10000000 { + t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) + + } +} + +func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { + rnd := rand.Int63() + + file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file1.Write([]byte("1 1 1\nrequest1")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("2 1 1\nresponse1")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("1 2 3\nrequest2")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("2 2 3\nresponse2")) + file1.Write([]byte(payloadSeparator)) + file1.Close() + + file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file2.Write([]byte("1 3 2\nrequest3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("2 3 2\nresponse3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("1 4 4\nrequest4")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("2 4 4\nresponse4")) + file2.Write([]byte(payloadSeparator)) + file2.Close() + + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false) + buf := make([]byte, 1000) + + for i := '1'; i <= '4'; i++ { + n, _ := input.Read(buf) + if buf[0] != '1' && buf[4] != byte(i) { + t.Error("Shound emit requests in right order", string(buf[:n])) + } + + n, _ = input.Read(buf) + if buf[0] != '2' && buf[4] != byte(i) { + t.Error("Shound emit responses in right order", string(buf[:n])) + } } os.Remove(file1.Name()) @@ -149,8 +223,8 @@ func TestInputFileLoop(t *testing.T) { for i := 0; i < 1000; i++ { input.Read(buf) } - input.Close() + input.Close() os.Remove(file.Name()) }