-
-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send a state machine over MQTT / serializing-deserializing a SM #421
Comments
Hi @scls19fr , how are you? You only need to serialize the value of the current state, right? Maybe you can use the current state value: https://python-statemachine.readthedocs.io/en/latest/api.html#statemachine.statemachine.StateMachine.current_state_value Let me know if this helps, if not, please give an example of how you want to get the desired outcome. Best! |
Hi @fgmacedo , I'm fine and hope you are well also. |
What I mean is that sending only current active state through MQTT is not enough to draw the sm on telemetry app side. You need to have a whole "copy" of the sm which is currently running on target (bot) side. |
Can you provide me with an example of a state machine and the expected serialization output? The other site is also Python? I ask to know if pickle is a possible solution, as pickle can serialize/deserialize Python objects, but in binary format. I'm asking about the format, because maybe this can speak with this other thread related to "creating state machines dynamically". The format can be something related to a JSON version of SCXML, as I'm playing here. |
The current situation is Python on both side but it may change (different Python version ...) so I'd prefered to avoid Pickle. My SM is currently quite simple and looks like from statemachine import StateMachine, State
import datetime
def now():
return datetime.datetime.utcnow().timestamp()
class DrivingState(StateMachine):
"Finite state machine for driving state"
init = State(initial=True)
lidar_init = State()
waiting_start = State()
manual_mode = State()
ok = State()
warning = State()
front_problem_situation = State()
#rear_problem_situation = State()
telemetry_ready = init.to(lidar_init)
lidar_ready = lidar_init.to(waiting_start)
caution = (ok.to(warning) | warning.to(warning))
letsgo = (warning.to(ok))
front_problem_occurs = (ok.to(front_problem_situation) | warning.to(front_problem_situation))
front_problem_disappears = (front_problem_situation.to(warning))
stop = (waiting_start.to(waiting_start) | ok.to(waiting_start) | warning.to(waiting_start) | front_problem_situation.to(waiting_start) | manual_mode.to(waiting_start))
auto = (waiting_start.to(ok) | ok.to(ok) | warning.to(warning) | front_problem_situation.to(front_problem_situation))
manual = ( waiting_start.to(manual_mode) )
timestamp_state_changed = now()
def __init__(self):
super().__init__()
self.telemetry = None # will be set after attach
def _before(self, event: str, source: State, target: State, message: str = ""):
message = ". " + message if message else ""
return f"{datetime.datetime.utcnow()} Running {event} from {source.id} to {target.id}{message}"
def before_caution(self, event: str, source: State, target: State, message: str = ""):
self._before(str, source, target, message)
def before_letsgo(self, event: str, source: State, target: State, message: str = ""):
self._before(str, source, target, message)
def before_front_problem_occurs(self, event: str, source: State, target: State, message: str = ""):
self._before(str, source, target, message)
def before_front_problem_disappears(self, event: str, source: State, target: State, message: str = ""):
self._before(str, source, target, message)
def _on_enter(self):
self.timestamp_state_changed = now()
print(f"{datetime.datetime.utcnow()} Entering '{self.current_state.value}'")
self.telemetry.publish("autodriver/state", self.current_state.value) # ToFix
def on_enter_init(self):
print(f"{datetime.datetime.utcnow()} Entering '{self.current_state.value}'")
# telemetry is not yet configured
def on_enter_waiting_start(self):
self._on_enter()
def on_enter_manual_mode(self):
self._on_enter()
def on_enter_ok(self):
self._on_enter()
def on_enter_warning(self):
self._on_enter()
def on_enter_front_problem_situation(self):
self._on_enter()
def _on_exit(self):
duration = now() - self.timestamp_state_changed
msg = f"Exiting '{self.current_state.value}' after {duration:.1f}s"
print(f"{datetime.datetime.utcnow()} {msg}")
self.telemetry.publish("logs", msg)
def on_exit_front_problem_situation(self):
self._on_exit()
# print(f"{datetime.datetime.utcnow()} No more front problem") |
Hi @scls19fr, following our discussion at this link, I believe we can close the issue for now. Please feel free to reopen it or create a new one if needed. |
Hello,
I'm currently using python-statemachine with Webots (a robotic simulator). In my application I send sensors data over MQTT to an home made telemetry app (Python PyQtGraph). I wonder if (and probably more exactly how) send over MQTT state machine.
I don't want to copy paste code of bot controller to my telemetry app but'd like to use a serializing-deserializing protocol (JSON, MessagePack) to send state machine. On telemetry side statemachine will only be used to display what state is active.
Any help to achieve that will be great.
Kind regards
The text was updated successfully, but these errors were encountered: