Skip to content

Commit

Permalink
More log cleanup (#982)
Browse files Browse the repository at this point in the history
* More log cleanup
* Add resource ids per log line
  • Loading branch information
bartv authored Mar 5, 2019
1 parent 6101cd9 commit ade36bc
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 58 deletions.
29 changes: 16 additions & 13 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ def execute(self, dummy, generation, cache):
start = datetime.datetime.now()
ctx = handler.HandlerContext(self.resource, logger=self.logger)

ctx.debug("Start run for resource %(resource)s because %(reason)s",
resource=str(self.resource.id),
deploy_id=self.gid,
agent=self.scheduler.agent.name,
reason=self.reason)
ctx.debug(
"Start run for resource %(resource)s because %(reason)s",
resource=str(self.resource.id),
deploy_id=self.gid,
agent=self.scheduler.agent.name,
reason=self.reason
)

self.running = True
if self.is_done():
Expand Down Expand Up @@ -228,19 +230,20 @@ def execute(self, dummy, generation, cache):
send_event = False
elif not result.success:
ctx.set_status(const.ResourceState.skipped)
ctx.info("Resource %(resource)s skipped due to failed dependency %(failed)s",
resource=str(self.resource.id),
failed=self.skipped_because(results))
ctx.info(
"Resource %(resource)s skipped due to failed dependency %(failed)s",
resource=str(self.resource.id),
failed=self.skipped_because(results)
)
success = False
send_event = False
yield self._execute(ctx=ctx, events=received_events, cache=cache, event_only=True, start=start)
else:
success, send_event = yield self._execute(ctx=ctx, events=received_events, cache=cache, start=start)

LOGGER.info("end run %s", self.resource)
ctx.debug("end run for resource %(resource)s with id %(deploy_id)s",
resource=str(self.resource.id),
deploy_id=self.gid)
ctx.debug(
"End run for resource %(resource)s in deploy %(deploy_id)s", resource=str(self.resource.id), deploy_id=self.gid
)

end = datetime.datetime.now()
changes = {str(self.resource.id): ctx.changes}
Expand Down Expand Up @@ -461,7 +464,7 @@ def mark_deployment_as_finished(self, resource_actions, reason, gid):

def notify_ready(self, resourceid, send_events, state, change, changes):
if resourceid not in self.cad:
self.logger.warning("received CAD notification that was not required, %s", resourceid)
# received CAD notification for which no resource are waiting, so return
return
self.cad[resourceid].notify(send_events, state, change, changes)

Expand Down
2 changes: 1 addition & 1 deletion src/inmanta/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ def deploy_version(self, key, mod, persist=False):
:param version The version of the deployed modules
:modules modules A list of module names and the hashes of the code files
"""
LOGGER.info("Deploying code (key=%s)" % key)
# deploy the new code
name = mod[1]
source_code = mod[2]

# if the module is new, or update
if name not in self.__modules or key != self.__modules[name][0]:
LOGGER.info("Deploying code (key=%s, module=%s)", key, mod[1])
# write the new source
source_file = os.path.join(self.__code_dir, MODULE_DIR, name + ".py")

Expand Down
9 changes: 7 additions & 2 deletions src/inmanta/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,13 @@ def validate_fact_renew(value):
server_purge_resource_action_logs_interval = Option("server", "purge-resource-action-logs-interval", 3600,
"The number of seconds between resource-action log purging", is_time)

server_resource_action_log = Option("server", "resource_action_log", "resource-actions.log",
"File in log-dir, containing the resource-action logs", is_str_opt)
server_resource_action_log_prefix = Option(
"server",
"resource_action_log_prefix",
"resource-actions-",
"File prefix in log-dir, containing the resource-action logs. The after the prefix the environment uuid and .log is added",
is_str_opt
)


#############################
Expand Down
175 changes: 135 additions & 40 deletions src/inmanta/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from uuid import UUID
import uuid
import shutil
import json

import dateutil
import pymongo
Expand All @@ -42,17 +43,40 @@
from inmanta.ast import type
from inmanta.resources import Id
from inmanta.server import config as opt
import json
from inmanta.util import hash_file
from inmanta.const import UNDEPLOYABLE_STATES
from inmanta.protocol import encode_token, methods

from typing import List

LOGGER = logging.getLogger(__name__)
agent_lock = locks.Lock()

DBLIMIT = 100000


class ResourceActionLogLine(logging.LogRecord):
""" A special log record that is used to report log lines that come from the agent
"""
def __init__(self, logger_name: str, level: str, msg: str, created: datetime.datetime) -> None:
super().__init__(
name=logger_name,
level=level,
pathname="(unknown file)",
lineno=0,
msg=msg,
args=[],
exc_info=None,
func=None,
sinfo=None
)

self.created = created.timestamp()
self.created = self.created
self.msecs = (self.created - int(self.created)) * 1000
self.relativeCreated = (self.created - logging._startTime) * 1000


class Server(protocol.ServerSlice):
"""
The central Inmanta server that communicates with clients and agents and persists configuration
Expand All @@ -75,17 +99,15 @@ def __init__(self, database_host=None, database_port=None, agent_no_log=False):
self._database_host = database_host
self._database_port = database_port

self._resource_action_logger = None
self._resource_action_file_handler = None
self._resource_action_loggers: Dict[uuid.UUID, logging.Logger] = {}
self._resource_action_handlers: Dict[uuid.UUID, logging.Handler] = {}

@gen.coroutine
def prestart(self, server):
self.agentmanager = server.get_slice("agentmanager")

@gen.coroutine
def start(self):
yield self._create_resource_action_logger()

if self._database_host is None:
self._database_host = opt.db_host.get()

Expand All @@ -108,32 +130,81 @@ def start(self):
@gen.coroutine
def stop(self):
yield super().stop()
yield self._close_resource_action_logger()
self._close_resource_action_loggers()

@gen.coroutine
def _create_resource_action_logger(self):
resource_action_log = os.path.join(opt.log_dir.get(), opt.server_resource_action_log.get())
self._file_handler = logging.handlers.WatchedFileHandler(filename=resource_action_log, mode='a+')
formatter = logging.Formatter(fmt="%(asctime)s %(levelname)-8s %(message)s")
self._file_handler.setFormatter(formatter)
self._file_handler.setLevel(logging.DEBUG)
self._resource_action_logger = logging.getLogger(const.NAME_RESOURCE_ACTION_LOGGER)
self._resource_action_logger.setLevel(logging.DEBUG)
self._resource_action_logger.addHandler(self._file_handler)
@staticmethod
def get_resource_action_log_file(environment: uuid.UUID) -> str:
"""Get the correct filename for the given environment
:param environment: The environment id to get the file for
:return: The path to the logfile
"""
return os.path.join(
opt.log_dir.get(),
opt.server_resource_action_log_prefix.get() + str(environment) + ".log"
)

def get_resource_action_logger(self, environment: uuid.UUID) -> logging.Logger:
"""Get the resource action logger for the given environment. If the logger was not created, create it.
:param environment: The environment to get a logger for
:return: The logger for the given environment.
"""
if environment in self._resource_action_loggers:
return self._resource_action_loggers[environment]

@gen.coroutine
def _close_resource_action_logger(self):
if self._resource_action_logger:
logger_copy = self._resource_action_logger
self._resource_action_logger = None
logger_copy.removeHandler(self._file_handler)
self._file_handler.flush()
self._file_handler.close()
self._file_handler = None
resource_action_log = self.get_resource_action_log_file(environment)

file_handler = logging.handlers.WatchedFileHandler(filename=resource_action_log, mode='a+')
# Most logs will come from agents. We need to use their level and timestamp and their formatted message
file_handler.setFormatter(logging.Formatter(fmt="%(message)s"))
file_handler.setLevel(logging.DEBUG)

resource_action_logger = logging.getLogger(const.NAME_RESOURCE_ACTION_LOGGER).getChild(str(environment))
resource_action_logger.setLevel(logging.DEBUG)
resource_action_logger.addHandler(file_handler)

self._resource_action_loggers[environment] = resource_action_logger
self._resource_action_handlers[environment] = file_handler

def _write_to_resource_action_log(self, log_line):
if self._resource_action_logger:
log_line.write_to_logger(self._resource_action_logger)
return resource_action_logger

def _close_resource_action_loggers(self) -> None:
"""Close all resource action loggers and their associated handlers"""
try:
while True:
env, logger = self._resource_action_loggers.popitem()
self._close_resource_action_logger(env, logger)
except KeyError:
pass

def _close_resource_action_logger(self, env: uuid.UUID, logger: logging.Logger = None) -> None:
"""Close the given logger for the given env.
:param env: The environment to close the logger for
:param logger: The logger to close, if the logger is none it is retrieved
"""
if logger is None:
if env in self._resource_action_loggers:
logger = self._resource_action_loggers.pop(env)
else:
return

handler = self._resource_action_handlers.pop(env)
logger.removeHandler(handler)
handler.flush()
handler.close()

def log_resource_action(
self, env: uuid.UUID, resource_ids: List[str], log_level: int, ts: datetime.datetime, message: str
) -> None:
"""Write the given log to the correct resource action logger"""
logger = self.get_resource_action_logger(env)
if len(resource_ids) == 0:
message = "no resources: " + message
elif len(resource_ids) > 1:
message = "multiple resources: " + message
else:
message = resource_ids[0] + ": " + message
log_record = ResourceActionLogLine(logger.name, log_level, message, ts)
logger.handle(log_record)

def get_agent_client(self, tid: UUID, endpoint):
return self.agentmanager.get_agent_client(tid, endpoint)
Expand Down Expand Up @@ -709,7 +780,7 @@ def get_all_resources_for_agent(self, env: Environment, agent: str, version: str
now = datetime.datetime.now()

log_line = data.LogLine.log(logging.INFO, "Resource version pulled by client for agent %(agent)s state", agent=agent)
self._write_to_resource_action_log(log_line)
self.log_resource_action(env.id, resource_ids, logging.INFO, now, log_line.msg)
ra = data.ResourceAction(environment=env.id, resource_version_ids=resource_ids, action=const.ResourceAction.pull,
action_id=uuid.uuid4(), started=started, finished=now, messages=[log_line])
yield ra.insert()
Expand Down Expand Up @@ -942,11 +1013,18 @@ def safe_get(input, key, default):
for agent in agents:
yield self.agentmanager.ensure_agent_registered(env, agent)

now = datetime.datetime.now()
log_line = data.LogLine.log(logging.INFO, "Successfully stored version %(version)d", version=version)
self._write_to_resource_action_log(log_line)
ra = data.ResourceAction(environment=env.id, resource_version_ids=resource_version_ids, action_id=uuid.uuid4(),
action=const.ResourceAction.store, started=started, finished=datetime.datetime.now(),
messages=[log_line])
self.log_resource_action(env.id, resource_version_ids, logging.INFO, now, log_line.msg)
ra = data.ResourceAction(
environment=env.id,
resource_version_ids=resource_version_ids,
action_id=uuid.uuid4(),
action=const.ResourceAction.store,
started=started,
finished=now,
messages=[log_line]
)
yield ra.insert()
LOGGER.debug("Successfully stored version %d", version)

Expand Down Expand Up @@ -995,7 +1073,12 @@ def release_version(self, env, version_id, push, agent_trigger_method=None):

# all resources already deployed
deployed = yield model.get_increment(negative=True)
logline = {"level": "INFO", "msg": "Setting deployed due to known good status", "timestamp": now, "args": []}
logline = {
"level": "INFO",
"msg": "Setting deployed due to known good status",
"timestamp": now.isoformat(),
"args": []
}
yield self.resource_action_update(env, deployed, action_id=uuid.uuid4(),
started=now, finished=now, status=const.ResourceState.deployed,
# does this require a different ResourceAction?
Expand Down Expand Up @@ -1233,11 +1316,15 @@ def resource_action_update(self, env, resource_ids, action_id, action, started,

if len(messages) > 0:
resource_action.add_logs(messages)
if self._resource_action_logger:
for msg in messages:
# All other data is stored in the database. The msg was already formatted at the client side.
# The only way to disable formatting is, by not passing any args
self._resource_action_logger.log(level=const.LogLevel[msg["level"]].value, msg=msg["msg"])
for msg in messages:
# All other data is stored in the database. The msg was already formatted at the client side.
self.log_resource_action(
env.id,
resource_ids,
const.LogLevel[msg["level"]].value,
datetime.datetime.strptime(msg["timestamp"], const.TIME_ISOFMT),
msg["msg"]
)

if len(changes) > 0:
resource_action.add_changes(changes)
Expand Down Expand Up @@ -1310,7 +1397,13 @@ def delete_project(self, project_id):
if project is None:
return 404, {"message": "The project with given id does not exist."}

yield project.delete_cascade()
environments = yield Environment.get_list(project=project.id)
for env in environments:
yield env.delete_cascade()
self._close_resource_action_logger(env)

yield project.delete()

return 200, {}

@protocol.handle(methods.modify_project, project_id="id")
Expand Down Expand Up @@ -1438,6 +1531,8 @@ def delete_environment(self, environment_id):

yield env.delete_cascade()

self._close_resource_action_logger(environment_id)

return 200

@protocol.handle(methods.list_settings, env="tid")
Expand Down
4 changes: 2 additions & 2 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import pytest
from inmanta.agent.agent import Agent
from inmanta import data, const, config
from inmanta.server import config as opt, SLICE_AGENT_MANAGER, SLICE_SESSION_MANAGER
from inmanta.server import config as opt, SLICE_AGENT_MANAGER, SLICE_SESSION_MANAGER, server
from datetime import datetime
from uuid import UUID
from inmanta.util import hash_file
Expand Down Expand Up @@ -1054,7 +1054,7 @@ async def test_resource_action_log(motor, server_multi, client_multi, environmen
res = await client_multi.put_version(tid=environment, version=version, resources=resources, unknowns=[], version_info={})
assert res.code == 200

resource_action_log = os.path.join(opt.log_dir.get(), opt.server_resource_action_log.get())
resource_action_log = server.Server.get_resource_action_log_file(environment)
assert os.path.isfile(resource_action_log)
assert os.stat(resource_action_log).st_size != 0

Expand Down

0 comments on commit ade36bc

Please sign in to comment.