Skip to content

Commit

Permalink
ATask: Improve CombineAsyncTask and AsyncTaskMlt
Browse files Browse the repository at this point in the history
  • Loading branch information
SunSerega committed May 10, 2024
1 parent 25d9d7c commit f05eeb4
Showing 1 changed file with 97 additions and 57 deletions.
154 changes: 97 additions & 57 deletions Utils/ATask.pas
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
AsyncTask = abstract class
public own_otp: AsyncProcOtp;

private procedure Prepare(evs: Dictionary<string, ManualResetEvent>); abstract;
private procedure SyncExecImpl; abstract;
private procedure Prepare(evs: Dictionary<string, ManualResetEvent>); abstract;

private function CreateThread := new Thread(()->
try
Expand Down Expand Up @@ -53,52 +53,92 @@
end;

AsyncTaskSum = sealed class(AsyncTask)
private p1, p2: AsyncTask;
private ps: array of AsyncTask;

protected constructor(p1, p2: AsyncTask);
protected constructor(ps: array of AsyncTask);
begin
self.p1 := p1;
self.p2 := p2;
self.ps := ps;
foreach var p in ps do
if p=nil then raise nil;
end;

private procedure Prepare(evs: Dictionary<string, ManualResetEvent>); override;
begin
p1.Prepare(evs);
p2.Prepare(evs);
end;
private procedure Prepare(evs: Dictionary<string, ManualResetEvent>); override :=
foreach var p in ps do
p.Prepare(evs);

private procedure SyncExecImpl; override;
begin
p1.SyncExecImpl;
p2.SyncExecImpl;
end;
private procedure SyncExecImpl; override :=
foreach var p in ps do
p.SyncExecImpl;

end;

AsyncTaskMlt = sealed class(AsyncTask)
private p1,p2: AsyncTask;
AsyncTaskMltExecuter = sealed class(AsyncTask)
private ps: array of AsyncTask;
private otps: array of AsyncProcOtp;
private next_ind: ()->integer;

protected constructor(p1,p2: AsyncTask);
protected constructor(ps: array of AsyncTask; otps: array of AsyncProcOtp; next_ind: ()->integer);
begin
self.p1 := p1;
self.p2 := p2;
self.ps := ps;
self.otps := otps;
self.next_ind := next_ind;
end;

private procedure Prepare(evs: Dictionary<string, ManualResetEvent>); override;
begin
p1.Prepare(evs);
p2.Prepare(evs);
end;
private procedure Prepare(evs: Dictionary<string, ManualResetEvent>); override :=
raise new System.InvalidOperationException;

private procedure SyncExecImpl; override;
private procedure SyncExecImpl; override :=
while true do
begin
var i := next_ind();
if i>=ps.Length then break;
AsyncProcOtp.curr := otps[i];
ps[i].SyncExecImpl();
otps[i].Finish;
end;

end;
AsyncTaskMlt = sealed class(AsyncTask)
private ps: array of AsyncTask;
private max_cores: integer;

protected constructor(ps: array of AsyncTask; max_cores: integer?);
begin
p1.StartExecImpl;
p2.StartExecImpl;

foreach var l in p1.own_otp do Otp(l);
foreach var l in p2.own_otp do Otp(l);
self.ps := ps;
self.max_cores := (max_cores ?? System.Environment.ProcessorCount+1).Value.ClampTop(ps.Length);
foreach var p in ps do
if p=nil then raise nil;
end;

private procedure Prepare(evs: Dictionary<string, ManualResetEvent>); override :=
foreach var p in ps do
p.Prepare(evs);

private procedure SyncExecImpl; override :=
if ps.Length = max_cores then
begin
foreach var p in ps do
p.StartExecImpl;

foreach var p in ps do
foreach var l in p.own_otp do
Otp(l);
end else
begin
var otps := ArrGen(ps.Length, i->new AsyncProcOtp(AsyncProcOtp.curr));

var next_p_ind := 0;
var next_ind := function: integer ->
Interlocked.Increment(next_p_ind) - 1;

loop max_cores do
AsyncTaskMltExecuter.Create(ps, otps, next_ind).StartExecImpl;

foreach var p_otp in otps do
foreach var l in p_otp do
AOtp.Otp(l);
end;

end;

AsyncTaskProcessExec = sealed class(AsyncTask)
Expand Down Expand Up @@ -180,12 +220,32 @@ function ProcTask(p: Action0): AsyncTask :=
new AsyncProc(p);
function EmptyTask := ProcTask(()->exit());

function operator+(p1,p2: AsyncTask): AsyncTask; extensionmethod :=
new AsyncTaskSum(p1??EmptyTask, p2??EmptyTask);
function operator+(p1,p2: AsyncTask): AsyncTask; extensionmethod;
begin
var p1_arr := (p1 as AsyncTaskSum)?.ps;
var p2_arr := (p2 as AsyncTaskSum)?.ps;
var res := new List<AsyncTask>(
(p1_arr?.Length??1).Value +
(p2_arr?.Length??1).Value
);
if p1_arr<>nil then res.AddRange(p1_arr) else if p1<>nil then res += p1;
if p2_arr<>nil then res.AddRange(p2_arr) else if p2<>nil then res += p2;
Result := new AsyncTaskSum(res.ToArray);
end;
procedure operator+=(var p1: AsyncTask; p2: AsyncTask); extensionmethod := p1 := p1+p2;

function operator*(p1,p2: AsyncTask): AsyncTask; extensionmethod :=
new AsyncTaskMlt(p1??EmptyTask, p2??EmptyTask);
function operator*(p1,p2: AsyncTask): AsyncTask; extensionmethod;
begin
var p1_arr := (p1 as AsyncTaskMlt)?.ps;
var p2_arr := (p2 as AsyncTaskMlt)?.ps;
var res := new List<AsyncTask>(
(p1_arr?.Length??1).Value +
(p2_arr?.Length??1).Value
);
if p1_arr<>nil then res.AddRange(p1_arr) else if p1<>nil then res += p1;
if p2_arr<>nil then res.AddRange(p2_arr) else if p2<>nil then res += p2;
Result := new AsyncTaskMlt(res.ToArray, nil);
end;
procedure operator*=(var p1: AsyncTask; p2: AsyncTask); extensionmethod := p1 := p1*p2;

function CompTask(fname: string; kind: OtpKind? := nil; args: string := nil) :=
Expand All @@ -197,28 +257,8 @@ function ExecTask(fname, nick: string; params pars: array of string): AsyncTask
function SetEvTask(ev: ManualResetEvent) := ProcTask(()->ev.Set());
function EventTask(ev: ManualResetEvent) := ProcTask(()->ev.WaitOne());

function CombineAsyncTask(self: sequence of AsyncTask; max_cores: integer := System.Environment.ProcessorCount+1): AsyncTask; extensionmethod;
begin
Result := EmptyTask;

var evs := new List<ManualResetEvent>;
foreach var t in self do
begin
var ev := new ManualResetEvent(false);
evs += ev;

var T_Wait := EmptyTask;
foreach var pev in evs.SkipLast(max_cores).TakeLast(max_cores) do T_Wait += EventTask(pev);

var T_ver :=
T_Wait + t +
SetEvTask(ev)
;

Result := Result * T_ver;
end;

end;
function CombineAsyncTask(self: sequence of AsyncTask; max_cores: integer? := nil): AsyncTask; extensionmethod :=
new AsyncTaskMlt(self.ToArray, max_cores);

function TaskForEach<T>(self: sequence of T; p: T->(); max_cores: integer := System.Environment.ProcessorCount+1); extensionmethod :=
self.Select(o->ProcTask(()->p(o))).CombineAsyncTask(max_cores);
Expand Down

0 comments on commit f05eeb4

Please sign in to comment.