-
Notifications
You must be signed in to change notification settings - Fork 2
/
benchapps.py
executable file
·1726 lines (1529 loc) · 84.9 KB
/
benchapps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
:Description: List of the clustering algorithms to be executed by the benchmark and accessory routines.
Execution function for each algorithm must be named "exec<Algname>" and have the following signature:
def execAlgorithm(execpool, netfile, asym, odir, timeout, memlim=0., seed=None, task=None, pathidsuf='', workdir=ALGSDIR):
Execute the algorithm (stub)
execpool: ExecPool - execution pool of worker processes
netfile: str - the input network to be clustered
asym: bool - whether the input network is asymmetric (directed, specified by arcs)
odir: bool - whether to output results to the dedicated dir named by the network instance name,
which is actual for the shuffles with non-flat structure
timeout: ufloat32 - processing (clustering) timeout of the input file, 0 means infinity
memlim: ufloat32 - max amount of memory in GB allowed for the algorithm execution, 0 - unlimited
seed: uint64 or None - random seed, uint64_t
task: Task - owner task
pathidsuf: str - network path id prepended with the path separator
workdir: str - relative working directory of the app, actual when the app contains libs
return njobs: uint - number of performed executions (started jobs)
:Authors: (c) Artem Lutov <[email protected]>
:Organizations: eXascale Infolab <http://exascale.info/>, Lumais <http://www.lumais.com/>,
ScienceWise <http://sciencewise.info/>
:Date: 2015-07
"""
from __future__ import print_function, division # Required for stderr output, must be the first import
# Required to efficiently traverse items of dictionaries in both Python 2 and 3
try:
from future.builtins import range
except ImportError:
# Replace range() implementation for Python2
try:
range = xrange
except NameError:
pass # xrange is not defined in Python3, which is fine
import os
import shutil
import glob
import sys
import inspect # To automatically fetch algorithm name
import traceback # Stacktrace
import subprocess
from numbers import Number # To verify that a variable is a number (int or float)
from sys import executable as PYEXEC #pylint: disable=C0412; # Full path to the current Python interpreter
from benchutils import viewitems, delPathSuffix, ItemsStatistic, parseName, dirempty, funcToAppName \
, tobackup, escapePathWildcards, UTILDIR, ALGSDIR, ORIGDIR, TIMESTAMP_START_HEADER \
, SEPPARS, SEPSUBTASK, SEPPATHID, ALEVSMAX, ALGLEVS
from benchevals import SEPNAMEPART, RESDIR, CLSDIR, EXTRESCONS, EXTLOG, EXTERR, EXTAGGRES, EXTAGGRESEXT
from utils.mpepool import Job, Task
from algorithms.utils.parser_nsl import parseHeaderNslFile #, asymnet
# Note: currently the output level are limited only for the algorithms that may produce more than 10 levels
assert ALEVSMAX >= 10, 'The number of levels limitation should be added to GANXiS and some others'
EXTCLSNDS = '.cnl' # Clusters (Communities) Nodes Lists
# reFirstDigits = re.compile(r'\d+') # First digit regex
_DEBUG_TRACE = False # Trace start / stop and other events to stderr
def aggexec(apps):
"""Aggregate execution statistics
Aggregate execution results of all input datasets (considering instances and shuffles)
and output average, and avg, min, max values for each input dataset per each app.
Expected format of the aggregating files:
# ExecTime(sec) CPU_time(sec) CPU_usr(sec) CPU_kern(sec) RSS_RAM_peak(Mb) TaskName
0.550262 0.526599 0.513438 0.013161 2.086 syntmix/1K10/1K10^1=k7.1#1
...
apps - apps were executed, whose resource consumption should be aggregated
#>>> aggexec(['scp', 'ganxis']) is None
#True
"""
#exectime = {} # inpname: [app1_stat, app2_stat, ...]
# ATTENTION: for the correct output memory must be the last one
mnames = ('exectime', 'cputime', 'rssmem') # Measures names
measures = [{}, {}, {}] # execitem, cputime, rssmem
mapps = [] # Measured apps
iapp = 0 # Algorithm index
for app in apps:
appesfile = ''.join((RESDIR, app, '/', app, EXTRESCONS))
try:
with open(appesfile, 'r') as aest:
mapps.append(app)
for ln in aest:
# Strip leading spaces
ln = ln.lstrip()
# Skip comments
if not ln or ln[0] == '#':
continue
# Parse the content
fields = ln.split(None, 6)
# Note: empty and spaces strings were already excluded
# 6 fields in the old format without the rcode
assert 6 <= len(fields) <= 7, (
'Invalid format of the resource consumption file "{}": {}'.format(appesfile, ln))
# Fetch and accumulate measures
# Note: rstrip() is required, because fields[-1] can ends with '\n'; os.path.split(...)[1]
dataset = delPathSuffix(fields[-1].rstrip(), True) # Note: name can't be a path here
#print('> dataset: >>>{}<<< from >{}<'.format(dataset, fields[5]), file=sys.stderr)
assert dataset, 'Dataset name must exist'
etime = float(fields[0])
ctime = float(fields[1])
rmem = float(fields[4])
#rcode = float(fields[5]) # Note: in the old format 5-th field is the last and is the app name
for imsr, val in enumerate((etime, ctime, rmem)):
dstats = measures[imsr].setdefault(dataset, [])
if len(dstats) <= iapp:
assert len(dstats) == iapp, ('Network statistics are not synced with apps:'
' iapp={}, dataset: {}, dstats: {}'.format(iapp, dataset, dstats))
dstats.append(ItemsStatistic('_'.join((app, dataset)), val, val))
dstats[-1].add(val)
except IOError:
print('WARNING, resource consumption results for "{}" do not exist, the aggregation is discarded.'.format(app), file=sys.stderr)
else:
iapp += 1
# Check number of the apps to be outputted
if not mapps:
print('WARNING, there are no any resource consumption results to be aggregated.', file=sys.stderr)
return
# Output results
for imsr, measure in enumerate(mnames):
resfile = ''.join((RESDIR, measure, EXTAGGRES))
resxfile = ''.join((RESDIR, measure, EXTAGGRESEXT))
try:
with open(resfile, 'a') as outres, open(resxfile, 'a') as outresx:
# The header is unified for multiple outputs only for the outresx
if not os.fstat(outresx.fileno()).st_size:
# ExecTime(sec), ExecTime_avg(sec), ExecTime_min ExecTime_max
outresx.write('# <dataset>\n#\t<app1_outp>\n#\t<app2_outp>\n#\t...\n')
# Output timestamp
# Note: print() unlike .write() outputs also ending '\n'
print(TIMESTAMP_START_HEADER, file=outres)
print(TIMESTAMP_START_HEADER, file=outresx)
# Output header, which might differ for distinct runs by number of apps
outres.write('# <dataset>')
for app in mapps:
outres.write('\t{}'.format(app))
outres.write('\n')
# Output results for each dataset
for dname, dstats in viewitems(measures[imsr]):
outres.write(dname)
outresx.write(dname)
for iapp, stat in enumerate(dstats):
if not stat.fixed:
stat.fix()
# Output sum for time, but avg for mem
val = stat.sum if imsr < len(mnames) - 1 else stat.avg
outres.write('\t{:.3f}'.format(val))
outresx.write('\n\t{}>\ttotal: {:.3f}, per_item: {:.6f} ({:.6f} .. {:.6f})'
.format(mapps[iapp], val, stat.avg, stat.min, stat.max))
outres.write('\n')
outresx.write('\n')
except IOError as err:
print('ERROR, "{}" resources consumption output is failed: {}. {}'
.format(measure, err, traceback.format_exc(5)), file=sys.stderr)
def preparePath(taskpath): # , netshf=False
"""Create the path if required, otherwise move existent data to backup.
All instances and shuffles of each network are handled all together and only once,
even on calling this function for each shuffle.
taskpath - the path to be prepared
"""
# netshf - whether the task is a shuffle processing in the non-flat dir structure
#
# Backup existent files & dirs with such base only if this path exists and is not empty
# ATTENTION: do not use only basePathExists(taskpath) here to avoid movement to the backup
# processing paths when xxx.mod.net is processed before the xxx.net (has the same base)
# Create target path if not exists
# print('> preparePath(), for: {}'.format(taskpath))
if not os.path.exists(taskpath):
os.makedirs(taskpath)
elif not dirempty(taskpath): # Back up all instances and shuffles once per execution in a single archive
# print('> preparePath(), backing up: {}, content: {}'.format(taskpath, os.listdir(taskpath)))
mainpath = delPathSuffix(taskpath)
tobackup(mainpath, True, move=True) # Move to the backup (old results can't be reused in the forming results)
os.mkdir(taskpath)
def prepareResDir(appname, taskname, odir, pathidsuf):
"""Prepare output directory for the app results and back up the previous results
appname - application (algorithm) name
taskname - task name
odir - whether to output results to the dedicated dir named by the instance name,
which is typically used for shuffles with the non-flat structure
pathidsuf: str - network path id prepended with the path separator
return resulting directory without the ending '/' terminator
"""
# Preapare resulting directory
taskdir = taskname # Relative task directory without the ending '/'
if odir:
nameparts = parseName(taskname, True)
taskdir = ''.join((nameparts.basepath, nameparts.insid, '/', taskname)) # Use base name and instance id
assert not pathidsuf or pathidsuf.startswith(SEPPATHID), 'Ivalid pathidsuf: ' + pathidsuf
taskpath = ''.join((RESDIR, appname, '/', CLSDIR, taskdir, pathidsuf))
preparePath(taskpath)
return taskpath
class PyBin(object):
"""Automatically identify the most appropriate Python interpreter among the available"""
#_pybin = PYEXEC
_pypy3 = None
_pypy = None
_python3 = None
# Initialized existing Python interpreters once
try:
with open(os.devnull, 'wb') as fdevnull:
# Note: More accurate solution is not check "python -V" output, but it fails on Python2 for the
# 'python -V' (but works for the 'python -h')
# pyverstr = subprocess.check_output([PYEXEC, '-V']).decode() # Note: Xcoding is required for Python3
##pyverstr = subprocess.Popen((PYEXEC, '-V'), stdout=subprocess.PIPE).communicate()[0].decode()
# pyver = int(reFirstDigits.search(pyverstr).group()) # Take the first digits, i.e. the major version
# pybin = 'python' if pyver >= 3 else PYEXEC
#
# Check for the pypy interpreter/JIT in the system if required
# ATTENTION: due to some bug 'python -V' does not output results
# to the specified pipe and .check_output() also fails to deliver results,
# always outputting to the stdout (which is not desirable in our case);
# 'python -V' works fine only for the Python3 that is why it is not used here.
try:
if not subprocess.call(('pypy3', '-h'), stdout=fdevnull):
_pypy3 = 'pypy3'
except OSError:
pass
try:
if not subprocess.call(('pypy', '-h'), stdout=fdevnull):
_pypy = 'pypy'
except OSError:
pass
try:
if not subprocess.call(('python3', '-h'), stdout=fdevnull):
_python3 = 'python3'
except OSError:
pass
except IOError:
# Note: the required interpreter existence in the system can't be checked here,
# only 'python' is assumed to be present by default.
pass
@staticmethod
def bestof(pypy, v3):
"""Select the best suitable Python interpreter
pypy - whether to consider PyPy versions, give priority to pypy over the CPython (standard interpreter)
v3 - whether to consider interpretors of v3.x, give priority to the largest version
"""
pybin = PYEXEC
pyname = os.path.split(pybin)[1]
if pypy and v3 and PyBin._pypy3:
if pyname.find('pypy3') == -1: # Otherwise retain PYEXEC
pybin = PyBin._pypy3
elif pypy and PyBin._pypy:
if pyname.find('pypy') in (-1, pyname.find('pypy3')): # Otherwise retain PYEXEC
pybin = PyBin._pypy
elif v3 and PyBin._python3:
if pyname.find('python3') == -1: # Otherwise retain PYEXEC
pybin = PyBin._python3
elif pyname.find('python') in (-1, pyname.find('python3')): # Otherwise retain PYEXEC
pybin = 'python'
return pybin
def iround(val, lower):
"""Round value to lower or upper integer in case of the equally good fit.
Equas to math.round for lower = False.
val: float - the value to be rounded
lower: bool - direction of the rounding resolution in case of the equally good fit
return v: int - rounded value
>>> iround(2.5, True)
2
>>> iround(2.5, False)
3
>>> iround(2.2, True)
2
>>> iround(2.2, False)
2
>>> iround(2.7, True)
3
>>> iround(2.7, False)
3
"""
q, r = divmod(val, 1)
res = int(q if lower and r <= 0.5 or not lower and r < 0.5 else q + 1)
# print('>> val: {:.3f}, q: {:.0f}, r: {:.3f}, res: {:.0f}'.format(val, q, r-0.5, res), file=sys.stderr)
return res
def reduceLevels(levs, num, root0):
"""Uniformly fetch required number of levels from the levs giving priority
to the coarse-grained (top hierarchy levels) in case of the equal fit
levs: list - ORDERED levels to be processed, where the list starts from the bottom
level of the hierarchy having the highest (most fine-grained resolution) and the
last level in the list is the root level having the most coarse-grained resolution
num: uint >= 1 - target number of levels to be fetched uniformly
root0: bool - whether the root (most coarse-grained) level has zero or maximal index
return rlevs: list, tuple - list of the reduced levels
>>> list(reduceLevels([1, 2], 1, True))
[1]
>>> list(reduceLevels([1, 2], 1, False))
[2]
>>> list(reduceLevels([1, 2, 3], 1, True))
[2]
>>> list(reduceLevels([1, 2, 3], 1, False))
[2]
>>> reduceLevels(range(0, 10), 9, True)
[0, 1, 2, 3, 4, 6, 7, 8, 9]
>>> reduceLevels(range(0, 10), 9, False)
[0, 1, 2, 3, 5, 6, 7, 8, 9]
>>> reduceLevels(range(0, 10), 8, True)
[0, 1, 3, 4, 5, 6, 8, 9]
>>> reduceLevels(range(0, 10), 8, False)
[0, 1, 3, 4, 5, 6, 8, 9]
>>> reduceLevels(range(0, 10), 7, True)
[0, 1, 3, 4, 6, 7, 9]
>>> reduceLevels(range(0, 10), 7, False)
[0, 2, 3, 5, 6, 8, 9]
>>> reduceLevels(range(0, 10), 6, True)
[0, 2, 4, 5, 7, 9]
>>> reduceLevels(range(0, 10), 6, False)
[0, 2, 4, 5, 7, 9]
>>> reduceLevels(range(0, 10), 5, True)
[0, 2, 4, 7, 9]
>>> reduceLevels(range(0, 10), 5, False)
[0, 2, 5, 7, 9]
>>> reduceLevels(range(0, 10), 4, True)
[0, 3, 6, 9]
>>> reduceLevels(range(0, 10), 4, False)
[0, 3, 6, 9]
>>> reduceLevels(range(0, 10), 3, True)
[0, 4, 9]
>>> reduceLevels(range(0, 10), 3, False)
[0, 5, 9]
>>> list(reduceLevels(range(0, 10), 2, True))
[0, 9]
>>> list(reduceLevels(range(0, 10), 2, False))
[0, 9]
>>> list(reduceLevels(range(0, 10), 1, True))
[4]
>>> list(reduceLevels(range(0, 10), 1, False))
[5]
>>> list(reduceLevels(range(0, 9), 1, True))
[4]
>>> list(reduceLevels(range(0, 9), 1, False))
[4]
"""
nlevs = len(levs)
if num >= nlevs:
return levs
elif num >= 2:
# Multiplication ratio >= 1
# The last source index is nlevs - 1, the number of dest indexes besides the zero is num - 1
mrt = (nlevs - 1) / float(num - 1)
# print('> num: {}, lower: {}, mrt: {:.3f}'.format(num, root0, mrt), file=sys.stderr)
res = []
i = 0
while i < num:
res.append(levs[iround(i * mrt, root0)])
i += 1
assert len(res) == num, ('Unexpected number of resulting levels:'
' {} of {}: {}'.format(len(res), num, res))
return res
elif num == 1:
# 1 element tuple
return (levs[(nlevs - root0) // 2],) # Note: -1 to give priority to the begin
elif num <= 0:
raise ValueError('The required number of levels should be positive: ' + str(num))
else:
raise AssertionError('The value of num has not been handled: ' + str(num))
def limlevs(job):
"""Limit the number of output level to fit ALEVSMAX (unified for all algorithms).
Limit the number of hierarchy levels in the output by moving the original output
to the dedicated directory and uniformly linking the required number of levels it
to the expected output path.
Job params:
taskpath: str - task path, base directory of the resulting clusters output
fetchLevId: callable - algorithm-specific callback to fetch level ids
levfmt (optional): str - level format WILDCARD (only ? and * are supported
as in the shell) to fetch levels among other files, for example: 'tp*'.
Required at least for Oslom.
"""
lmax = ALEVSMAX # Max number of the output levels for the network
# Check the number of output levels and restructure the output if required saving the original one
taskpath = job.params['taskpath']
fetchLevId = job.params['fetchLevId']
assert os.path.isdir(taskpath) and callable(fetchLevId), (
'Invalid job parameters: taskpath: {}, fetchLevId callable: {}'.format(
taskpath, callable(fetchLevId)))
# Filter files from other items (accessory dirs)
levfmt = job.params.get('levfmt')
if levfmt:
levnames = [os.path.split(lev)[1] for lev in glob.iglob('/'.join((taskpath, levfmt)))]
else:
levnames = os.listdir(taskpath) # Note: only file names without the path are returned
# print('> limlevs() called from {}, levnames ({} / {}): {}'.format(
# job.name, len(levnames), lmax, levnames), file=sys.stderr)
if len(levnames) <= lmax:
return
# Move the initial output to the ORIGDIR
origdir, oname = os.path.split(taskpath)
if not origdir:
origdir = '.'
origdir = '/'.join((origdir, ORIGDIR))
# Check existence of the destination dir
newdir = origdir + oname + '/'
if not os.path.exists(origdir):
os.mkdir(origdir)
elif os.path.exists(newdir):
# Note: this notification is not so significant to be logged to the stderr
print('WARNING {}.limlevs(), removing the former ORIGDIR clusters: {}'.format(job.name, newdir))
# New destination of the original task output
shutil.rmtree(newdir)
shutil.move(taskpath, origdir)
# Uniformly link the required number of levels to the expected output dir
os.mkdir(taskpath)
levnames.sort(key=fetchLevId)
# Note: all callers have end indexing of the root level: Louvain, Oslom, Daoc
levnames = reduceLevels(levnames, lmax, False)
# print('> Creating symlinks for ', levnames, file=sys.stderr)
for lev in levnames:
os.symlink(os.path.relpath(newdir + lev, taskpath), '/'.join((taskpath, lev)))
def subuniflevs(job):
"""Subtask of the levels output unification.
Aggregates output levels from the parameterized job and reports them to the
task to unify results for all parameterized jobs of the algorithm on the
current network (input dataset).
Required at least for Scp.
Job params are propagated to the super-task params
taskpath: str - task path, base directory of the resulting clusters output
"""
# fetchLevId: callable - algorithm-specific callback to fetch level ids
# aparams: str - algorithm parameters
task = job.task
assert task, 'A task should exist in the job: ' + job.name
if task.params is None:
task.params = {'subtasks': {job.name: job.params}}
else:
sbtasks = task.params.setdefault('subtasks', {})
sbtasks[job.name] = job.params
# print('> subuniflevs() from job {}, {} sbtasks'.format(job.name, len(task.params['subtasks'])), file=sys.stderr)
def uniflevs(task):
"""Unify representation of the output levels.
Aggregates levels from each parameter in a uniform way limiting their number
to the required amount.
At least one level is taken from the levels output corresponding to each parameter.
The output levels of each parameter should be bound to task, which should pass
aggregated values to this task on successful completion. Note that some
subtasks might be failed but this task should perform the final aggregation
till at least any subtask completed and provided required data.
Task params:
params: dict, str
outpname: str, str - target output name without the path
fetchLevId: callable - algorithm-specific callback to fetch level ids
subtasks: dict
<subtask_name>: str, <subtask_params>: dict - processing outputs of the subtasks
"""
# root0: bool - whether the hierarchy root (the most coarse-grained) level
# has index 0 or the maximal index
if not task.params:
# Note: this is not an error to be reported to the stderr
print('WARNING, no any output levels are reported for the unification in the super task: ', task.name)
return
# print('> uniflevs() of {} started'.format(task.name), file=sys.stderr)
lmax = ALEVSMAX # Max number of the output levels for the network
# Check the number of output levels and restructure the output if required saving the original one
levsnum = 0 # Total number of the (valid) output levels for all alg. parameters
bpath = None # Base path
pouts = [] # Parameterized outputs of levels to be processed: [(outname, levnames), ...]
origdir = None
root0 = True # Scp enumerates root on the zero level
fetchLevId = task.params['fetchLevId'] # Callback to fetch level ids
subtasks = task.params.get('subtasks')
if subtasks:
for sbt, tpars in viewitems(subtasks):
try:
taskpath = tpars if isinstance(tpars, str) else tpars['taskpath']
# assert os.path.isdir(taskpath) and callable(fetchLevId), (
# 'Invalid job parameters: taskpath: {}, fetchLevId callable: {}'.format(
# taskpath, callable(fetchLevId)))
# Define base path
if bpath is not None:
outbase, outname = os.path.split(taskpath)
if outbase != bpath:
print('ERROR, levels unification called for distinct networks. Omitted for', taskpath, file=sys.stderr)
continue
else:
bpath, outname = os.path.split(taskpath)
# Move parameterized levels to the orig dir
if origdir is None:
origdir = '/'.join((bpath if bpath else '.', ORIGDIR))
if not os.path.exists(origdir):
os.mkdir(origdir)
# newdir = origdir + oname + '/'
levnames = os.listdir(taskpath) # Note: only file names without the path are returned
# Check existence of the dest path, which causes exception in shutil.move()
dstpath = origdir + os.path.split(taskpath)[1]
if os.path.exists(dstpath):
try:
os.rmdir(dstpath)
except OSError as err:
print('WARNING uniflevs(), orig dest dir is dirty. Replaced with the latest version.'
, err, file=sys.stderr)
shutil.rmtree(dstpath)
# # Note: os.listdir would throw OSError if taskpath would not be a dir
# assert os.path.isdir(taskpath), 'A directory is expected: ' + taskpath
shutil.move(taskpath, origdir)
if levnames:
levsnum += len(levnames)
# Sort levnames in a way to start from the root (the most coarse-grained) level
levnames.sort(key=fetchLevId, reverse=not root0)
pouts.append((outname, levnames)) # Output dir name without the path and corresponding levels
# print('> pout added: {} {} levs ({} .. {})'.format(outname, len(levnames), levnames[0], levnames[-1]), file=sys.stderr)
except Exception as err: #pylint: disable=W0703
print('ERROR, {} subtask output levels aggregating unification failed'
', {} params ({}): {}. Discarded. {}'.format(sbt
, None if tpars is None else len(tpars), type(tpars).__name__
, err, traceback.format_exc(3)), file=sys.stderr)
if not pouts:
print('WARNING uniflevs(), nothing to process because the output levels are empty for the task'
', which may happen if there are no any completed subtasks/jobs', task.name, file=sys.stderr)
return
# Sort pouts by the decreasing number of levels, i.e. from the fine to coarse grained outputs
pouts.sort(key=lambda outp: len(outp[1]), reverse=True)
# Create the unifying output dir
uniout = task.params.get('outpname')
if not uniout:
#uniout, _apars, insid, shfid, pathid
sname = parseName(pouts[0], True) # Parse name only without the path
uniout = ''.join((sname.basepath, sname.insid, sname.shfid, sname.pathid)) # , sname.lnkrd; Note: alg params marker is intentionally omitted
assert uniout, 'Output directory name should be defined'
unidir = '/'.join((bpath if bpath else '.', uniout, '')) # Note: ending '' to have the ending '/'
if os.path.exists(unidir):
if not (os.path.isdir(unidir) and dirempty(unidir)):
tobackup(unidir, False, move=True) # Move to the backup (old results can't be reused)
os.mkdir(unidir)
else:
os.mkdir(unidir)
# Take lmax output levels from pnets parameterized outputs proportionally to the number of
# levels in each output but with at least one output per each network
# NOTE: Take the most coarce-grained level when only a single level from the parameterized
# output is taken.
# Remained number of output clusterings after the reservation of a single level in each output
# print('> unidir: {}, {} pouts, {} levsnum'.format(unidir, len(pouts), levsnum), file=sys.stderr)
numouts = len(pouts)
iroot = 0 if root0 else -1 # Index of the root level
if numouts < lmax:
# rlevs = levsnum - numouts
lmax -= numouts # Remained limit considering the reserved levels from each output
levsnum -= numouts # The number of levels besides the reserved
for i in range(0, numouts):
outname, levs = pouts[i]
# Evaluate current number of the processing levels, take at least one
# in addition to the already reserved because the number of levels in
# the begin of pouts is maximal
levnames = None
if levsnum:
numcur = iround(len(levs) * lmax / float(levsnum), False)
if lmax and not numcur:
numcur = 1
if numcur:
# Take 2+ levels
levnames = reduceLevels(levs, 1 + numcur, root0)
lmax -= numcur # Note: the reserved 1 level was already considered
# Note: even when lmax becomes zero, the reserved levels should be linked below
if not levnames:
# Take only the root level
levnames = (levs[iroot],)
# Link the required levels
for lname in levnames:
os.symlink(os.path.relpath(''.join((origdir, outname, '/', lname)), unidir)
, '/'.join((unidir, lname)))
assert lmax >= 0, 'lmax levels at most should be outputted'
else:
# Link a single network from as many subsequent pouts as possible
for i in range(0, lmax):
outname, levs = pouts[i]
os.symlink(os.path.relpath(''.join((origdir, outname, '/', levs[iroot])), unidir)
, '/'.join((unidir, levs[iroot])))
def fetchLevIdCnl(name):
"""Fetch level id of the hierarchy/scale from the output Cnl file name.
The format of the output file name: <outpfile_name>_<lev_num><EXTCLSNDS>
name: str - level name
return id: uint - hierarchy/scale level id
"""
iid = name.rfind('_') # Index of the id
if iid == -1:
raise ValueError('The file name does not contain lev_num: ' + name)
iid += 1
iide = name.rfind('.', iid) # Extension index
if iide == -1:
print('WARNING, Cnl files should be named with the', EXTCLSNDS, 'extension:', name, file=sys.stderr)
iide = len(name)
return int(name[iid:iide])
def metainfo(levsmax=ALEVSMAX):
"""Set some meta information for the executing algorithms
levsmax: uint16 - expected max number of resolution level files (excluding the aggregated/multires output)
"""
def decor(func):
"""Decorator returning the original function"""
assert levsmax >= 1 and isinstance(levsmax, int), ('Invalid arguments, levsmax: {}'.format(levsmax))
# QMSRAFN[funcToAppName(func)] = afnmask
if levsmax != ALEVSMAX: # Save only quality measures with non-default affinity
ALGLEVS[funcToAppName(func.__name__)] = levsmax
return func
return decor
# Louvain
## Original Louvain
#def execLouvain(execpool, netfile, asym, odir, timeout=0, memlim=0., pathidsuf='', tasknum=0, task=None):
# """Execute Louvain
# Results are not stable => multiple execution is desirable.
#
# tasknum - index of the execution on the same dataset
# """
#
# # Evaluate relative network size considering whether the network is directed (asymmetric)
# netsize = os.path.getsize(netfile)
# if not asym:
# netsize *= 2
# # Fetch the task name and chose correct network filename
# netfile = os.path.splitext(netfile)[0] # Remove the extension
# taskname = os.path.split(netfile)[1] # Base name of the network
# assert taskname, 'The network name should exists'
# if tasknum:
# taskname = '-'.join((taskname, str(tasknum)))
# netfile = '../' + netfile # Use network in the required format
#
# algname = funcToAppName(inspect.currentframe().f_code.co_name) # 'louvain'
# # ./community graph.bin -l -1 -w graph.weights > graph.tree
# args = ('../exectime', ''.join(('-o=../', RESDIR, algname, EXTRESCONS)), ''.join(('-n=', taskname, pathidsuf)), '-s=/etime_' + algname
# , './community', netfile + '.lig', '-l', '-1', '-v', '-w', netfile + '.liw')
# execpool.execute(Job(name=SEPNAMEPART.join((algname, taskname)), workdir=ALGSDIR, args=args
# , timeout=timeout, memlim=memlim, stdout=''.join((RESDIR, algname, '/', taskname, '.loc'))
# , task=task, category=algname, size=netsize, stderr=''.join((RESDIR, algname, '/', taskname, EXTLOG))))
# return 1
#
#
#def evalLouvain(execpool, basefile, measure, timeout):
# return
def execLouvainIg(execpool, netfile, asym, odir, timeout=0, memlim=0., seed=None, task=None, pathidsuf='', workdir=ALGSDIR): # , selfexec=False - whether to call self recursively
"""Execute Louvain using the igraph library
Note: Louvain produces not stable results => multiple executions are desirable.
Mandatory arguments:
execpool: ExecPool - execution pool of worker processes
netfile: str - the input network to be clustered
asym: bool - whether the input network is asymmetric (directed, specified by arcs)
odir: bool - whether to output results to the dedicated dir named by the instance name,
which is actual for the shuffles with non-flat structure
Optiononal arguments:
timeout: ufloat32 - processing (clustering) timeout of the input file, 0 means infinity
memlim: ufloat32 - max amount of memory in GB allowed for the algorithm execution, 0 - unlimited
seed: uint64 - random seed, uint64_t
task: Task - owner task
pathidsuf: str - network path id prepended with the path separator
workdir: str - relative working directory of the app, actual when the app contains libs
returns - the number of executions or None
"""
# Note: .. + 0 >= 0 to be sure that type is arithmetic, otherwise it is always true for the str
assert execpool and netfile and (asym is None or isinstance(asym, bool)) and timeout + 0 >= 0 and (
memlim + 0 >= 0 and task is None or isinstance(task, Task)) and (seed is None or isinstance(seed, int)), (
'Invalid input parameters:\n\texecpool: {},\n\tnet: {},\n\tasym: {},\n\ttimeout: {},\n\tmemlim: {}'
.format(execpool, netfile, asym, timeout, memlim))
# Evaluate relative network size considering whether the network is directed (asymmetric)
netsize = os.path.getsize(netfile)
if not asym:
netsize *= 2
# Fetch the task name and chose correct network filename
taskname = os.path.splitext(os.path.split(netfile)[1])[0] # Base name of the network; , netext
assert taskname, 'The network name should exists'
#if tasknum:
# taskname = '_'.join((taskname, str(tasknum)))
# ATTENTION: for the correct execution algname must be always the same as func name without the prefix "exec"
algname = funcToAppName(inspect.currentframe().f_code.co_name) # 'LouvainIg'
# Backup prepared the resulting dir and back up the previous results if exist
taskpath = prepareResDir(algname, taskname, odir, pathidsuf)
# print('> execLouvainIg(), taskpath exists:', os.path.exists(taskpath))
# Note: igraph-python is a Cython wrapper around C igraph lib. Calls are much faster on CPython than on PyPy
pybin = PyBin.bestof(pypy=False, v3=True)
# Note: Louvain_igraph creates the output dir if it has not been existed, but not the exectime app
errfile = taskpath + EXTERR
logfile = taskpath + EXTLOG
# def relpath(path, basedir=workdir):
# """Relative path to the specified basedir"""
# return os.path.relpath(path, basedir)
# Note: without './' relpath args do not work properly for the binaries located in the current dir
relpath = lambda path: './' + os.path.relpath(path, workdir) # Relative path to the specified basedir
# Evaluate relative paths
xtimebin = relpath(UTILDIR + 'exectime')
xtimeres = relpath(''.join((RESDIR, algname, '/', algname, EXTRESCONS)))
netfile = relpath(netfile)
# taskpath = relpath(taskpath)
# ./louvain_igraph.py -i=../syntnets/1K5.nsa -o=louvain_igoutp/1K5/1K5.cnl -l
args = (xtimebin, '-o=' + xtimeres, ''.join(('-n=', taskname, pathidsuf)), '-s=/etime_' + algname
# Note: igraph-python is a Cython wrapper around C igraph lib. Calls are much faster on CPython than on PyPy
, pybin, './louvain_igraph.py', '-i' + ('nsa' if asym else 'nse')
, '-lo', ''.join((relpath(taskpath), '/', taskname, EXTCLSNDS)), netfile)
execpool.execute(Job(name=SEPNAMEPART.join((algname, taskname)), workdir=workdir, args=args, timeout=timeout
#, stdout=os.devnull
, ondone=limlevs, params={'taskpath': taskpath, 'fetchLevId': fetchLevIdCnl}
, task=task, category=algname, size=netsize, memlim=memlim, stdout=logfile, stderr=errfile))
execnum = 1
# Note: execution on shuffled network instances is now generalized for all algorithms
## Run again for all shuffled nets
#if not selfexec:
# selfexec = True
# netdir = os.path.split(netfile)[0]
# if not netdir:
# netdir = .
# netdir += '/'
# #print('Netdir: ', netdir)
# for netfile in glob.iglob(''.join((escapePathWildcards(netdir), escapePathWildcards(taskname), '/*', netext))):
# execLouvain_ig(execpool, netfile, asym, odir, timeout, memlim, selfexec)
# execnum += 1
return execnum
def fastConsBase(algname, execpool, netfile, asym, odir, timeout=0, memlim=0., seed=None, task=None, pathidsuf='', workdir=ALGSDIR): # , selfexec=False - whether to call self recursively
"""Execute Fast Consensus using the networkx and igraph libraries
Note: Louvain produces not stable results => multiple executions are desirable.
Mandatory arguments:
algname: str - underlying clusterig algorithm
execpool: ExecPool - execution pool of worker processes
netfile: str - the input network to be clustered
asym: bool - whether the input network is asymmetric (directed, specified by arcs)
odir: bool - whether to output results to the dedicated dir named by the instance name,
which is actual for the shuffles with non-flat structure
Optiononal arguments:
timeout: ufloat32 - processing (clustering) timeout of the input file, 0 means infinity
memlim: ufloat32 - max amount of memory in GB allowed for the algorithm execution, 0 - unlimited
seed: uint64 - random seed, uint64_t
task: Task - owner task
pathidsuf: str - network path id prepended with the path separator
workdir: str - relative working directory of the app, actual when the app contains libs
returns - the number of executions or None
"""
# Note: .. + 0 >= 0 to be sure that type is arithmetic, otherwise it is always true for the str
assert algname in ('FcLouv', 'FcLpm', 'FcCnm', 'FcImap') and execpool and netfile and (
asym is None or isinstance(asym, bool)) and timeout + 0 >= 0 and (
memlim + 0 >= 0 and task is None or isinstance(task, Task)) and (seed is None or isinstance(seed, int)), (
'Invalid input parameters:\n\texecpool: {},\n\tnet: {},\n\tasym: {},\n\ttimeout: {},\n\tmemlim: {}'
.format(execpool, netfile, asym, timeout, memlim))
# Note: igraph-python is a Cython wrapper around C igraph lib. Calls are much faster on CPython than on PyPy
pybin = PyBin.bestof(pypy=False, v3=True)
# Evaluate relative network size considering whether the network is directed (asymmetric)
netsize = os.path.getsize(netfile)
if not asym:
netsize *= 2
# Fetch the task name and chose correct network filename
taskname = os.path.splitext(os.path.split(netfile)[1])[0] # Remove the base path and separate extension; , netext
assert taskname, 'The network name should exists'
# ATTENTION: for the correct execution algname must be always the same as func name without the prefix "exec"
alg = None # Algorithm name parameter: louvain, lpm, cnm, infomap
if algname == 'FcCnm':
alg = 'cnm'
elif algname == 'FcImap':
alg = 'infomap'
elif algname == 'FcLouv':
alg = 'louvain'
elif algname == 'FcLpm':
alg = 'lpm'
else:
raise ValueError('Algorithm name mapping is not defined: {} <- {}'.format(alg, algname))
# Backup prepared the resulting dir and back up the previous results if exist
taskpath = prepareResDir(algname, taskname, odir, pathidsuf)
errfile = taskpath + EXTERR
# logfile = taskpath + EXTLOG
relpath = lambda path: './' + os.path.relpath(path, workdir) # Relative path to the specified basedir
# Evaluate relative paths
xtimebin = relpath(UTILDIR + 'exectime')
xtimeres = relpath(''.join((RESDIR, algname, '/', algname, EXTRESCONS)))
netfile = relpath(netfile)
taskpath = relpath(taskpath)
# Run for range of delta
delta = 0.02 # dmin
dmax = 0.1
steps = ALEVSMAX # The number of steps (similarity thresholds). Use 10 scale levels as in Ganxis.
if steps >= 2:
dd = (dmax - delta) / (steps - 1)
# print('>> steps: {}, da: {:.2f}'.format(steps, da))
else:
delta = (delta + dmax) / 2.
dd = dmax - delta
while delta <= dmax:
# for delta in (dmin + (0 if steps <= 1 else (dmax - dmin) / (steps - 1) * i) for i in range(steps)):
# Note: the number of digits should be at lest one larger that the margin values to not overwrite the file on rounding
dstr = '{:.3f}'.format(delta) # Alg params as string
# Embed params into the task name
taskparname = delPathSuffix(taskname, True)
tasksuf = taskname[len(taskparname):]
taskparname = ''.join((taskparname, SEPPARS, 'a', dstr, tasksuf)) # Current task
# ./fast_consensus.py -f <inpnet> -p 5 --outp-parts 1 -a louvain -o <outpdir>
args = (xtimebin, '-o=' + xtimeres, ''.join(('-n=', taskparname, pathidsuf)), '-s=/etime_' + algname
, pybin, './fast_consensus.py', '-f', netfile, '-a', alg
# p: 5-20
, '-p', '5', '--outp-parts', '1', '-d', dstr, '-w', '1', '-o', taskpath)
execpool.execute(Job(name=SEPNAMEPART.join((algname, taskparname)), workdir=workdir, args=args, timeout=timeout
#, ondone=postexec, stdout=os.devnull, stdout=logfile
, task=task, category=algname, size=netsize, memlim=memlim, stdout=os.devnull, stderr=errfile))
delta += dd
return steps
#
# # Note: without './' relpath args do not work properly for the binaries located in the current dir
# relpath = lambda path: './' + os.path.relpath(path, workdir) # Relative path to the specified basedir
# # Evaluate relative paths
# xtimebin = relpath(UTILDIR + 'exectime')
# xtimeres = relpath(''.join((RESDIR, algname, '/', algname, EXTRESCONS)))
# netfile = relpath(netfile)
#
# # Create subtask to monitor execution for each clique size
# taskbasex = delPathSuffix(taskname, True)
# tasksuf = taskname[len(taskbasex):]
# aggtname = taskname + pathidsuf
# task = Task(aggtname if task is None else SEPSUBTASK.join((task.name, tasksuf))
# , task=task, onfinish=uniflevs, params={'outpname': aggtname, 'fetchLevId': fetchLevIdCnl})
#
# # Run for range of delta
# dmin = 0.02
# dmax = 0.1
# dnum = 10
# for delta in (dmin + (dmax - dmin) / (dnum - 1) * i for i in range(dnum)):
# # A single argument is k-clique size
# dstr = str(delta)
# dstrex = 'd{:.2}'.format(delta)
# # Embed params into the task name
# dtaskname = ''.join((taskbasex, SEPPARS, dstrex, tasksuf))
# # Backup prepared the resulting dir and back up the previous results if exist
# taskpath = prepareResDir(algname, dtaskname, odir, pathidsuf)
# errfile = taskpath + EXTERR
# logfile = taskpath + EXTLOG
# # Evaluate relative paths dependent of the alg params
# reltaskpath = relpath(taskpath)
#
# # scp.py netname k [start_linksnum end_linksnum number_of_evaluations] [weight]
# args = (xtimebin, '-o=' + xtimeres, ''.join(('-n=', dtaskname, pathidsuf)), '-s=/etime_' + algname
# # p: 5-20
# , pybin, './fast_consensus.py', '-f', netfile, '-a', alg, '-p', '5', '-d', dstr, '-w', '1', '-o', reltaskpath)
#
# #print('> Starting job {} with args: {}'.format('_'.join((ktaskname, algname, kstrex)), args + [kstr]))
# execpool.execute(Job(name=SEPNAMEPART.join((algname, dtaskname)), workdir=workdir, args=args, timeout=timeout
# # , ondone=tidy, params=taskpath # Do not delete dirs with empty results to explicitly see what networks are clustered having empty results
# # Note: increasing clique size k causes ~(k ** pratio) increased consumption of both memory and time (up to k ^ 2),
# # so it is better to use the same category with boosted size for the much more efficient filtering comparing to the distinct categories
# , task=task, category='_'.join((algname, dstrex))
# , size=netsize, ondone=subuniflevs, params=taskpath # {'taskpath': taskpath} # , 'aparams': kstrex
# #, memlim=64 # Limit max memory consumption to 64 GB
# , memlim=memlim, stdout=logfile, stderr=errfile))
#
# return dnum
# # Extremely slow
# def execFcCnm(execpool, netfile, asym, odir, timeout=0, memlim=0., seed=None, task=None, pathidsuf='', workdir=ALGSDIR): # , selfexec=False - whether to call self recursively
# algname = funcToAppName(inspect.currentframe().f_code.co_name) # 'FcImap'
# return fastConsBase(algname, execpool, netfile, asym, odir, timeout, memlim, seed, task, pathidsuf, workdir)
# Even more slower
def execFcImap(execpool, netfile, asym, odir, timeout=0, memlim=0., seed=None, task=None, pathidsuf='', workdir=ALGSDIR): # , selfexec=False - whether to call self recursively
algname = funcToAppName(inspect.currentframe().f_code.co_name) # 'FcImap'
return fastConsBase(algname, execpool, netfile, asym, odir, timeout, memlim, seed, task, pathidsuf, workdir)
# Not too fast
def execFcLouv(execpool, netfile, asym, odir, timeout=0, memlim=0., seed=None, task=None, pathidsuf='', workdir=ALGSDIR): # , selfexec=False - whether to call self recursively
algname = funcToAppName(inspect.currentframe().f_code.co_name) # 'FcLouv'
return fastConsBase(algname, execpool, netfile, asym, odir, timeout, memlim, seed, task, pathidsuf, workdir)
# Slower
def execFcLpm(execpool, netfile, asym, odir, timeout=0, memlim=0., seed=None, task=None, pathidsuf='', workdir=ALGSDIR): # , selfexec=False - whether to call self recursively
algname = funcToAppName(inspect.currentframe().f_code.co_name) # 'FcLpm'
return fastConsBase(algname, execpool, netfile, asym, odir, timeout, memlim, seed, task, pathidsuf, workdir)
# SCP (Sequential algorithm for fast clique percolation)
# Note: it is desirable to have a dedicated task for each type of networks or even for each network for this algorithm
def execScp(execpool, netfile, asym, odir, timeout=0, memlim=0., seed=None, task=None, pathidsuf='', workdir=ALGSDIR): #pylint: disable=W0613
"""SCP algorithm
return uint: the number of scheduled jobs
"""
assert execpool and netfile and (asym is None or isinstance(asym, bool)) and timeout + 0 >= 0 and memlim + 0 >= 0, (
'Invalid input parameters:\n\texecpool: {},\n\tnet: {},\n\tasym: {},\n\ttimeout: {},\n\tmemlim: {}'
.format(execpool, netfile, asym, timeout, memlim))
# Fetch the task name (includes networks instance and shuffle if any)
taskname = os.path.splitext(os.path.split(netfile)[1])[0] # Base name of the network; , netext
assert taskname, 'The network name should exists'
algname = funcToAppName(inspect.currentframe().f_code.co_name) # 'scp'
# Evaluate relative network size considering whether the network is directed (asymmetric)
with open(netfile) as finp:
netinfo = parseHeaderNslFile(finp, asym)
asym = netinfo.directed
if not netinfo.lnsnum:
# Use network size if the number of links is not available
size = os.fstat(finp.fileno()).st_size * (1 + (not asym)) # Multiply by 2 for the symmetric (undirected) network
avgnls = None
else:
# The number of arcs in the network
# ATTENTION: / 2. is important since the resulting value affects avgnls, which affects the k powering
size = netinfo.lnsnum * (1 + (not netinfo.directed)) / 2. # arcs = edges * 2
avgnls = size / float(netinfo.ndsnum) # Average number of arcs per node
# size *= avgnls # To partially consider complexity increase with the density
relpath = lambda path: './' + os.path.relpath(path, workdir) # Relative path to the specified basedir
# Evaluate relative paths
xtimebin = relpath(UTILDIR + 'exectime')
xtimeres = relpath(''.join((RESDIR, algname, '/', algname, EXTRESCONS)))
netfile = relpath(netfile)
# Set the best possible interpreter, run under pypy if possible
# ATTENTION: Scp doesn't work correctly under Python 3
pybin = PyBin.bestof(pypy=True, v3=False)
if _DEBUG_TRACE:
print(' Selected Python interpreter: {}', pybin)
# def tidy(job):
# # The network might lack large cliques, so for some parameters the resulting
# # directories might be empty and should be cleared
# if os.path.isdir(job.params) and dirempty(job.params):
# os.rmdir(job.params)
# Create subtask to monitor execution for each clique size
taskbasex = delPathSuffix(taskname, True)
tasksuf = taskname[len(taskbasex):]
aggtname = taskname + pathidsuf
task = Task(aggtname if task is None else SEPSUBTASK.join((task.name, tasksuf))
, task=task, onfinish=uniflevs, params={'outpname': aggtname, 'fetchLevId': fetchLevIdCnl})
kmin = 3 # Min clique size to be used for the communities identification
kmax = 7 # Max clique size (~ min node degree to be considered)
steps = str(ALEVSMAX) # Use 10 scale levels as in Ganxis
# Power ratio to consider non-linear memory complexity increase depending on k
pratio = (1 + 5 ** 0.5) * 0.5 # Golden section const: 1.618 # 2.718 # exp(1)
# Run for range of clique sizes
for k in range(kmin, kmax + 1):
# A single argument is k-clique size
kstr = str(k)
kstrex = 'k' + kstr
# Embed params into the task name
ktaskname = ''.join((taskbasex, SEPPARS, kstrex, tasksuf))