Skip to content

Commit

Permalink
feat(action): added Action support (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlangzi authored Mar 18, 2024
1 parent c749421 commit 74db4a9
Show file tree
Hide file tree
Showing 8 changed files with 813 additions and 262 deletions.
8 changes: 7 additions & 1 deletion .deepsource.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
version = 1

test_patterns = ["*_test.go"]

exclude_patterns = ["*_test.go"]

[[analyzers]]
name = "go"

[analyzers.meta]
import_root = "github.com/yaitoo/async"

[[transformers]]
name = "gofmt"
name = "gofmt"


2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [1.0.4] - 2024-03-18
- added `Action` support (#4)

## [1.0.3] - 2024-03-12
- added `WaitN` (#1)
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Async
Async is an asynchronous task package for Go.
Async is an async/await like task package for Go

![License](https://img.shields.io/badge/license-MIT-green.svg)
[![Tests](https://github.com/yaitoo/async/actions/workflows/tests.yml/badge.svg)](https://github.com/yaitoo/async/actions/workflows/tests.yml)
Expand All @@ -10,12 +10,12 @@ Async is an asynchronous task package for Go.


## Features
- Wait/WaitAny/WaitN
- Wait/WaitAny/WaitN for `Task` and `Action`
- `context.Context` with `timeout`, `cancel` support
- Works with generic instead of `interface{}`

## Tutorials
see more examples on [tests](./awaiter_test.go) or [go.dev](https://go.dev/play/p/7jgcRltbwts)
see more examples on [tasks](./waiter_test.go), [actions](./awaiter_test.go) or [go.dev](https://go.dev/play/p/7jgcRltbwts)

### Install async
- install latest commit from `main` branch
Expand Down
20 changes: 17 additions & 3 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,25 @@ import (
)

var (
ErrTooLessDone = errors.New("async: too less tasks to completed without error")
ErrTooLessDone = errors.New("async: too less tasks/actions to completed without error")
)

func New[T any](tasks ...func(ctx context.Context) (T, error)) Awaiter[T] {
return &awaiter[T]{
// Task a task with result T
type Task[T any] func(ctx context.Context) (T, error)

// New create a task waiter
func New[T any](tasks ...Task[T]) Waiter[T] {
return &waiter[T]{
tasks: tasks,
}
}

// Action a task without result
type Action func(ctx context.Context) error

// NewA create an action awaiter
func NewA(actions ...Action) Awaiter {
return &awaiter{
actions: actions,
}
}
110 changes: 46 additions & 64 deletions awaiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,112 +4,94 @@ import (
"context"
)

type Awaiter[T any] interface {
// Add add a task
Add(task func(context.Context) (T, error))
// Wait wail for all tasks to completed
Wait(context.Context) ([]T, []error, error)
// WaitAny wait for any task to completed without error, can cancel other tasks
WaitAny(context.Context) (T, []error, error)
// WaitN wait for N tasks to completed without error
WaitN(context.Context, int) ([]T, []error, error)
type Awaiter interface {
// Add add an action
Add(action Action)
// Wait wail for all actions to completed
Wait(context.Context) ([]error, error)
// WaitAny wait for any action to completed without error, can cancel other tasks
WaitAny(context.Context) ([]error, error)
// WaitN wait for N actions to completed without error
WaitN(context.Context, int) ([]error, error)
}

type awaiter[T any] struct {
tasks []func(context.Context) (T, error)
type awaiter struct {
actions []Action
}

func (a *awaiter[T]) Add(task func(ctx context.Context) (T, error)) {
a.tasks = append(a.tasks, task)
func (a *awaiter) Add(action Action) {
a.actions = append(a.actions, action)
}

func (a *awaiter[T]) Wait(ctx context.Context) ([]T, []error, error) {
wait := make(chan Result[T])
func (a *awaiter) Wait(ctx context.Context) ([]error, error) {
wait := make(chan error)

for _, task := range a.tasks {
go func(task func(context.Context) (T, error)) {
r, err := task(ctx)
wait <- Result[T]{
Data: r,
Error: err,
}
}(task)
for _, action := range a.actions {
go func(action Action) {

wait <- action(ctx)
}(action)
}

var r Result[T]
var taskErrs []error
var items []T

tt := len(a.tasks)
tt := len(a.actions)
for i := 0; i < tt; i++ {
select {
case r = <-wait:
if r.Error != nil {
taskErrs = append(taskErrs, r.Error)
} else {
items = append(items, r.Data)
case err := <-wait:
if err != nil {
taskErrs = append(taskErrs, err)
}
case <-ctx.Done():
return items, taskErrs, ctx.Err()
return taskErrs, ctx.Err()
}
}

if len(items) == tt {
return items, taskErrs, nil
if len(taskErrs) > 0 {
return taskErrs, ErrTooLessDone
}

return items, taskErrs, ErrTooLessDone
return taskErrs, nil
}

func (a *awaiter[T]) WaitN(ctx context.Context, n int) ([]T, []error, error) {
wait := make(chan Result[T])
func (a *awaiter) WaitN(ctx context.Context, n int) ([]error, error) {
wait := make(chan error)

cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()

for _, task := range a.tasks {
go func(task func(context.Context) (T, error)) {
r, err := task(cancelCtx)
wait <- Result[T]{
Data: r,
Error: err,
}
}(task)
for _, action := range a.actions {
go func(action Action) {
wait <- action(cancelCtx)

}(action)
}

var r Result[T]
var taskErrs []error
var items []T
tt := len(a.tasks)
tt := len(a.actions)

var done int
for i := 0; i < tt; i++ {
select {
case r = <-wait:
if r.Error != nil {
taskErrs = append(taskErrs, r.Error)
case err := <-wait:
if err != nil {
taskErrs = append(taskErrs, err)
} else {
items = append(items, r.Data)

done++
if done == n {
return items, taskErrs, nil
return taskErrs, nil
}
}
case <-ctx.Done():
return items, taskErrs, ctx.Err()
return taskErrs, ctx.Err()
}

}

return items, taskErrs, ErrTooLessDone
return taskErrs, ErrTooLessDone
}

func (a *awaiter[T]) WaitAny(ctx context.Context) (T, []error, error) {
var t T
result, err, taskErrs := a.WaitN(ctx, 1)

if len(result) == 1 {
t = result[0]
}

return t, err, taskErrs
func (a *awaiter) WaitAny(ctx context.Context) ([]error, error) {
return a.WaitN(ctx, 1)
}
Loading

0 comments on commit 74db4a9

Please sign in to comment.