-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy pathmachine.rs
253 lines (233 loc) · 9.67 KB
/
machine.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
//! The [`StateMachine`] trait, [`Behavior`] trait, and the [`Engine`] that runs
//! [`Behavior`]s.
use std::pin::Pin;
use anyhow::Result;
use arbiter_core::middleware::ArbiterMiddleware;
use futures_util::{Stream, StreamExt};
use tokio::task::JoinHandle;
use super::*;
/// A type alias for a pinned, boxed stream of events.
///
/// This stream is capable of handling items of any type that implements the
/// `Stream` trait, and it is both sendable across threads and synchronizable
/// between threads.
///
/// # Type Parameters
///
/// * `E`: The type of the items in the stream.
pub type EventStream<E> = Pin<Box<dyn Stream<Item = E> + Send + Sync>>;
/// The instructions that can be sent to a [`StateMachine`].
#[derive(Clone, Debug)]
pub enum MachineInstruction {
/// Used to make a [`StateMachine`] start up.
Start(Arc<ArbiterMiddleware>, Messager),
/// Used to make a [`StateMachine`] process events.
/// This will offload the process into a task that can be halted by sending
/// a [`ControlFlow::Halt`] message from the [`Messager`]. For our purposes,
/// the [`crate::world::World`] will handle this.
Process,
}
/// The message that is used in a [`StateMachine`] to continue or halt its
/// processing.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ControlFlow {
/// Used to halt the processing of a [`StateMachine`].
Halt,
/// Used to continue on the processing of a [`StateMachine`].
Continue,
}
/// The state used by any entity implementing [`StateMachine`].
#[derive(Clone, Copy, Debug)]
pub enum State {
/// The entity is not yet running any process.
/// This is the state adopted by the entity when it is first created.
Uninitialized,
/// The entity is starting up.
/// This is where the entity can engage in its specific start up activities
/// that it can do given the current state of the world.
/// These are usually quick one-shot activities that are not repeated.
Starting,
/// The entity is processing.
/// This is where the entity can engage in its specific processing
/// of events that can lead to actions being taken.
Processing,
}
// NOTE: `async_trait::async_trait` is used throughout to make the trait object
// safe even though rust >=1.75 has async trait stabilized
/// The [`Behavior`] trait is the lowest level functionality that will be used
/// by a [`StateMachine`]. This constitutes what each state transition will do.
#[async_trait::async_trait]
pub trait Behavior<E>: Serialize + DeserializeOwned + Send + Sync + Debug + 'static {
/// Used to start the agent.
/// This is where the agent can engage in its specific start up activities
/// that it can do given the current state of the world.
async fn startup(
&mut self,
client: Arc<ArbiterMiddleware>,
messager: Messager,
) -> Result<EventStream<E>>;
/// Used to process events.
/// This is where the agent can engage in its specific processing
/// of events that can lead to actions being taken.
async fn process(&mut self, event: E) -> Result<ControlFlow>;
}
/// A trait for creating a state machine.
///
/// This trait is intended to be implemented by types that can be converted into
/// a state machine. A state machine, in this context, is an entity capable of
/// executing a set of instructions or operations based on its current state and
/// inputs it receives.
///
/// Implementers of this trait should provide the logic to initialize and return
/// a new instance of a state machine, encapsulated within a `Box<dyn
/// StateMachine>`. This allows for dynamic dispatch to the state machine's
/// methods, enabling polymorphism where different types of state machines can
/// be used interchangeably at runtime.
///
/// # Returns
///
/// - `Box<dyn StateMachine>`: A boxed state machine object that can be
/// dynamically dispatched.
pub trait CreateStateMachine {
/// Creates and returns a new state machine instance.
///
/// This method consumes the implementer and returns a new instance of a
/// state machine encapsulated within a `Box<dyn StateMachine>`. The
/// specific type of the state machine returned can vary, allowing for
/// flexibility and reuse of the state machine logic across
/// different contexts.
fn create_state_machine(self) -> Box<dyn StateMachine>;
}
#[async_trait::async_trait]
/// A trait defining the capabilities of a state machine within the system.
///
/// This trait is designed to be implemented by entities that can execute
/// instructions based on their current state and inputs they receive. The
/// execution of these instructions is asynchronous, allowing for non-blocking
/// operations within the state machine's logic.
///
/// Implementers of this trait must be able to be sent across threads and shared
/// among threads safely, hence the `Send`, `Sync`, and `'static` bounds. They
/// should also support debugging through the `Debug` trait.
pub trait StateMachine: Send + Sync + Debug + 'static {
/// Executes a given instruction asynchronously.
///
/// This method takes a mutable reference to self, allowing the state
/// machine to modify its state in response to the instruction. The
/// instruction to be executed is passed as an argument, encapsulating the
/// action to be performed by the state machine.
///
/// # Parameters
///
/// - `instruction`: The instruction that the state machine is to execute.
///
/// # Returns
///
/// This method does not return a value, but it may result in state changes
/// within the implementing type or the generation of further instructions
/// or events.
async fn execute(&mut self, instruction: MachineInstruction) -> Result<()>;
}
/// The `Engine` struct represents the core logic unit of a state machine-based
/// entity, such as an agent. It encapsulates a behavior and manages the flow
/// of events to and from this behavior, effectively driving the entity's
/// response to external stimuli.
///
/// The `Engine` is generic over a behavior type `B` and an event type `E`,
/// allowing it to be used with a wide variety of behaviors and event sources.
/// It is itself a state machine, capable of executing instructions that
/// manipulate its behavior or react to events.
///
/// # Fields
///
/// - `behavior`: An optional behavior that the engine is currently managing.
/// This is where the engine's logic is primarily executed in response to
/// events.
pub struct Engine<B, E>
where
B: Behavior<E>,
{
/// The behavior the `Engine` runs.
behavior: Option<B>,
/// The current state of the [`Engine`].
state: State,
/// The receiver of events that the [`Engine`] will process.
/// The [`State::Processing`] stage will attempt a decode of the [`String`]s
/// into the event type `<E>`.
event_stream: Option<EventStream<E>>,
}
impl<B, E> Debug for Engine<B, E>
where
B: Behavior<E>,
E: DeserializeOwned + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Engine")
.field("behavior", &self.behavior)
.field("state", &self.state)
.finish()
}
}
impl<B, E> Engine<B, E>
where
B: Behavior<E> + Debug,
E: DeserializeOwned + Send + Sync + 'static,
{
/// Creates a new [`Engine`] with the given [`Behavior`] and [`Receiver`].
pub fn new(behavior: B) -> Self {
Self {
behavior: Some(behavior),
state: State::Uninitialized,
event_stream: None,
}
}
}
#[async_trait::async_trait]
impl<B, E> StateMachine for Engine<B, E>
where
B: Behavior<E> + Debug + Serialize + DeserializeOwned,
E: DeserializeOwned + Serialize + Send + Sync + Debug + 'static,
{
async fn execute(&mut self, instruction: MachineInstruction) -> Result<()> {
// NOTE: The unwraps here are safe because the `Behavior` in an engine is only
// accessed here and it is private.
match instruction {
MachineInstruction::Start(client, messager) => {
self.state = State::Starting;
let mut behavior = self.behavior.take().unwrap();
let behavior_task: JoinHandle<Result<(EventStream<E>, B)>> =
tokio::spawn(async move {
let id = messager.id.clone();
let stream = behavior.startup(client, messager).await?;
debug!("startup complete for behavior {:?}", id);
Ok((stream, behavior))
});
let (stream, behavior) = behavior_task.await??;
self.event_stream = Some(stream);
self.behavior = Some(behavior);
self.execute(MachineInstruction::Process).await?;
Ok(())
}
MachineInstruction::Process => {
trace!("Behavior is starting up.");
let mut behavior = self.behavior.take().unwrap();
let mut stream = self.event_stream.take().unwrap();
let behavior_task: JoinHandle<Result<B>> = tokio::spawn(async move {
while let Some(event) = stream.next().await {
match behavior.process(event).await? {
ControlFlow::Halt => {
break;
}
ControlFlow::Continue => {}
}
}
Ok(behavior)
});
// TODO: We don't have to store the behavior again here, we could just discard
// it.
self.behavior = Some(behavior_task.await??);
Ok(())
}
}
}
}