diff --git a/input_file.go b/input_file.go index 8f45c993..d1e42308 100644 --- a/input_file.go +++ b/input_file.go @@ -196,12 +196,13 @@ type FileInput struct { loop bool readDepth int dryRun bool + maxWait time.Duration stats *expvar.Map } // NewFileInput constructor for FileInput. Accepts file path as argument. -func NewFileInput(path string, loop bool, readDepth int, dryRun bool) (i *FileInput) { +func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) { i = new(FileInput) i.data = make(chan []byte, 1000) i.exit = make(chan bool) @@ -211,6 +212,7 @@ func NewFileInput(path string, loop bool, readDepth int, dryRun bool) (i *FileIn i.readDepth = readDepth i.stats = expvar.NewMap("file-" + path) i.dryRun = dryRun + i.maxWait = maxWait if err := i.init(); err != nil { return @@ -351,6 +353,10 @@ func (i *FileInput) emit() { diff = int64(float64(diff) / i.speedFactor) } + if i.maxWait > 0 && diff > int64(i.maxWait) { + diff = int64(i.maxWait) + } + if diff >= 0 { lastTime = payload.timestamp diff --git a/input_file_test.go b/input_file_test.go index 7fef0f82..0538e62b 100644 --- a/input_file_test.go +++ b/input_file_test.go @@ -104,7 +104,7 @@ func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) { file2.Write([]byte(payloadSeparator)) file2.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false) + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, 0, false) for i := '1'; i <= '4'; i++ { msg, _ := input.PluginRead() @@ -130,7 +130,7 @@ func TestInputFileRequestsWithLatency(t *testing.T) { file.Write([]byte("1 3 250000000\nrequest3")) file.Write([]byte(payloadSeparator)) - input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100, false) + input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100, 0, false) start := time.Now().UnixNano() for i := 0; i < 3; i++ { @@ -170,7 +170,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) { file2.Write([]byte(payloadSeparator)) file2.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false) + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, 0, false) for i := '1'; i <= '4'; i++ { msg, _ := input.PluginRead() @@ -198,7 +198,7 @@ func TestInputFileLoop(t *testing.T) { file.Write([]byte(payloadSeparator)) file.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100, false) + input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100, 0, false) // Even if we have just 2 requests in file, it should indifinitly loop for i := 0; i < 1000; i++ { @@ -226,7 +226,7 @@ func TestInputFileCompressed(t *testing.T) { name2 := output2.file.Name() output2.Close() - input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false) + input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, 0, false) for i := 0; i < 2000; i++ { input.PluginRead() } @@ -326,7 +326,7 @@ func CreateCaptureFile(requestGenerator *RequestGenerator) *CaptureFile { func ReadFromCaptureFile(captureFile *os.File, count int, callback writeCallback) (err error) { wg := new(sync.WaitGroup) - input := NewFileInput(captureFile.Name(), false, 100, false) + input := NewFileInput(captureFile.Name(), false, 100, 0, false) output := NewTestOutput(func(msg *Message) { callback(msg) wg.Done() diff --git a/output_file_test.go b/output_file_test.go index 4190df87..3fd00d62 100644 --- a/output_file_test.go +++ b/output_file_test.go @@ -39,7 +39,7 @@ func TestFileOutput(t *testing.T) { emitter.Close() var counter int64 - input2 := NewFileInput("/tmp/test_requests.gor", false, 100, false) + input2 := NewFileInput("/tmp/test_requests.gor", false, 100, 0, false) output2 := NewTestOutput(func(*Message) { atomic.AddInt64(&counter, 1) wg.Done() diff --git a/plugins.go b/plugins.go index 54d4e7c1..e6b1a1e1 100644 --- a/plugins.go +++ b/plugins.go @@ -118,7 +118,7 @@ func NewPlugins() *InOutPlugins { } for _, options := range Settings.InputFile { - plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileDryRun) + plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun) } for _, path := range Settings.OutputFile { diff --git a/s3_test.go b/s3_test.go index 65453d35..a6b12f9a 100644 --- a/s3_test.go +++ b/s3_test.go @@ -127,7 +127,7 @@ func TestInputFileFromS3(t *testing.T) { <-output.closeCh } - input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd), false, 100, false) + input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd), false, 100, 0, false) buf := make([]byte, 1000) for i := 0; i <= 19999; i++ { diff --git a/settings.go b/settings.go index c64c7cb9..b9927311 100644 --- a/settings.go +++ b/settings.go @@ -45,11 +45,12 @@ type AppSettings struct { OutputTCPConfig TCPOutputConfig OutputTCPStats bool `json:"output-tcp-stats"` - InputFile MultiOption `json:"input-file"` - InputFileLoop bool `json:"input-file-loop"` - InputFileReadDepth int `json:"input-file-read-depth"` - InputFileDryRun bool `json:"input-file-dry-run"` - OutputFile MultiOption `json:"output-file"` + InputFile MultiOption `json:"input-file"` + InputFileLoop bool `json:"input-file-loop"` + InputFileReadDepth int `json:"input-file-read-depth"` + InputFileDryRun bool `json:"input-file-dry-run"` + InputFileMaxWait time.Duration `json:"input-file-max-wait"` + OutputFile MultiOption `json:"output-file"` OutputFileConfig FileOutputConfig InputRAW MultiOption `json:"input_raw"` @@ -117,6 +118,7 @@ func init() { flag.BoolVar(&Settings.InputFileLoop, "input-file-loop", false, "Loop input files, useful for performance testing.") flag.IntVar(&Settings.InputFileReadDepth, "input-file-read-depth", 100, "GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance") flag.BoolVar(&Settings.InputFileDryRun, "input-file-dry-run", false, "Simulate reading from the data source without replaying it. You will get information about expected replay time, number of found records etc.") + flag.DurationVar(&Settings.InputFileMaxWait, "input-file-max-wait", 0, "Set the maximum time between requests. Can help in situations when you have too long periods between request, and you want to skip them. Example: --input-raw-max-wait 1s") flag.Var(&Settings.OutputFile, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor") flag.DurationVar(&Settings.OutputFileConfig.FlushInterval, "output-file-flush-interval", time.Second, "Interval for forcing buffer flush to the file, default: 1s.")