Skip to content

Commit

Permalink
Rip out master handler mechanism
Browse files Browse the repository at this point in the history
All events are now handled by addons, and we no longer support any events on
master.
  • Loading branch information
cortesi committed Mar 16, 2017
1 parent 6646a30 commit 3d2c289
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 246 deletions.
27 changes: 27 additions & 0 deletions mitmproxy/addonmanager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from mitmproxy import exceptions
from mitmproxy import eventsequence
from mitmproxy import controller
from . import ctx
import pprint

Expand Down Expand Up @@ -60,6 +61,32 @@ def __len__(self):
def __str__(self):
return pprint.pformat([str(i) for i in self.chain])

def handle_lifecycle(self, name, message):
"""
Handle a lifecycle event.
"""
if not hasattr(message, "reply"): # pragma: no cover
raise exceptions.ControlException(
"Message %s has no reply attribute" % message
)

# We can use DummyReply objects multiple times. We only clear them up on
# the next handler so that we can access value and state in the
# meantime.
if isinstance(message.reply, controller.DummyReply):
message.reply.reset()

self.trigger(name, message)

if message.reply.state != "taken":
message.reply.take()
if not message.reply.has_message:
message.reply.ack()
message.reply.commit()

if isinstance(message.reply, controller.DummyReply):
message.reply.mark_reset()

def invoke_addon(self, addon, name, *args, **kwargs):
"""
Invoke an event on an addon. This method must run within an
Expand Down
112 changes: 33 additions & 79 deletions mitmproxy/controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import functools
import queue
from mitmproxy import exceptions

Expand All @@ -14,8 +13,8 @@ def __init__(self, q, should_exit):

def ask(self, mtype, m):
"""
Decorate a message with a reply attribute, and send it to the
master. Then wait for a response.
Decorate a message with a reply attribute, and send it to the master.
Then wait for a response.
Raises:
exceptions.Kill: All connections should be closed immediately.
Expand All @@ -36,131 +35,86 @@ def ask(self, mtype, m):

def tell(self, mtype, m):
"""
Decorate a message with a dummy reply attribute, send it to the
master, then return immediately.
Decorate a message with a dummy reply attribute, send it to the master,
then return immediately.
"""
m.reply = DummyReply()
self.q.put((mtype, m))


def handler(f):
@functools.wraps(f)
def wrapper(master, message):
if not hasattr(message, "reply"):
raise exceptions.ControlException("Message %s has no reply attribute" % message)

# DummyReplys may be reused multiple times.
# We only clear them up on the next handler so that we can access value and
# state in the meantime.
if isinstance(message.reply, DummyReply):
message.reply.reset()

# The following ensures that inheritance with wrapped handlers in the
# base class works. If we're the first handler, then responsibility for
# acking is ours. If not, it's someone else's and we ignore it.
handling = False
# We're the first handler - ack responsibility is ours
if message.reply.state == "unhandled":
handling = True
message.reply.handle()

with master.handlecontext():
ret = f(master, message)
master.addons.trigger(f.__name__, message)

# Reset the handled flag - it's common for us to feed the same object
# through handlers repeatedly, so we don't want this to persist across
# calls.
if handling and message.reply.state == "handled":
message.reply.take()
if not message.reply.has_message:
message.reply.ack()
message.reply.commit()

# DummyReplys may be reused multiple times.
if isinstance(message.reply, DummyReply):
message.reply.mark_reset()
return ret
# Mark this function as a handler wrapper
wrapper.__dict__["__handler"] = True
return wrapper


NO_REPLY = object() # special object we can distinguish from a valid "None" reply.


class Reply:
"""
Messages sent through a channel are decorated with a "reply" attribute.
This object is used to respond to the message through the return
channel.
Messages sent through a channel are decorated with a "reply" attribute. This
object is used to respond to the message through the return channel.
"""
def __init__(self, obj):
self.obj = obj
self.q = queue.Queue() # type: queue.Queue

self._state = "unhandled" # "unhandled" -> "handled" -> "taken" -> "committed"
self.value = NO_REPLY # holds the reply value. May change before things are actually commited.
self._state = "start" # "start" -> "taken" -> "committed"

# Holds the reply value. May change before things are actually commited.
self.value = NO_REPLY

@property
def state(self):
"""
The state the reply is currently in. A normal reply object goes sequentially through the following lifecycle:
The state the reply is currently in. A normal reply object goes
sequentially through the following lifecycle:
1. unhandled: Initial State.
2. handled: The reply object has been handled by the topmost handler function.
3. taken: The reply object has been taken to be commited.
4. committed: The reply has been sent back to the requesting party.
1. start: Initial State.
2. taken: The reply object has been taken to be commited.
3. committed: The reply has been sent back to the requesting party.
This attribute is read-only and can only be modified by calling one of state transition functions.
This attribute is read-only and can only be modified by calling one of
state transition functions.
"""
return self._state

@property
def has_message(self):
return self.value != NO_REPLY

def handle(self):
"""
Reply are handled by controller.handlers, which may be nested. The first handler takes
responsibility and handles the reply.
"""
if self.state != "unhandled":
raise exceptions.ControlException("Reply is {}, but expected it to be unhandled.".format(self.state))
self._state = "handled"

def take(self):
"""
Scripts or other parties make "take" a reply out of a normal flow.
For example, intercepted flows are taken out so that the connection thread does not proceed.
"""
if self.state != "handled":
raise exceptions.ControlException("Reply is {}, but expected it to be handled.".format(self.state))
if self.state != "start":
raise exceptions.ControlException(
"Reply is {}, but expected it to be start.".format(self.state)
)
self._state = "taken"

def commit(self):
"""
Ultimately, messages are commited. This is done either automatically by the handler
if the message is not taken or manually by the entity which called .take().
Ultimately, messages are commited. This is done either automatically by
if the message is not taken or manually by the entity which called
.take().
"""
if self.state != "taken":
raise exceptions.ControlException("Reply is {}, but expected it to be taken.".format(self.state))
raise exceptions.ControlException(
"Reply is {}, but expected it to be taken.".format(self.state)
)
if not self.has_message:
raise exceptions.ControlException("There is no reply message.")
self._state = "committed"
self.q.put(self.value)

def ack(self, force=False):
if self.state not in {"start", "taken"}:
raise exceptions.ControlException(
"Reply is {}, but expected it to be start or taken.".format(self.state)
)
self.send(self.obj, force)

def kill(self, force=False):
self.send(exceptions.Kill, force)

def send(self, msg, force=False):
if self.state not in ("handled", "taken"):
raise exceptions.ControlException(
"Reply is {}, did not expect a call to .send().".format(self.state)
)
if self.has_message and not force:
raise exceptions.ControlException("There is already a reply message.")
self.value = msg
Expand All @@ -174,7 +128,7 @@ def __del__(self):
class DummyReply(Reply):
"""
A reply object that is not connected to anything. In contrast to regular
Reply objects, DummyReply objects are reset to "unhandled" at the end of an
Reply objects, DummyReply objects are reset to "start" at the end of an
handler so that they can be used multiple times. Useful when we need an
object to seem like it has a channel, and during testing.
"""
Expand All @@ -189,7 +143,7 @@ def mark_reset(self):

def reset(self):
if self._should_reset:
self._state = "unhandled"
self._state = "start"
self.value = NO_REPLY

def __del__(self):
Expand Down
7 changes: 4 additions & 3 deletions mitmproxy/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,17 @@ def revert(self):

@property
def killable(self):
return self.reply and self.reply.state in {"handled", "taken"}
return self.reply and self.reply.state == "taken"

def kill(self):
"""
Kill this request.
"""
self.error = Error("Connection killed")
self.intercepted = False
# reply.state should only be "handled" or "taken" here.
# if none of this is the case, .take() will raise an exception.

# reply.state should be "taken" here, or .take() will raise an
# exception.
if self.reply.state != "taken":
self.reply.take()
self.reply.kill(force=True)
Expand Down
99 changes: 2 additions & 97 deletions mitmproxy/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,7 @@ def tick(self, timeout):
raise exceptions.ControlException(
"Unknown event %s" % repr(mtype)
)
handle_func = getattr(self, mtype)
if not callable(handle_func):
raise exceptions.ControlException(
"Handler %s not callable" % mtype
)
if not handle_func.__dict__.get("__handler"):
raise exceptions.ControlException(
"Handler function %s is not decorated with controller.handler" % (
handle_func
)
)
handle_func(obj)
self.addons.handle_lifecycle(mtype, obj)
self.event_queue.task_done()
changed = True
except queue.Empty:
Expand Down Expand Up @@ -153,7 +142,7 @@ def load_flow(self, f):
f.request.scheme = self.server.config.upstream_server.scheme
f.reply = controller.DummyReply()
for e, o in eventsequence.iterate(f):
getattr(self, e)(o)
self.addons.handle_lifecycle(e, o)

def replay_request(
self,
Expand Down Expand Up @@ -209,87 +198,3 @@ def replay_request(
if block:
rt.join()
return rt

@controller.handler
def log(self, l):
pass

@controller.handler
def clientconnect(self, root_layer):
pass

@controller.handler
def clientdisconnect(self, root_layer):
pass

@controller.handler
def serverconnect(self, server_conn):
pass

@controller.handler
def serverdisconnect(self, server_conn):
pass

@controller.handler
def next_layer(self, top_layer):
pass

@controller.handler
def http_connect(self, f):
pass

@controller.handler
def error(self, f):
pass

@controller.handler
def requestheaders(self, f):
pass

@controller.handler
def request(self, f):
pass

@controller.handler
def responseheaders(self, f):
pass

@controller.handler
def response(self, f):
pass

@controller.handler
def websocket_handshake(self, f):
pass

@controller.handler
def websocket_start(self, flow):
pass

@controller.handler
def websocket_message(self, flow):
pass

@controller.handler
def websocket_error(self, flow):
pass

@controller.handler
def websocket_end(self, flow):
pass

@controller.handler
def tcp_start(self, flow):
pass

@controller.handler
def tcp_message(self, flow):
pass

@controller.handler
def tcp_error(self, flow):
pass

@controller.handler
def tcp_end(self, flow):
pass
Loading

0 comments on commit 3d2c289

Please sign in to comment.