Skip to content

Commit

Permalink
feat: Enabling remote pilot logging to Tornado.
Browse files Browse the repository at this point in the history
  • Loading branch information
martynia committed Jun 27, 2022
1 parent f70b1d7 commit 5a6f587
Show file tree
Hide file tree
Showing 8 changed files with 506 additions and 7 deletions.
89 changes: 89 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
""" :mod: PilotLoggingAgent
PilotLoggingAgent sends Pilot log files to an SE
"""

# # imports
import os, requests
from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Security.Locations import getHostCertificateAndKeyLocation, getCAsLocation
from DIRAC.DataManagementSystem.Client.DataManager import DataManager


class PilotLoggingAgent(AgentModule):
"""
.. class:: PilotLoggingAgent
The agent sends completed pilot log files to permanent storage for analysis.
"""

def initialize(self):
"""
agent's initalisation. Use this agent's CS information to:
Determine what Defaults/Shifter shifter proxy to use.,
get the target SE name from the CS.
Obtain log file location from Tornado.
:param self: self reference
"""

# get shifter proxy for uploads (VO-specific shifter from the Defaults CS section)
self.shifterName = self.am_getOption("ShifterName", "GridPPLogManager")
self.am_setOption("shifterProxy", self.shifterName)
self.uploadSE = self.am_getOption("UploadSE", "UKI-LT2-IC-HEP-disk")

certAndKeyLocation = getHostCertificateAndKeyLocation()
casLocation = getCAsLocation()

data = {"method": "getMetadata"}
self.server = self.am_getOption("DownloadLocation", None)

if not self.server:
return S_ERROR("No DownloadLocation set in the CS !")
try:
with requests.post(self.server, data=data, verify=casLocation, cert=certAndKeyLocation) as res:
if res.status_code not in (200, 202):
message = "Could not get metadata from %s: status %s" % (self.server, res.status_code)
self.log.error(message)
return S_ERROR(message)
resDict = res.json()
except Exception as exc:
message = "Call to server %s failed" % (self.server,)
self.log.exception(message, lException=exc)
return S_ERROR(message)
if resDict["OK"]:
meta = resDict["Value"]
self.pilotLogPath = meta["LogPath"]
else:
return S_ERROR(resDict["Message"])
self.log.info("Pilot log files location = %s " % self.pilotLogPath)
return S_OK()

def execute(self):
"""
Execute one agent cycle. Upload log files to the SE and register them in the DFC.
:param self: self reference
"""

self.log.info("Pilot files upload cycle started.")
files = [
f
for f in os.listdir(self.pilotLogPath)
if os.path.isfile(os.path.join(self.pilotLogPath, f)) and f.endswith("log")
]
for elem in files:
lfn = os.path.join("/gridpp/pilotlogs/", elem)
name = os.path.join(self.pilotLogPath, elem)
res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=self.uploadSE, overwrite=True)
if not res["OK"]:
self.log.error("Could not upload", "to %s: %s" % (self.uploadSE, res["Message"]))
else:
self.log.info("File uploaded: ", "LFN = %s" % res["Value"])
try:
pass
# os.remove(name)
except Exception as excp:
self.log.exception("Cannot remove a local file after uploading", lException=excp)
return S_OK()
7 changes: 5 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,14 +980,17 @@ def _getPilotOptions(self, queue, **kwargs):
else:
self.log.info("DIRAC project will be installed by pilots")

# Pilot Logging defined?
# Pilot Logging defined? This enables the extended (possibly remote) logger
pilotLogging = opsHelper.getValue("/Services/JobMonitoring/usePilotsLoggingFlag", False)
if pilotLogging:
pilotOptions.append("-z ")
# remote logger URL.
remoteLoggerURL = opsHelper.getValue("/Services/JobMonitoring/remoteLoggerURL", "localhost")
pilotOptions.append("-g %s" % remoteLoggerURL)

pilotOptions.append("--pythonVersion=3")

# Debug
# Debug. Both for the standard and (if enabled) extended logger.
if self.pilotLogLevel.lower() == "debug":
pilotOptions.append("-ddd")

Expand Down
15 changes: 15 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ Services
}
}
##END
TornadoPilotLogging
{
Protocol = https
Authorization
{
Default = authenticated
}
}
JobMonitoring
{
Port = 9130
Expand Down Expand Up @@ -190,6 +198,13 @@ Agents
PilotAccountingEnabled = yes
}
##END
##BEGIN PilotLoggingAgent
PilotLoggingAgent
{
PollingTime = 600
LogLevel = DEBUG
}
##END
JobAgent
{
FillingModeFlag = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Basic Pilot logging plugin. Just log messages.
"""
from DIRAC import S_OK, S_ERROR, gLogger

sLog = gLogger.getSubLogger(__name__)


class BasicPilotLoggingPlugin(object):
"""
This is a no-op fallback solution class, to be used when no plugin is defined for remote logging.
Any pilot logger plugin could inherit from this class to receive a set of no-op methods required by
:class:`TornadoPilotLoggingHandler` and only overwrite needed methods.
"""

def __init__(self):

sLog.warning("BasicPilotLoggingPlugin is being used. It only logs locally at a debug level.")

def sendMessage(self, message, UUID):
"""
Dummy sendMessage method.
:param message: text to log
:type message: str
:return: None
:rtype: None
"""
sLog.debug(message)
return S_OK("Message sent")

def finaliseLogs(self, payload, UUID):
"""
Dummy finaliseLogs method.
:param payload:
:type payload:
:return: S_OK or S_ERROR
:rtype: dict
"""

return S_OK("Finaliser!")

def getMeta(self):
"""
Get metadata dummy method.
:return: S_OK with an empty dict
:rtype: dict
"""
return S_OK({})
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Basic Pilot logging plugin. Just log messages.
"""
import os, json, re
from DIRAC import S_OK, S_ERROR, gLogger

sLog = gLogger.getSubLogger(__name__)


class FileCacheLoggingPlugin(object):
"""
File cache logging. Log records are appended to a file, one for each pilot.
It is assumed that an agent will be installed together with this plugin, which will copy
the files to a safe place and clear the cache.
"""

def __init__(self):
"""
Sets the pilot log files location for a WebServer.
"""
# UUID pattern
self.pattern = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$")
self.meta = {}
logPath = os.path.join(os.getcwd(), "pilotlogs")
self.meta["LogPath"] = logPath
if not os.path.exists(logPath):
os.makedirs(logPath)
sLog.info("Pilot logging directory:", logPath)

def sendMessage(self, message, pilotUUID):
"""
File cache sendMessage method. Write the log message to a file line by line.
:param message: text to log in json format
:type message: str
:return: S_OK or S_ERROR
:rtype: dict
"""

res = self.pattern.match(pilotUUID)
if not res:
sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (pilotUUID,))
return S_ERROR("Pilot UUID is invalid")

with open(os.path.join(self.meta["LogPath"], pilotUUID), "a") as pilotLog:
try:
messageContent = json.loads(message)
if isinstance(messageContent, list):
for elem in messageContent:
pilotLog.write(elem + "\n")
else:
# it could be a string, if emitted by pilot logger StringIO handler
pilotLog.write(messageContent)
except IOError as ioerr:
sLog.error("Error writing to log file:", str(ioerr))
return S_ERROR(str(ioerr))
return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,))

def finaliseLogs(self, payload, logfile):
"""
Finalise a log file. Finalised logfile can be copied to a secure location.
:param payload: additional info, a plugin might want to use (i.e. the system return code of a pilot script)
:type payload: dict
:param logfile: log filename (pilotUUID).
:type logfile: json representation of dict
:return: S_OK or S_ERROR
:rtype: dict
"""

returnCode = json.loads(payload).get("retCode", 0)
res = self.pattern.match(logfile)
if not res:
sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (logfile,))
return S_ERROR("Pilot UUID is invalid")

try:
filepath = self.meta["LogPath"]
os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log"))
sLog.info("Log file finalised for pilot: %s (return code: %s)" % (logfile, returnCode))
return S_OK()
except Exception as err:
sLog.exception("Exception when finalising log: ", err)
return S_ERROR(str(err))

def getMeta(self):
"""
Return any metadata related to this plugin. The "LogPath" is the minimum requirement for the dict to contain.
:return: Dirac S_OK containing the metadata or S_ERROR if the LogPath is not defined.
:rtype: dict
"""
if "LogPath" in self.meta:
return S_OK(self.meta)
return S_ERROR("No Pilot logging directory defined")
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
""" Tornado-based HTTPs JobMonitoring service.
"""


from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

__RCSID__ = "$Id$"

import os, json
from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.Tornado.Server.TornadoService import TornadoService
from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader

sLog = gLogger.getSubLogger(__name__)


class TornadoPilotLoggingHandler(TornadoService):
log = sLog

@classmethod
def initializeHandler(cls, infoDict):
"""
Called once, at the first request. Create a directory where pilot logs will be stored.
:param infoDict:
:return: None
"""

cls.log.info("Handler initialised ...")
cls.log.debug("with a dict: ", str(infoDict))
defaultOption, defaultClass = "LoggingPlugin", "BasicPilotLoggingPlugin"
configValue = getServiceOption(infoDict, defaultOption, defaultClass)

result = ObjectLoader().loadObject("WorkloadManagementSystem.Service.%s" % (configValue,), configValue)
if not result["OK"]:
cls.log.error("Failed to load LoggingPlugin", "%s: %s" % (configValue, result["Message"]))
return result

componentClass = result["Value"]
cls.loggingPlugin = componentClass()
cls.log.info("Loaded: PilotLoggingPlugin class", configValue)

cls.meta = {}
logPath = os.path.join(os.getcwd(), "pilotlogs")
cls.meta["LogPath"] = logPath
if not os.path.exists(logPath):
os.makedirs(logPath)
cls.log.info("Pilot logging directory:", logPath)

def initializeRequest(self):
"""
Called for each request.
:return: None
"""

self.log.info("Request initialised.. ")

auth_sendMessage = ["Operator", "Pilot", "GenericPilot"]

def export_sendMessage(self, message, pilotUUID):
# def export_sendMessage(self, message, pilotUUID):
"""
The method logs messages to Tornado and forwards pilot log files, one per pilot, to a relevant plugin.
The pilot is identified by its UUID.
:param message: message sent by a client, a list of strings in JSON format
:param pilotUUID: pilot UUID
:return: S_OK or S_ERROR if a plugin cannot process the message.
:rtype: dict
"""
## Insert your method here, don't forget the return should be serializable
## Returned value may be an S_OK/S_ERROR
## You don't need to serialize in JSON, Tornado will do it
# self.log.info("Message: ", message)
# the plugin returns S_OK or S_ERROR
# leave JSON decoding to the selected plugin:
result = self.loggingPlugin.sendMessage(message, pilotUUID)
return result

auth_getMetadata = ["Operator", "TrustedHost"]

def export_getMetadata(self):
"""
Get PilotLoggingHandler metadata. Intended to be used by a client or an agent.
:return: S_OK containing a metadata dictionary
"""
return self.loggingPlugin.getMeta()

auth_finaliseLogs = ["Operator", "Pilot", "GenericPilot"]

def export_finaliseLogs(self, payload, pilotUUID):
"""
Finalise a log file. Finalised logfile can be copied to a secure location, if a file cache is used.
:param payload: data passed to the plugin finaliser.
:type payload: dict
:param pilotUUID: pilot UUID
:return: S_OK or S_ERROR (via the plugin involved)
:rtype: dict
"""

# The plugin returns the Dirac S_OK or S_ERROR object

return self.loggingPlugin.finaliseLogs(payload, pilotUUID)
Loading

0 comments on commit 5a6f587

Please sign in to comment.