Skip to content

Commit

Permalink
Merge branch 'feature/batch-processing' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidSubiros committed Jan 12, 2021
2 parents 677fe49 + 12dc3d2 commit 4ab1915
Show file tree
Hide file tree
Showing 8 changed files with 941 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: go
go:
- 1.13
- 1.15
- tip
script:
- export GO111MODULE="on"
Expand Down
59 changes: 52 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ Common client code - in go - for ONS APIs:
* search


#### Usage
## Usage

(WIP) Each client defines two constructor functions: one that creates a new healthcheck client (with a new dp-net/http Clienter), and the other that allows you to provide it externally, so that you can reuse it among different clients.
Each client defines two constructor functions: one that creates a new healthcheck client (with a new dp-net/http Clienter), and the other that allows you to provide it externally, so that you can reuse it among different clients.

For example, you may create a new image API client like so:
```
```go
import "github.com/ONSdigital/dp-api-clients-go/image"

...
Expand All @@ -31,7 +31,7 @@ For example, you may create a new image API client like so:
```

Or you may create it providing a Healthcheck client:
```
```go
import "github.com/ONSdigital/dp-api-clients-go/image"
import "github.com/ONSdigital/dp-api-clients-go/health"

Expand All @@ -41,15 +41,60 @@ Or you may create it providing a Healthcheck client:
...
```

#### Package docs
### Batch processing

Each method in each client corresponds to a single call against one endpoint of an API, except for the Batch processing calls, which may trigger multiple concurrent calls.

The batch processing logic is implemented in the batch package as a generic method (`ProcessInConcurrentBatches`) that can be used by multiple client implementations to handle the processing of paginated responses.

For each batch, a parallel go-routine will trigger the provided getter method (`GenericBatchGetter`). Once the getter method returns, the resulting batch is provided to the processor method (`GenericBatchProcessor`) after acquiring a lock to guarantee mutually exclusive execution of processors.

The algorithm can be configured with a maximum number of items per batch (which will control the offset of each getter call) and a maximum number of workers, which will limit the number of concurrent go-routines that are executed at the same time.

If any getter or processor returns an error, the algorithm will be aborted and the same error will be returned. The processor may also return a boolean value of `true` to force the abortion of the algorithm, even if there is no error.

So far, the batch processing has been implemented by `filter API` and `dataset API` clients in order to obtain dimension options.

#### Get in batches

Assuming you have a dataset client called `datasetClient`, then you can get all the options in batches like so:

```go
// obtain all options after aggregating paginated GetOption responses
allValues, err := datasetClient.GetOptionsInBatches(ctx, userToken, serviceToken, collectionID, datasetID, edition, version, dimensionName, batchSize, maxWorkers)
```

where `batchSize` is the maximum number of items requested in each batch, and `maxWorkers` is the maximum number of concurrent go-routines.
This method will call `GET options` for each batch and then it will aggregate the results until we have all the options.

Instead of aggregating the results, you may want to perform some different logic for each batch. In this case, you may use `GetOptionsBatchProcess` with your batch Processor, like so:

```go

// processBatch is a function that performs some logic for each batch, and has the ability to abort execution if forceAbort is true or an error is returned.
var processBatch dataset.OptionsBatchProcessor = func(batch dataset.Options) (forceAbort bool, err error) {
// <Do something with batch>
return false, nil
}

// list of option IDs to obtain (if nil, all options will be provided)
optionIDs := []string{"option1", "option2", "option3"}

// call dataset API GetOptionsBatchProcess with the batch processor
err = f.DatasetClient.GetOptionsBatchProcess(ctx, userToken, serviceToken, collectionID, datasetID, edition, version, dimensionName, &optionIDs, processBatch, f.maxDatasetOptions, f.BatchMaxWorkers)
return idLabelMap, err
```


## Package docs

* [health](https://github.com/ONSdigital/dp-api-clients-go/tree/feature/client-checker/health)

### Tests
## Tests

Run tests using `make test`

### Licence
## Licence

Copyright ©‎ 2019, Crown Copyright (Office for National Statistics) (https://www.ons.gov.uk)

Expand Down
134 changes: 134 additions & 0 deletions batch/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package batch

import (
"sync"
"time"
)

// GenericBatchGetter defines the method signature for a batch getter to obtain a batch of some generic resource
type GenericBatchGetter func(offset int) (batch interface{}, totalCount int, err error)

// GenericBatchProcessor defines the method signature for a batch processor to process a batch of some generic resource
type GenericBatchProcessor func(batch interface{}) (abort bool, err error)

// ProcessInConcurrentBatches is a generic method to concurrently obtain some resource in batches and then process each batch
func ProcessInConcurrentBatches(getBatch GenericBatchGetter, processBatch GenericBatchProcessor, batchSize, maxWorkers int) (err error) {
wg := sync.WaitGroup{}
chWait := make(chan struct{})
chErr := make(chan error, maxWorkers)
chAbort := make(chan struct{})
chSemaphore := make(chan struct{}, maxWorkers)

lockResult := sync.Mutex{}

// worker add delta to workers WaitGroup and acquire semaphore
acquire := func() {
wg.Add(1)
chSemaphore <- struct{}{}
}

// worker release semaphore and workers WaitGroup delta
release := func() {
<-chSemaphore
wg.Done()
}

// abort closes the abort channel if it's not already closed
abort := func() {
select {
case <-chAbort:
default:
close(chAbort)
}
}

// isAborting returns true if the abort channel is closed
isAborting := func() bool {
select {
case <-chAbort:
return true
default:
return false
}
}

// func executed in each go-routine to process the batch and send errors to the error channel
doProcessBatch := func(offset int) {
defer release()

// Abort if needed
if isAborting() {
return
}

// get batch
batch, _, err := getBatch(offset)
if err != nil {
chErr <- err
abort()
return
}

// lock to prevent concurrent result manipulation
lockResult.Lock()
defer lockResult.Unlock()

// process batch by calling the provided function
forceAbort, err := processBatch(batch)
if err != nil {
chErr <- err
abort()
}
if forceAbort {
abort()
}
}

// get first batch sequentially, so that we know the total count before triggering any further go-routine
batch, totalCount, err := getBatch(0)
if err != nil {
return err
}

// process first batch by calling the provided function
forceAbort, err := processBatch(batch)
if forceAbort || err != nil {
return err
}

// determine the total number of remaining calls, considering that we have already performed the first one
numCalls := totalCount / batchSize
if (totalCount % batchSize) == 0 {
numCalls--
}

// process remaining batches concurrently
for i := 0; i < numCalls; i++ {
acquire()
go doProcessBatch((i + 1) * batchSize)
}

// func that will close wait channel when all go-routines complete their execution
go func() {
wg.Wait()
time.Sleep(time.Millisecond)
close(chWait)
}()

// Block until all workers finish their work, keeping track of errors
for {
select {
case err = <-chErr:
case <-chWait:
return err
}
}
}

// Min returns the lowest value
func Min(x, y int) int {
if x < y {
return x
}
return y
}
Loading

0 comments on commit 4ab1915

Please sign in to comment.