From 54de61cc6b6eca3ccc68b6d904aca2f06266ef2e Mon Sep 17 00:00:00 2001 From: "daniel_fillol@hotmail.com" <55287657+DanielFillol@users.noreply.github.com> Date: Sun, 2 Jun 2024 13:25:45 -0300 Subject: [PATCH] feat: implementing AsyncRequest. It does require some improvements, but for now it's ok --- goSpider.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/goSpider.go b/goSpider.go index b2597da..eb77c99 100644 --- a/goSpider.go +++ b/goSpider.go @@ -10,6 +10,7 @@ import ( "log" "os" "strings" + "sync" "time" ) @@ -20,6 +21,19 @@ type Navigator struct { Logger *log.Logger } +// Requests structure to hold user data +type Requests struct { + ProcessNumber string +} + +// ResponseBody structure to hold response data +type ResponseBody struct { + Cover map[string]string + Movements []map[int]map[string]interface{} + People []map[int]map[string]interface{} + Error error +} + // NewNavigator creates a new Navigator instance. // Example: // @@ -539,3 +553,74 @@ func (nav *Navigator) SelectDropdown(selector, value string) error { // nav.Logger.Println("Dropdown option selected successfully") return nil } + +// AsyncRequest performs web scraping tasks concurrently with a specified number of workers and a delay between requests. +// The crawlerFunc parameter allows for flexibility in defining the web scraping logic. +func AsyncRequest(requests []Requests, numberOfWorkers int, duration time.Duration, crawlerFunc func(string) (map[string]string, []map[int]map[string]interface{}, []map[int]map[string]interface{}, error)) ([]ResponseBody, error) { + done := make(chan struct{}) + defer close(done) + + inputCh := streamInputs(done, requests) + var wg sync.WaitGroup + wg.Add(numberOfWorkers) + + resultCh := make(chan ResponseBody) + var errorOnApiRequests error + + for i := 0; i < numberOfWorkers; i++ { + go func() { + for input := range inputCh { + time.Sleep(duration) + cover, movements, people, err := crawlerFunc(input.ProcessNumber) + resultCh <- ResponseBody{ + Cover: cover, + Movements: movements, + People: people, + Error: err, + } + if err != nil { + log.Println(err) + errorOnApiRequests = err + continue + } + } + wg.Done() + }() + } + + go func() { + wg.Wait() + close(resultCh) + }() + + if errorOnApiRequests != nil { + var results []ResponseBody + for result := range resultCh { + results = append(results, result) + } + return results, errorOnApiRequests + } + + var results []ResponseBody + for result := range resultCh { + results = append(results, result) + } + + return results, nil +} + +// streamInputs streams the input requests into a channel. +func streamInputs(done <-chan struct{}, inputs []Requests) <-chan Requests { + inputCh := make(chan Requests) + go func() { + defer close(inputCh) + for _, input := range inputs { + select { + case inputCh <- input: + case <-done: + break + } + } + }() + return inputCh +}