Skip to content

Commit

Permalink
Merge pull request #3 from jomorais/finite_state_machine
Browse files Browse the repository at this point in the history
Implementation of a simple Finite-State-Machine inside your service
  • Loading branch information
jomorais authored Jun 21, 2021
2 parents 0b3fea3 + 507084b commit cb874ac
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 4 deletions.
52 changes: 52 additions & 0 deletions examples/fsm/fsm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/python
__author__ = 'joaci'

try:
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
import time
from pyfase import MicroService
except Exception as e:
print('require module exception: %s' % e)
exit(0)


class Fsm(MicroService):
def __init__(self):
super(Fsm, self).__init__(self,
sender_endpoint='ipc:///tmp/sender',
receiver_endpoint='ipc:///tmp/receiver')

def on_connect(self):
print('### on_connect ###')

def on_idle(self):
print('### state: on_idle')

@MicroService.state
def downloading(self, data):
print('### state: downloading')
downloaded_data = {'data': '123456'}
time.sleep(0.1)
self.request_state('processing', downloaded_data)

@MicroService.state
def processing(self, data):
print('### state: processing data: %s' % data)
time.sleep(0.1)

@MicroService.task
def taskt(self):
while True:
time.sleep(8)
self.request_state('downloading')


Fsm().execute(enable_tasks=True, enable_fsm=True)






47 changes: 43 additions & 4 deletions pyfase.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import signal
import inspect
import zmq
from threading import Thread
import time
from threading import Thread, Event
from json import loads, dumps
except Exception as requirement_exception:
print('requirements exception: %s' % requirement_exception)
Expand Down Expand Up @@ -46,6 +47,11 @@ def __init__(self, service, sender_endpoint, receiver_endpoint):
self.name = service.__class__.__name__
self.actions = {}
self.tasks = {}
self.fsm_states = {}
self.fsm_current_state = 'idle'
self.fsm_data = None
self.fsm_event = Event()
self.fsm_timeout_event = 9999
self.ctx = zmq.Context()
self.sender = self.ctx.socket(zmq.PUSH)
self.sender.connect(receiver_endpoint)
Expand All @@ -67,6 +73,8 @@ def __init__(self, service, sender_endpoint, receiver_endpoint):
self.receiver.setsockopt_string(zmq.SUBSCRIBE, u'%s:' % name)
elif '_task_wrapper_' in func.__name__: # IS A TASK?
self.tasks[name] = func
elif '_state_wrapper_' in func.__name__: # IS A STATE?
self.fsm_states[name] = func
else:
raise Exception('MicroService %s must be a class' % service)

Expand All @@ -82,6 +90,12 @@ def _task_wrapper_(*args, **kwargs):
return function(*args, **kwargs)
return _task_wrapper_

@staticmethod
def state(function):
def _state_wrapper_(*args, **kwargs):
return function(*args, **kwargs)
return _state_wrapper_

@staticmethod
def exit():
os.kill(os.getpid(), signal.SIGKILL)
Expand All @@ -98,6 +112,14 @@ def on_new_service(self, service, actions):
def on_response(self, service, data):
pass

def on_idle(self):
pass

def request_state(self, next_state, data=None):
self.fsm_data = data
self.fsm_current_state = next_state
self.fsm_event.set()

def start_task(self, task_name, data):
if task_name in self.tasks:
Thread(target=self.tasks[task_name], name=task_name, args=data).start()
Expand All @@ -114,11 +136,27 @@ def response(self, data):
if self.action_context:
self.sender.send_string('%s:%s' % (self.o_pkg['s'], dumps({'s': self.name, 'd': data})), zmq.NOBLOCK)

def execute(self, enable_tasks=None):
def fsm(self):
try:
self.fsm_event.clear()
while True:
if self.fsm_event.wait(self.fsm_timeout_event):
if self.fsm_current_state in self.fsm_states:
self.fsm_event.clear()
self.fsm_states[self.fsm_current_state](self, self.fsm_data)
if self.fsm_event.is_set() is False:
self.on_idle()
except Exception as fsm_exception:
print('fsm exception: %s' % fsm_exception)
os.kill(os.getpid(), signal.SIGKILL)

def execute(self, enable_tasks=None, enable_fsm=None):
try:
if enable_tasks:
for name, task in self.tasks.items():
Thread(target=task, name=name, args=(self,)).start()
if enable_fsm:
Thread(target=self.fsm, name='fsm').start()
self.sender.send_string('<r>:%s' % dumps({'s': self.name,
'a': [action for action in self.actions]}), zmq.NOBLOCK)
while True:
Expand All @@ -137,14 +175,15 @@ def execute(self, enable_tasks=None):
self.on_broadcast(service, self.o_pkg['d'])
elif '%s:' % self.name in pkg: # IS A RESPONSE PACKAGE!
pos = pkg.find(':')
self.o_pkg = loads(pkg[pos+1:])
self.o_pkg = loads(pkg[pos + 1:])
self.on_response(self.o_pkg['s'], self.o_pkg['d'])
else: # IS AN ACTION PACKAGE!
pos = pkg.find(':')
self.o_pkg = loads(pkg[pos+1:])
self.o_pkg = loads(pkg[pos + 1:])
self.action_context = True
self.actions[pkg[:pos]](self, self.o_pkg['s'], self.o_pkg['d'])
self.action_context = False
except Exception and KeyboardInterrupt as execute_exception:
print('execute exception: %s' % execute_exception)
os.kill(os.getpid(), signal.SIGKILL)

0 comments on commit cb874ac

Please sign in to comment.