Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test no backlog #79

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 150 additions & 116 deletions src/shotgunEventDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,19 @@ class Engine(object):
The engine holds the main loop of event processing.
"""

ele_fields = [
"id",
"event_type",
"attribute_name",
"meta",
"entity",
"user",
"project",
"session_uuid",
"created_at",
]

def __init__(self, configPath):
"""
"""
self._continue = True
self._eventIdData = {}

Expand Down Expand Up @@ -456,10 +466,9 @@ def _loadEventIdData(self):
for collection in self._pluginCollections:
collection.setState(lastEventId)

self._saveEventIdData()
self._saveEventData()

def _getLastEventIdFromDatabase(self):

conn_attempts = 0
lastEventId = None
while lastEventId is None:
Expand Down Expand Up @@ -507,10 +516,76 @@ def _mainLoop(self):
while self._continue:
# Process events
events = self._getNewEvents()
skipPlugins = []

for event in events:
for collection in self._pluginCollections:
collection.process(event)
self._saveEventIdData()
for plugin in collection:
pluginName = plugin.getName()

if pluginName in skipPlugins:
self.log.debug(
"Not processing event #%d. Plugin %s is waiting for a gap to fill.",
event["id"],
pluginName,
)
continue

if not plugin.isActive():
self.log.debug(
"Not processing event #%d. Plugin %s is inactive.",
event["id"],
pluginName,
)
continue

nextEventId = plugin.getNextUnprocessedEventId()

if nextEventId is None:
msg = (
"Plugin %s has no last event marker. Sending event #%d."
pboucher marked this conversation as resolved.
Show resolved Hide resolved
)
self.log.debug(msg, pluginName, event["id"])
plugin.process(event)
elif event["id"] < nextEventId:
msg = "Last processed event for plugin %s is #%d, got #%d. Skipping."
pboucher marked this conversation as resolved.
Show resolved Hide resolved
self.log.debug(
msg, pluginName, plugin.lastEvent["id"], event["id"]
)
elif event["id"] == nextEventId:
self.log.debug(
"Sending expected event #%d to plugin %s.",
event["id"],
pluginName,
)
plugin.process(event)
elif event["id"] > nextEventId:
msg = "There is an event log entry gap between #%d (%s) and #%d (%s) for plugin %s."
self.log.debug(
msg,
plugin.lastEvent["id"],
plugin.lastEvent["created_at"],
event["id"],
event["created_at"],
pluginName,
)

delta = event["created_at"] - plugin.lastEvent["created_at"]
if delta >= datetime.timedelta(seconds=5 * 60):
msg = "Gap delta is %s. Assuming events in the gap won't show up, sending event to plugin %s."
pboucher marked this conversation as resolved.
Show resolved Hide resolved
self.log.debug(msg, pluginName)
plugin.process(event)
else:
msg = "Gap delta is %s. Will wait until the gap fills up or the timeout is hit."
pboucher marked this conversation as resolved.
Show resolved Hide resolved
self.log.debug(msg, delta)

# Skip this plugin for the rest of the event loop.
# We do this to make sure no events in the same batch are processed leading
# to them being available _and_ over the timeout causing events to be dropped
skipPlugins.append(plugin.getName())

# After all collections have been processed, save waypoint to disk
self._saveEventData()

# if we're lagging behind Shotgun, we received a full batch of events
# skip the sleep() call in this case
Expand All @@ -537,25 +612,20 @@ def _getNewEvents(self):
@rtype: I{list} of Shotgun event dictionaries.
"""
nextEventId = None
for newId in [
coll.getNextUnprocessedEventId() for coll in self._pluginCollections
]:
if newId is not None and (nextEventId is None or newId < nextEventId):
nextEventId = newId
nextIds = [
i
for i in [
coll.getNextUnprocessedEventId() for coll in self._pluginCollections
]
if i is not None
]
if nextIds:
nextEventId = min(nextIds)

# self.log.debug("Lowest id found in all collections: %s", nextEventId)

if nextEventId is not None:
filters = [["id", "greater_than", nextEventId - 1]]
fields = [
"id",
"event_type",
"attribute_name",
"meta",
"entity",
"user",
"project",
"session_uuid",
"created_at",
]
order = [{"column": "id", "direction": "asc"}]

conn_attempts = 0
Expand All @@ -564,7 +634,7 @@ def _getNewEvents(self):
events = self._sg.find(
"EventLogEntry",
filters,
fields,
self.ele_fields,
order,
limit=self.config.getMaxEventBatchSize(),
)
Expand All @@ -586,7 +656,7 @@ def _getNewEvents(self):

return []

def _saveEventIdData(self):
def _saveEventData(self):
"""
Save an event Id to persistant storage.

Expand Down Expand Up @@ -648,42 +718,39 @@ def __init__(self, engine, path):
self._engine = engine
self.path = path
self._plugins = {}
self._stateData = {}

def setState(self, state):
if isinstance(state, int):
for plugin in self:
plugin.setState(state)
self._stateData[plugin.getName()] = plugin.getState()
else:
self._stateData = state
for plugin in self:
pluginState = self._stateData.get(plugin.getName())
pluginState = state.get(plugin.getName())
if pluginState:
plugin.setState(pluginState)

def getState(self):
state = {}
for plugin in self:
self._stateData[plugin.getName()] = plugin.getState()
return self._stateData
state[plugin.getName()] = plugin.getState()
return state

def getNextUnprocessedEventId(self):
eId = None
for plugin in self:
if not plugin.isActive():
continue

newId = plugin.getNextUnprocessedEventId()
if newId is not None and (eId is None or newId < eId):
eId = newId
return eId
"""
Get the next unprocessed event id across this entire plugin collection.

def process(self, event):
for plugin in self:
if plugin.isActive():
plugin.process(event)
else:
plugin.logger.debug("Skipping: inactive.")
@return: The lowest unprocessed id of all plugins or None if all plugins
have not processed any events.
@rtype: Int or None
"""
ids = [
i
for i in [p.getNextUnprocessedEventId() for p in self if p.isActive()]
if i is not None
]
if ids:
return min(ids)
return None

def load(self):
"""
Expand Down Expand Up @@ -742,8 +809,7 @@ def __init__(self, engine, path):
self._active = True
self._callbacks = []
self._mtime = None
self._lastEventId = None
self._backlog = {}
self.lastEvent = None

# Setup the plugin's logger
self.logger = logging.getLogger("plugin." + self.getName())
Expand All @@ -759,32 +825,47 @@ def getName(self):
return self._pluginName

def setState(self, state):
if isinstance(state, int):
self._lastEventId = state
# self.logger.debug("Setting state with %s.", state)

if isinstance(state, dict):
# The is an event log entry object
pboucher marked this conversation as resolved.
Show resolved Hide resolved
self.lastEvent = state
elif isinstance(state, int):
# The state is the id of an event log entry
self.lastEvent = self._eventFromId(state)
elif isinstance(state, tuple):
self._lastEventId, self._backlog = state
# The state is a tuple where the first element is the last id processed
# and the second element is a backlog. Scour through the backlog to find
# the oldest value and save the state as the oldest backlog value or
# the last processed id, whichever is oldest
lastEventId, backlog = state
backlogIds = list(backlog) + [lastEventId]
self.lastEvent = self._eventFromId(min(backlogIds))
else:
raise ValueError("Unknown state type: %s." % type(state))

def getState(self):
return (self._lastEventId, self._backlog)
def _eventFromId(self, entity_id):
filters = [["id", "is", entity_id]]
order = [{"field_name": "id", "direction": "desc"}]
event = self._engine._sg.find_one(
"EventLogEntry", filters, self._engine.ele_fields, order=order
)
if not event:
filters = [["id", "less_than", entity_id + 1]]
event = self._engine._sg.find_one(
"EventLogEntry", filters, self._engine.ele_fields, order=order
)

def getNextUnprocessedEventId(self):
if self._lastEventId:
nextId = self._lastEventId + 1
else:
nextId = None
# self.logger.debug("Set plugin state from int to %s", event)
return event

now = datetime.datetime.now()
for k in list(self._backlog):
v = self._backlog[k]
if v < now:
self.logger.warning("Timeout elapsed on backlog event id %d.", k)
del self._backlog[k]
elif nextId is None or k < nextId:
nextId = k
def getState(self):
return self.lastEvent

return nextId
def getNextUnprocessedEventId(self):
if not self.lastEvent:
return None
return self.lastEvent["id"] + 1

def isActive(self):
"""
Expand Down Expand Up @@ -896,21 +977,6 @@ def registerCallback(
)

def process(self, event):
if event["id"] in self._backlog:
if self._process(event):
self.logger.info("Processed id %d from backlog." % event["id"])
del self._backlog[event["id"]]
self._updateLastEventId(event)
elif self._lastEventId is not None and event["id"] <= self._lastEventId:
msg = "Event %d is too old. Last event processed was (%d)."
self.logger.debug(msg, event["id"], self._lastEventId)
else:
if self._process(event):
self._updateLastEventId(event)

return self._active

def _process(self, event):
for callback in self:
if callback.isActive():
if callback.canProcess(event):
Expand All @@ -925,40 +991,10 @@ def _process(self, event):
msg = "Skipping inactive callback %s in plugin."
self.logger.debug(msg, str(callback))

return self._active
if self._active:
self.lastEvent = event

def _updateLastEventId(self, event):
BACKLOG_TIMEOUT = (
5 # time in minutes after which we consider a pending event won't happen
)
if self._lastEventId is not None and event["id"] > self._lastEventId + 1:
event_date = event["created_at"].replace(tzinfo=None)
if datetime.datetime.now() > (
event_date + datetime.timedelta(minutes=BACKLOG_TIMEOUT)
):
# the event we've just processed happened more than BACKLOG_TIMEOUT minutes ago so any event
# with a lower id should have shown up in the EventLog by now if it actually happened
if event["id"] == self._lastEventId + 2:
self.logger.info(
"Event %d never happened - ignoring.", self._lastEventId + 1
)
else:
self.logger.info(
"Events %d-%d never happened - ignoring.",
self._lastEventId + 1,
event["id"] - 1,
)
else:
# in this case, we want to add the missing events to the backlog as they could show up in the
# EventLog within BACKLOG_TIMEOUT minutes, during which we'll keep asking for the same range
# them to show up until they expire
expiration = datetime.datetime.now() + datetime.timedelta(
minutes=BACKLOG_TIMEOUT
)
for skippedId in range(self._lastEventId + 1, event["id"]):
self.logger.info("Adding event id %d to backlog.", skippedId)
self._backlog[skippedId] = expiration
self._lastEventId = event["id"]
return self._active

def __iter__(self):
"""
Expand Down Expand Up @@ -1334,8 +1370,6 @@ def _cleanup(self):


def main():
"""
"""
if CURRENT_PYTHON_VERSION <= PYTHON_26:
print(
"Python 2.5 and older is not supported anymore. Please use Python 2.6 or newer."
Expand Down