diff --git a/input_file.go b/input_file.go index ba3f542a..81701793 100644 --- a/input_file.go +++ b/input_file.go @@ -9,31 +9,40 @@ import ( "log" "os" "path/filepath" - "sort" - "strconv" "strings" + "strconv" "time" ) // FileInput can read requests generated by FileOutput type FileInput struct { - data chan []byte - path string - currentFile *os.File - currentReader *bufio.Reader - speedFactor float64 - loop bool + data chan []byte + exit chan bool + path string + fileInputReaders []*fileInputReader + speedFactor float64 + loop bool +} + + +type fileInputReader struct { + reader *bufio.Reader + meta [][]byte + data []byte + file *os.File + timestamp int64 } // NewFileInput constructor for FileInput. Accepts file path as argument. func NewFileInput(path string, loop bool) (i *FileInput) { i = new(FileInput) i.data = make(chan []byte) + i.exit = make(chan bool) i.path = path i.speedFactor = 1 i.loop = loop - if err := i.updateFile(); err != nil { + if err := i.init(); err != nil { return } @@ -48,9 +57,7 @@ func (_ *NextFileNotFound) Error() string { return "There is no new files" } -// path can be a pattern -// It sort paths lexicographically and tries to choose next one -func (i *FileInput) updateFile() (err error) { +func (i *FileInput) init() (err error) { var matches []string if matches, err = filepath.Glob(i.path); err != nil { @@ -63,40 +70,24 @@ func (i *FileInput) updateFile() (err error) { return errors.New("No matching files") } - sort.Sort(sortByFileIndex(matches)) - - // Just pick first file, if there is many, and we are just started - if i.currentFile == nil { - if i.currentFile, err = os.Open(matches[0]); err != nil { - log.Println("Can't read file ", matches[0], err) - return - } - } else { - found := false - for idx, p := range matches { - if p == i.currentFile.Name() && idx != len(matches)-1 { - if i.currentFile, err = os.Open(matches[idx+1]); err != nil { - log.Println("Can't read file ", matches[idx+1], err) - return - } + i.fileInputReaders = make([]*fileInputReader, len(matches)) - found = true + for idx, p := range matches { + file, _ := os.Open(p) + fileInputReader := &fileInputReader{} + fileInputReader.file = file + if strings.HasSuffix(p, ".gz") { + gzReader, err := gzip.NewReader(file) + if err != nil { + log.Fatal(err) } + fileInputReader.reader = bufio.NewReader(gzReader) + } else { + fileInputReader.reader = bufio.NewReader(file) } - if !found { - return new(NextFileNotFound) - } - } - - if strings.HasSuffix(i.currentFile.Name(), ".gz") { - gzReader, err := gzip.NewReader(i.currentFile) - if err != nil { - log.Fatal(err) - } - i.currentReader = bufio.NewReader(gzReader) - } else { - i.currentReader = bufio.NewReader(i.currentFile) + fileInputReader.readNextInput() + i.fileInputReaders[idx] = fileInputReader } return nil @@ -113,83 +104,123 @@ func (i *FileInput) String() string { return "File input: " + i.path } -func (i *FileInput) emit() { - var lastTime int64 +func (f *fileInputReader) readNextInput() { + nextInput := f.nextInput() + f.parseNextInput(nextInput) +} - payloadSeparatorAsBytes := []byte(payloadSeparator) +func (f *fileInputReader) parseNextInput(input []byte) { + if (input != nil) { + f.meta = payloadMeta(input) + f.timestamp, _ = strconv.ParseInt(string(f.meta[2]), 10, 64) + f.data = input + } +} +func (f *fileInputReader) nextInput() []byte { + payloadSeparatorAsBytes := []byte(payloadSeparator) var buffer bytes.Buffer - if i.currentReader == nil { - return - } - for { - line, err := i.currentReader.ReadBytes('\n') + line, err := f.reader.ReadBytes('\n') if err != nil { if err != io.EOF { log.Fatal(err) } - // If our path pattern match multiple files, try to find them if err == io.EOF { - if e := i.updateFile(); e != nil { - if _, ok := e.(*NextFileNotFound); ok && i.loop { - // Start from the first file - i.Close() - i.currentFile = nil - i.currentReader = nil - lastTime = 0 - i.updateFile() - - continue - } else { - break - } - } - - continue + f.file.Close() + f.file = nil + return nil } } if bytes.Equal(payloadSeparatorAsBytes[1:], line) { asBytes := buffer.Bytes() - buffer.Reset() - meta := payloadMeta(asBytes) + // Bytes() returns only pointer, so to remove data-race copy the data to an array + newBuf := make([]byte, len(asBytes) - 1) + copy(newBuf, asBytes) + return newBuf + } + + buffer.Write(line) + } +} - if len(meta) > 2 && meta[0][0] == RequestPayload { - ts, _ := strconv.ParseInt(string(meta[2]), 10, 64) +func (i *FileInput) nextInputReader() *fileInputReader { + var nextFileInputReader *fileInputReader + for _, fileInputReader := range i.fileInputReaders { + if fileInputReader.file == nil { + continue + } - if lastTime != 0 { - timeDiff := ts - lastTime + if fileInputReader.meta[0][0] == ResponsePayload { + return fileInputReader + } - if i.speedFactor != 1 { - timeDiff = int64(float64(timeDiff) / i.speedFactor) - } + if nextFileInputReader == nil || nextFileInputReader.timestamp > fileInputReader.timestamp { + nextFileInputReader = fileInputReader + continue + } + } - time.Sleep(time.Duration(timeDiff)) - } + return nextFileInputReader; +} - lastTime = ts - } +func (i *FileInput) emit() { + var lastTime int64 = -1 - // Bytes() returns only pointer, so to remove data-race copy the data to an array - newBuf := make([]byte, len(asBytes)-1) - copy(newBuf, asBytes) + for { + fileInputReader := i.nextInputReader() - i.data <- newBuf - } else { - buffer.Write(line) + if fileInputReader == nil { + if i.loop { + i.init() + lastTime = -1 + continue + } else { + break; + } + } + + if fileInputReader.meta[0][0] == RequestPayload { + lastTime = i.simulateRequestDelay(fileInputReader, lastTime) } + select { + case <-i.exit: + for _, fileInputReader := range i.fileInputReaders { + if fileInputReader.file != nil { + fileInputReader.file.Close() + } + } + break + case i.data <- fileInputReader.data: + fileInputReader.readNextInput() + } } log.Printf("FileInput: end of file '%s'\n", i.path) } +func (i*FileInput) simulateRequestDelay(fileInputReader *fileInputReader, lastTime int64) int64 { + if lastTime != -1 { + timeDiff := fileInputReader.timestamp - lastTime + + if i.speedFactor != 1 { + timeDiff = int64(float64(timeDiff) / i.speedFactor) + } + + time.Sleep(time.Duration(timeDiff)) + } + + return fileInputReader.timestamp +} + func (i *FileInput) Close() error { - i.currentFile.Close() + i.exit <- true return nil } + diff --git a/input_file_test.go b/input_file_test.go index e5857cfd..61c18cdc 100644 --- a/input_file_test.go +++ b/input_file_test.go @@ -97,35 +97,109 @@ func TestInputFileWithGETAndPOST(t *testing.T) { } -func TestInputFileMultipleFiles(t *testing.T) { +func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) { rnd := rand.Int63() file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) file1.Write([]byte("1 1 1\ntest1")) file1.Write([]byte(payloadSeparator)) - file1.Write([]byte("1 1 2\ntest2")) + file1.Write([]byte("1 1 3\ntest2")) file1.Write([]byte(payloadSeparator)) file1.Close() file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) - file2.Write([]byte("1 1 3\ntest3")) + file2.Write([]byte("1 1 2\ntest3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("1 1 4\ntest4")) file2.Write([]byte(payloadSeparator)) file2.Close() input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false) buf := make([]byte, 1000) - n, _ := input.Read(buf) - if buf[10] != '1' { - t.Error("Shound emit requests in right order", string(buf[:n])) + + for i := '1'; i <= '4'; i++ { + n, _ := input.Read(buf) + if buf[4] != byte(i) { + t.Error("Should emit requests in right order", string(buf[:n])) + } + } + + os.Remove(file1.Name()) + os.Remove(file2.Name()) +} + +func TestInputFileRequestsWithLatency(t *testing.T) { + rnd := rand.Int63() + + file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY | os.O_CREATE | os.O_TRUNC, 0660) + defer file.Close() + + file.Write([]byte("1 1 100000000\nrequest1")) + file.Write([]byte(payloadSeparator)) + file.Write([]byte("1 2 150000000\nrequest2")) + file.Write([]byte(payloadSeparator)) + file.Write([]byte("1 3 250000000\nrequest3")) + file.Write([]byte(payloadSeparator)) + + input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false) + buf := make([]byte, 1000) + + start := time.Now().UnixNano() + for i := 0; i < 3; i++ { + input.Read(buf) } - input.Read(buf) - if buf[10] != '2' { - t.Error("Shound emit requests in right order", string(buf[:n])) + end := time.Now().UnixNano() + + var expectedLatency int64 = 250000000 - 100000000 + realLatency := end - start + if realLatency < expectedLatency { + t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) } - input.Read(buf) - if buf[10] != '3' { - t.Error("Shound emit requests from second file", string(buf[:n])) + if realLatency > expectedLatency + 10000000 { + t.Errorf("Should emit requests respecting latency. Expected: %v, real: %v", expectedLatency, realLatency) + + } +} + +func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { + rnd := rand.Int63() + + file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file1.Write([]byte("1 1 1\nrequest1")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("2 1 1\nresponse1")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("1 2 3\nrequest2")) + file1.Write([]byte(payloadSeparator)) + file1.Write([]byte("2 2 3\nresponse2")) + file1.Write([]byte(payloadSeparator)) + file1.Close() + + file2, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_1", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + file2.Write([]byte("1 3 2\nrequest3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("2 3 2\nresponse3")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("1 4 4\nrequest4")) + file2.Write([]byte(payloadSeparator)) + file2.Write([]byte("2 4 4\nresponse4")) + file2.Write([]byte(payloadSeparator)) + file2.Close() + + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false) + buf := make([]byte, 1000) + + for i := '1'; i <= '4'; i++ { + n, _ := input.Read(buf) + if buf[0] != '1' && buf[4] != byte(i) { + t.Error("Shound emit requests in right order", string(buf[:n])) + } + + n, _ = input.Read(buf) + if buf[0] != '2' && buf[4] != byte(i) { + t.Error("Shound emit responses in right order", string(buf[:n])) + } } os.Remove(file1.Name()) @@ -149,8 +223,8 @@ func TestInputFileLoop(t *testing.T) { for i := 0; i < 1000; i++ { input.Read(buf) } - input.Close() + input.Close() os.Remove(file.Name()) }