[[chapter_10_commands]]
== Commands and Command Handler

((("commands", id="ix_cmnd")))
In the previous chapter, we talked about using events as a way of representing
the inputs to our system, and we turned our application into a message-processing
machine.

To achieve that, we converted all our use-case functions to event handlers.
When the API receives a POST to create a new batch, it builds a new `BatchCreated`
event and handles it as if it were an internal event.
This might feel counterintuitive. After all, the batch _hasn't_ been
created yet; that's why we called the API. We're going to fix that conceptual
wart by introducing commands and showing how they can be handled by the same
message bus but with slightly different rules.

[TIP]
====
The code for this chapter is in the
chapter_10_commands branch https://oreil.ly/U_VGa[on GitHub]:

----
git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_10_commands
# or to code along, checkout the previous chapter:
git checkout chapter_09_all_messagebus
----
====

=== Commands and Events

((("commands", "events versus", id="ix_cmdevnt")))
((("events", "commands versus", id="ix_evntcmd")))
Like events, _commands_ are a type of message--instructions sent by one part of
a system to another. We usually represent commands with dumb data
structures and can handle them in much the same way as events.

The differences between commands and events, though, are important.

Commands are sent by one actor to another specific actor with the expectation that
a particular thing will happen as a result. When we post a form to an API handler,
we are sending a command. We name commands with imperative mood verb phrases like
"allocate stock" or "delay shipment."

Commands capture _intent_. They express our wish for the system to do something.
As a result, when they fail, the sender needs to receive error information.

_Events_ are broadcast by an actor to all interested listeners. When we publish
`BatchQuantityChanged`, we don't know who's going to pick it up. We name events
with past-tense verb phrases like "order allocated to stock" or "shipment delayed."

We often use events to spread the knowledge about successful commands.

Events capture _facts_ about things that happened in the past. Since we don't
know who's handling an event, senders should not care whether the receivers
succeeded or failed. <<events_vs_commands_table>> recaps the differences.

[[events_vs_commands_table]]
[options="header"]
.Events versus commands
|===
e|      e| Event e| Command
| Named | Past tense | Imperative mood
| Error handling | Fail independently | Fail noisily
| Sent to | All listeners | One recipient
|===


// IDEA: Diagram of user "buy stock" -> "stock purchased"
//                       "create batch" -> "batch created"
// (EJ3) "ChangeBatchQuantity" -> "AllocationRequired" will be a less trivial example

((("commands", "in our system now")))
((("commands", "events versus", startref="ix_cmdevnt")))
What kinds of commands do we have in our system right now?

[[commands_dot_py]]
.Pulling out some commands (src/allocation/domain/commands.py)
====
[source,python]
----
class Command:
    pass


@dataclass
class Allocate(Command):  #<1>
    orderid: str
    sku: str
    qty: int


@dataclass
class CreateBatch(Command):  #<2>
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None


@dataclass
class ChangeBatchQuantity(Command):  #<3>
    ref: str
    qty: int
----
====

<1> `commands.Allocate` will replace `events.AllocationRequired`.
<2> `commands.CreateBatch` will replace `events.BatchCreated`.
<3> `commands.ChangeBatchQuantity` will replace `events.BatchQuantityChanged`.


=== Differences in Exception Handling


((("message bus", "dispatching events and commands differently")))
((("exception handling, differences for events and commands")))
((("events", "commands versus", startref="ix_evntcmd")))
Just changing the names and verbs is all very well, but that won't
change the behavior of our system.  We want to treat events and commands similarly,
but not exactly the same.  Let's see how our message bus changes:

[[messagebus_dispatches_differently]]
.Dispatch events and commands differently (src/allocation/service_layer/messagebus.py)
====
[source,python]
----
Message = Union[commands.Command, events.Event]


def handle(  #<1>
    message: Message,
    uow: unit_of_work.AbstractUnitOfWork,
):
    results = []
    queue = [message]
    while queue:
        message = queue.pop(0)
        if isinstance(message, events.Event):
            handle_event(message, queue, uow)  #<2>
        elif isinstance(message, commands.Command):
            cmd_result = handle_command(message, queue, uow)  #<2>
            results.append(cmd_result)
        else:
            raise Exception(f"{message} was not an Event or Command")
    return results
----
====

<1> It still has a main `handle()` entrypoint that takes a `message`, which may
    be a command or an event.

<2> We dispatch events and commands to two different helper functions, shown next.


Here's how we handle events:

[[handle_event]]
.Events cannot interrupt the flow (src/allocation/service_layer/messagebus.py)
====
[source,python]
----
def handle_event(
    event: events.Event,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork,
):
    for handler in EVENT_HANDLERS[type(event)]:  #<1>
        try:
            logger.debug("handling event %s with handler %s", event, handler)
            handler(event, uow=uow)
            queue.extend(uow.collect_new_events())
        except Exception:
            logger.exception("Exception handling event %s", event)
            continue  #<2>
----
====

<1> Events go to a dispatcher that can delegate to multiple handlers per
    event.

<2> It catches and logs errors but doesn't let them interrupt
    message processing.

((("commands", "exception handling")))
And here's how we do commands:

[[handle_command]]
.Commands reraise exceptions (src/allocation/service_layer/messagebus.py)
====
[source,python]
----
def handle_command(
    command: commands.Command,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork,
):
    logger.debug("handling command %s", command)
    try:
        handler = COMMAND_HANDLERS[type(command)]  #<1>
        result = handler(command, uow=uow)
        queue.extend(uow.collect_new_events())
        return result  #<3>
    except Exception:
        logger.exception("Exception handling command %s", command)
        raise  #<2>
----
====


<1> The command dispatcher expects just one handler per command.

<2> If any errors are raised, they fail fast and will bubble up.

<3> `return result` is only temporary; as mentioned in <<temporary_ugly_hack>>,
    it's a temporary hack to allow the message bus to return the batch
    reference for the API to use.  We'll fix this in <<chapter_12_cqrs>>.


((("commands", "handlers for")))
((("handlers", "new HANDLERS dicts for commands and events")))
((("dictionaries", "HANDLERS dicts for commands and events")))
We also change the single `HANDLERS` dict into different ones for
commands and events. Commands can have only one handler, according
to our convention:

[[new_handlers_dicts]]
.New handlers dicts (src/allocation/service_layer/messagebus.py)
====
[source,python]
----
EVENT_HANDLERS = {
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]

COMMAND_HANDLERS = {
    commands.Allocate: handlers.allocate,
    commands.CreateBatch: handlers.add_batch,
    commands.ChangeBatchQuantity: handlers.change_batch_quantity,
}  # type: Dict[Type[commands.Command], Callable]
----
====



=== Discussion: Events, Commands, and Error Handling

((("commands", "events, commands, and error handling", id="ix_cmndeverr")))
((("error handling", "events, commands, and", id="ix_errhnd")))
((("events", "events, commands, and error handling", id="ix_evntcmderr")))
Many developers get uncomfortable at this point and ask, "What happens when an
event fails to process? How am I supposed to make sure the system is in a
consistent state?" If we manage to process half of the events during `messagebus.handle` before an
out-of-memory error kills our process, how do we mitigate problems caused by the
lost messages?

Let's start with the worst case: we fail to handle an event, and the system is
left in an inconsistent state. What kind of error would cause this? Often in our
systems we can end up in an inconsistent state when only half an operation is
completed.

For example, we could allocate three units of `DESIRABLE_BEANBAG` to a customer's
order but somehow fail to reduce the amount of remaining stock. This would
cause an inconsistent state: the three units of stock are both allocated _and_
available, depending on how you look at it. Later, we might allocate those
same beanbags to another customer, causing a headache for customer support.

((("Unit of Work pattern", "UoW managing success or failure of aggregate update")))
((("consistency boundaries", "aggregates acting as")))
((("aggregates", "acting as consistency boundaries")))
In our allocation service, though, we've already taken steps to prevent that
happening. We've carefully identified _aggregates_ that act as consistency
boundaries, and we've introduced a _UoW_ that manages the atomic
success or failure of an update to an aggregate.

((("Product object", "acting as consistency boundary")))
For example, when we allocate stock to an order, our consistency boundary is the
`Product` aggregate. This means that we can't accidentally overallocate: either
a particular order line is allocated to the product, or it is not--there's no
room for inconsistent states.

By definition, we don't require two aggregates to be immediately consistent, so
if we fail to process an event and update only a single aggregate, our system
can still be made eventually consistent. We shouldn't violate any constraints of
the system.

With this example in mind, we can better understand the reason for splitting
messages into commands and events. When a user wants to make the system do
something, we represent their request as a _command_. That command should modify
a single _aggregate_ and either succeed or fail in totality. Any other bookkeeping, cleanup, and notification we need to do can happen via an _event_. We
don't require the event handlers to succeed in order for the command to be
successful.

Let's look at another example (from a different, imaginary project) to see why not.

Imagine we are building an ecommerce website that sells expensive luxury goods.
Our marketing department wants to reward customers for repeat visits. We will
flag customers as VIPs after they make their third purchase, and this will
entitle them to priority treatment and special offers. Our acceptance criteria
for this story reads as follows:


[source,gherkin]
[role="skip"]
----
Given a customer with two orders in their history,
When the customer places a third order,
Then they should be flagged as a VIP.

When a customer first becomes a VIP
Then we should send them an email to congratulate them
----

((("aggregates", "History aggregate recording orders and raising domain events")))
Using the techniques we've already discussed in this book, we decide that we
want to build a new `History` aggregate that records orders and can raise domain
events when rules are met. We will structure the code like this:


[[vip_customer_listing]]
.VIP customer (example code for a different project)
====
[source,python]
[role="skip"]
----
class History:  # Aggregate

    def __init__(self, customer_id: int):
        self.orders = set()  # Set[HistoryEntry]
        self.customer_id = customer_id

    def record_order(self, order_id: str, order_amount: int): #<1>
        entry = HistoryEntry(order_id, order_amount)

        if entry in self.orders:
            return

        self.orders.add(entry)

        if len(self.orders) == 3:
            self.events.append(
                CustomerBecameVIP(self.customer_id)
            )


def create_order_from_basket(uow, cmd: CreateOrder): #<2>
    with uow:
        order = Order.from_basket(cmd.customer_id, cmd.basket_items)
        uow.orders.add(order)
        uow.commit()  # raises OrderCreated


def update_customer_history(uow, event: OrderCreated): #<3>
    with uow:
        history = uow.order_history.get(event.customer_id)
        history.record_order(event.order_id, event.order_amount)
        uow.commit()  # raises CustomerBecameVIP


def congratulate_vip_customer(uow, event: CustomerBecameVip): #<4>
    with uow:
        customer = uow.customers.get(event.customer_id)
        email.send(
            customer.email_address,
            f'Congratulations {customer.first_name}!'
        )

----
====

<1> The `History` aggregate captures the rules indicating when a customer becomes a VIP.
    This puts us in a good place to handle changes when the rules become more
    complex in the future.

<2> Our first handler creates an order for the customer and raises a domain
    event `OrderCreated`.

<3> Our second handler updates the `History` object to record that an order was
    [.keep-together]#created#.

<4> Finally, we send an email to the customer when they become a VIP.

//IDEA: Sequence diagram here?

Using this code, we can gain some intuition about error handling in an
event-driven system.

((("aggregates", "raising events about")))
In our current implementation, we raise events about an aggregate _after_ we
persist our state to the database. What if we raised those events _before_ we
persisted, and committed all our changes at the same time? That way, we could be
sure that all the work was complete. Wouldn't that be safer?

What happens, though, if the email server is slightly overloaded? If all the work
has to complete at the same time, a busy email server can stop us from taking money
for orders.

What happens if there is a bug in the implementation of the `History` aggregate?
Should we fail to take your money just because we can't recognize you as a VIP?

By separating out these concerns, we have made it possible for things to fail
in isolation, which improves the overall reliability of the system. The only
part of this code that _has_ to complete is the command handler that creates an
order. This is the only part that a customer cares about, and it's the part that
our business stakeholders should prioritize.

((("commands", "events, commands, and error handling", startref="ix_cmndeverr")))
((("error handling", "events, commands, and", startref="ix_errhnd")))
((("events", "events, commands, and error handling", startref="ix_evntcmderr")))
Notice how we've deliberately aligned our transactional boundaries to the start
and end of the business processes. The names that we use in the code match the
jargon used by our business stakeholders, and the handlers we've written match
the steps of our natural language acceptance criteria. This concordance of names
and structure helps us to reason about our systems as they grow larger and more
complex.


[[recovering_from_errors]]
=== Recovering from Errors Synchronously

((("commands", "events, commands, and error handling", "recovering from errors synchronously")))
((("errors, recovering from synchronously")))
Hopefully we've convinced you that it's OK for events to fail independently
from the commands that raised them. What should we do, then, to make sure we
can recover from errors when they inevitably occur?

The first thing we need is to know _when_ an error has occurred, and for that we
usually rely on logs.

((("message bus", "handle_event method")))
Let's look again at the `handle_event` method from our message bus:

[[messagebus_logging]]
.Current handle function (src/allocation/service_layer/messagebus.py)
====
[source,python,highlight=8;12]
----
def handle_event(
    event: events.Event,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork,
):
    for handler in EVENT_HANDLERS[type(event)]:
        try:
            logger.debug("handling event %s with handler %s", event, handler)
            handler(event, uow=uow)
            queue.extend(uow.collect_new_events())
        except Exception:
            logger.exception("Exception handling event %s", event)
            continue
----
====

When we handle a message in our system, the first thing we do is write a log
line to record what we're about to do. For our `CustomerBecameVIP` use case, the
logs might read as follows:

----
Handling event CustomerBecameVIP(customer_id=12345)
with handler <function congratulate_vip_customer at 0x10ebc9a60>
----

((("dataclasses", "use for message types")))
Because we've chosen to use dataclasses for our message types, we get a neatly
printed summary of the incoming data that we can copy and paste into a Python
shell to re-create the object.

When an error occurs, we can use the logged data to either reproduce the problem
in a unit test or replay the message into the system.

Manual replay works well for cases where we need to fix a bug before we can
re-process an event, but our systems will _always_ experience some background
level of transient failure. This includes things like network hiccups, table
deadlocks, and brief downtime caused by deployments.

((("retries", "message bus handle_event with")))
((("message bus", "handle_event with retries")))
For most of those cases, we can recover elegantly by trying again. As the
proverb says, "If at first you don't succeed, retry the operation with an
exponentially increasing back-off period."

[[messagebus_handle_event_with_retry]]
.Handle with retry (src/allocation/service_layer/messagebus.py)
====
[source,python]
[role="skip"]
----
from tenacity import Retrying, RetryError, stop_after_attempt, wait_exponential #<1>

...

def handle_event(
    event: events.Event,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork,
):
    for handler in EVENT_HANDLERS[type(event)]:
        try:
            for attempt in Retrying(  #<2>
                stop=stop_after_attempt(3),
                wait=wait_exponential()
            ):

                with attempt:
                    logger.debug("handling event %s with handler %s", event, handler)
                    handler(event, uow=uow)
                    queue.extend(uow.collect_new_events())
        except RetryError as retry_failure:
            logger.error(
                "Failed to handle event %s times, giving up!",
                retry_failure.last_attempt.attempt_number
            )
            continue

----
====

<1> Tenacity is a Python library that implements common patterns for retrying.
    ((("Tenacity library")))
    ((("retries", "Tenacity library for")))

<2> Here we configure our message bus to retry operations up to three times,
    with an exponentially increasing wait between attempts.

Retrying operations that might fail is probably the single best way to improve
the resilience of our software. Again, the Unit of Work and Command Handler
patterns mean that each attempt starts from a consistent state and won't leave
things half-finished.

WARNING: At some point, regardless of `tenacity`, we'll have to give up trying to
    process the message. Building reliable systems with distributed messages is
    hard, and we have to skim over some tricky bits. There are pointers to more
    reference materials in the <<epilogue_1_how_to_get_there_from_here, epilogue>>.

[role="pagebreak-before less_space"]
=== Wrap-Up

((("Command Handler pattern")))
((("events", "splitting command and events, trade-offs")))
((("commands", "splitting commands and events, trade-offs")))
In this book we decided to introduce the concept of events before the concept
of commands, but other guides often do it the other way around.  Making
explicit the requests that our system can respond to by giving them a name
and their own data structure is quite a fundamental thing to do.  You'll
sometimes see people use the name _Command Handler_ pattern to describe what
we're doing with Events, Commands, and Message Bus.

<<chapter_10_commands_and_events_tradeoffs>> discusses some of the things you
should think about before you jump on board.

[[chapter_10_commands_and_events_tradeoffs]]
[options="header"]
.Splitting commands and events: the trade-offs
|===
|Pros|Cons
a|
* Treating commands and events differently helps us understand which things
  have to succeed and which things we can tidy up later.

* `CreateBatch` is definitely a less confusing name than `BatchCreated`. We are
  being explicit about the intent of our users, and explicit is better than
  implicit, right?

a|
* The semantic differences between commands and events can be subtle. Expect
  bikeshedding arguments over the differences.

* We're expressly inviting failure. We know that sometimes things will break, and
  we're choosing to handle that by making the failures smaller and more isolated.
  This can make the system harder to reason about and requires better monitoring.
  ((("commands", startref="ix_cmnd")))

|===

In <<chapter_11_external_events>> we'll talk about using events as an integration pattern.
// IDEA: discussion, can events raise commands?