diff --git a/src/lava/magma/compiler/builders/py_builder.py b/src/lava/magma/compiler/builders/py_builder.py index f15d5311c..28902d8a7 100644 --- a/src/lava/magma/compiler/builders/py_builder.py +++ b/src/lava/magma/compiler/builders/py_builder.py @@ -9,13 +9,21 @@ from lava.magma.compiler.channels.interfaces import AbstractCspPort from lava.magma.compiler.channels.pypychannel import CspRecvPort, CspSendPort -from lava.magma.compiler.utils import (PortInitializer, VarInitializer, - VarPortInitializer) +from lava.magma.compiler.utils import ( + PortInitializer, + VarInitializer, + VarPortInitializer, +) from lava.magma.core.model.py.model import AbstractPyProcessModel -from lava.magma.core.model.py.ports import (AbstractPyIOPort, - IdentityTransformer, PyInPort, - PyOutPort, PyRefPort, PyVarPort, - VirtualPortTransformer) +from lava.magma.core.model.py.ports import ( + AbstractPyIOPort, + IdentityTransformer, + PyInPort, + PyOutPort, + PyRefPort, + PyVarPort, + VirtualPortTransformer, +) from lava.magma.core.model.py.type import LavaPyType @@ -44,14 +52,12 @@ class variables of a PyProcessModel, creates the corresponding data type """ def __init__( - self, - proc_model: ty.Type[AbstractPyProcessModel], - model_id: int, - proc_params: ty.Dict[str, ty.Any] = None): - super().__init__( - proc_model=proc_model, - model_id=model_id - ) + self, + proc_model: ty.Type[AbstractPyProcessModel], + model_id: int, + proc_params: ty.Dict[str, ty.Any] = None, + ): + super().__init__(proc_model=proc_model, model_id=model_id) if not issubclass(proc_model, AbstractPyProcessModel): raise AssertionError("Is not a subclass of AbstractPyProcessModel") self.vars: ty.Dict[str, VarInitializer] = {} @@ -77,10 +83,10 @@ def check_all_vars_and_ports_set(self): attr = getattr(self.proc_model, attr_name) if isinstance(attr, LavaPyType): if ( - attr_name not in self.vars - and attr_name not in self.py_ports - and attr_name not in self.ref_ports - and attr_name not in self.var_ports + attr_name not in self.vars + and attr_name not in self.py_ports + and attr_name not in self.ref_ports + and attr_name not in self.var_ports ): raise AssertionError( f"No LavaPyType '{attr_name}' found in ProcModel " @@ -187,8 +193,12 @@ def set_csp_ports(self, csp_ports: ty.List[AbstractCspPort]): proc_name = self.proc_model.implements_process.__name__ for port_name in new_ports: if not hasattr(self.proc_model, port_name): - raise AssertionError("PyProcessModel '{}' has \ - no port named '{}'.".format(proc_name, port_name)) + raise AssertionError( + "PyProcessModel '{}' has \ + no port named '{}'.".format( + proc_name, port_name + ) + ) if port_name in self.csp_ports: self.csp_ports[port_name].extend(new_ports[port_name]) @@ -209,9 +219,9 @@ def add_csp_port_mapping(self, py_port_id: str, csp_port: AbstractCspPort): a CSP port """ # Add or update the mapping - self._csp_port_map.setdefault( - csp_port.name, {} - ).update({py_port_id: csp_port}) + self._csp_port_map.setdefault(csp_port.name, {}).update( + {py_port_id: csp_port} + ) def set_rs_csp_ports(self, csp_ports: ty.List[AbstractCspPort]): """Set RS CSP Ports @@ -274,17 +284,21 @@ def build(self): csp_ports = [csp_ports] if issubclass(port_cls, PyInPort): - transformer = VirtualPortTransformer( - self._csp_port_map[name], - p.transform_funcs - ) if p.transform_funcs else IdentityTransformer() + transformer = ( + VirtualPortTransformer( + self._csp_port_map[name], p.transform_funcs + ) + if p.transform_funcs + else IdentityTransformer() + ) port_cls = ty.cast(ty.Type[PyInPort], lt.cls) port = port_cls(csp_ports, pm, p.shape, lt.d_type, transformer) elif issubclass(port_cls, PyOutPort): port = port_cls(csp_ports, pm, p.shape, lt.d_type) else: - raise AssertionError("port_cls must be of type PyInPort or " - "PyOutPort") + raise AssertionError( + "port_cls must be of type PyInPort or " "PyOutPort" + ) # Create dynamic PyPort attribute on ProcModel setattr(pm, name, port) @@ -300,18 +314,28 @@ def build(self): csp_send = None if name in self.csp_ports: csp_ports = self.csp_ports[name] - csp_recv = csp_ports[0] if isinstance( - csp_ports[0], CspRecvPort) else csp_ports[1] - csp_send = csp_ports[0] if isinstance( - csp_ports[0], CspSendPort) else csp_ports[1] + csp_recv = ( + csp_ports[0] + if isinstance(csp_ports[0], CspRecvPort) + else csp_ports[1] + ) + csp_send = ( + csp_ports[0] + if isinstance(csp_ports[0], CspSendPort) + else csp_ports[1] + ) - transformer = VirtualPortTransformer( - self._csp_port_map[name], - p.transform_funcs - ) if p.transform_funcs else IdentityTransformer() + transformer = ( + VirtualPortTransformer( + self._csp_port_map[name], p.transform_funcs + ) + if p.transform_funcs + else IdentityTransformer() + ) - port = port_cls(csp_send, csp_recv, pm, p.shape, lt.d_type, - transformer) + port = port_cls( + csp_send, csp_recv, pm, p.shape, lt.d_type, transformer + ) # Create dynamic RefPort attribute on ProcModel setattr(pm, name, port) @@ -327,19 +351,34 @@ def build(self): csp_send = None if name in self.csp_ports: csp_ports = self.csp_ports[name] - csp_recv = csp_ports[0] if isinstance( - csp_ports[0], CspRecvPort) else csp_ports[1] - csp_send = csp_ports[0] if isinstance( - csp_ports[0], CspSendPort) else csp_ports[1] + csp_recv = ( + csp_ports[0] + if isinstance(csp_ports[0], CspRecvPort) + else csp_ports[1] + ) + csp_send = ( + csp_ports[0] + if isinstance(csp_ports[0], CspSendPort) + else csp_ports[1] + ) - transformer = VirtualPortTransformer( - self._csp_port_map[name], - p.transform_funcs - ) if p.transform_funcs else IdentityTransformer() + transformer = ( + VirtualPortTransformer( + self._csp_port_map[name], p.transform_funcs + ) + if p.transform_funcs + else IdentityTransformer() + ) port = port_cls( - p.var_name, csp_send, csp_recv, pm, p.shape, p.d_type, - transformer) + p.var_name, + csp_send, + csp_recv, + pm, + p.shape, + p.d_type, + transformer, + ) # Create dynamic VarPort attribute on ProcModel setattr(pm, name, port) @@ -361,13 +400,15 @@ def build(self): if issubclass(lt.cls, np.ndarray): var = lt.cls(v.shape, lt.d_type) var[:] = v.value - elif issubclass(lt.cls, (int, float)): + elif issubclass(lt.cls, (int, float, str)): var = v.value else: - raise NotImplementedError("Cannot initiliaze variable " - "datatype, \ - only subclasses of int and float are \ - supported") + raise NotImplementedError( + "Cannot initiliaze variable " + "datatype, \ + only subclasses of int, float and str are \ + supported" + ) # Create dynamic variable attribute on ProcModel setattr(pm, name, var) diff --git a/src/lava/magma/core/learning/constants.py b/src/lava/magma/core/learning/constants.py index 81a6b31c3..5bc9dd746 100644 --- a/src/lava/magma/core/learning/constants.py +++ b/src/lava/magma/core/learning/constants.py @@ -16,7 +16,7 @@ W_WEIGHTS_S = W_WEIGHTS_U + 1 # Unsigned width of tag 2 -W_TAG_2_U = 7 +W_TAG_2_U = 8 # Signed width of tag 2 W_TAG_2_S = W_TAG_2_U + 1 diff --git a/src/lava/magma/core/learning/learning_rule.py b/src/lava/magma/core/learning/learning_rule.py index 337162719..801906b50 100644 --- a/src/lava/magma/core/learning/learning_rule.py +++ b/src/lava/magma/core/learning/learning_rule.py @@ -96,29 +96,11 @@ def __init__( rng_seed: ty.Optional[int] = None, ) -> None: - self._dw_str = None if dw is None else "dw = " + str(dw) - self._dd_str = None if dd is None else "dd = " + str(dd) - self._dt_str = None if dt is None else "dt = " + str(dt) + self._dw_str = dw + self._dd_str = dd + self._dt_str = dt - # dict of string learning rules - str_learning_rules = { - str_symbols.DW: dw, - str_symbols.DD: dd, - str_symbols.DT: dt, - } - - # dict of string learning rules that were provided only - active_str_learning_rules = { - key: str_learning_rule - for key, str_learning_rule in str_learning_rules.items() - if str_learning_rule is not None - } - - # validate that at least one learning rule was provided - self._validate_at_least_one_learning_rule(active_str_learning_rules) - # validate that same k was used throughout all learning rules in case - # a uk dependency is used - self._decimate_exponent = self._validate_uk(active_str_learning_rules) + self._validate_learning_rule_strings() # extract and validate x trace impulses and taus self._x1_impulse = self._validate_impulse(x1_impulse) @@ -144,13 +126,42 @@ def __init__( else np.random.randint(1, np.iinfo(np.int32).max) ) + self._create_product_series() + + def _validate_learning_rule_strings(self): + + # dict of string learning rules + str_learning_rules = { + str_symbols.DW: self._dw_str, + str_symbols.DD: self._dd_str, + str_symbols.DT: self._dt_str, + } + + # dict of string learning rules that were provided only + self._active_str_learning_rules = { + key: str_learning_rule + for key, str_learning_rule in str_learning_rules.items() + if str_learning_rule is not None + } + + # validate that at least one learning rule was provided + self._validate_at_least_one_learning_rule( + self._active_str_learning_rules + ) + # validate that same k was used throughout all learning rules in case + # a uk dependency is used + self._decimate_exponent = self._validate_uk( + self._active_str_learning_rules + ) + + def _create_product_series(self): # generate ProductSeries for all learning rules that were provided in # string format self._active_product_series = { key: self._generate_product_series_from_string( - key, str_learning_rule + key, str_lr ) - for key, str_learning_rule in active_str_learning_rules.items() + for key, str_lr in self._active_str_learning_rules.items() } # set attribute for each of the active ProductSeries @@ -168,10 +179,44 @@ def __init__( self._active_traces_per_dependency, ) = self._get_active_traces_from_active_product_series() + @property + def dw_str(self): + return self._dw_str + + @dw_str.setter + def dw_str(self, dw_str): + self._dw_str = dw_str + self._validate_learning_rule_strings() + self._create_product_series() + + @property + def dd_str(self): + return self._dd_str + + @dd_str.setter + def dd_str(self, dd_str): + self._dd_str = dd_str + self._validate_learning_rule_strings() + self._create_product_series() + + @property + def dt_str(self): + return self._dt_str + + @dt_str.setter + def dt_str(self, dt_str): + self._dt_str = dt_str + self._validate_learning_rule_strings() + self._create_product_series() + @property def rng_seed(self) -> int: return self._rng_seed + @rng_seed.setter + def rng_seed(self, value): + self._rng_seed = value + @property def x1_impulse(self) -> float: """Get the impulse value for x1 trace. @@ -183,6 +228,10 @@ def x1_impulse(self) -> float: """ return self._x1_impulse + @x1_impulse.setter + def x1_impulse(self, value): + self._x1_impulse = self._validate_impulse(value) + @property def x1_tau(self) -> float: """Get the tau value for x1 trace. @@ -194,6 +243,10 @@ def x1_tau(self) -> float: """ return self._x1_tau + @x1_tau.setter + def x1_tau(self, value): + self._x1_tau = self._validate_tau(value) + @property def x2_impulse(self) -> float: """Get the impulse value for x2 trace. @@ -205,6 +258,10 @@ def x2_impulse(self) -> float: """ return self._x2_impulse + @x2_impulse.setter + def x2_impulse(self, value): + self._x2_impulse = self._validate_impulse(value) + @property def x2_tau(self) -> float: """Get the tau value for x2 trace. @@ -216,6 +273,10 @@ def x2_tau(self) -> float: """ return self._x2_tau + @x2_tau.setter + def x2_tau(self, value): + self._x2_tau = self._validate_tau(value) + @property def y1_impulse(self) -> float: """Get the impulse value for y1 trace. @@ -227,6 +288,10 @@ def y1_impulse(self) -> float: """ return self._y1_impulse + @y1_impulse.setter + def y1_impulse(self, value): + self._y1_impulse = self._validate_impulse(value) + @property def y1_tau(self) -> float: """Get the tau value for y1 trace. @@ -238,6 +303,10 @@ def y1_tau(self) -> float: """ return self._y1_tau + @y1_tau.setter + def y1_tau(self, value): + self._y1_tau = self._validate_tau(value) + @property def y2_impulse(self) -> float: """Get the impulse value for y2 trace. @@ -249,6 +318,10 @@ def y2_impulse(self) -> float: """ return self._y2_impulse + @y2_impulse.setter + def y2_impulse(self, value): + self._y2_impulse = self._validate_impulse(value) + @property def y2_tau(self) -> float: """Get the tau value for y2 trace. @@ -260,6 +333,10 @@ def y2_tau(self) -> float: """ return self._y2_tau + @y2_tau.setter + def y2_tau(self, value): + self._y2_tau = self._validate_tau(value) + @property def y3_impulse(self) -> float: """Get the impulse value for y3 trace. @@ -271,6 +348,10 @@ def y3_impulse(self) -> float: """ return self._y3_impulse + @y3_impulse.setter + def y3_impulse(self, value): + self._y3_impulse = self._validate_impulse(value) + @property def y3_tau(self) -> float: """Get the tau value for y3 trace. @@ -282,6 +363,10 @@ def y3_tau(self) -> float: """ return self._y3_tau + @y3_tau.setter + def y3_tau(self, value): + self._y3_tau = self._validate_tau(value) + @property def t_epoch(self) -> int: """Get the epoch length. @@ -293,6 +378,10 @@ def t_epoch(self) -> int: """ return self._t_epoch + @t_epoch.setter + def t_epoch(self, value): + self._t_epoch = self._validate_t_epoch(value) + @property def dw(self) -> ty.Optional[ProductSeries]: """Get the ProductSeries associated with the "dw" target. @@ -643,7 +732,6 @@ def __init__( t_epoch: ty.Optional[int] = 1, rng_seed: ty.Optional[int] = None, ) -> None: - super().__init__( dw=dw, dd=dd, @@ -653,11 +741,11 @@ def __init__( x2_impulse=x2_impulse, x2_tau=x2_tau, y1_impulse=0, - y1_tau=2**32 - 1, + y1_tau=np.iinfo(np.uint16).max, y2_impulse=0, - y2_tau=2**32 - 1, + y2_tau=np.iinfo(np.uint16).max, y3_impulse=0, - y3_tau=2**32 - 1, + y3_tau=np.iinfo(np.uint16).max, t_epoch=t_epoch, rng_seed=rng_seed, ) diff --git a/src/lava/magma/core/model/model.py b/src/lava/magma/core/model/model.py index 38809e3dd..4d3cf36cc 100644 --- a/src/lava/magma/core/model/model.py +++ b/src/lava/magma/core/model/model.py @@ -78,9 +78,11 @@ class level variables. This should not cause problems as class level and required_resources: ty.List[ty.Type[AbstractResource]] = [] tags: ty.List[str] = [] - def __init__(self, - proc_params: ty.Type["ProcessParameters"], - loglevel: ty.Optional[int] = logging.WARNING) -> None: + def __init__( + self, + proc_params: ty.Type["ProcessParameters"], + loglevel: ty.Optional[int] = logging.WARNING, + ) -> None: self.log = logging.getLogger(__name__) self.log.setLevel(loglevel) self.proc_params: ty.Type["ProcessParameters"] = proc_params diff --git a/src/lava/magma/core/model/py/connection.py b/src/lava/magma/core/model/py/connection.py index 2688972bf..6084858fc 100644 --- a/src/lava/magma/core/model/py/connection.py +++ b/src/lava/magma/core/model/py/connection.py @@ -32,8 +32,7 @@ class AbstractLearningConnection: - """Base class for learning connection ProcessModels. - """ + """Base class for learning connection ProcessModels.""" # Learning Ports s_in_bap = None @@ -56,6 +55,22 @@ class AbstractLearningConnection: tag_2 = None tag_1 = None + dw = None + dt = None + dd = None + + x1_tau = None + x1_impulse = None + x2_tau = None + x2_impulse = None + + y1_tau = None + y1_impulse = None + y2_tau = None + y2_impulse = None + y3_tau = None + y3_impulse = None + class PyLearningConnection(AbstractLearningConnection): """Base class for learning connection ProcessModels in Python / CPU. @@ -111,23 +126,39 @@ def __init__(self, proc_params: dict) -> None: self.sign_mode = proc_params.get("sign_mode", SignMode.MIXED) - # store shapes that useful throughout the lifetime of this PM self._store_shapes() - # store impulses and taus in ndarrays with the right shapes self._store_impulses_and_taus() - # store active traces per dependency from learning_rule in ndarrays - # with the right shapes self._build_active_traces_per_dependency() - # store active traces from learning_rule in ndarrays - # with the right shapes self._build_active_traces() - # generate LearningRuleApplierBitApprox from ProductSeries self._build_learning_rule_appliers() - - # initialize TraceRandoms and ConnVarRandom self._init_randoms() + def on_var_update(self): + """ Update the learning rule parameters when on single Var is + updated. """ + + self._learning_rule.x1_tau = self.x1_tau[0] + self._learning_rule.x1_impulse = self.x1_impulse[0] + self._learning_rule.x2_tau = self.x2_tau[0] + self._learning_rule.x2_impulse = self.x2_impulse[0] + + self._learning_rule.y1_tau = self.y1_tau[0] + self._learning_rule.y1_impulse = self.y1_impulse[0] + self._learning_rule.y2_tau = self.y2_tau[0] + self._learning_rule.y2_impulse = self.y2_impulse[0] + self._learning_rule.y3_tau = self.y3_tau[0] + self._learning_rule.y3_impulse = self.y3_impulse[0] + + self._learning_rule.dw_str = self.dw + self._learning_rule.dd_str = self.dd + self._learning_rule.dt_str = self.dt + + self._store_impulses_and_taus() + self._build_active_traces_per_dependency() + self._build_active_traces() + self._build_learning_rule_appliers() + def _store_shapes(self) -> None: """Build and store several shapes that are needed in several computation stages of this ProcessModel.""" @@ -356,9 +387,6 @@ def recv_traces(self, s_in) -> None: self._record_post_spike_times(s_in_bap) elif isinstance(self._learning_rule, Loihi3FLearningRule): s_in_bap = self.s_in_bap.recv().astype(bool) - - # s_in_bap is being connected to the y1 port to receive - # post-synaptic spikes. y1 = self.s_in_y1.recv() y2 = self.s_in_y2.recv() y3 = self.s_in_y3.recv() @@ -403,9 +431,9 @@ def _update_synaptic_variable_random(self) -> None: def _compute_trace_histories(self) -> typing.Tuple[np.ndarray, np.ndarray]: pass - def _update_traces(self, - x_traces_history: np.ndarray, - y_traces_history: np.ndarray) -> None: + def _update_traces( + self, x_traces_history: np.ndarray, y_traces_history: np.ndarray + ) -> None: """Update x and y traces to last values in the epoch history. Parameters @@ -421,15 +449,14 @@ def _update_traces(self, self._set_y_traces(y_traces_history[-1]) @abstractmethod - def _apply_learning_rules(self, - x_traces_history: np.ndarray, - y_traces_history: np.ndarray) -> None: + def _apply_learning_rules( + self, x_traces_history: np.ndarray, y_traces_history: np.ndarray + ) -> None: pass - def _extract_applier_evaluated_traces(self, - x_traces_history: np.ndarray, - y_traces_history: np.ndarray) \ - -> typing.Dict[str, np.ndarray]: + def _extract_applier_evaluated_traces( + self, x_traces_history: np.ndarray, y_traces_history: np.ndarray + ) -> typing.Dict[str, np.ndarray]: """Extract x and y trace values on time steps derived from each of allowed dependencies. @@ -555,6 +582,22 @@ class LearningConnectionModelBitApproximate(PyLearningConnection): tag_2: np.ndarray = LavaPyType(np.ndarray, int, precision=6) tag_1: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + dw: str = LavaPyType(str, str) + dd: str = LavaPyType(str, str) + dt: str = LavaPyType(str, str) + + x1_tau: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + x1_impulse: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + x2_tau: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + x2_impulse: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + + y1_tau: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + y1_impulse: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + y2_tau: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + y2_impulse: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + y3_tau: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + y3_impulse: np.ndarray = LavaPyType(np.ndarray, int, precision=8) + def _store_impulses_and_taus(self) -> None: """Build and store integer ndarrays representing x and y impulses and taus.""" @@ -726,35 +769,45 @@ def _compute_trace_histories(self) -> typing.Tuple[np.ndarray, np.ndarray]: # most naive algorithm to decay traces # TODO decay only for important time-steps - x_traces_history = np.full((t_epoch + 1, ) + x_traces.shape, np.nan, - dtype=int) + x_traces_history = np.full( + (t_epoch + 1,) + x_traces.shape, np.nan, dtype=int + ) x_traces_history[0] = x_traces - y_traces_history = np.full((t_epoch + 1,) + y_traces.shape, np.nan, - dtype=int) + y_traces_history = np.full( + (t_epoch + 1,) + y_traces.shape, np.nan, dtype=int + ) y_traces_history[0] = y_traces for t in range(1, t_epoch + 1): - x_traces_history[t][x_taus != 0] = \ - self._decay_trace(x_traces_history[t - 1][x_taus != 0], 1, - x_taus[x_taus != 0][:, np.newaxis], - x_random.random_trace_decay) - y_traces_history[t][y_taus != 0] = \ - self._decay_trace(y_traces_history[t - 1][y_taus != 0], 1, - y_taus[y_taus != 0][:, np.newaxis], - y_random.random_trace_decay) + x_traces_history[t][x_taus != 0] = self._decay_trace( + x_traces_history[t - 1][x_taus != 0], + 1, + x_taus[x_taus != 0][:, np.newaxis], + x_random.random_trace_decay, + ) + y_traces_history[t][y_taus != 0] = self._decay_trace( + y_traces_history[t - 1][y_taus != 0], + 1, + y_taus[y_taus != 0][:, np.newaxis], + y_random.random_trace_decay, + ) # add impulses if spike happens in this timestep x_spike_ids = np.where(t_spike_x == t)[0] - x_traces_history[t][:, x_spike_ids] = \ - self._add_impulse(x_traces_history[t][:, x_spike_ids], - x_random.random_impulse_addition, - x_impulses_int, x_impulses_frac) + x_traces_history[t][:, x_spike_ids] = self._add_impulse( + x_traces_history[t][:, x_spike_ids], + x_random.random_impulse_addition, + x_impulses_int, + x_impulses_frac, + ) y_spike_ids = np.where(t_spike_y == t)[0] - y_traces_history[t][:, y_spike_ids] = \ - self._add_impulse(y_traces_history[t][:, y_spike_ids], - y_random.random_impulse_addition, - y_impulses_int, y_impulses_frac) + y_traces_history[t][:, y_spike_ids] = self._add_impulse( + y_traces_history[t][:, y_spike_ids], + y_random.random_impulse_addition, + y_impulses_int, + y_impulses_frac, + ) return x_traces_history, y_traces_history @@ -790,10 +843,10 @@ def _decay_trace( @staticmethod def _add_impulse( - trace_values: np.ndarray, - random: int, - impulses_int: np.ndarray, - impulses_frac: np.ndarray, + trace_values: np.ndarray, + random: int, + impulses_int: np.ndarray, + impulses_frac: np.ndarray, ) -> np.ndarray: """Add trace impulse impulse value and stochastically round the result. @@ -816,18 +869,19 @@ def _add_impulse( """ trace_new = trace_values + impulses_int trace_new = stochastic_round(trace_new, random, impulses_frac) - trace_new = np.clip(trace_new, a_min=0, a_max=2 ** W_TRACE - 1) + trace_new = np.clip(trace_new, a_min=0, a_max=2**W_TRACE - 1) return trace_new - def _apply_learning_rules(self, - x_traces_history: np.ndarray, - y_traces_history: np.ndarray) -> None: + def _apply_learning_rules( + self, x_traces_history: np.ndarray, y_traces_history: np.ndarray + ) -> None: """Update all synaptic variables according to the LearningRuleApplier representation of their corresponding learning rule.""" - applier_args = self._extract_applier_args(x_traces_history, - y_traces_history) + applier_args = self._extract_applier_args( + x_traces_history, y_traces_history + ) for syn_var_name, lr_applier in self._learning_rule_appliers.items(): syn_var = getattr(self, syn_var_name).copy() @@ -850,9 +904,9 @@ def _apply_learning_rules(self, syn_var = self._saturate_synaptic_variable(syn_var_name, syn_var) setattr(self, syn_var_name, syn_var) - def _extract_applier_args(self, - x_traces_history: np.ndarray, - y_traces_history: np.ndarray) -> dict: + def _extract_applier_args( + self, x_traces_history: np.ndarray, y_traces_history: np.ndarray + ) -> dict: """Extracts arguments for the LearningRuleApplierFloat. "u" is a scalar. @@ -881,7 +935,7 @@ def _extract_applier_args(self, "weights": self.weights, "tag_2": self.tag_2, "tag_1": self.tag_1, - "u": 0 + "u": 0, } if self._learning_rule.decimate_exponent is not None: @@ -896,9 +950,9 @@ def _extract_applier_args(self, # Shape: (0, ) applier_args["u"] = u - evaluated_traces = \ - self._extract_applier_evaluated_traces(x_traces_history, - y_traces_history) + evaluated_traces = self._extract_applier_evaluated_traces( + x_traces_history, y_traces_history + ) applier_args.update(evaluated_traces) return applier_args @@ -945,9 +999,9 @@ def _saturate_synaptic_variable_accumulator( @staticmethod def _stochastic_round_synaptic_variable( - synaptic_variable_name: str, - synaptic_variable_values: np.ndarray, - random: float, + synaptic_variable_name: str, + synaptic_variable_values: np.ndarray, + random: float, ) -> np.ndarray: """Stochastically round synaptic variable after learning rule application. @@ -970,8 +1024,7 @@ def _stochastic_round_synaptic_variable( fractional_part = integer_part % 1 integer_part = np.floor(integer_part) - integer_part = stochastic_round(integer_part, random, - fractional_part) + integer_part = stochastic_round(integer_part, random, fractional_part) result = (integer_part * exp_mant).astype( synaptic_variable_values.dtype ) @@ -1088,6 +1141,22 @@ class LearningConnectionModelFloat(PyLearningConnection): tag_2: np.ndarray = LavaPyType(np.ndarray, float) tag_1: np.ndarray = LavaPyType(np.ndarray, float) + dw: str = LavaPyType(str, str) + dd: str = LavaPyType(str, str) + dt: str = LavaPyType(str, str) + + x1_tau: np.ndarray = LavaPyType(np.ndarray, float) + x1_impulse: np.ndarray = LavaPyType(np.ndarray, float) + x2_tau: np.ndarray = LavaPyType(np.ndarray, float) + x2_impulse: np.ndarray = LavaPyType(np.ndarray, float) + + y1_tau: np.ndarray = LavaPyType(np.ndarray, float) + y1_impulse: np.ndarray = LavaPyType(np.ndarray, float) + y2_tau: np.ndarray = LavaPyType(np.ndarray, float) + y2_impulse: np.ndarray = LavaPyType(np.ndarray, float) + y3_tau: np.ndarray = LavaPyType(np.ndarray, float) + y3_impulse: np.ndarray = LavaPyType(np.ndarray, float) + def _store_impulses_and_taus(self) -> None: """Build and store integer ndarrays representing x and y impulses and taus.""" @@ -1191,20 +1260,26 @@ def _compute_trace_histories(self) -> typing.Tuple[np.ndarray, np.ndarray]: # most naive algorithm to decay traces # TODO decay only for important time-steps - x_traces_history = np.full((t_epoch + 1, ) + x_traces.shape, np.nan, - dtype=float) + x_traces_history = np.full( + (t_epoch + 1,) + x_traces.shape, np.nan, dtype=float + ) x_traces_history[0] = x_traces - y_traces_history = np.full((t_epoch + 1,) + y_traces.shape, np.nan, - dtype=float) + y_traces_history = np.full( + (t_epoch + 1,) + y_traces.shape, np.nan, dtype=float + ) y_traces_history[0] = y_traces for t in range(1, t_epoch + 1): - x_traces_history[t][x_taus != 0] = \ - self._decay_trace(x_traces_history[t - 1][x_taus != 0], 1, - x_taus[x_taus != 0][:, np.newaxis]) - y_traces_history[t][y_taus != 0] = \ - self._decay_trace(y_traces_history[t - 1][y_taus != 0], 1, - y_taus[y_taus != 0][:, np.newaxis]) + x_traces_history[t][x_taus != 0] = self._decay_trace( + x_traces_history[t - 1][x_taus != 0], + 1, + x_taus[x_taus != 0][:, np.newaxis], + ) + y_traces_history[t][y_taus != 0] = self._decay_trace( + y_traces_history[t - 1][y_taus != 0], + 1, + y_taus[y_taus != 0][:, np.newaxis], + ) # add impulses if spike happens in this timestep x_spike_ids = np.where(t_spike_x == t)[0] @@ -1217,7 +1292,7 @@ def _compute_trace_histories(self) -> typing.Tuple[np.ndarray, np.ndarray]: @staticmethod def _decay_trace( - trace_values: np.ndarray, t: np.ndarray, taus: np.ndarray + trace_values: np.ndarray, t: np.ndarray, taus: np.ndarray ) -> np.ndarray: """Decay trace to a given within-epoch time step. @@ -1238,14 +1313,15 @@ def _decay_trace( """ return np.exp(-t / taus) * trace_values - def _apply_learning_rules(self, - x_traces_history: np.ndarray, - y_traces_history: np.ndarray) -> None: + def _apply_learning_rules( + self, x_traces_history: np.ndarray, y_traces_history: np.ndarray + ) -> None: """Update all synaptic variables according to the LearningRuleApplier representation of their corresponding learning rule.""" - applier_args = self._extract_applier_args(x_traces_history, - y_traces_history) + applier_args = self._extract_applier_args( + x_traces_history, y_traces_history + ) for syn_var_name, lr_applier in self._learning_rule_appliers.items(): syn_var = getattr(self, syn_var_name).copy() @@ -1253,9 +1329,9 @@ def _apply_learning_rules(self, syn_var = self._saturate_synaptic_variable(syn_var_name, syn_var) setattr(self, syn_var_name, syn_var) - def _extract_applier_args(self, - x_traces_history: np.ndarray, - y_traces_history: np.ndarray) -> dict: + def _extract_applier_args( + self, x_traces_history: np.ndarray, y_traces_history: np.ndarray + ) -> dict: """Extracts arguments for the LearningRuleApplierFloat. "u" is a scalar. @@ -1300,9 +1376,9 @@ def _extract_applier_args(self, # Shape: (0, ) applier_args["u"] = u - evaluated_traces = \ - self._extract_applier_evaluated_traces(x_traces_history, - y_traces_history) + evaluated_traces = self._extract_applier_evaluated_traces( + x_traces_history, y_traces_history + ) applier_args.update(evaluated_traces) return applier_args diff --git a/src/lava/magma/core/model/py/model.py b/src/lava/magma/core/model/py/model.py index 3ddce22ac..c8f79dd0b 100644 --- a/src/lava/magma/core/model/py/model.py +++ b/src/lava/magma/core/model/py/model.py @@ -8,15 +8,19 @@ import numpy as np import platform -from lava.magma.compiler.channels.pypychannel import CspSendPort, CspRecvPort, \ - CspSelector +from lava.magma.compiler.channels.pypychannel import ( + CspSendPort, + CspRecvPort, + CspSelector, +) from lava.magma.core.model.model import AbstractProcessModel from lava.magma.core.model.py.ports import AbstractPyPort, PyVarPort from lava.magma.runtime.mgmt_token_enums import ( enum_to_np, enum_equal, MGMT_COMMAND, - MGMT_RESPONSE, ) + MGMT_RESPONSE, +) from lava.magma.core.sync.protocols.async_protocol import AsyncProtocol @@ -32,9 +36,11 @@ class AbstractPyProcessModel(AbstractProcessModel, ABC): du: int = LavaPyType(int, np.uint16, precision=12) """ - def __init__(self, - proc_params: ty.Type["ProcessParameters"], - loglevel: ty.Optional[int] = logging.WARNING) -> None: + def __init__( + self, + proc_params: ty.Type["ProcessParameters"], + loglevel: ty.Optional[int] = logging.WARNING, + ) -> None: super().__init__(proc_params=proc_params, loglevel=loglevel) self.model_id: ty.Optional[int] = None self.service_to_process: ty.Optional[CspRecvPort] = None @@ -43,16 +49,16 @@ def __init__(self, self.var_ports: ty.List[PyVarPort] = [] self.var_id_to_var_map: ty.Dict[int, ty.Any] = {} self._selector: CspSelector = CspSelector() - self._action: str = 'cmd' + self._action: str = "cmd" self._stopped: bool = False - self._channel_actions: ty.List[ty.Tuple[ty.Union[CspSendPort, - CspRecvPort], - ty.Callable]] = [] + self._channel_actions: ty.List[ + ty.Tuple[ty.Union[CspSendPort, CspRecvPort], ty.Callable] + ] = [] self._cmd_handlers: ty.Dict[MGMT_COMMAND, ty.Callable] = { MGMT_COMMAND.STOP[0]: self._stop, MGMT_COMMAND.PAUSE[0]: self._pause, MGMT_COMMAND.GET_DATA[0]: self._get_var, - MGMT_COMMAND.SET_DATA[0]: self._set_var + MGMT_COMMAND.SET_DATA[0]: self._set_var, } def __setattr__(self, key: str, value: ty.Any): @@ -110,16 +116,21 @@ def _get_var(self): data_port = self.process_to_service # Header corresponds to number of values # Data is either send once (for int) or one by one (array) - if isinstance(var, int) or isinstance(var, np.integer): + if isinstance(var, int) or isinstance(var, np.int32): data_port.send(enum_to_np(1)) data_port.send(enum_to_np(var)) elif isinstance(var, np.ndarray): # FIXME: send a whole vector (also runtime_service.py) - var_iter = np.nditer(var, order='C') - num_items: np.integer = np.prod(var.shape) + var_iter = np.nditer(var, order="C") + num_items: np.int32 = np.prod(var.shape) data_port.send(enum_to_np(num_items)) for value in var_iter: data_port.send(enum_to_np(value, np.float64)) + elif isinstance(var, str): + encoded_str = list(var.encode("ascii")) + data_port.send(enum_to_np(len(encoded_str))) + for ch in encoded_str: + data_port.send(enum_to_np(ch, d_type=np.int32)) def _set_var(self): """Handles the set Var command from runtime service.""" @@ -130,7 +141,7 @@ def _set_var(self): # 2. Receive Var data data_port = self.service_to_process - if isinstance(var, int) or isinstance(var, np.integer): + if isinstance(var, int) or isinstance(var, np.int32): # First item is number of items (1) - not needed data_port.recv() # Data to set @@ -143,7 +154,7 @@ def _set_var(self): elif isinstance(var, np.ndarray): # First item is number of items num_items = data_port.recv()[0] - var_iter = np.nditer(var, op_flags=['readwrite']) + var_iter = np.nditer(var, op_flags=["readwrite"]) # Set data one by one for i in var_iter: if num_items == 0: @@ -151,10 +162,25 @@ def _set_var(self): num_items -= 1 i[...] = data_port.recv()[0] self.process_to_service.send(MGMT_RESPONSE.SET_COMPLETE) + elif isinstance(var, str): + # First item is number of items + num_items = int(data_port.recv()[0]) + + s = [] + for i in range(num_items): + s.append(int(data_port.recv()[0])) # decode string from ascii + + s = bytes(s).decode("ascii") + setattr(self, var_name, s) + self.process_to_service.send(MGMT_RESPONSE.SET_COMPLETE) + else: self.process_to_service.send(MGMT_RESPONSE.ERROR) raise RuntimeError("Unsupported type") + # notify PM that Vars have been changed + self.on_var_update() + def _handle_var_port(self, var_port): """Handles read/write requests on the given VarPort.""" var_port.service() @@ -166,7 +192,7 @@ def run(self): is informed about completion. The loop ends when the STOP command is received.""" while True: - if self._action == 'cmd': + if self._action == "cmd": cmd = self.service_to_process.recv()[0] try: if cmd in self._cmd_handlers: @@ -178,7 +204,8 @@ def run(self): f"Illegal RuntimeService command! ProcessModels of " f"type {self.__class__.__qualname__} " f"{self.model_id} cannot handle " - f"command: {cmd} ") + f"command: {cmd} " + ) except Exception as inst: # Inform runtime service about termination self.process_to_service.send(MGMT_RESPONSE.ERROR) @@ -187,7 +214,7 @@ def run(self): else: # Handle VarPort requests from RefPorts self._handle_var_port(self._action) - self._channel_actions = [(self.service_to_process, lambda: 'cmd')] + self._channel_actions = [(self.service_to_process, lambda: "cmd")] self.add_ports_for_polling() self._action = self._selector.select(*self._channel_actions) @@ -207,6 +234,12 @@ def join(self): for p in self.py_ports: p.join() + def on_var_update(self): + """This method is called if a Var is updated. It + can be used as callback function to calculate dependent + changes.""" + pass + class PyLoihiProcessModel(AbstractPyProcessModel): """ @@ -236,13 +269,15 @@ def __init__(self, proc_params: ty.Optional["ProcessParameters"] = None): super().__init__(proc_params=proc_params) self.time_step = 0 self.phase = PyLoihiProcessModel.Phase.SPK - self._cmd_handlers.update({ - PyLoihiProcessModel.Phase.SPK[0]: self._spike, - PyLoihiProcessModel.Phase.PRE_MGMT[0]: self._pre_mgmt, - PyLoihiProcessModel.Phase.LRN[0]: self._lrn, - PyLoihiProcessModel.Phase.POST_MGMT[0]: self._post_mgmt, - PyLoihiProcessModel.Phase.HOST[0]: self._host - }) + self._cmd_handlers.update( + { + PyLoihiProcessModel.Phase.SPK[0]: self._spike, + PyLoihiProcessModel.Phase.PRE_MGMT[0]: self._pre_mgmt, + PyLoihiProcessModel.Phase.LRN[0]: self._lrn, + PyLoihiProcessModel.Phase.POST_MGMT[0]: self._post_mgmt, + PyLoihiProcessModel.Phase.HOST[0]: self._host, + } + ) self._req_pause: bool = False self._req_stop: bool = False @@ -250,6 +285,7 @@ class Phase: """ Different States of the State Machine of a Loihi Process """ + SPK = enum_to_np(1) PRE_MGMT = enum_to_np(2) LRN = enum_to_np(3) @@ -260,6 +296,7 @@ class Response: """ Different types of response for a RuntimeService Request """ + STATUS_DONE = enum_to_np(0) """Signfies Ack or Finished with the Command""" STATUS_TERMINATED = enum_to_np(-1) @@ -336,16 +373,20 @@ def _spike(self): return if self.lrn_guard() and self.pre_guard(): self.process_to_service.send( - PyLoihiProcessModel.Response.REQ_PRE_LRN_MGMT) + PyLoihiProcessModel.Response.REQ_PRE_LRN_MGMT + ) elif self.lrn_guard(): self.process_to_service.send( - PyLoihiProcessModel.Response.REQ_LEARNING) + PyLoihiProcessModel.Response.REQ_LEARNING + ) elif self.post_guard(): self.process_to_service.send( - PyLoihiProcessModel.Response.REQ_POST_LRN_MGMT) + PyLoihiProcessModel.Response.REQ_POST_LRN_MGMT + ) else: self.process_to_service.send( - PyLoihiProcessModel.Response.STATUS_DONE) + PyLoihiProcessModel.Response.STATUS_DONE + ) def _pre_mgmt(self): """ @@ -357,8 +398,7 @@ def _pre_mgmt(self): if self._req_pause or self._req_stop: self._handle_pause_or_stop_req() return - self.process_to_service.send( - PyLoihiProcessModel.Response.REQ_LEARNING) + self.process_to_service.send(PyLoihiProcessModel.Response.REQ_LEARNING) def _post_mgmt(self): """ @@ -384,7 +424,8 @@ def _lrn(self): return if self.post_guard(): self.process_to_service.send( - PyLoihiProcessModel.Response.REQ_POST_LRN_MGMT) + PyLoihiProcessModel.Response.REQ_POST_LRN_MGMT + ) return self.process_to_service.send(PyLoihiProcessModel.Response.STATUS_DONE) @@ -399,15 +440,15 @@ def _stop(self): Command handler for Stop Command. """ self.process_to_service.send( - PyLoihiProcessModel.Response.STATUS_TERMINATED) + PyLoihiProcessModel.Response.STATUS_TERMINATED + ) self.join() def _pause(self): """ Command handler for Pause Command. """ - self.process_to_service.send( - PyLoihiProcessModel.Response.STATUS_PAUSED) + self.process_to_service.send(PyLoihiProcessModel.Response.STATUS_PAUSED) def _handle_pause_or_stop_req(self): """ @@ -437,17 +478,21 @@ def add_ports_for_polling(self): """ Add various ports to poll for communication on ports """ - if enum_equal(self.phase, PyLoihiProcessModel.Phase.PRE_MGMT) or \ - enum_equal(self.phase, PyLoihiProcessModel.Phase.POST_MGMT) \ - or enum_equal(self.phase, PyLoihiProcessModel.Phase.HOST): + if ( + enum_equal(self.phase, PyLoihiProcessModel.Phase.PRE_MGMT) + or enum_equal(self.phase, PyLoihiProcessModel.Phase.POST_MGMT) + or enum_equal(self.phase, PyLoihiProcessModel.Phase.HOST) + ): for var_port in self.var_ports: for csp_port in var_port.csp_ports: if isinstance(csp_port, CspRecvPort): + def func(fvar_port=var_port): return lambda: fvar_port - self._channel_actions.insert(0, - (csp_port, func(var_port))) + self._channel_actions.insert( + 0, (csp_port, func(var_port)) + ) class PyAsyncProcessModel(AbstractPyProcessModel): @@ -481,14 +526,13 @@ class PyAsyncProcessModel(AbstractPyProcessModel): def __init__(self, proc_params: ty.Optional["ProcessParameters"] = None): super().__init__(proc_params=proc_params) self.num_steps = 0 - self._cmd_handlers.update({ - MGMT_COMMAND.RUN[0]: self._run_async - }) + self._cmd_handlers.update({MGMT_COMMAND.RUN[0]: self._run_async}) class Response: """ Different types of response for a RuntimeService Request """ + STATUS_DONE = enum_to_np(0) """Signifies Ack or Finished with the Command""" STATUS_TERMINATED = enum_to_np(-1) @@ -543,7 +587,7 @@ def add_ports_for_polling(self): def _get_attr_dict( - model_class: ty.Type[PyLoihiProcessModel] + model_class: ty.Type[PyLoihiProcessModel], ) -> ty.Dict[str, ty.Any]: """Get a dictionary of non-callable public attributes of a class. @@ -557,10 +601,14 @@ def _get_attr_dict( ty.Dict[str, ty.Any] Dictionary of attribute name and it's value. """ - var_names = [v for v, m in vars(model_class).items() - if not (v.startswith('_') or callable(m))] - var_dict = {var_name: getattr(model_class, var_name) - for var_name in var_names} + var_names = [ + v + for v, m in vars(model_class).items() + if not (v.startswith("_") or callable(m)) + ] + var_dict = { + var_name: getattr(model_class, var_name) for var_name in var_names + } if model_class == PyLoihiProcessModel: return {} for base in model_class.__bases__: @@ -569,7 +617,7 @@ def _get_attr_dict( def _get_callable_dict( - model_class: ty.Type[PyLoihiProcessModel] + model_class: ty.Type[PyLoihiProcessModel], ) -> ty.Dict[str, ty.Callable]: """Get a dictionary of callable public members of a class. @@ -583,10 +631,15 @@ def _get_callable_dict( ty.Dict[str, ty.Callable] Dictionary of callable name and it's pointer. """ - callable_names = [v for v, m in vars(model_class).items() - if callable(m) and not v.startswith('_')] - callable_dict = {callable_name: getattr(model_class, callable_name) - for callable_name in callable_names} + callable_names = [ + v + for v, m in vars(model_class).items() + if callable(m) and not v.startswith("_") + ] + callable_dict = { + callable_name: getattr(model_class, callable_name) + for callable_name in callable_names + } if model_class == PyLoihiProcessModel: return {} for base in model_class.__bases__: @@ -595,7 +648,7 @@ def _get_callable_dict( def PyLoihiModelToPyAsyncModel( - py_loihi_model: ty.Type[PyLoihiProcessModel] + py_loihi_model: ty.Type[PyLoihiProcessModel], ) -> ty.Type[PyAsyncProcessModel]: """Factory function that converts Py-Loihi process models to equivalent Py-Async definition. @@ -615,34 +668,47 @@ class name is the original loihi process model class name with Async """ # The exclude_vars and exclude_callables are # based on the constructor of PyLoihiProcessModel and PyAsyncProcModel - if platform.system() == 'Windows': - raise OSError('Conversion of process models on the fly is not ' - 'supported on Windows system. It will result in ' - 'pickling error when lava threads for model execution ' - 'are spawned. The fundamental reason is Windows OS ' - 'does not support forking and needs to use pickle.') - - exclude_vars = ['time_step', 'phase'] - exclude_callables = ['run_spk', - 'pre_guard', 'run_pre_mgmt', - 'post_guard', 'run_post_mgmt', - 'implements_process', 'implements_protocol'] - name = py_loihi_model.__name__ + 'Async' + if platform.system() == "Windows": + raise OSError( + "Conversion of process models on the fly is not " + "supported on Windows system. It will result in " + "pickling error when lava threads for model execution " + "are spawned. The fundamental reason is Windows OS " + "does not support forking and needs to use pickle." + ) + + exclude_vars = ["time_step", "phase"] + exclude_callables = [ + "run_spk", + "pre_guard", + "run_pre_mgmt", + "post_guard", + "run_post_mgmt", + "implements_process", + "implements_protocol", + ] + name = py_loihi_model.__name__ + "Async" var_dict = _get_attr_dict(py_loihi_model) - var_dict['implements_process'] = py_loihi_model.implements_process - var_dict['implements_protocol'] = AsyncProtocol - callable_dict = {k: v for k, v in _get_callable_dict(py_loihi_model).items() - if k not in exclude_callables} + var_dict["implements_process"] = py_loihi_model.implements_process + var_dict["implements_protocol"] = AsyncProtocol + callable_dict = { + k: v + for k, v in _get_callable_dict(py_loihi_model).items() + if k not in exclude_callables + } def __init__(self, proc_params: dict): # New constructor of the PyAsyncModel implementation. PyAsyncProcessModel.__init__(self, proc_params) ref_model = py_loihi_model(proc_params) - attributes = [v for v, m in vars(ref_model).items() - if not (v.startswith('_') or callable(m)) - and v not in var_dict.keys() - and v not in vars(self) - and v not in exclude_vars] + attributes = [ + v + for v, m in vars(ref_model).items() + if not (v.startswith("_") or callable(m)) + and v not in var_dict.keys() + and v not in vars(self) + and v not in exclude_vars + ] for attr in attributes: setattr(self, attr, getattr(ref_model, attr)) self.time_step = 1 @@ -657,9 +723,14 @@ def run_async(self) -> None: py_loihi_model.run_post_mgmt(self) self.time_step += 1 - py_async_model = type(name, (PyAsyncProcessModel,), - {'__init__': __init__, - 'run_async': run_async, - **var_dict, - **callable_dict}) + py_async_model = type( + name, + (PyAsyncProcessModel,), + { + "__init__": __init__, + "run_async": run_async, + **var_dict, + **callable_dict, + }, + ) return py_async_model diff --git a/src/lava/magma/core/process/connection.py b/src/lava/magma/core/process/connection.py index e52ff5330..934f1fddf 100644 --- a/src/lava/magma/core/process/connection.py +++ b/src/lava/magma/core/process/connection.py @@ -53,18 +53,19 @@ class LearningConnectionProcess(AbstractProcess): learning_rule: LoihiLearningRule Learning rule which determines the parameters for online learning. """ + def __init__( self, shape: tuple, - learning_rule: ty.Optional[LoihiLearningRule] = None, + learning_rule: ty.Optional[LoihiLearningRule], **kwargs, ): kwargs["learning_rule"] = learning_rule kwargs["shape"] = shape - tag_1 = kwargs.get('tag_1', 0) - tag_2 = kwargs.get('tag_2', 0) + tag_1 = kwargs.get("tag_1", 0) + tag_2 = kwargs.get("tag_2", 0) - self.learning_rule = learning_rule + self._learning_rule = learning_rule # Learning Ports self.s_in_bap = InPort(shape=(shape[0],)) @@ -87,4 +88,20 @@ def __init__( self.tag_1 = Var(shape=shape, init=tag_1) self.tag_2 = Var(shape=shape, init=tag_2) + self.dw = Var(shape=(256,), init=learning_rule.dw_str) + self.dd = Var(shape=(256,), init=learning_rule.dd_str) + self.dt = Var(shape=(256,), init=learning_rule.dt_str) + + self.x1_tau = Var(shape=(1,), init=learning_rule.x1_tau) + self.x1_impulse = Var(shape=(1,), init=learning_rule.x1_impulse) + self.x2_tau = Var(shape=(1,), init=learning_rule.x2_tau) + self.x2_impulse = Var(shape=(1,), init=learning_rule.x2_impulse) + + self.y1_tau = Var(shape=(1,), init=learning_rule.y1_tau) + self.y1_impulse = Var(shape=(1,), init=learning_rule.y1_impulse) + self.y2_tau = Var(shape=(1,), init=learning_rule.y2_tau) + self.y2_impulse = Var(shape=(1,), init=learning_rule.y2_impulse) + self.y3_tau = Var(shape=(1,), init=learning_rule.y3_tau) + self.y3_impulse = Var(shape=(1,), init=learning_rule.y3_impulse) + super().__init__(**kwargs) diff --git a/src/lava/magma/core/process/neuron.py b/src/lava/magma/core/process/neuron.py index 4da34a912..3a99008a0 100644 --- a/src/lava/magma/core/process/neuron.py +++ b/src/lava/magma/core/process/neuron.py @@ -21,14 +21,16 @@ class LearningNeuronProcess: Learning rule which determines the parameters for online learning. """ - def __init__(self, - shape: ty.Tuple[int, ...], - learning_rule: LoihiLearningRule, - *args, - **kwargs): - - kwargs['shape'] = shape - kwargs['learning_rule'] = learning_rule + + def __init__( + self, + shape: ty.Tuple[int, ...], + learning_rule: LoihiLearningRule, + *args, + **kwargs, + ): + kwargs["shape"] = shape + kwargs["learning_rule"] = learning_rule # Learning Ports self.a_third_factor_in = InPort(shape=(shape[0],)) diff --git a/src/lava/magma/core/process/variable.py b/src/lava/magma/core/process/variable.py index a2646eb1b..54ffaa58f 100644 --- a/src/lava/magma/core/process/variable.py +++ b/src/lava/magma/core/process/variable.py @@ -4,8 +4,10 @@ import typing as ty import numpy as np -from lava.magma.core.process.interfaces import \ - AbstractProcessMember, IdGeneratorSingleton +from lava.magma.core.process.interfaces import ( + AbstractProcessMember, + IdGeneratorSingleton, +) class Var(AbstractProcessMember): @@ -40,10 +42,11 @@ class Var(AbstractProcessMember): """ def __init__( - self, - shape: ty.Tuple[int, ...], - init: ty.Union[bool, float, list, tuple, np.ndarray] = 0, - shareable: bool = True): + self, + shape: ty.Tuple[int, ...], + init: ty.Union[bool, float, list, tuple, np.ndarray] = 0, + shareable: bool = True, + ): """Initializes a new Lava variable. Parameters: @@ -68,10 +71,10 @@ def model(self): return self._model @model.setter - def model(self, val: 'AbstractVarModel'): + def model(self, val: "AbstractVarModel"): self._model = val - def alias(self, other_var: 'Var'): + def alias(self, other_var: "Var"): """Establishes an 'alias' relationship between this and 'other_var'. The other Var must be a member of a strict sub processes of this Var's parent process which might be instantiated within a @@ -88,11 +91,14 @@ def alias(self, other_var: 'Var'): if not isinstance(other_var, Var): raise AssertionError("'other_var' must be a Var instance.") if self.shape != other_var.shape: - raise AssertionError("Shapes of this and 'other_var' must " - "be the same.") + raise AssertionError( + "Shapes of this and 'other_var' must " "be the same." + ) if self.shareable != other_var.shareable: - raise AssertionError("'shareable' attribute of this and " - "'other_var' must be the same.") + raise AssertionError( + "'shareable' attribute of this and " + "'other_var' must be the same." + ) # Establish 'alias' relationship self.aliased_var = other_var @@ -116,20 +122,27 @@ def validate_alias(self): f"must be a member of a process that is a strict sub " f"process of the aliasing Var's '{self.name}' in process " f"'{self.process.name}::{self.process.__class__.__name__}'" - f".") + f"." + ) - def set(self, value: np.ndarray, idx: np.ndarray = None): + def set(self, value: ty.Union[np.ndarray, str], idx: np.ndarray = None): """Sets value of Var. If this Var aliases another Var, then set(..) is delegated to aliased Var.""" if self.aliased_var is not None: self.aliased_var.set(value, idx) else: if self.process.runtime: + # encode if var is str + if isinstance(value, str): + value = np.array( + list(value.encode("ascii")), dtype=np.int32 + ) self.process.runtime.set_var(self.id, value, idx) else: raise ValueError( "No Runtime available yet. Cannot set new 'Var' without " - "Runtime.") + "Runtime." + ) def get(self, idx: np.ndarray = None) -> np.ndarray: """Gets and returns value of Var. If this Var aliases another Var, @@ -138,7 +151,12 @@ def get(self, idx: np.ndarray = None) -> np.ndarray: return self.aliased_var.get(idx) else: if self.process.runtime: - return self.process.runtime.get_var(self.id, idx) + buffer = self.process.runtime.get_var(self.id, idx) + if isinstance(self.init, str): + # decode if var is string + return bytes(buffer.astype(int).tolist()).decode("ascii") + else: + return buffer else: return self.init diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index bbf19a631..903daa572 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -499,7 +499,7 @@ def get_var(self, var_id: int, idx: np.ndarray = None) -> np.ndarray: # 2. Receive Data [NUM_ITEMS, DATA1, DATA2, ...] data_port: CspRecvPort = self.service_to_runtime[runtime_srv_id] num_items: int = int(data_port.recv()[0].item()) - buffer: np.ndarray = np.empty((1, num_items)) + buffer: np.ndarray = np.zeros((1, np.prod(ev.shape))) for i in range(num_items): buffer[0, i] = data_port.recv()[0] diff --git a/tests/lava/magma/core/learning/test_learning_rule.py b/tests/lava/magma/core/learning/test_learning_rule.py index 5c4b9c9de..7b1384f00 100644 --- a/tests/lava/magma/core/learning/test_learning_rule.py +++ b/tests/lava/magma/core/learning/test_learning_rule.py @@ -3,9 +3,85 @@ # See: https://spdx.org/licenses/ import unittest +import numpy as np -from lava.magma.core.learning.learning_rule import LoihiLearningRule +from lava.magma.core.learning.learning_rule import ( + LoihiLearningRule, + Loihi2FLearningRule, + Loihi3FLearningRule, +) from lava.magma.core.learning.product_series import ProductSeries +from lava.magma.core.run_conditions import RunSteps +from lava.magma.core.run_configs import Loihi2SimCfg +from lava.proc.lif.process import LIF, LearningLIF +from lava.proc.dense.process import LearningDense, Dense +from lava.proc.monitor.process import Monitor +from lava.proc.io.source import RingBuffer as SpikeIn +from lava.magma.core.model.py.neuron import ( + LearningNeuronModelFloat, + LearningNeuronModelFixed, +) +from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol +from lava.magma.core.model.py.ports import PyInPort, PyOutPort +from lava.magma.core.model.py.type import LavaPyType +from lava.magma.core.resources import CPU +from lava.magma.core.decorator import implements, requires, tag +from lava.proc.lif.models import ( + AbstractPyLifModelFloat, + AbstractPyLifModelFixed, +) +from lava.proc.io.source import RingBuffer as SpikeIn + + +def create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + target="pre", + precision="floating_pt", +): + if precision == "floating_pt": + du = 1 + dv = 1 + vth = 1 + elif precision == "fixed_pt": + du = 4095 + dv = 4095 + vth = 1 + + pre_spikes = np.zeros((size, num_steps * 2)) + pre_spikes[size - 1, t_spike] = 1 + pre_spikes[size - 1, t_spike + num_steps] = 1 + + spike_gen = SpikeIn(data=pre_spikes) + + dense_inp = Dense(weights=np.eye(size, size) * 2.0) + + lif_0 = LIF( + shape=(size,), du=du, dv=dv, vth=vth, bias_mant=0, name="lif_pre" + ) + + dense = LearningDense( + weights=weights_init, learning_rule=learning_rule, name="plastic_dense" + ) + + lif_1 = LIF( + shape=(size,), du=du, dv=dv, vth=vth, bias_mant=0, name="lif_post" + ) + + spike_gen.s_out.connect(dense_inp.s_in) + if target == "pre": + dense_inp.a_out.connect(lif_0.a_in) + elif target == "post": + dense_inp.a_out.connect(lif_1.a_in) + + lif_0.s_out.connect(dense.s_in) + dense.a_out.connect(lif_1.a_in) + lif_1.s_out.connect(dense.s_in_bap) + + return spike_gen, dense_inp, lif_0, dense, lif_1 class TestLoihiLearningRule(unittest.TestCase): @@ -13,15 +89,19 @@ def test_learning_rule_dw(self) -> None: """Tests that a LoihiLearningRule is instantiable with string learning rule for dw, impulse and tau values for x1 and y1, and t_epoch.""" - dw = 'x0*(-1)*2^-1*y1 + y0*1*2^1*x1' + dw = "x0*(-1)*2^-1*y1 + y0*1*2^1*x1" impulse = 16 tau = 10 t_epoch = 1 - learning_rule = LoihiLearningRule(dw=dw, - x1_impulse=impulse, x1_tau=tau, - y1_impulse=impulse, y1_tau=tau, - t_epoch=t_epoch) + learning_rule = LoihiLearningRule( + dw=dw, + x1_impulse=impulse, + x1_tau=tau, + y1_impulse=impulse, + y1_tau=tau, + t_epoch=t_epoch, + ) self.assertIsInstance(learning_rule, LoihiLearningRule) self.assertIsInstance(learning_rule.dw, ProductSeries) @@ -35,25 +115,30 @@ def test_learning_rule_dw(self) -> None: self.assertEqual(learning_rule.decimate_exponent, None) self.assertEqual(len(learning_rule.active_product_series), 1) self.assertSetEqual(learning_rule.active_traces, {"x1", "y1"}) - self.assertDictEqual(learning_rule.active_traces_per_dependency, { - "x0": {"y1"}, - "y0": {"x1"} - }) + self.assertDictEqual( + learning_rule.active_traces_per_dependency, + {"x0": {"y1"}, "y0": {"x1"}}, + ) def test_learning_rule_dw_dd(self) -> None: """Tests that a LoihiLearningRule is instantiable with string learning rule for dw and dd, impulse and tau values for x1 and y1, and t_epoch.""" - dw = 'x0*(-1)*2^-1*y1 + y0*1*2^1*x1' - dd = 'x0*y2*w' + dw = "x0*(-1)*2^-1*y1 + y0*1*2^1*x1" + dd = "x0*y2*w" impulse = 16 tau = 10 t_epoch = 1 - learning_rule = LoihiLearningRule(dw=dw, dd=dd, - x1_impulse=impulse, x1_tau=tau, - y1_impulse=impulse, y1_tau=tau, - t_epoch=t_epoch) + learning_rule = LoihiLearningRule( + dw=dw, + dd=dd, + x1_impulse=impulse, + x1_tau=tau, + y1_impulse=impulse, + y1_tau=tau, + t_epoch=t_epoch, + ) self.assertIsInstance(learning_rule, LoihiLearningRule) self.assertIsInstance(learning_rule.dw, ProductSeries) @@ -67,26 +152,32 @@ def test_learning_rule_dw_dd(self) -> None: self.assertEqual(learning_rule.decimate_exponent, None) self.assertEqual(len(learning_rule.active_product_series), 2) self.assertSetEqual(learning_rule.active_traces, {"x1", "y1", "y2"}) - self.assertDictEqual(learning_rule.active_traces_per_dependency, { - "x0": {"y1", "y2"}, - "y0": {"x1"} - }) + self.assertDictEqual( + learning_rule.active_traces_per_dependency, + {"x0": {"y1", "y2"}, "y0": {"x1"}}, + ) def test_learning_rule_dw_dd_dt(self) -> None: """Tests that a LoihiLearningRule is instantiable with string learning rule for dw, dd and dt, impulse and tau values for x1 and y1, and t_epoch.""" - dw = 'x0*(-1)*2^-1*y1 + y0*1*2^1*x1' - dd = 'x0*y2*w' - dt = 'x0*y3*sgn(d) + y0*x2' + dw = "x0*(-1)*2^-1*y1 + y0*1*2^1*x1" + dd = "x0*y2*w" + dt = "x0*y3*sgn(d) + y0*x2" impulse = 16 tau = 10 t_epoch = 1 - learning_rule = LoihiLearningRule(dw=dw, dd=dd, dt=dt, - x1_impulse=impulse, x1_tau=tau, - y1_impulse=impulse, y1_tau=tau, - t_epoch=t_epoch) + learning_rule = LoihiLearningRule( + dw=dw, + dd=dd, + dt=dt, + x1_impulse=impulse, + x1_tau=tau, + y1_impulse=impulse, + y1_tau=tau, + t_epoch=t_epoch, + ) self.assertIsInstance(learning_rule, LoihiLearningRule) self.assertIsInstance(learning_rule.dw, ProductSeries) @@ -99,26 +190,32 @@ def test_learning_rule_dw_dd_dt(self) -> None: self.assertEqual(learning_rule.t_epoch, t_epoch) self.assertEqual(learning_rule.decimate_exponent, None) self.assertEqual(len(learning_rule.active_product_series), 3) - self.assertSetEqual(learning_rule.active_traces, - {"x1", "x2", "y1", "y2", "y3"}) - self.assertDictEqual(learning_rule.active_traces_per_dependency, { - "x0": {"y1", "y2", "y3"}, - "y0": {"x1", "x2"} - }) + self.assertSetEqual( + learning_rule.active_traces, {"x1", "x2", "y1", "y2", "y3"} + ) + self.assertDictEqual( + learning_rule.active_traces_per_dependency, + {"x0": {"y1", "y2", "y3"}, "y0": {"x1", "x2"}}, + ) def test_learning_rule_uk_dependency(self) -> None: """Tests that a LoihiLearningRule is instantiable with a string learning rule containing a uk dependency.""" - dw = 'x0*(-1)*2^-1*y1 + y0*1*2^1*x1' - dd = 'u0*x2*y2' + dw = "x0*(-1)*2^-1*y1 + y0*1*2^1*x1" + dd = "u0*x2*y2" impulse = 16 tau = 10 t_epoch = 1 - learning_rule = LoihiLearningRule(dw=dw, dd=dd, - x1_impulse=impulse, x1_tau=tau, - y1_impulse=impulse, y1_tau=tau, - t_epoch=t_epoch) + learning_rule = LoihiLearningRule( + dw=dw, + dd=dd, + x1_impulse=impulse, + x1_tau=tau, + y1_impulse=impulse, + y1_tau=tau, + t_epoch=t_epoch, + ) self.assertIsInstance(learning_rule, LoihiLearningRule) self.assertIsInstance(learning_rule.dw, ProductSeries) @@ -131,86 +228,1498 @@ def test_learning_rule_uk_dependency(self) -> None: self.assertEqual(learning_rule.t_epoch, t_epoch) self.assertEqual(learning_rule.decimate_exponent, 0) self.assertEqual(len(learning_rule.active_product_series), 2) - self.assertSetEqual(learning_rule.active_traces, - {"x1", "x2", "y1", "y2"}) - self.assertDictEqual(learning_rule.active_traces_per_dependency, { - "x0": {"y1"}, - "y0": {"x1"}, - "u": {"x2", "y2"} - }) + self.assertSetEqual( + learning_rule.active_traces, {"x1", "x2", "y1", "y2"} + ) + self.assertDictEqual( + learning_rule.active_traces_per_dependency, + {"x0": {"y1"}, "y0": {"x1"}, "u": {"x2", "y2"}}, + ) def test_invalid_impulse(self) -> None: """Tests that instantiating a LoihiLearningRule throws error when impulse is negative.""" - dw = 'x0*(-1)*2^-1*y1 + y0*1*2^1*x1' + dw = "x0*(-1)*2^-1*y1 + y0*1*2^1*x1" impulse = -16 tau = 10 t_epoch = 1 with self.assertRaises(ValueError): - LoihiLearningRule(dw=dw, - x1_impulse=impulse, x1_tau=tau, - y1_impulse=impulse, y1_tau=tau, - t_epoch=t_epoch) + LoihiLearningRule( + dw=dw, + x1_impulse=impulse, + x1_tau=tau, + y1_impulse=impulse, + y1_tau=tau, + t_epoch=t_epoch, + ) def test_invalid_tau(self) -> None: """Tests that instantiating a LoihiLearningRule throws error when tau is negative.""" - dw = 'x0*(-1)*2^-1*y1 + y0*1*2^1*x1' + dw = "x0*(-1)*2^-1*y1 + y0*1*2^1*x1" impulse = 16 tau = -10 t_epoch = 1 with self.assertRaises(ValueError): - LoihiLearningRule(dw=dw, - x1_impulse=impulse, x1_tau=tau, - y1_impulse=impulse, y1_tau=tau, - t_epoch=t_epoch) + LoihiLearningRule( + dw=dw, + x1_impulse=impulse, + x1_tau=tau, + y1_impulse=impulse, + y1_tau=tau, + t_epoch=t_epoch, + ) def test_invalid_t_epoch(self) -> None: """Tests that instantiating a LoihiLearningRule throws error when t_epoch is negative.""" - dw = 'x0*(-1)*2^-1*y1 + y0*1*2^1*x1' + dw = "x0*(-1)*2^-1*y1 + y0*1*2^1*x1" impulse = 16 tau = 10 t_epoch = -1 with self.assertRaises(ValueError): - LoihiLearningRule(dw=dw, - x1_impulse=impulse, x1_tau=tau, - y1_impulse=impulse, y1_tau=tau, - t_epoch=t_epoch) + LoihiLearningRule( + dw=dw, + x1_impulse=impulse, + x1_tau=tau, + y1_impulse=impulse, + y1_tau=tau, + t_epoch=t_epoch, + ) def test_different_decimate_exponent_same_learning_rule(self) -> None: """Tests that instantiating a LoihiLearningRule throws error when providing a learning rule with uk dependencies with different decimate exponents.""" - dw = 'x0*(-1)*2^-1*y1 + y0*1*2^1*x1 + u0*x2*y2 + u1*y3' + dw = "x0*(-1)*2^-1*y1 + y0*1*2^1*x1 + u0*x2*y2 + u1*y3" impulse = 16 tau = 10 t_epoch = 1 with self.assertRaises(ValueError): - LoihiLearningRule(dw=dw, - x1_impulse=impulse, x1_tau=tau, - y1_impulse=impulse, y1_tau=tau, - t_epoch=t_epoch) + LoihiLearningRule( + dw=dw, + x1_impulse=impulse, + x1_tau=tau, + y1_impulse=impulse, + y1_tau=tau, + t_epoch=t_epoch, + ) def test_different_decimate_exponent_different_learning_rule(self) -> None: """Tests that instantiating a LoihiLearningRule throws error when providing different learning rules with uk dependencies with different decimate exponents.""" - dw = 'x0*(-1)*2^-1*y1 + y0*1*2^1*x1 + u1*y3' - dd = 'u0*x2*y2' + dw = "x0*(-1)*2^-1*y1 + y0*1*2^1*x1 + u1*y3" + dd = "u0*x2*y2" impulse = 16 tau = 10 t_epoch = 1 with self.assertRaises(ValueError): - LoihiLearningRule(dw=dw, dd=dd, - x1_impulse=impulse, x1_tau=tau, - y1_impulse=impulse, y1_tau=tau, - t_epoch=t_epoch) + LoihiLearningRule( + dw=dw, + dd=dd, + x1_impulse=impulse, + x1_tau=tau, + y1_impulse=impulse, + y1_tau=tau, + t_epoch=t_epoch, + ) + + def test_get_set_x1_tau_float(self) -> None: + """Tests changing x1_tau during runtime in a floating point + simulation""" + + dw = "x0 * x1 * 0" + t_epoch = 1 + x1_tau_init = 2 + x1_tau_new = 10 + x1_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, x1_tau=x1_tau_init, x1_impulse=x1_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, num_steps, t_spike, learning_rule, weights_init + ) + + mon_x1 = Monitor() + mon_x1.probe(dense.x1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + # reset x1 and set new tau + dense.x1.set(np.zeros(size)) + x1_tau_init_got = dense.x1_tau.get() + dense.x1_tau.set(np.ones(size) * x1_tau_new) + x1_tau_new_got = dense.x1_tau.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + x1 = mon_x1.get_data()["plastic_dense"]["x1"].flatten() + + lif_0.stop() + + assert x1_tau_init == x1_tau_init_got + assert x1_tau_new == x1_tau_new_got + + assert x1[t_spike + 1] == x1_impulse + assert x1[t_spike + 2] == x1_impulse * np.exp(-1 / x1_tau_init) + + assert x1[t_spike + num_steps + 1] == x1_impulse + assert x1[t_spike + num_steps + 2] == x1_impulse * np.exp( + -1 / x1_tau_new + ) + + def test_get_set_x1_impulse_float(self) -> None: + """Tests changing x1_impulse during runtime in a floating point + simulation""" + + dw = "x0 * x1 * 0" + t_epoch = 1 + x1_impulse_init = 16 + x1_impulse_new = 32 + x1_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, x1_tau=x1_tau, x1_impulse=x1_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, num_steps, t_spike, learning_rule, weights_init + ) + + mon_x1 = Monitor() + mon_x1.probe(dense.x1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + # reset x1 and set new impulse + dense.x1.set(np.zeros(size)) + x1_impulse_init_got = dense.x1_impulse.get() + dense.x1_impulse.set(np.ones(size) * x1_impulse_new) + x1_impulse_new_got = dense.x1_impulse.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + x1 = mon_x1.get_data()["plastic_dense"]["x1"].flatten() + + lif_0.stop() + + assert x1_impulse_init == x1_impulse_init_got + assert x1_impulse_new == x1_impulse_new_got + + assert x1[t_spike + 1] == x1_impulse_init + assert x1[t_spike + num_steps + 1] == x1_impulse_new + + def test_get_set_x2_tau_float(self) -> None: + """Tests changing x2_tau during runtime in a floating point + simulation""" + + dw = "x0 * x2 * 0" + t_epoch = 1 + x2_tau_init = 2 + x2_tau_new = 10 + x2_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, x2_tau=x2_tau_init, x2_impulse=x2_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, num_steps, t_spike, learning_rule, weights_init + ) + + mon_x2 = Monitor() + mon_x2.probe(dense.x2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + # reset x2 and set new tau + dense.x2.set(np.zeros(size)) + x2_tau_init_got = dense.x2_tau.get() + dense.x2_tau.set(np.ones(size) * x2_tau_new) + x2_tau_new_got = dense.x2_tau.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + x2 = mon_x2.get_data()["plastic_dense"]["x2"].flatten() + + lif_0.stop() + + assert x2_tau_init == x2_tau_init_got + assert x2_tau_new == x2_tau_new_got + + assert x2[t_spike + 1] == x2_impulse + assert x2[t_spike + 2] == x2_impulse * np.exp(-1 / x2_tau_init) + + assert x2[t_spike + num_steps + 1] == x2_impulse + assert x2[t_spike + num_steps + 2] == x2_impulse * np.exp( + -1 / x2_tau_new + ) + + def test_get_set_x2_impulse_float(self) -> None: + """Tests changing x2_impulse during runtime in a floating point + simulation""" + + dw = "x0 * x2 * 0" + t_epoch = 1 + x2_impulse_init = 16 + x2_impulse_new = 32 + x2_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, x2_tau=x2_tau, x2_impulse=x2_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, num_steps, t_spike, learning_rule, weights_init + ) + + mon_x2 = Monitor() + mon_x2.probe(dense.x2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + # reset x2 and set new impulse + dense.x2.set(np.zeros(size)) + x2_impulse_init_got = dense.x2_impulse.get() + dense.x2_impulse.set(np.ones(size) * x2_impulse_new) + x2_impulse_new_got = dense.x2_impulse.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + x2 = mon_x2.get_data()["plastic_dense"]["x2"].flatten() + + lif_0.stop() + + assert x2_impulse_init == x2_impulse_init_got + assert x2_impulse_new == x2_impulse_new_got + + assert x2[t_spike + 1] == x2_impulse_init + assert x2[t_spike + num_steps + 1] == x2_impulse_new + + def test_get_set_y1_tau_float(self) -> None: + """Tests changing y1_tau during runtime in a floating point + simulation""" + + dw = "y0 * y1 * 0" + t_epoch = 1 + y1_tau_init = 2 + y1_tau_new = 10 + y1_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y1_tau=y1_tau_init, y1_impulse=y1_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, num_steps, t_spike, learning_rule, weights_init, target="post" + ) + + mon_y1 = Monitor() + mon_y1.probe(dense.y1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y1 and set new tau + dense.y1.set(np.zeros(size)) + y1_tau_init_got = dense.y1_tau.get() + dense.y1_tau.set(np.ones(size) * y1_tau_new) + y1_tau_new_got = dense.y1_tau.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y1 = mon_y1.get_data()["plastic_dense"]["y1"].flatten() + + lif_1.stop() + + assert y1_tau_init == y1_tau_init_got + assert y1_tau_new == y1_tau_new_got + + assert y1[t_spike + 1] == y1_impulse + assert y1[t_spike + 2] == y1_impulse * np.exp(-1 / y1_tau_init) + + assert y1[t_spike + num_steps + 1] == y1_impulse + assert y1[t_spike + num_steps + 2] == y1_impulse * np.exp( + -1 / y1_tau_new + ) + + def test_get_set_y1_impulse_float(self) -> None: + """Tests changing y1_impulse during runtime in a floating point + simulation""" + + dw = "y0 * y1 * 0" + t_epoch = 1 + y1_impulse_init = 16 + y1_impulse_new = 32 + y1_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y1_tau=y1_tau, y1_impulse=y1_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, num_steps, t_spike, learning_rule, weights_init, target="post" + ) + + mon_y1 = Monitor() + mon_y1.probe(dense.y1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y1 and set new impulse + dense.y1.set(np.zeros(size)) + y1_impulse_init_got = dense.y1_impulse.get() + dense.y1_impulse.set(np.ones(size) * y1_impulse_new) + y1_impulse_new_got = dense.y1_impulse.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y1 = mon_y1.get_data()["plastic_dense"]["y1"].flatten() + + lif_1.stop() + + assert y1_impulse_init == y1_impulse_init_got + assert y1_impulse_new == y1_impulse_new_got + + assert y1[t_spike + 1] == y1_impulse_init + assert y1[t_spike + num_steps + 1] == y1_impulse_new + + def test_get_set_y2_tau_float(self) -> None: + """Tests changing y2_tau during runtime in a floating point + simulation""" + + dw = "y0 * y2 * 0" + t_epoch = 1 + y2_tau_init = 2 + y2_tau_new = 10 + y2_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y2_tau=y2_tau_init, y2_impulse=y2_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, num_steps, t_spike, learning_rule, weights_init, target="post" + ) + + mon_y2 = Monitor() + mon_y2.probe(dense.y2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y2 and set new tau + dense.y2.set(np.zeros(size)) + y2_tau_init_got = dense.y2_tau.get() + dense.y2_tau.set(np.ones(size) * y2_tau_new) + y2_tau_new_got = dense.y2_tau.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y2 = mon_y2.get_data()["plastic_dense"]["y2"].flatten() + + lif_1.stop() + + assert y2_tau_init == y2_tau_init_got + assert y2_tau_new == y2_tau_new_got + + assert y2[t_spike + 1] == y2_impulse + assert y2[t_spike + 2] == y2_impulse * np.exp(-1 / y2_tau_init) + + assert y2[t_spike + num_steps + 1] == y2_impulse + assert y2[t_spike + num_steps + 2] == y2_impulse * np.exp( + -1 / y2_tau_new + ) + + def test_get_set_y2_impulse_float(self) -> None: + """Tests changing y2_impulse during runtime in a floating point + simulation""" + + dw = "y0 * y2 * 0" + t_epoch = 1 + y2_impulse_init = 16 + y2_impulse_new = 32 + y2_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y2_tau=y2_tau, y2_impulse=y2_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, num_steps, t_spike, learning_rule, weights_init, target="post" + ) + + mon_y2 = Monitor() + mon_y2.probe(dense.y2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y2 and set new impulse + dense.y2.set(np.zeros(size)) + y2_impulse_init_got = dense.y2_impulse.get() + dense.y2_impulse.set(np.ones(size) * y2_impulse_new) + y2_impulse_new_got = dense.y2_impulse.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y2 = mon_y2.get_data()["plastic_dense"]["y2"].flatten() + + lif_1.stop() + + assert y2_impulse_init == y2_impulse_init_got + assert y2_impulse_new == y2_impulse_new_got + + assert y2[t_spike + 1] == y2_impulse_init + assert y2[t_spike + num_steps + 1] == y2_impulse_new + + def test_get_set_y3_tau_float(self) -> None: + """Tests changing y3_tau during runtime in a floating point + simulation""" + + dw = "y0 * y3 * 0" + t_epoch = 1 + y3_tau_init = 2 + y3_tau_new = 10 + y3_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y3_tau=y3_tau_init, y3_impulse=y3_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, num_steps, t_spike, learning_rule, weights_init, target="post" + ) + + mon_y3 = Monitor() + mon_y3.probe(dense.y3, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y3 and set new tau + dense.y3.set(np.zeros(size)) + y3_tau_init_got = dense.y3_tau.get() + dense.y3_tau.set(np.ones(size) * y3_tau_new) + y3_tau_new_got = dense.y3_tau.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y3 = mon_y3.get_data()["plastic_dense"]["y3"].flatten() + + lif_1.stop() + + assert y3_tau_init == y3_tau_init_got + assert y3_tau_new == y3_tau_new_got + + assert y3[t_spike + 1] == y3_impulse + assert y3[t_spike + 2] == y3_impulse * np.exp(-1 / y3_tau_init) + + assert y3[t_spike + num_steps + 1] == y3_impulse + assert y3[t_spike + num_steps + 2] == y3_impulse * np.exp( + -1 / y3_tau_new + ) + + def test_get_set_y3_impulse_float(self) -> None: + """Tests changing y3_impulse during runtime in a floating point + simulation""" + + dw = "y0 * y3 * 0" + t_epoch = 1 + y3_impulse_init = 16 + y3_impulse_new = 32 + y3_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y3_tau=y3_tau, y3_impulse=y3_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, num_steps, t_spike, learning_rule, weights_init, target="post" + ) + + mon_y3 = Monitor() + mon_y3.probe(dense.y3, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y3 and set new impulse + dense.y3.set(np.zeros(size)) + y3_impulse_init_got = dense.y3_impulse.get() + dense.y3_impulse.set(np.ones(size) * y3_impulse_new) + y3_impulse_new_got = dense.y3_impulse.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y3 = mon_y3.get_data()["plastic_dense"]["y3"].flatten() + + lif_1.stop() + + assert y3_impulse_init == y3_impulse_init_got + assert y3_impulse_new == y3_impulse_new_got + + assert y3[t_spike + 1] == y3_impulse_init + assert y3[t_spike + num_steps + 1] == y3_impulse_new + + def test_get_set_dw_float(self) -> None: + """Tests changing dw during runtime in a floating point + simulation""" + + dw_init = "u0 * 1" + dw_new = "u0 * 2 " + t_epoch = 1 + + learning_rule = Loihi2FLearningRule(dw=dw_init, t_epoch=t_epoch) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, num_steps, t_spike, learning_rule, weights_init + ) + + mon_weights = Monitor() + mon_weights.probe(dense.weights, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + dw_init_got = dense.dw.get() + + dense.dw.set(dw_new) + dw_new_got = dense.dw.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + weights = mon_weights.get_data()["plastic_dense"]["weights"].flatten() + + lif_0.stop() + + assert dw_init == dw_init_got[: len(dw_init)] + assert dw_new == dw_new_got[: len(dw_new)] + + assert weights[-1] == 30 + + def test_get_set_dt_float(self) -> None: + """Tests changing dt during runtime in a floating point + simulation""" + + dt_init = "u0 * 1" + dt_new = "u0 * 2 " + t_epoch = 1 + + learning_rule = Loihi2FLearningRule(dt=dt_init, t_epoch=t_epoch) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, num_steps, t_spike, learning_rule, weights_init + ) + + mon_tag1 = Monitor() + mon_tag1.probe(dense.tag_1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + dt_init_got = dense.dt.get() + + dense.dt.set(dt_new) + dt_new_got = dense.dt.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + tags = mon_tag1.get_data()["plastic_dense"]["tag_1"].flatten() + + lif_0.stop() + + assert dt_init == dt_init_got[: len(dt_init)] + assert dt_new == dt_new_got[: len(dt_new)] + + assert tags[-1] == 30 + + def test_get_set_dd_float(self) -> None: + """Tests changing dd during runtime in a floating point + simulation""" + + dd_init = "u0 * 1" + dd_new = "u0 * 2 " + t_epoch = 1 + + learning_rule = Loihi2FLearningRule(dd=dd_init, t_epoch=t_epoch) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, num_steps, t_spike, learning_rule, weights_init + ) + + mon_tag1 = Monitor() + mon_tag1.probe(dense.tag_2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="floating_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + dd_init_got = dense.dd.get() + + dense.dd.set(dd_new) + dd_new_got = dense.dd.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + delays = mon_tag1.get_data()["plastic_dense"]["tag_2"].flatten() + + lif_0.stop() + + assert dd_init == dd_init_got[: len(dd_init)] + assert dd_new == dd_new_got[: len(dd_new)] + + assert delays[-1] == 30 + + # fixed point + + def test_get_set_x1_tau_fixed(self) -> None: + """Tests changing x1_tau during runtime in a fixed point + simulation""" + + dw = "x0 * x1 * 0" + t_epoch = 1 + x1_tau_init = 2 + x1_tau_new = 10 + x1_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, x1_tau=x1_tau_init, x1_impulse=x1_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + precision="fixed_pt", + ) + + mon_x1 = Monitor() + mon_x1.probe(dense.x1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + # reset x1 and set new tau + dense.x1.set(np.zeros(size)) + x1_tau_init_got = dense.x1_tau.get() + dense.x1_tau.set(np.ones(size) * x1_tau_new) + x1_tau_new_got = dense.x1_tau.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + x1 = mon_x1.get_data()["plastic_dense"]["x1"].flatten() + + lif_0.stop() + + assert x1_tau_init == x1_tau_init_got + assert x1_tau_new == x1_tau_new_got + + assert x1[t_spike + 1] == x1_impulse + assert ( + np.abs(x1[t_spike + 2] - x1_impulse * np.exp(-1 / x1_tau_init)) < 1 + ) + + assert x1[t_spike + num_steps + 1] == x1_impulse + assert ( + np.abs( + x1[t_spike + num_steps + 2] + - x1_impulse * np.exp(-1 / x1_tau_new) + ) + < 1 + ) + + def test_get_set_x1_impulse_fixed(self) -> None: + """Tests changing x1_impulse during runtime in a fixed point + simulation""" + + dw = "x0 * x1 * 0" + t_epoch = 1 + x1_impulse_init = 16 + x1_impulse_new = 32 + x1_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, x1_tau=x1_tau, x1_impulse=x1_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + precision="fixed_pt", + ) + + mon_x1 = Monitor() + mon_x1.probe(dense.x1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + # reset x1 and set new impulse + dense.x1.set(np.zeros(size)) + x1_impulse_init_got = dense.x1_impulse.get() + dense.x1_impulse.set(np.ones(size) * x1_impulse_new) + x1_impulse_new_got = dense.x1_impulse.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + x1 = mon_x1.get_data()["plastic_dense"]["x1"].flatten() + + lif_0.stop() + + assert x1_impulse_init == x1_impulse_init_got + assert x1_impulse_new == x1_impulse_new_got + + assert x1[t_spike + 1] == x1_impulse_init + assert x1[t_spike + num_steps + 1] == x1_impulse_new + + def test_get_set_x2_tau_fixed(self) -> None: + """Tests changing x2_tau during runtime in a fixed point + simulation""" + + dw = "x0 * x2 * 0" + t_epoch = 1 + x2_tau_init = 2 + x2_tau_new = 10 + x2_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, x2_tau=x2_tau_init, x2_impulse=x2_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + precision="fixed_pt", + ) + + mon_x2 = Monitor() + mon_x2.probe(dense.x2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + # reset x2 and set new tau + dense.x2.set(np.zeros(size)) + x2_tau_init_got = dense.x2_tau.get() + dense.x2_tau.set(np.ones(size) * x2_tau_new) + x2_tau_new_got = dense.x2_tau.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + x2 = mon_x2.get_data()["plastic_dense"]["x2"].flatten() + + lif_0.stop() + + assert x2_tau_init == x2_tau_init_got + assert x2_tau_new == x2_tau_new_got + + assert x2[t_spike + 1] == x2_impulse + assert ( + np.abs(x2[t_spike + 2] - x2_impulse * np.exp(-1 / x2_tau_init)) < 1 + ) + + assert x2[t_spike + num_steps + 1] == x2_impulse + assert ( + np.abs( + x2[t_spike + num_steps + 2] + - x2_impulse * np.exp(-1 / x2_tau_new) + ) + < 1 + ) + + def test_get_set_x2_impulse_fixed(self) -> None: + """Tests changing x2_impulse during runtime in a fixed point + simulation""" + + dw = "x0 * x2 * 0" + t_epoch = 1 + x2_impulse_init = 16 + x2_impulse_new = 32 + x2_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, x2_tau=x2_tau, x2_impulse=x2_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + precision="fixed_pt", + ) + + mon_x2 = Monitor() + mon_x2.probe(dense.x2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + # reset x2 and set new impulse + dense.x2.set(np.zeros(size)) + x2_impulse_init_got = dense.x2_impulse.get() + dense.x2_impulse.set(np.ones(size) * x2_impulse_new) + x2_impulse_new_got = dense.x2_impulse.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + x2 = mon_x2.get_data()["plastic_dense"]["x2"].flatten() + + lif_0.stop() + + assert x2_impulse_init == x2_impulse_init_got + assert x2_impulse_new == x2_impulse_new_got + + assert x2[t_spike + 1] == x2_impulse_init + assert x2[t_spike + num_steps + 1] == x2_impulse_new + + def test_get_set_y1_tau_fixed(self) -> None: + """Tests changing y1_tau during runtime in a fixed point + simulation""" + + dw = "y0 * y1 * 0" + t_epoch = 1 + y1_tau_init = 2 + y1_tau_new = 10 + y1_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y1_tau=y1_tau_init, y1_impulse=y1_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + target="post", + precision="fixed_pt", + ) + + mon_y1 = Monitor() + mon_y1.probe(dense.y1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y1 and set new tau + dense.y1.set(np.zeros(size)) + y1_tau_init_got = dense.y1_tau.get() + dense.y1_tau.set(np.ones(size) * y1_tau_new) + y1_tau_new_got = dense.y1_tau.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y1 = mon_y1.get_data()["plastic_dense"]["y1"].flatten() + + lif_1.stop() + + assert y1_tau_init == y1_tau_init_got + assert y1_tau_new == y1_tau_new_got + + assert y1[t_spike + 1] == y1_impulse + assert ( + np.abs(y1[t_spike + 2] - y1_impulse * np.exp(-1 / y1_tau_init)) < 1 + ) + + assert y1[t_spike + num_steps + 1] == y1_impulse + assert ( + np.abs( + y1[t_spike + num_steps + 2] + - y1_impulse * np.exp(-1 / y1_tau_new) + ) + < 1 + ) + + def test_get_set_y1_impulse_fixed(self) -> None: + """Tests changing y1_impulse during runtime in a fixed point + simulation""" + + dw = "y0 * y1 * 0" + t_epoch = 1 + y1_impulse_init = 16 + y1_impulse_new = 32 + y1_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y1_tau=y1_tau, y1_impulse=y1_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + target="post", + precision="fixed_pt", + ) + + mon_y1 = Monitor() + mon_y1.probe(dense.y1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y1 and set new impulse + dense.y1.set(np.zeros(size)) + y1_impulse_init_got = dense.y1_impulse.get() + dense.y1_impulse.set(np.ones(size) * y1_impulse_new) + y1_impulse_new_got = dense.y1_impulse.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y1 = mon_y1.get_data()["plastic_dense"]["y1"].flatten() + + lif_1.stop() + + assert y1_impulse_init == y1_impulse_init_got + assert y1_impulse_new == y1_impulse_new_got + + assert y1[t_spike + 1] == y1_impulse_init + assert y1[t_spike + num_steps + 1] == y1_impulse_new + + def test_get_set_y2_tau_fixed(self) -> None: + """Tests changing y2_tau during runtime in a fixed point + simulation""" + + dw = "y0 * y2 * 0" + t_epoch = 1 + y2_tau_init = 2 + y2_tau_new = 10 + y2_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y2_tau=y2_tau_init, y2_impulse=y2_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + target="post", + precision="fixed_pt", + ) + + mon_y2 = Monitor() + mon_y2.probe(dense.y2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y2 and set new tau + dense.y2.set(np.zeros(size)) + y2_tau_init_got = dense.y2_tau.get() + dense.y2_tau.set(np.ones(size) * y2_tau_new) + y2_tau_new_got = dense.y2_tau.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y2 = mon_y2.get_data()["plastic_dense"]["y2"].flatten() + + lif_1.stop() + + assert y2_tau_init == y2_tau_init_got + assert y2_tau_new == y2_tau_new_got + + assert y2[t_spike + 1] == y2_impulse + assert ( + np.abs(y2[t_spike + 2] - y2_impulse * np.exp(-1 / y2_tau_init)) < 1 + ) + + assert y2[t_spike + num_steps + 1] == y2_impulse + assert ( + np.abs( + y2[t_spike + num_steps + 2] + - y2_impulse * np.exp(-1 / y2_tau_new) + ) + < 1 + ) + + def test_get_set_y2_impulse_fixed(self) -> None: + """Tests changing y2_impulse during runtime in a fixed point + simulation""" + + dw = "y0 * y2 * 0" + t_epoch = 1 + y2_impulse_init = 16 + y2_impulse_new = 32 + y2_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y2_tau=y2_tau, y2_impulse=y2_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + target="post", + precision="fixed_pt", + ) + + mon_y2 = Monitor() + mon_y2.probe(dense.y2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y2 and set new impulse + dense.y2.set(np.zeros(size)) + y2_impulse_init_got = dense.y2_impulse.get() + dense.y2_impulse.set(np.ones(size) * y2_impulse_new) + y2_impulse_new_got = dense.y2_impulse.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y2 = mon_y2.get_data()["plastic_dense"]["y2"].flatten() + + lif_1.stop() + + assert y2_impulse_init == y2_impulse_init_got + assert y2_impulse_new == y2_impulse_new_got + + assert y2[t_spike + 1] == y2_impulse_init + assert y2[t_spike + num_steps + 1] == y2_impulse_new + + def test_get_set_y3_tau_fixed(self) -> None: + """Tests changing y3_tau during runtime in a fixed point + simulation""" + + dw = "y0 * y3 * 0" + t_epoch = 1 + y3_tau_init = 2 + y3_tau_new = 10 + y3_impulse = 16 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y3_tau=y3_tau_init, y3_impulse=y3_impulse + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + target="post", + precision="fixed_pt", + ) + + mon_y3 = Monitor() + mon_y3.probe(dense.y3, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y3 and set new tau + dense.y3.set(np.zeros(size)) + y3_tau_init_got = dense.y3_tau.get() + dense.y3_tau.set(np.ones(size) * y3_tau_new) + y3_tau_new_got = dense.y3_tau.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y3 = mon_y3.get_data()["plastic_dense"]["y3"].flatten() + + lif_1.stop() + + assert y3_tau_init == y3_tau_init_got + assert y3_tau_new == y3_tau_new_got + + assert y3[t_spike + 1] == y3_impulse + assert ( + np.abs(y3[t_spike + 2] - y3_impulse * np.exp(-1 / y3_tau_init)) < 1 + ) + + assert y3[t_spike + num_steps + 1] == y3_impulse + assert ( + np.abs( + y3[t_spike + num_steps + 2] + - y3_impulse * np.exp(-1 / y3_tau_new) + ) + < 1 + ) + + def test_get_set_y3_impulse_fixed(self) -> None: + """Tests changing y3_impulse during runtime in a fixed point + simulation""" + + dw = "y0 * y3 * 0" + t_epoch = 1 + y3_impulse_init = 16 + y3_impulse_new = 32 + y3_tau = 10 + + learning_rule = Loihi2FLearningRule( + dw=dw, t_epoch=t_epoch, y3_tau=y3_tau, y3_impulse=y3_impulse_init + ) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, _, dense, lif_1 = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + target="post", + precision="fixed_pt", + ) + + mon_y3 = Monitor() + mon_y3.probe(dense.y3, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + # reset y3 and set new impulse + dense.y3.set(np.zeros(size)) + y3_impulse_init_got = dense.y3_impulse.get() + dense.y3_impulse.set(np.ones(size) * y3_impulse_new) + y3_impulse_new_got = dense.y3_impulse.get() + + lif_1.run(condition=run_cnd, run_cfg=run_cfg) + + y3 = mon_y3.get_data()["plastic_dense"]["y3"].flatten() + + lif_1.stop() + + assert y3_impulse_init == y3_impulse_init_got + assert y3_impulse_new == y3_impulse_new_got + + assert y3[t_spike + 1] == y3_impulse_init + assert y3[t_spike + num_steps + 1] == y3_impulse_new + + def test_get_set_dw_fixed(self) -> None: + """Tests changing dw during runtime in a fixed point + simulation""" + + dw_init = "u0 * 1" + dw_new = "u0 * 2 " + t_epoch = 1 + + learning_rule = Loihi2FLearningRule(dw=dw_init, t_epoch=t_epoch) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + precision="fixed_pt", + ) + + mon_weights = Monitor() + mon_weights.probe(dense.weights, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + dw_init_got = dense.dw.get() + + dense.dw.set(dw_new) + dw_new_got = dense.dw.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + weights = mon_weights.get_data()["plastic_dense"]["weights"].flatten() + + lif_0.stop() + + assert dw_init == dw_init_got[: len(dw_init)] + assert dw_new == dw_new_got[: len(dw_new)] + + assert weights[-1] == 30 + + def test_get_set_dt_fixed(self) -> None: + """Tests changing dt during runtime in a fixed point + simulation""" + + dt_init = "u0 * 1" + dt_new = "u0 * 2" + t_epoch = 1 + + learning_rule = Loihi2FLearningRule(dt=dt_init, t_epoch=t_epoch) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + precision="fixed_pt", + ) + + mon_tag1 = Monitor() + mon_tag1.probe(dense.tag_1, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + dt_init_got = dense.dt.get() + + dense.dt.set(dt_new) + dt_new_got = dense.dt.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + tags = mon_tag1.get_data()["plastic_dense"]["tag_1"].flatten() + + lif_0.stop() + + assert dt_init == dt_init_got[: len(dt_init)] + assert dt_new == dt_new_got[: len(dt_new)] + + assert tags[-1] == 30 + + def test_get_set_dd_fixed(self) -> None: + """Tests changing dd during runtime in a fixed point + simulation""" + + dd_init = "u0 * 1" + dd_new = "u0 * 2" + t_epoch = 1 + + learning_rule = Loihi2FLearningRule(dd=dd_init, t_epoch=t_epoch) + + size = 1 + weights_init = np.eye(size) * 0 + num_steps = 10 + t_spike = 3 + + _, _, lif_0, dense, _ = create_network( + size, + num_steps, + t_spike, + learning_rule, + weights_init, + precision="fixed_pt", + ) + + mon_tag1 = Monitor() + mon_tag1.probe(dense.tag_2, 2 * num_steps) + + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + run_cnd = RunSteps(num_steps=num_steps) + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + dd_init_got = dense.dd.get() + + dense.dd.set(dd_new) + dd_new_got = dense.dd.get() + + lif_0.run(condition=run_cnd, run_cfg=run_cfg) + + delays = mon_tag1.get_data()["plastic_dense"]["tag_2"].flatten() + + lif_0.stop() + + assert dd_init == dd_init_got[: len(dd_init)] + assert dd_new == dd_new_got[: len(dd_new)] + + assert delays[-1] == 30 if __name__ == "__main__": diff --git a/tests/lava/proc/dense/test_process.py b/tests/lava/proc/dense/test_process.py index 0656376c4..ba0772d90 100644 --- a/tests/lava/proc/dense/test_process.py +++ b/tests/lava/proc/dense/test_process.py @@ -6,6 +6,7 @@ import numpy as np from lava.proc.dense.process import Dense, LearningDense +from lava.proc.learning_rules.stdp_learning_rule import STDPLoihi class TestDenseProcess(unittest.TestCase): @@ -37,7 +38,14 @@ def test_init(self): shape = (100, 200) weights = np.random.randint(100, size=shape) - conn = LearningDense(weights=weights) + lr = STDPLoihi(learning_rate=0.1, + A_plus=1., + A_minus=1., + tau_plus=20., + tau_minus=20.) + + conn = LearningDense(weights=weights, + learning_rule=lr) self.assertEqual(np.shape(conn.weights.init), shape) np.testing.assert_array_equal(conn.weights.init, weights) diff --git a/tests/lava/proc/dense/test_stdp_sim.py b/tests/lava/proc/dense/test_stdp_sim.py index f0c808daf..d4b931800 100644 --- a/tests/lava/proc/dense/test_stdp_sim.py +++ b/tests/lava/proc/dense/test_stdp_sim.py @@ -534,7 +534,6 @@ def test_rstdp_floating_point(self): # y1: post-synaptic trace # y2: reward lif_1.s_out_bap.connect(dense.s_in_bap) - lif_1.s_out_y1.connect(dense.s_in_y1) lif_1.s_out_y2.connect(dense.s_in_y2) lif_1.s_out_y3.connect(dense.s_in_y3) @@ -550,7 +549,7 @@ def test_rstdp_floating_point(self): np.testing.assert_almost_equal(weight_before_run, weights_init) np.testing.assert_almost_equal( - weight_after_run, np.array([[33.4178762]]) + weight_after_run, np.array([[33.4210359]]) ) def test_rstdp_floating_point_multi_synapse(self): @@ -614,6 +613,7 @@ def test_rstdp_floating_point_multi_synapse(self): lif_1.s_out_y1.connect(dense.s_in_y1) lif_1.s_out_y2.connect(dense.s_in_y2) + lif_1.s_out_y3.connect(dense.s_in_y3) run_cfg = Loihi2SimCfg(select_tag="floating_pt") run_cnd = RunSteps(num_steps=num_steps) @@ -629,8 +629,8 @@ def test_rstdp_floating_point_multi_synapse(self): weight_after_run, np.array( [ - [191.7346893, 31.3543832, 255.5798239], - [187.6966191, 17.4426083, 250.7489829], + [191.7300724, 31.3616088, 255.5749675], + [187.6922553, 17.4506295, 250.7446092] ] ), ) @@ -689,6 +689,7 @@ def test_rstdp_fixed_point(self): lif_1.s_out_y1.connect(dense.s_in_y1) lif_1.s_out_y2.connect(dense.s_in_y2) + lif_1.s_out_y3.connect(dense.s_in_y3) run_cfg = Loihi2SimCfg(select_tag="fixed_pt") run_cnd = RunSteps(num_steps=num_steps) @@ -766,6 +767,7 @@ def test_rstdp_fixed_point_multi_synapse(self): lif_1.s_out_y1.connect(dense.s_in_y1) lif_1.s_out_y2.connect(dense.s_in_y2) + lif_1.s_out_y3.connect(dense.s_in_y3) run_cfg = Loihi2SimCfg(select_tag="fixed_pt") run_cnd = RunSteps(num_steps=num_steps)