-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathLearn2_new.py
executable file
·3898 lines (3349 loc) · 167 KB
/
Learn2_new.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# '''
# Created in December 2021
# @author: Alessandro Lovo
# '''
'''
Module for training a Convolutional Neural Network on climate data.
Usage
-----
First you need to move the code to a desired folder by running
python Learn2_new.py <folder>
This will copy this code and its dependencies to your desired location and will create a config file from the default values in the functions specified in this module.
`cd` into your folder and have a look at the config file, modify all the parameters you want BEFORE the first run, but AFTER the first successful run the config file becomes read-only. There is a reason for it, so don't try to modify it anyways!
The config file will store the default values for the arguments of the functions.
If you want to import config parameters from another config file (for example another folder with a previous version of this code), you can do it by running
python import_config.py <path_to_config_from_which_to_import>
When running the code you can specify some parameters to deviate from their default value, for example running
python Learn2_new.py tau=5
will run the code with all parameters at their default values but `tau` which will now be 5
Beware that arguments are parsed with spaces, so valid syntaxes are
python Learn2_new.py tau=5 lr=1e-4
python Learn2_new.py tau 5 lr 1e-4
Invalid syntaxes are:
python Learn2_new.py tau=5 lr = 1e-4
python Learn2_new.py tau=5, lr=1e-4
You can also provide arguments as lists, in that case the program will iterate over them. For example:
python Learn2_new.py tau='[0,1,2]'
will perform three runs with tau=1, tau=2, tau=3.
Beware that you need to enclose the list inside a string or the terminal will complain. If you are passing a list of strings, use double apices, i.e.
python Learn2_new.py area="['France', 'Scandinavia']"
If by default an argument is already a list, the provided list is not interpreted as something to be iterated over, for example the argument `fields` has default value ['t2m','zg500','mrso_filtered']. So running
python Learn2_new.py fields="['t2m', 'zg500']"
will result in a single run performed with fields=['t2m', 'zg500']
If you provide more than one argument to iterate over, all combinations will be performed, e.g.:
python Learn2_new.py fields="[['t2m'], ['t2m', 'zg500']]" tau='[1,2]'
will result in 4 runs:
fields=['t2m'], tau=1
fields=['t2m'], tau=2
fields=['t2m', 'zg500'], tau=1
fields=['t2m', 'zg500'], tau=2
You can also import parameters from a previous run by using the argument 'import_params_from'.
Let's say you have performed run 0 with
percent=5
tau=0
T=12
Now if you run
python Learn2_new.py import_params_from=0 percent=1 fields="['t2m']"
You will perform a run with the same parameters as run 0, except for those explicitly provided. In particular for this case
percent=1
tau=0
T=12
fields="['t2m']"
To facilitate debugging and development it is highly advised to work with small dataset. We have orginazed our files so that
if you choose: datafolder=Data_CESM_short the code will load for a different source (with fewer number of years)
Somewhat less obvious is the treatment of skip connections that are used in `create_model` method. The point is that we convert the input
to the dictionary inside the function but we couldn't include it as a kwarg for Learn2_new.py because of conflicts with file names when
saving model checkpoints. Thus we provide an input which is subsequently processed, e.g.:
python Learn2_new.py conv_skip="[[[0,2]],[[0,2],[1,2]]]"=
will result in in two runs, one with a skip connections between layers 0 and 2, and the second run with two skip connections, one between
0 and 2 layers and one between 1 and 2 layers.
While originally we developed the code to work with a single time, we have subsequently modified it to allow inputs which
use sliding windows with time. To engage such a sliding window it is important to specify the input parameters precisely:
if `label_period_start` is None -> single time will be used as usual
if `label_period_start` is specified and it is larger than 'time_start' then the code expects that the user wishes
to use temporal information for the inference and thus expands X to an extra dimension which carries slided windows
in time. The difference that is computed `leftmargin` = `label_period_start` - `time_start` tells us how long the extra
time information is (going back in time). The window will be slided on a daily basis, so the extra dimension length
will become `leftmargin`+1. It should be noted that currently we do not support tensorflow Datasets and thus one should
be careful and not apply big time windows, or possibly risk to overrun RAM. having leftmargin nonzero is safer if
dimensionality reduction was performed beforehand.
FAQ
---
Q: what is tau?
It is the delay between the prediction and the observation, and is usually supposed to be negative (prediction before observation)
Q: What if I want to work with 1000 years that are a subset of 8000 years of Plasim_LONG?
You can do it by running with `dataset_years = 8000` and `year_list = 'range(1000)'`, which will use the first 1000 years of the 8000 year dataset
you can also provide `year_list = 'range(1000,3000,2)'` which will take the even years between 1000 and 3000
Q: what if I want to have a smaller training set and a bigger validation one
If you want to have kfold validation where the validation dataset is bigger than the training one you can do it as well by providing the argument val_folds.
For example `nfolds = 10, val_folds = 9` will use 90% of the data for testing and 10% for training
Logging levels:
level name events
0 logging.NOTSET
10 logging.DEBUG
20 logging.INFO
25 History of training
30 logging.WARNING
35 Which fold is running
No runs from which to perform transfer learning
Recomputing scores with optimal checkpoint
40 logging.ERROR
41 From where the models are loaded or created
Final score of k_fold_cross_val
Run pruned
42 Folder name of the run
Single run completes
44 Non default arguments of the run
45 Added and removed telegram logger
Tell number of scheduled runs
Skipping/rerunning already performed run
Average score of the run
48 Progressive number among the scheduled runs
49 All runs completed
50 logging.CRITICAL The program stops due to an error
'''
### IMPORT LIBRARIES #####
## general purpose
from copy import deepcopy
import os as os
from pathlib import Path
from stat import S_IREAD, S_IROTH, S_IRGRP
import sys
import traceback
import time
import shutil
import gc
import psutil
import numpy as np
import pandas as pd
import inspect
import ast
import pickle # in case we need to open labels from outside
import logging
from uncertainties import ufloat
from functools import wraps
import socket
from functools import partial # this one we use for the scheduler
from sklearn.decomposition import PCA
import signal
if __name__ == '__main__':
logger = logging.getLogger()
logger.handlers = [logging.StreamHandler(sys.stdout)]
else:
logger = logging.getLogger(__name__)
logger.level = logging.INFO
HOSTNAME = socket.gethostname()
mods = {}
## machine learning
import tensorflow as tf
from tensorflow import keras
layers = keras.layers
models = keras.models
## user defined modules
this_module = sys.modules[__name__]
path_to_here = Path(__file__).resolve().parent
path_to_ERA = path_to_here / 'ERA' # when absolute path, so you can run the script from another folder (outside plasim)
if not os.path.exists(path_to_ERA):
path_to_ERA = path_to_here.parent / 'ERA'
if not os.path.exists(path_to_ERA):
raise FileNotFoundError('Could not find ERA folder')
# go to the parent so vscode is happy with code completion :)
path_to_ERA = path_to_ERA.parent
path_to_ERA = str(path_to_ERA)
logger.info(f'{path_to_ERA = }/ERA/')
if not path_to_ERA in sys.path:
sys.path.insert(1, path_to_ERA)
import ERA.ERA_Fields_New as ef # general routines
import ERA.TF_Fields as tff # tensorflow routines
try:
import general_purpose.utilities as ut
except ImportError:
import ERA.utilities as ut
try:
from np.lib.stride_tricks import sliding_window_view
except ImportError: # custom copy for numpy<1.20
logger.warning('Could not import sliding_window_view from np.lib.stride_tricks. Using custom copy for numpy<1.20')
try:
sliding_window_view = ef.sliding_window_view
except AttributeError:
logger.warning('This version of ERA_Fields_New does not have the sliding_window_view function. Using custom copy for numpy<1.20')
# define a function which will raise error when called
def sliding_window_view(*args, **kwargs):
raise NotImplementedError("you need to update ERA_Fields_New.py to have the sliding_window_view function, or update numpy to 1.20 or higher")
# separators to create the run name from the run arguments
arg_sep = '--' # separator between arguments
value_sep = '__' # separator between an argument and its value
# dependencies that need to be copied to the run folder: keys are source files, values are the subfolders in the run folder
dependencies = {
'import_config.py': '',
'cleanup.py': '',
'../ERA/ERA_Fields_New.py': 'ERA',
'../ERA/TF_Fields.py': 'ERA',
'../general_purpose/utilities.py': 'ERA',
'../general_purpose/cartopy_plots.py': 'ERA',
}
def get_dependencies():
all_dependencies = dependencies.copy()
for mod in mods.values():
all_dependencies.update(mod['dependencies'])
return all_dependencies
def add_mod(file, description=None, dependencies=None, destination=''):
mods[Path(file).stem] = { # add this module to the dict of mods of Learn2_new.
'description': description,
'dependencies': {
Path(file).name : destination, # add this module to the list of dependencies, so it will be automatically copied by ln.move_to_folder
**(dependencies or {}) # add the dependencies of the mod
}
}
def remove_mod(file):
mods.pop(Path(file).stem)
########## USAGE ###############################
def usage():
'''
Returns the documentation of this module that explains how to use it.
'''
return this_module.__doc__
#######################################
#### CONFIG FILE RELATED ROUTINES #####
#######################################
def get_default_params(func, recursive=False):
'''
Given a function returns a dictionary with the default values of its parameters
Parameters
----------
func : callable
to be able to use the recursive feature, the function must be defined inside this module
recursive : bool, optional
if True arguments of the type '*_kwargs' will be interpreted as kwargs to pass to function * and `get_default_params` will be applied to * as well to retrieve the default values
Returns
-------
default_params : dict
dictionary of the default parameter values
Examples
--------
>>> get_default_params(balance_folds) # see function balance_folds
{'nfolds': 10, 'verbose': False}
>>> def pnroll(X, roll_X_kwargs, greeting='Hello there!'):
... print(greeting)
... return roll_X(X, **roll_X_kwargs)
>>> get_default_params(pnroll, recursive=True)
{'greeting': 'Hello there!', 'roll_X_kwargs': {'roll_axis': 'lon', 'roll_steps': 0}}
'''
s = inspect.signature(func)
default_params = {
k:v.default for k,v in s.parameters.items()
if (v.default is not inspect.Parameter.empty and not k.endswith('_kwargs'))
}
if recursive: # look for parameters ending in '_kwargs' and extract further default arguments
possible_other_params = [k for k,v in s.parameters.items() if k.endswith('_kwargs')]
for k in possible_other_params:
func_name = k.rsplit('_',1)[0] # remove '_kwargs'
try:
default_params[k] = get_default_params(getattr(this_module, func_name), recursive=True)
except:
logger.warning(f'From get_default_params: Could not find function {func_name}')
return default_params
def build_config_dict(functions):
'''
Creates a config dictionary with the default arguments of the functions in the list `functions`. See also function `get_default_params`
Parameters:
-----------
functions : list
list of functions or string with the function name
Returns
-------
d : dict
Examples
--------
If calling on one function only it is the same as to use get_default_params recursively
>>> d1 = build_config_dict([k_fold_cross_val])
>>> d2 = get_default_params(k_fold_cross_val, recursive=True)
>>> d1 == {'k_fold_cross_val_kwargs': d2}
True
'''
d = {}
for f in functions:
if isinstance(f, str):
f_name = f
f = getattr(this_module, f_name)
else:
f_name = f.__name__
d[f'{f_name}_kwargs'] = get_default_params(f, recursive=True)
return d
def check_config_dict(config_dict, correct_mistakes=True):
'''
Checks that the confic dictionary is consistent
Returns
-------
config_dict_flat : dict
flattened config_dict
Raises
------
KeyError
if the config dictionary is inconsistent.
'''
try:
config_dict_flat = ut.collapse_dict(config_dict) # in itself this checks for multiple values for the same key
found = False
label_field = config_dict_flat['label_field']
for field_name in config_dict_flat['fields']:
if field_name.startswith(label_field):
found = True
break
if not found:
if correct_mistakes:
logger.warning(f"field {label_field} is not a loaded field: adding ghost field")
config_dict_flat['fields'].append(f'{label_field}_ghost')
ut.set_values_recursive(config_dict, {'fields': config_dict_flat['fields']}, inplace=True)
else:
raise ValueError(f"{label_field = } is not one of the loaded fields: please add a ghost field as {label_field+'_ghost'}")
unusual_settings = 0
logger.warning('Checking config dictionary for unusual settings\n\n')
# early stopping related checks
if 'enable_early_stopping' in config_dict_flat:
if config_dict_flat['enable_early_stopping']:
if config_dict_flat['patience'] == 0:
logger.warning('-- Setting `patience` to 0 disables early stopping\n')
unusual_settings += 1
elif config_dict_flat['collective']:
raise ValueError('Using collective checkpoint together with early stopping is highly deprecated')
else:
logger.warning('-- Early stopping is disabled: training might take unnecessarily long time\n')
unusual_settings += 1
# collective checkpoint related checks
if config_dict_flat['collective']:
logger.warning('-- Collective checkpoint is enabled, this may be (very) suboptimal as the different folds may reach their best at different epochs\n')
unusual_settings += 1
# mask area related checks
if config_dict_flat['area'] != config_dict_flat['filter_area']:
logger.warning('-- `area` and `filter_area` are not the same!\n')
unusual_settings += 1
if config_dict_flat['Model'] in ['ERA5', 'CESM']:
if not (config_dict_flat['area'].endswith('-xarray') or config_dict_flat['area'].endswith('-xarray')):
logger.warning(f"-- For high resolution models like {config_dict_flat['Model']} you may want to use a better mask by exploiting xarray features. To do so,`area` should end with `-xarray`\n")
unusual_settings += 1
if unusual_settings > 0:
t = 5*unusual_settings + 10
logger.warning(f'\nSCRIPT PAUSED: Found {unusual_settings} unusual settings in the config dictionary.\nYou may want to interrupt and check manually your config file.\n\nThe script will continue in {t} seconds')
time.sleep(t) # pause
else:
logger.warning('Found no unusual settings in the config dictionary.\n\n')
except Exception as e:
raise KeyError('Invalid config dictionary') from e
return config_dict_flat
def remove_default_kwargs(kwargs:dict, config_dict_flat:dict):
'''
Removes from a dictionary of parameters the values that are at their default one.
Parameters
----------
kwargs : dict
dictionary with the arguments of the run
config_dict_flat : dict
flattened config dictionary with the default values
Returns
-------
dict
epurated run_args
'''
new_kwargs = {}
for arg,value in kwargs.items():
if value != config_dict_flat[arg]:
new_kwargs[arg] = value
else:
logger.debug(f'removing {arg} from run_args because it is at its default value {value}')
return new_kwargs
####################################
### OPERATIONS WITH RUN METADATA ###
####################################
def make_run_name(run_id, **kwargs):
folder = f'{run_id}{arg_sep}'
for k in sorted(kwargs):
if k == 'load_from': # skip putting load_from in the name as it messes it
continue
folder += f'{k}{value_sep}{kwargs[k]}{arg_sep}'
folder = folder[:-len(arg_sep)] # remove the last arg_sep
folder = ut.make_safe(folder)
return folder
def parse_run_name(run_name, evaluate=False):
'''
Parses a string into a dictionary
Parameters
----------
run_name: str
run name formatted as *<param_name>_<param_value>__*
evaluate : bool, optional
whether to try to evaluate the string expressions (True), or leave them as strings (False).
If unable to evalate an expression, it is left as is
Returns
-------
dict
Examples
--------
>>> parse_run_name('a__5--b__7')
{'a': '5', 'b': '7'}
>>> parse_run_name('test_arg__bla--b__7')
{'test_arg': 'bla', 'b': '7'}
>>> parse_run_name('test_arg__bla--b__7', evaluate=True)
{'test_arg': 'bla', 'b': 7}
'''
d = {}
args = run_name.split(arg_sep)
for arg in args:
if value_sep not in arg:
continue
key, value = arg.rsplit(value_sep,1)
if evaluate:
try:
value = ast.literal_eval(value)
except:
pass
d[key] = value
return d
def select_compatible(run_args, conditions, require_unique=True, config=None):
'''
Selects which runs are compatible with given conditions
Parameters
----------
run_args : dict
dictionary where each item is a dictionary of the arguments of the run
conditions : dict
dictionary of run arguments that has to be contained in the arguments of a compatible run
require_unique : bool, optional
whether you want a single run or a subset of compatible runs, by default True
config : dict or str, optional
if dict: config file
if str: path to the config file
If provided allows to beter check when a condition is at its default level, since it won't appear in the list of arguments of the run
Returns
-------
if require_unique:
the key in run_args corresponding to the only run satisfying the compatibility requirements
else:
the list of keys of compatible runs
Raises
------
KeyError
If require_unique and either none or more than one run are found compatible.
Examples
--------
>>> run_args = {'1': {'tau': -5}, '2': {'percent': 1, 'tau': -10}, '3': {'percent': 1, 'tau': -5}}
>>> select_compatible(run_args, {'tau': -10})
'2'
>>> select_compatible(run_args, {'tau': -10}, require_unique=False)
['2']
>>> select_compatible(run_args, {'percent': 1}, require_unique=False)
['2', '3']
>>> select_compatible(run_args, {'percent': 10}, require_unique=False)
[]
'''
_run_args = deepcopy(run_args)
if config is not None:
if isinstance(config, dict):
config_dict = config
elif isinstance(config, str):
config_dict = ut.json2dict(config)
else:
raise TypeError(f'Invalid type {type(config)} for config')
config_dict_flat = ut.collapse_dict(config_dict)
conditions_at_default = {k:v for k,v in conditions.items() if v == config_dict_flat[k]}
for args in _run_args.values():
for k,v in conditions_at_default.items():
if k not in args:
args[k] = v
compatible_keys = [k for k,v in _run_args.items() if conditions.items() <= v.items()]
if not require_unique:
return compatible_keys
if len(compatible_keys) == 0:
raise KeyError(f'No previous compatible run satisfies {conditions = }')
elif len(compatible_keys) > 1:
raise KeyError(f"Multiple runs contain satisfy {conditions = } ({compatible_keys}) and your are requiring just one")
return compatible_keys[0]
def remove_args_at_default(run_args:dict, config_dict_flat:dict):
'''
Removes from a dictionary of parameters the values that are at their default one.
Parameters
----------
run_args : dict
dictionary where each item is a dictionary of the arguments of the run
config_dict_flat : dict
flattened config dictionary with the default values
Returns
-------
dict
epurated run_args
'''
return {k: remove_default_kwargs(v, config_dict_flat) for k,v in run_args.items()}
def group_by_varying(run_args, variable='tau', config_dict_flat=None, sort=False, ignore=None):
'''
Groups a series of runs into sets where only `variable` varies and other parameters are shared inside the set
Parameters
----------
run_args : dict
dictionary where each item is a dictionary of the arguments of the run
variable : str, optional
argument that varies inside each group, by default 'tau'
config_dict_flat : dict, optional
flattened config dictionary with the default values, by default None
sort: True, False or 'descending', optional
wether and how to sort the runs according to the variable, default False, i.e. no sorting, which means the runs will be in chronological order
ignore : str or list[str], optional
kwarg or list of kwargs to ignore when grouping rins together
Returns
-------
list
list of dictionaries, one for each group. See examples for its structure
Examples
--------
>>> run_args = {'1': {'tau': 0, 'percent':5}, '2': {'percent': 1, 'tau': 0}, '3': {'percent': 1, 'tau': -5}}
>>> group_by_varying(run_args)
[{'args': {'percent': 5}, 'runs': ['1'], 'tau': [0]}, {'args': {'percent': 1}, 'runs': ['2', '3'], 'tau': [0, -5]}]
>>> group_by_varying(run_args, 'percent')
[{'args': {'tau': 0}, 'runs': ['1', '2'], 'percent': [5, 1]}, {'args': {'tau': -5}, 'runs': ['3'], 'percent': [1]}]
>>> group_by_varying(run_args, 'percent', sort=True)
[{'args': {'tau': 0}, 'runs': ['2', '1'], 'percent': [1, 5]}, {'args': {'tau': -5}, 'runs': ['3'], 'percent': [1]}]
>>> group_by_varying(run_args, 'percent', ignore='tau')
[{'args': {}, 'runs': ['1', '2', '3'], 'percent': [5, 1, 1]}]
'''
_run_args = deepcopy(run_args)
# remove arguments to ignore
if ignore is not None:
if isinstance(ignore, str):
ignore = [ignore]
if variable in ignore:
raise ValueError('Cannot ignore the variable!')
for args in _run_args.values():
for ign in ignore:
args.pop(ign, None)
# find the groups
try:
# move the variable to a separate dictionary removing it from the arguments in _run_args
variable_dict = {k:v.pop(variable) if variable in v else config_dict_flat[variable] for k,v in _run_args.items()}
except TypeError as e:
raise TypeError(f'{variable} is at default value in some runs, please provide config_dict_flat') from e
group_args = []
group_runs = []
for k,v in _run_args.items():
try:
i = group_args.index(v)
group_runs[i].append(k)
except ValueError:
group_args.append(v)
group_runs.append([k])
groups = []
for i in range(len(group_args)):
groups.append({'args': group_args[i], 'runs': group_runs[i], variable:[variable_dict[k] for k in group_runs[i]]})
if sort:
for g in groups:
isort = np.argsort(g[variable])
if sort == 'descending':
isort = isort[::-1]
g['runs'] = [g['runs'][i] for i in isort]
g[variable] = [g[variable][i] for i in isort]
return groups
def make_groups(runs, variable='tau', config_dict_flat=None, sort=False, ignore=None):
'''
A wrapper of `group by varying` that allows to use directly the runs dictionary rather then needing to extract the run arguments
Parameters
----------
runs : dict
dictionary with the runs
variable : str, optional
argument that varies inside each group, by default 'tau'
config_dict_flat : dict, optional
flattened config dictionary with the default values, by default None
sort : True, False or 'descending', optional
wether and how to sort the runs according to the variable, default False, i.e. no sorting, which means the runs will be in chronological order
ignore : list[str], optional
list of kwargs to ignore when grouping runs together
Returns
-------
list
list of groups.
Each group is a dictionary with the same structure as the output of `group_by_varying` but the argument 'runs' contains the full run dictionary instead of just the run numbers
'''
run_args = {k:v['args'] for k,v in runs.items()}
groups = group_by_varying(run_args, variable=variable, config_dict_flat=config_dict_flat, sort=sort, ignore=ignore)
for g in groups:
g['runs'] = [runs[k] for k in g['runs']]
return groups
def get_subset(runs:dict, conditions:dict, config_dict=None) -> dict:
'''
Wrapper of `select_compatible` that allows to extract a subset of runs that satisfy certain conditions
Parameters
----------
runs : dict
dictionary with the runs
conditions : dict
dictionary of run arguments that has to be contained in the arguments of a compatible run
config : dict or str, optional
if dict: config file
if str: path to the config file
If provided allows to beter check when a condition is at its default level, since it won't appear in the list of arguments of the run
Returns
-------
dict
subset of `runs`
'''
run_args = {k:v['args'] for k,v in runs.items()}
subset = select_compatible(run_args, conditions, require_unique=False, config=config_dict)
subset = {k:runs[k] for k in subset}
return subset
def get_run_kwargs(run_id:str, root_path:str='./') -> dict:
"""
Retrieves the keyword arguments for a specific run based on the run_id and root_path.
Parameters
----------
run_id : str
The identifier of the run to retrieve the keyword arguments for.
It can either be a simple integer or a path (absolute or relative to the root_path) to an external folder. See examples
root_path : str, optional
The root path to search for the run. This is where this script is running. Defaults to './'.
Returns
-------
dict: The keyword arguments for the specified run (as a flat dictionary).
Examples
--------
get_run_kwargs('42') # returns the keyword arguments for the run with id 42 in the current folder
get_run_kwargs('42', '/path/to/root/folder') # returns the keyword arguments for the run with id 42 in the root folder
get_run_kwargs('/path/to/external/folder/42') # returns the keyword arguments for the run with id 42 in the external folder, including those specified in the config file in the external folder
"""
external_folder = False
run_id = str(run_id)
if '/' not in run_id:
path = root_path
else:
external_folder = True
path, run_id = run_id.rsplit('/',1)
logger.info(f'Looking for run {run_id} in external folder {path}')
try:
run_id = str(int(run_id))
except ValueError:
raise ValueError('run_id must be the string representation of an integer')
runs = ut.json2dict(f'{path}/runs.json')
try:
rargs = runs[run_id]['args']
except KeyError:
raise KeyError(f'{run_id} is not a valid run')
if not external_folder:
return rargs
# deal with external folder
logger.warning(f'Loading external config file from {path}: if there are incopatibilities with the current folder, they may cause issues')
config_dict = ut.json2dict(f'{path}/config.json')
run_kwargs = ut.collapse_dict(ut.set_values_recursive(config_dict['run_kwargs'], rargs))
return run_kwargs
def get_run(load_from, current_run_args:dict=None, ignorable_keys=None, runs_path='./runs.json'):
'''
Parameters
----------
load_from : dict, int, str, or None
If dict:
it is a dictionary with arguments of the run, plus the optional argument 'if_ambiguous_choose'.
The latter can have value either 'last' or 'first', and tells the function which run to choose if multiple satisfy the compatibility conditions.
If it is not provided, the function will require to have only one compatible run and will raise an error if the choice is ambiguous.
The other items in the dictionary can be set to the special value 'same', which will set them to the value assumed by that argument in the current run.
If str:
it is parsed into a dictionary using `parse_run_name` function. `if_ambiguous_choose` is inferred from the beginning of `load_from`.
For example providing 'last', will look for the most recent run without further conditions than normal compatibility
Providing 'first--percent__1' will return the first compatible performed run with `percent` = 1
Providing 'last--percent__1--tau__same' will return the last compatible run with `percent` = 1 and `tau` at the same value of the current run
To use the 'same' keyword you must provide `current_run_name`
`load_from` can also be the full name of a run, in which case compatibility checks are skipped.
If int:
it is the number of the run (>0) or if negative is the n-th last run
If None:
the function returns None
current_run_args : dict, optional
Flattened dictionary with all the arguments of the current run, used to check for compatibility issues when loading a model
ignorable_keys : list[str], optional
list of arguments to ignore when checking for compatibility
Returns
-------
run_name : str
name of the run from which to load
If there are no runs to load from or `load_from` = None, None is returned instead of raising an error
Raises
------
KeyError
if run_name is not found or the choice is ambiguous
# GM: examples could help to understand this function better.
'''
if load_from is None:
return None
if ignorable_keys is None:
ignorable_keys = []
runs = ut.json2dict(runs_path)
spl = runs_path.rsplit('/',1)
if len(spl) == 2:
root_folder = spl[0]
else:
root_folder = './'
config = ut.json2dict(f'{root_folder}/config.json')
default_run_args = ut.collapse_dict(config['run_kwargs'])
# select only completed runs
runs = {k: v for k,v in runs.items() if v['status'] == 'COMPLETED'}
if isinstance(load_from, str) and load_from in [r['name'] for r in runs.values()]: # if load_from is precisely the name of one of the runs, we don't need to do anything more
return load_from
if_ambiguous_choose = None # either None, 'last' or 'first'
# get if_ambiguous_choose and deal with the string type
if isinstance(load_from, str):
if load_from.startswith('last'):
if_ambiguous_choose = 'last'
elif load_from.startswith('first'):
if_ambiguous_choose = 'first'
try:
load_from = int(load_from)
except ValueError: # cannot convert load_from to int, so it must be a string that doesn't contain only numbers:
load_from = parse_run_name(load_from, evaluate=True)
elif isinstance(load_from, dict):
if_ambiguous_choose = load_from.pop('if_ambiguous_choose', None)
# now load_from is either int or dict
# handle 'same' options
additional_relevant_keys = []
if isinstance(load_from, dict):
for k,v in load_from.items():
if v == 'same':
if current_run_args is None:
raise ValueError("Cannot use 'same' special value without providing current_run_name")
additional_relevant_keys.append(k)
for k in additional_relevant_keys:
load_from.pop(k)
# arguments relevant for model architecture
relevant_keys = list(get_default_params(create_model).keys()) + list(get_default_params(load_data).keys()) + ['nfolds'] + additional_relevant_keys
relevant_keys = list(set(relevant_keys).difference(ignorable_keys))
relevant_current_run_args = {k:v for k,v in current_run_args.items() if k in relevant_keys}
# select only compatible runs
compatible_runs = {}
for run_id,run in runs.items():
v_args = ut.set_values_recursive(default_run_args, run['args'])
relevant_v_args = {k:v for k,v in v_args.items() if k in relevant_keys}
diff = ut.compare_nested(relevant_v_args, relevant_current_run_args)
if diff:
logger.info(f"run {run['name']} is not compatible with the current run: {diff = }")
else:
run['args'] = v_args # save all the arguments of the run as we need them for select_compatible
compatible_runs[run_id] = run
runs = compatible_runs
if len(runs) == 0:
logger.warning('None of the previous runs are compatible with this one for performing transfer learning')
# GM: It would be nice if the warning specifies the function that reports them.
# AL: This can be achieved in formatting the logger
return None
logger.info(f'Found {len(runs)} compatible runs with the current one.')
if isinstance(load_from, int):
l = load_from
elif isinstance(load_from, dict):
require_unique = if_ambiguous_choose is None
logger.info(f'Requiring extra conditions: {load_from}')
l = select_compatible({k:v['args'] for k,v in runs.items()}, load_from, require_unique=require_unique)
if require_unique: # l is a string
l = int(l)
else: # l is a list of str
if len(l) == 0:
logger.log(35, f'None of the previous runs satisfy the conditions {load_from = }')
return None
if if_ambiguous_choose == 'first':
l = int(l[0])
elif if_ambiguous_choose == 'last':
l = int(l[-1])
else:
raise NotImplementedError(f'Unrecognized option {if_ambiguous_choose = }')
else:
raise TypeError(f'Unsupported type {type(load_from)} for load_from')
# now l is an int
if l < 0:
r = list(runs.values())[l]
else:
r = runs[str(l)]
run_name = r['name']
return run_name
######################################
########## COPY SOURCE FILES #########
######################################
def move_to_folder(folder, additional_files=None):
'''
Copies this file and its dependencies to a given folder.
Parameters
----------
folder : str or Path
destination folder
additional_files : list[Path], optional
list of additional files to copy in `folder`, by default None
Raises
------
FileExistsError
If there is already code in `folder`
'''
folder = Path(folder).resolve()
if os.path.exists(folder):
raise FileExistsError(f'Cannot copy scripts to {folder}: you already have some there')
# get all dependencies
all_dependencies = get_dependencies()
# create subdirectories for the dependencies
for subf in set(all_dependencies.values()):
(folder / subf).mkdir(parents=True,exist_ok=True)
# copy this file
path_to_here = Path(__file__).resolve() # path to this file
shutil.copy(path_to_here, folder)
path_to_here = path_to_here.parent # now it is the path to the folder where this script is
# copy dependencies
for src,dest in all_dependencies.items():
shutil.copy(path_to_here / src, folder / dest)
# copy additional files
if additional_files:
for file in additional_files:
shutil.copy(file, folder)
print(f'Now you can go to {folder} and run the learning from there:\n')
print(f'\n\ncd \"{folder}\"\n')
# print(f'cd \"{folder}\"\n has been copied to your clipboard :)')
# pyperclip.copy(f'cd \"{folder}\"')
############################################
########## DATA PREPROCESSING ##############
############################################
try:
fields_infos = ut.json2dict('fields_infos.json')
logger.info('Loaded field_infos from file')
except FileNotFoundError:
logger.warning("Could not load field_infos: using the hardcoded version")
fields_infos_Plasim = {
't2m': { # how we label the field
'name': 'tas', # how the variable is called in the *.nc files
'filename_suffix': 'tas', # the ending of the filename
'label': 'Temperature',
},
'mrso': { # how we label the field
'name': 'mrso', # how the variable is called in the *.nc files
'filename_suffix': 'mrso', # the ending of the filename
'label': 'Soil Moisture',
},
't2m_inter': { # how we label the field
'name': 'tas', # how the variable is called in the *.nc files
'filename_suffix': 'tas_inter', # the ending of the filename
'label': '3 day Temperature', # interpolated data
},
'mrso_inter': { # how we label the field
'name': 'mrso', # how the variable is called in the *.nc files
'filename_suffix': 'mrso_inter', # the ending of the filename
'label': '3 day Soil Moisture', # interpolated data
},
}
for h in [200,300,500,850]: # geopotential heights
fields_infos_Plasim[f'zg{h}'] = {
'name': 'zg',
'filename_suffix': f'zg{h}',
'label': f'{h} hPa Geopotential Height',
}
for h in [200,300,500,850]: # geopotential heights