Skip to content

Commit

Permalink
feat: implementing AsyncRequest. It does require some improvements, b…
Browse files Browse the repository at this point in the history
…ut for now it's ok
  • Loading branch information
DanielFillol committed Jun 2, 2024
1 parent b3e63e2 commit 54de61c
Showing 1 changed file with 85 additions and 0 deletions.
85 changes: 85 additions & 0 deletions goSpider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log"
"os"
"strings"
"sync"
"time"
)

Expand All @@ -20,6 +21,19 @@ type Navigator struct {
Logger *log.Logger
}

// Requests structure to hold user data
type Requests struct {
ProcessNumber string
}

// ResponseBody structure to hold response data
type ResponseBody struct {
Cover map[string]string
Movements []map[int]map[string]interface{}
People []map[int]map[string]interface{}
Error error
}

// NewNavigator creates a new Navigator instance.
// Example:
//
Expand Down Expand Up @@ -539,3 +553,74 @@ func (nav *Navigator) SelectDropdown(selector, value string) error {
// nav.Logger.Println("Dropdown option selected successfully")
return nil
}

// AsyncRequest performs web scraping tasks concurrently with a specified number of workers and a delay between requests.
// The crawlerFunc parameter allows for flexibility in defining the web scraping logic.
func AsyncRequest(requests []Requests, numberOfWorkers int, duration time.Duration, crawlerFunc func(string) (map[string]string, []map[int]map[string]interface{}, []map[int]map[string]interface{}, error)) ([]ResponseBody, error) {
done := make(chan struct{})
defer close(done)

inputCh := streamInputs(done, requests)
var wg sync.WaitGroup
wg.Add(numberOfWorkers)

resultCh := make(chan ResponseBody)
var errorOnApiRequests error

for i := 0; i < numberOfWorkers; i++ {
go func() {
for input := range inputCh {
time.Sleep(duration)
cover, movements, people, err := crawlerFunc(input.ProcessNumber)
resultCh <- ResponseBody{
Cover: cover,
Movements: movements,
People: people,
Error: err,
}
if err != nil {
log.Println(err)
errorOnApiRequests = err
continue
}
}
wg.Done()
}()
}

go func() {
wg.Wait()
close(resultCh)
}()

if errorOnApiRequests != nil {
var results []ResponseBody
for result := range resultCh {
results = append(results, result)
}
return results, errorOnApiRequests
}

var results []ResponseBody
for result := range resultCh {
results = append(results, result)
}

return results, nil
}

// streamInputs streams the input requests into a channel.
func streamInputs(done <-chan struct{}, inputs []Requests) <-chan Requests {
inputCh := make(chan Requests)
go func() {
defer close(inputCh)
for _, input := range inputs {
select {
case inputCh <- input:
case <-done:
break
}
}
}()
return inputCh
}

0 comments on commit 54de61c

Please sign in to comment.