Skip to content

Commit

Permalink
Merge pull request #10 from DanielFillol/daniel
Browse files Browse the repository at this point in the history
ParallelRequests
  • Loading branch information
DanielFillol authored Jun 3, 2024
2 parents 5e9b7df + 5f929d3 commit b8c94a6
Showing 1 changed file with 25 additions and 44 deletions.
69 changes: 25 additions & 44 deletions goSpider.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (nav *Navigator) SelectDropdown(selector, value string) error {
return nil
}

// ParallelRequests performs web scraping tasks concurrently in batches with a specified number of workers and a delay between requests.
// ParallelRequests performs web scraping tasks concurrently with a specified number of workers and a delay between requests.
// The crawlerFunc parameter allows for flexibility in defining the web scraping logic.
//
// Parameters:
Expand All @@ -569,46 +569,19 @@ func (nav *Navigator) SelectDropdown(selector, value string) error {
//
// Example Usage:
//
// results, err := goSpider.ParallelRequests(requests, 3, 1*time.Second, crawlerFunc)
// results, err := asyncRequest(requests, numberOfWorkers, duration, crawlerFunc)
func ParallelRequests(requests []Requests, numberOfWorkers int, duration time.Duration, crawlerFunc func(string) (map[string]string, []map[int]map[string]interface{}, []map[int]map[string]interface{}, error)) ([]ResponseBody, error) {
var resultsSaved int
var batchResults []ResponseBody
batchSize := len(requests)
for i := 0; i < len(requests); i += batchSize {
end := i + batchSize
if end > len(requests) {
end = len(requests)
}

batchRequests := requests[i:end]

br, err := asyncRequest(batchRequests, numberOfWorkers, duration, crawlerFunc)
if err != nil {
return nil, fmt.Errorf("failed to make asyncRequest: %v", err)
}

resultsSaved += len(batchResults)
batchResults = br
}

return batchResults, nil
}

// asyncRequest performs web scraping tasks concurrently with a specified number of workers and a delay between requests.
// The crawlerFunc parameter allows for flexibility in defining the web scraping logic.
func asyncRequest(requests []Requests, numberOfWorkers int, duration time.Duration, crawlerFunc func(string) (map[string]string, []map[int]map[string]interface{}, []map[int]map[string]interface{}, error)) ([]ResponseBody, error) {
done := make(chan struct{})
defer close(done)

inputCh := streamInputs(done, requests)
var wg sync.WaitGroup
wg.Add(numberOfWorkers)

resultCh := make(chan ResponseBody)
var errorOnApiRequests error
resultCh := make(chan ResponseBody, len(requests)) // Buffered channel to hold all results

for i := 0; i < numberOfWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for input := range inputCh {
time.Sleep(duration)
cover, movements, people, err := crawlerFunc(input.ProcessNumber)
Expand All @@ -620,36 +593,44 @@ func asyncRequest(requests []Requests, numberOfWorkers int, duration time.Durati
}
if err != nil {
log.Println(err)
errorOnApiRequests = err
continue
}
}
wg.Done()
}()
}

// Close the result channel once all workers are done
go func() {
wg.Wait()
close(resultCh)
}()

if errorOnApiRequests != nil {
var results []ResponseBody
for result := range resultCh {
results = append(results, result)
}
return results, errorOnApiRequests
}

var results []ResponseBody
var errorOnApiRequests error

// Collect results from the result channel
for result := range resultCh {
if result.Error != nil {
errorOnApiRequests = result.Error
}
results = append(results, result)
}

return results, nil
return results, errorOnApiRequests
}

// streamInputs streams the input requests into a channel.
//
// Parameters:
// - done: A channel to signal when to stop processing inputs.
// - inputs: A slice of Requests structures containing the data needed for each request.
//
// Returns:
// - A channel that streams the input requests.
//
// Example Usage:
//
// inputCh := streamInputs(done, inputs)
func streamInputs(done <-chan struct{}, inputs []Requests) <-chan Requests {
inputCh := make(chan Requests)
go func() {
Expand All @@ -658,7 +639,7 @@ func streamInputs(done <-chan struct{}, inputs []Requests) <-chan Requests {
select {
case inputCh <- input:
case <-done:
break
return
}
}
}()
Expand Down

0 comments on commit b8c94a6

Please sign in to comment.