From b3159f2825df20b396c15ec49e8ea4452767bdc9 Mon Sep 17 00:00:00 2001 From: Sun Serega Date: Fri, 10 May 2024 15:20:19 +0200 Subject: [PATCH] `ATask`: Improve `CombineAsyncTask` and `AsyncTaskMlt` --- ATask.pas | 154 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 97 insertions(+), 57 deletions(-) diff --git a/ATask.pas b/ATask.pas index 694a1cab..b1cfdf8a 100644 --- a/ATask.pas +++ b/ATask.pas @@ -9,8 +9,8 @@ AsyncTask = abstract class public own_otp: AsyncProcOtp; - private procedure Prepare(evs: Dictionary); abstract; private procedure SyncExecImpl; abstract; + private procedure Prepare(evs: Dictionary); abstract; private function CreateThread := new Thread(()-> try @@ -53,52 +53,92 @@ end; AsyncTaskSum = sealed class(AsyncTask) - private p1, p2: AsyncTask; + private ps: array of AsyncTask; - protected constructor(p1, p2: AsyncTask); + protected constructor(ps: array of AsyncTask); begin - self.p1 := p1; - self.p2 := p2; + self.ps := ps; + foreach var p in ps do + if p=nil then raise nil; end; - private procedure Prepare(evs: Dictionary); override; - begin - p1.Prepare(evs); - p2.Prepare(evs); - end; + private procedure Prepare(evs: Dictionary); override := + foreach var p in ps do + p.Prepare(evs); - private procedure SyncExecImpl; override; - begin - p1.SyncExecImpl; - p2.SyncExecImpl; - end; + private procedure SyncExecImpl; override := + foreach var p in ps do + p.SyncExecImpl; end; - AsyncTaskMlt = sealed class(AsyncTask) - private p1,p2: AsyncTask; + AsyncTaskMltExecuter = sealed class(AsyncTask) + private ps: array of AsyncTask; + private otps: array of AsyncProcOtp; + private next_ind: ()->integer; - protected constructor(p1,p2: AsyncTask); + protected constructor(ps: array of AsyncTask; otps: array of AsyncProcOtp; next_ind: ()->integer); begin - self.p1 := p1; - self.p2 := p2; + self.ps := ps; + self.otps := otps; + self.next_ind := next_ind; end; - private procedure Prepare(evs: Dictionary); override; - begin - p1.Prepare(evs); - p2.Prepare(evs); - end; + private procedure Prepare(evs: Dictionary); override := + raise new System.InvalidOperationException; - private procedure SyncExecImpl; override; + private procedure SyncExecImpl; override := + while true do + begin + var i := next_ind(); + if i>=ps.Length then break; + AsyncProcOtp.curr := otps[i]; + ps[i].SyncExecImpl(); + otps[i].Finish; + end; + + end; + AsyncTaskMlt = sealed class(AsyncTask) + private ps: array of AsyncTask; + private max_cores: integer; + + protected constructor(ps: array of AsyncTask; max_cores: integer?); begin - p1.StartExecImpl; - p2.StartExecImpl; - - foreach var l in p1.own_otp do Otp(l); - foreach var l in p2.own_otp do Otp(l); + self.ps := ps; + self.max_cores := (max_cores ?? System.Environment.ProcessorCount+1).Value.ClampTop(ps.Length); + foreach var p in ps do + if p=nil then raise nil; end; + private procedure Prepare(evs: Dictionary); override := + foreach var p in ps do + p.Prepare(evs); + + private procedure SyncExecImpl; override := + if ps.Length = max_cores then + begin + foreach var p in ps do + p.StartExecImpl; + + foreach var p in ps do + foreach var l in p.own_otp do + Otp(l); + end else + begin + var otps := ArrGen(ps.Length, i->new AsyncProcOtp(AsyncProcOtp.curr)); + + var next_p_ind := 0; + var next_ind := function: integer -> + Interlocked.Increment(next_p_ind) - 1; + + loop max_cores do + AsyncTaskMltExecuter.Create(ps, otps, next_ind).StartExecImpl; + + foreach var p_otp in otps do + foreach var l in p_otp do + AOtp.Otp(l); + end; + end; AsyncTaskProcessExec = sealed class(AsyncTask) @@ -180,12 +220,32 @@ function ProcTask(p: Action0): AsyncTask := new AsyncProc(p); function EmptyTask := ProcTask(()->exit()); -function operator+(p1,p2: AsyncTask): AsyncTask; extensionmethod := - new AsyncTaskSum(p1??EmptyTask, p2??EmptyTask); +function operator+(p1,p2: AsyncTask): AsyncTask; extensionmethod; +begin + var p1_arr := (p1 as AsyncTaskSum)?.ps; + var p2_arr := (p2 as AsyncTaskSum)?.ps; + var res := new List( + (p1_arr?.Length??1).Value + + (p2_arr?.Length??1).Value + ); + if p1_arr<>nil then res.AddRange(p1_arr) else if p1<>nil then res += p1; + if p2_arr<>nil then res.AddRange(p2_arr) else if p2<>nil then res += p2; + Result := new AsyncTaskSum(res.ToArray); +end; procedure operator+=(var p1: AsyncTask; p2: AsyncTask); extensionmethod := p1 := p1+p2; -function operator*(p1,p2: AsyncTask): AsyncTask; extensionmethod := - new AsyncTaskMlt(p1??EmptyTask, p2??EmptyTask); +function operator*(p1,p2: AsyncTask): AsyncTask; extensionmethod; +begin + var p1_arr := (p1 as AsyncTaskMlt)?.ps; + var p2_arr := (p2 as AsyncTaskMlt)?.ps; + var res := new List( + (p1_arr?.Length??1).Value + + (p2_arr?.Length??1).Value + ); + if p1_arr<>nil then res.AddRange(p1_arr) else if p1<>nil then res += p1; + if p2_arr<>nil then res.AddRange(p2_arr) else if p2<>nil then res += p2; + Result := new AsyncTaskMlt(res.ToArray, nil); +end; procedure operator*=(var p1: AsyncTask; p2: AsyncTask); extensionmethod := p1 := p1*p2; function CompTask(fname: string; kind: OtpKind? := nil; args: string := nil) := @@ -197,28 +257,8 @@ function ExecTask(fname, nick: string; params pars: array of string): AsyncTask function SetEvTask(ev: ManualResetEvent) := ProcTask(()->ev.Set()); function EventTask(ev: ManualResetEvent) := ProcTask(()->ev.WaitOne()); -function CombineAsyncTask(self: sequence of AsyncTask; max_cores: integer := System.Environment.ProcessorCount+1): AsyncTask; extensionmethod; -begin - Result := EmptyTask; - - var evs := new List; - foreach var t in self do - begin - var ev := new ManualResetEvent(false); - evs += ev; - - var T_Wait := EmptyTask; - foreach var pev in evs.SkipLast(max_cores).TakeLast(max_cores) do T_Wait += EventTask(pev); - - var T_ver := - T_Wait + t + - SetEvTask(ev) - ; - - Result := Result * T_ver; - end; - -end; +function CombineAsyncTask(self: sequence of AsyncTask; max_cores: integer? := nil): AsyncTask; extensionmethod := + new AsyncTaskMlt(self.ToArray, max_cores); function TaskForEach(self: sequence of T; p: T->(); max_cores: integer := System.Environment.ProcessorCount+1); extensionmethod := self.Select(o->ProcTask(()->p(o))).CombineAsyncTask(max_cores);