From 5f929d3fc7db5fdf464d2c602e9c717c16fa934a Mon Sep 17 00:00:00 2001 From: "daniel_fillol@hotmail.com" <55287657+DanielFillol@users.noreply.github.com> Date: Mon, 3 Jun 2024 12:37:54 -0300 Subject: [PATCH] feat: Last fix was useless, try again --- goSpider.go | 69 +++++++++++++++++++---------------------------------- 1 file changed, 25 insertions(+), 44 deletions(-) diff --git a/goSpider.go b/goSpider.go index 9e22238..c5480f7 100644 --- a/goSpider.go +++ b/goSpider.go @@ -554,7 +554,7 @@ func (nav *Navigator) SelectDropdown(selector, value string) error { return nil } -// ParallelRequests performs web scraping tasks concurrently in batches with a specified number of workers and a delay between requests. +// ParallelRequests performs web scraping tasks concurrently with a specified number of workers and a delay between requests. // The crawlerFunc parameter allows for flexibility in defining the web scraping logic. // // Parameters: @@ -569,46 +569,19 @@ func (nav *Navigator) SelectDropdown(selector, value string) error { // // Example Usage: // -// results, err := goSpider.ParallelRequests(requests, 3, 1*time.Second, crawlerFunc) +// results, err := asyncRequest(requests, numberOfWorkers, duration, crawlerFunc) func ParallelRequests(requests []Requests, numberOfWorkers int, duration time.Duration, crawlerFunc func(string) (map[string]string, []map[int]map[string]interface{}, []map[int]map[string]interface{}, error)) ([]ResponseBody, error) { - var resultsSaved int - var batchResults []ResponseBody - batchSize := len(requests) - for i := 0; i < len(requests); i += batchSize { - end := i + batchSize - if end > len(requests) { - end = len(requests) - } - - batchRequests := requests[i:end] - - br, err := asyncRequest(batchRequests, numberOfWorkers, duration, crawlerFunc) - if err != nil { - return nil, fmt.Errorf("failed to make asyncRequest: %v", err) - } - - resultsSaved += len(batchResults) - batchResults = br - } - - return batchResults, nil -} - -// asyncRequest performs web scraping tasks concurrently with a specified number of workers and a delay between requests. -// The crawlerFunc parameter allows for flexibility in defining the web scraping logic. -func asyncRequest(requests []Requests, numberOfWorkers int, duration time.Duration, crawlerFunc func(string) (map[string]string, []map[int]map[string]interface{}, []map[int]map[string]interface{}, error)) ([]ResponseBody, error) { done := make(chan struct{}) defer close(done) inputCh := streamInputs(done, requests) var wg sync.WaitGroup - wg.Add(numberOfWorkers) - - resultCh := make(chan ResponseBody) - var errorOnApiRequests error + resultCh := make(chan ResponseBody, len(requests)) // Buffered channel to hold all results for i := 0; i < numberOfWorkers; i++ { + wg.Add(1) go func() { + defer wg.Done() for input := range inputCh { time.Sleep(duration) cover, movements, people, err := crawlerFunc(input.ProcessNumber) @@ -620,36 +593,44 @@ func asyncRequest(requests []Requests, numberOfWorkers int, duration time.Durati } if err != nil { log.Println(err) - errorOnApiRequests = err continue } } - wg.Done() }() } + // Close the result channel once all workers are done go func() { wg.Wait() close(resultCh) }() - if errorOnApiRequests != nil { - var results []ResponseBody - for result := range resultCh { - results = append(results, result) - } - return results, errorOnApiRequests - } - var results []ResponseBody + var errorOnApiRequests error + + // Collect results from the result channel for result := range resultCh { + if result.Error != nil { + errorOnApiRequests = result.Error + } results = append(results, result) } - return results, nil + return results, errorOnApiRequests } // streamInputs streams the input requests into a channel. +// +// Parameters: +// - done: A channel to signal when to stop processing inputs. +// - inputs: A slice of Requests structures containing the data needed for each request. +// +// Returns: +// - A channel that streams the input requests. +// +// Example Usage: +// +// inputCh := streamInputs(done, inputs) func streamInputs(done <-chan struct{}, inputs []Requests) <-chan Requests { inputCh := make(chan Requests) go func() { @@ -658,7 +639,7 @@ func streamInputs(done <-chan struct{}, inputs []Requests) <-chan Requests { select { case inputCh <- input: case <-done: - break + return } } }()