Skip to content

Commit

Permalink
Merge branch 'ylegat-feature/respect_timestamps_when_replaying_requests'
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Jun 29, 2016
2 parents c756a11 + 7c33336 commit 803d53a
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 126 deletions.
4 changes: 2 additions & 2 deletions input_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func (i *DummyInput) emit() {
select {
case <-ticker.C:
uuid := uuid()
reqh := payloadHeader(RequestPayload, uuid, time.Now().UnixNano())
reqh := payloadHeader(RequestPayload, uuid, time.Now().UnixNano(), -1)
i.data <- append(reqh, []byte("GET / HTTP/1.1\r\nHost: www.w3.org\r\nUser-Agent: Go 1.1 package http\r\nAccept-Encoding: gzip\r\n\r\n")...)

resh := payloadHeader(ResponsePayload, uuid, 1)
resh := payloadHeader(ResponsePayload, uuid, time.Now().UnixNano()+1, 1)
i.data <- append(resh, []byte("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")...)
}
}
Expand Down
243 changes: 143 additions & 100 deletions input_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,114 @@ import (
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
)

type fileInputReader struct {
reader *bufio.Reader
data []byte
file *os.File
timestamp int64
}

func (f *fileInputReader) parseNext() error {
payloadSeparatorAsBytes := []byte(payloadSeparator)
var buffer bytes.Buffer

for {
line, err := f.reader.ReadBytes('\n')

if err != nil {
if err != io.EOF {
log.Println(err)
return err
}

if err == io.EOF {
f.file.Close()
f.file = nil
return err
}
}

if bytes.Equal(payloadSeparatorAsBytes[1:], line) {
asBytes := buffer.Bytes()
meta := payloadMeta(asBytes)

f.timestamp, _ = strconv.ParseInt(string(meta[2]), 10, 64)
f.data = asBytes[:len(asBytes)-1]

return nil
}

buffer.Write(line)
}

return nil
}

func (f *fileInputReader) ReadPayload() []byte {
defer f.parseNext()

return f.data
}
func (f *fileInputReader) Close() error {
if f.file != nil {
f.file.Close()
}

return nil
}

func NewFileInputReader(path string) *fileInputReader {
file, err := os.Open(path)

if err != nil {
log.Println(err)
return nil
}

r := &fileInputReader{file: file}
if strings.HasSuffix(path, ".gz") {
gzReader, err := gzip.NewReader(file)
if err != nil {
log.Println(err)
return nil
}
r.reader = bufio.NewReader(gzReader)
} else {
r.reader = bufio.NewReader(file)
}

r.parseNext()

return r
}

// FileInput can read requests generated by FileOutput
type FileInput struct {
data chan []byte
path string
currentFile *os.File
currentReader *bufio.Reader
speedFactor float64
loop bool
mu sync.Mutex
data chan []byte
exit chan bool
path string
readers []*fileInputReader
speedFactor float64
loop bool
}

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool) (i *FileInput) {
i = new(FileInput)
i.data = make(chan []byte)
i.data = make(chan []byte, 1000)
i.exit = make(chan bool, 1)
i.path = path
i.speedFactor = 1
i.loop = loop

if err := i.updateFile(); err != nil {
if err := i.init(); err != nil {
return
}

Expand All @@ -48,9 +131,10 @@ func (_ *NextFileNotFound) Error() string {
return "There is no new files"
}

// path can be a pattern
// It sort paths lexicographically and tries to choose next one
func (i *FileInput) updateFile() (err error) {
func (i *FileInput) init() (err error) {
defer i.mu.Unlock()
i.mu.Lock()

var matches []string

if matches, err = filepath.Glob(i.path); err != nil {
Expand All @@ -63,40 +147,10 @@ func (i *FileInput) updateFile() (err error) {
return errors.New("No matching files")
}

sort.Sort(sortByFileIndex(matches))
i.readers = make([]*fileInputReader, len(matches))

// Just pick first file, if there is many, and we are just started
if i.currentFile == nil {
if i.currentFile, err = os.Open(matches[0]); err != nil {
log.Println("Can't read file ", matches[0], err)
return
}
} else {
found := false
for idx, p := range matches {
if p == i.currentFile.Name() && idx != len(matches)-1 {
if i.currentFile, err = os.Open(matches[idx+1]); err != nil {
log.Println("Can't read file ", matches[idx+1], err)
return
}

found = true
}
}

if !found {
return new(NextFileNotFound)
}
}

if strings.HasSuffix(i.currentFile.Name(), ".gz") {
gzReader, err := gzip.NewReader(i.currentFile)
if err != nil {
log.Fatal(err)
}
i.currentReader = bufio.NewReader(gzReader)
} else {
i.currentReader = bufio.NewReader(i.currentFile)
for idx, p := range matches {
i.readers[idx] = NewFileInputReader(p)
}

return nil
Expand All @@ -113,83 +167,72 @@ func (i *FileInput) String() string {
return "File input: " + i.path
}

func (i *FileInput) emit() {
var lastTime int64
// Find reader with smallest timestamp e.g next payload in row
func (i *FileInput) nextReader() (next *fileInputReader) {
for _, r := range i.readers {
if r == nil || r.file == nil {
continue
}

payloadSeparatorAsBytes := []byte(payloadSeparator)
if next == nil || r.timestamp < next.timestamp {
next = r
continue
}
}

var buffer bytes.Buffer
return
}

if i.currentReader == nil {
return
}
func (i *FileInput) emit() {
var lastTime int64 = -1

for {
line, err := i.currentReader.ReadBytes('\n')

if err != nil {
if err != io.EOF {
log.Fatal(err)
}
select {
case <-i.exit:
return
default:
}

// If our path pattern match multiple files, try to find them
if err == io.EOF {
if e := i.updateFile(); e != nil {
if _, ok := e.(*NextFileNotFound); ok && i.loop {
// Start from the first file
i.Close()
i.currentFile = nil
i.currentReader = nil
lastTime = 0
i.updateFile()

continue
} else {
break
}
}
reader := i.nextReader()

if reader == nil {
if i.loop {
i.init()
lastTime = -1
continue
} else {
break
}
}

if bytes.Equal(payloadSeparatorAsBytes[1:], line) {
asBytes := buffer.Bytes()
buffer.Reset()

meta := payloadMeta(asBytes)

if len(meta) > 2 && meta[0][0] == RequestPayload {
ts, _ := strconv.ParseInt(string(meta[2]), 10, 64)

if lastTime != 0 {
timeDiff := ts - lastTime

if i.speedFactor != 1 {
timeDiff = int64(float64(timeDiff) / i.speedFactor)
}

time.Sleep(time.Duration(timeDiff))
}
if lastTime != -1 {
diff := reader.timestamp - lastTime
lastTime = reader.timestamp

lastTime = ts
if i.speedFactor != 1 {
diff = int64(float64(diff) / i.speedFactor)
}

// Bytes() returns only pointer, so to remove data-race copy the data to an array
newBuf := make([]byte, len(asBytes)-1)
copy(newBuf, asBytes)

i.data <- newBuf
time.Sleep(time.Duration(diff))
} else {
buffer.Write(line)
lastTime = reader.timestamp
}

i.data <- reader.ReadPayload()
}

log.Printf("FileInput: end of file '%s'\n", i.path)
}

func (i *FileInput) Close() error {
i.currentFile.Close()
defer i.mu.Unlock()
i.mu.Lock()

i.exit <- true

for _, r := range i.readers {
r.Close()
}

return nil
}
Loading

0 comments on commit 803d53a

Please sign in to comment.