-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First pass at concurrency changes #4
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a bunch of comments but main point is:
- Probbably time to make []State into a container struct instead that keeps track of all of the various lookups and validation logic for the []State which is essentially your workflow DAG
- I'd move the current counts of what's running into StateExec and out of processor... lets us have better telemetry and reporting on what is actually gumming up the works. later.
ac AC | ||
oc OC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of my thoughts was that this struct was probbably a better place to keep track of in flight work for any given state... it knows when it's:
- Got a job
- Waiting for rate limiter
- Processing a job
- Waiting to return the job
Which is I think useful when optimizing the framework but also concurrency.
Also lets the processor not have to know as much details or maintain the central counts for the states
processor.go
Outdated
type state struct { | ||
} | ||
|
||
type stateThing[AC any, OC any, JC any] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd pick a better name than state thing :)
|
||
func (s stateThing[AC, OC, JC]) processJob(job Job[JC]) { | ||
if s.isTerminal(job) { | ||
s.completeJob(job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just return after this and de-nest
processor.go
Outdated
} | ||
|
||
// Exec this big work function, this does all the crunching | ||
func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error { | ||
if err := p.stateThing.validate(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make NewProcessor return an error, it is a breaking change but that's the earliest time we would know the states are invalid
processor.go
Outdated
for _, c := range p.stateChan { | ||
close(c) | ||
for _, state := range p.stateThing.states { | ||
close(p.stateThing.getJobChannelForState(state.TriggerState)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't there just be a state thing method to do this for loop? "close"
No description provided.