Skip to content

Commit

Permalink
Resolve #300 : respect timestamps when replaying requests
Browse files Browse the repository at this point in the history
When Gor deals with multiple input files, it sorts them and replay their requests file after file.
This behavior is not convenient and should be changed. We should respect timestamps order when replaying requests.
  • Loading branch information
Yohan Legat committed Jun 17, 2016
1 parent 2c49556 commit 9e7351d
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 100 deletions.
205 changes: 118 additions & 87 deletions input_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,40 @@ import (
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"strconv"
"time"
)

// FileInput can read requests generated by FileOutput
type FileInput struct {
data chan []byte
path string
currentFile *os.File
currentReader *bufio.Reader
speedFactor float64
loop bool
data chan []byte
exit chan bool
path string
fileInputReaders []*fileInputReader
speedFactor float64
loop bool
}


type fileInputReader struct {
reader *bufio.Reader
meta [][]byte
data []byte
file *os.File
timestamp int64
}

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool) (i *FileInput) {
i = new(FileInput)
i.data = make(chan []byte)
i.exit = make(chan bool)
i.path = path
i.speedFactor = 1
i.loop = loop

if err := i.updateFile(); err != nil {
if err := i.init(); err != nil {
return
}

Expand All @@ -48,9 +57,7 @@ func (_ *NextFileNotFound) Error() string {
return "There is no new files"
}

// path can be a pattern
// It sort paths lexicographically and tries to choose next one
func (i *FileInput) updateFile() (err error) {
func (i *FileInput) init() (err error) {
var matches []string

if matches, err = filepath.Glob(i.path); err != nil {
Expand All @@ -63,40 +70,24 @@ func (i *FileInput) updateFile() (err error) {
return errors.New("No matching files")
}

sort.Sort(sortByFileIndex(matches))

// Just pick first file, if there is many, and we are just started
if i.currentFile == nil {
if i.currentFile, err = os.Open(matches[0]); err != nil {
log.Println("Can't read file ", matches[0], err)
return
}
} else {
found := false
for idx, p := range matches {
if p == i.currentFile.Name() && idx != len(matches)-1 {
if i.currentFile, err = os.Open(matches[idx+1]); err != nil {
log.Println("Can't read file ", matches[idx+1], err)
return
}
i.fileInputReaders = make([]*fileInputReader, len(matches))

found = true
for idx, p := range matches {
file, _ := os.Open(p)
fileInputReader := &fileInputReader{}
fileInputReader.file = file
if strings.HasSuffix(p, ".gz") {
gzReader, err := gzip.NewReader(file)
if err != nil {
log.Fatal(err)
}
fileInputReader.reader = bufio.NewReader(gzReader)
} else {
fileInputReader.reader = bufio.NewReader(file)
}

if !found {
return new(NextFileNotFound)
}
}

if strings.HasSuffix(i.currentFile.Name(), ".gz") {
gzReader, err := gzip.NewReader(i.currentFile)
if err != nil {
log.Fatal(err)
}
i.currentReader = bufio.NewReader(gzReader)
} else {
i.currentReader = bufio.NewReader(i.currentFile)
fileInputReader.readNextInput()
i.fileInputReaders[idx] = fileInputReader
}

return nil
Expand All @@ -113,83 +104,123 @@ func (i *FileInput) String() string {
return "File input: " + i.path
}

func (i *FileInput) emit() {
var lastTime int64
func (f *fileInputReader) readNextInput() {
nextInput := f.nextInput()
f.parseNextInput(nextInput)
}

payloadSeparatorAsBytes := []byte(payloadSeparator)
func (f *fileInputReader) parseNextInput(input []byte) {
if (input != nil) {
f.meta = payloadMeta(input)
f.timestamp, _ = strconv.ParseInt(string(f.meta[2]), 10, 64)
f.data = input
}
}

func (f *fileInputReader) nextInput() []byte {
payloadSeparatorAsBytes := []byte(payloadSeparator)
var buffer bytes.Buffer

if i.currentReader == nil {
return
}

for {
line, err := i.currentReader.ReadBytes('\n')
line, err := f.reader.ReadBytes('\n')

if err != nil {
if err != io.EOF {
log.Fatal(err)
}

// If our path pattern match multiple files, try to find them
if err == io.EOF {
if e := i.updateFile(); e != nil {
if _, ok := e.(*NextFileNotFound); ok && i.loop {
// Start from the first file
i.Close()
i.currentFile = nil
i.currentReader = nil
lastTime = 0
i.updateFile()

continue
} else {
break
}
}

continue
f.file.Close()
f.file = nil
return nil
}
}

if bytes.Equal(payloadSeparatorAsBytes[1:], line) {
asBytes := buffer.Bytes()
buffer.Reset()

meta := payloadMeta(asBytes)
// Bytes() returns only pointer, so to remove data-race copy the data to an array
newBuf := make([]byte, len(asBytes) - 1)
copy(newBuf, asBytes)
return newBuf
}

buffer.Write(line)
}
}

if len(meta) > 2 && meta[0][0] == RequestPayload {
ts, _ := strconv.ParseInt(string(meta[2]), 10, 64)
func (i *FileInput) nextInputReader() *fileInputReader {
var nextFileInputReader *fileInputReader
for _, fileInputReader := range i.fileInputReaders {
if fileInputReader.file == nil {
continue
}

if lastTime != 0 {
timeDiff := ts - lastTime
if fileInputReader.meta[0][0] == ResponsePayload {
return fileInputReader
}

if i.speedFactor != 1 {
timeDiff = int64(float64(timeDiff) / i.speedFactor)
}
if nextFileInputReader == nil || nextFileInputReader.timestamp > fileInputReader.timestamp {
nextFileInputReader = fileInputReader
continue
}
}

time.Sleep(time.Duration(timeDiff))
}
return nextFileInputReader;
}

lastTime = ts
}
func (i *FileInput) emit() {
var lastTime int64 = -1

// Bytes() returns only pointer, so to remove data-race copy the data to an array
newBuf := make([]byte, len(asBytes)-1)
copy(newBuf, asBytes)
for {
fileInputReader := i.nextInputReader()

i.data <- newBuf
} else {
buffer.Write(line)
if fileInputReader == nil {
if i.loop {
i.init()
lastTime = -1
continue
} else {
break;
}
}

if fileInputReader.meta[0][0] == RequestPayload {
lastTime = i.simulateRequestDelay(fileInputReader, lastTime)
}

select {
case <-i.exit:
for _, fileInputReader := range i.fileInputReaders {
if fileInputReader.file != nil {
fileInputReader.file.Close()
}
}
break
case i.data <- fileInputReader.data:
fileInputReader.readNextInput()
}
}

log.Printf("FileInput: end of file '%s'\n", i.path)
}

func (i*FileInput) simulateRequestDelay(fileInputReader *fileInputReader, lastTime int64) int64 {
if lastTime != -1 {
timeDiff := fileInputReader.timestamp - lastTime

if i.speedFactor != 1 {
timeDiff = int64(float64(timeDiff) / i.speedFactor)
}

time.Sleep(time.Duration(timeDiff))
}

return fileInputReader.timestamp
}

func (i *FileInput) Close() error {
i.currentFile.Close()
i.exit <- true
return nil
}

Loading

0 comments on commit 9e7351d

Please sign in to comment.