diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 053846adc6e..18077b741a3 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -984,9 +984,9 @@ def _getPilotOptions(self, queue, **kwargs): pilotLogging = opsHelper.getValue("/Services/JobMonitoring/usePilotsLoggingFlag", False) if pilotLogging: pilotOptions.append("-z ") - # internal extended logger logging to debug the logger itself. - extLoggingLevel = opsHelper.getValue("/Services/JobMonitoring/extLoggerLoggingLevel", "WARNING") - pilotOptions.append("-g %s" % extLoggingLevel) + # remote logger URL. + remoteLoggerURL = opsHelper.getValue("/Services/JobMonitoring/remoteLoggerURL", "localhost") + pilotOptions.append("-g %s" % remoteLoggerURL) pilotOptions.append("--pythonVersion=3") diff --git a/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py index 8452e0a0851..25b4774cc71 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/BasicPilotLoggingPlugin.py @@ -17,7 +17,7 @@ def __init__(self): sLog.warning("BasicPilotLoggingPlugin is being used. It only logs locally at a debug level.") - def sendMessage(self, message): + def sendMessage(self, message, UUID): """ Dummy sendMessage method. @@ -29,7 +29,7 @@ def sendMessage(self, message): sLog.debug(message) return S_OK("Message sent") - def finaliseLogs(self, payload): + def finaliseLogs(self, payload, UUID): """ Dummy finaliseLogs method. diff --git a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py index ea19624b20a..b3aac55d2a0 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py @@ -28,18 +28,15 @@ def __init__(self): os.makedirs(logPath) sLog.info("Pilot logging directory:", logPath) - def sendMessage(self, message): + def sendMessage(self, message, pilotUUID): """ - File cache sendMessage method. + File cache sendMessage method. Write the log message to a file line by line. - :param message: text to log + :param message: text to log in json format :type message: str - :return: None - :rtype: None + :return: S_OK or S_ERROR + :rtype: dict """ - sLog.info(message) - messageDict = json.loads(message) - pilotUUID = messageDict.get("pilotUUID", "Unspecified_ID") res = self.pattern.match(pilotUUID) if not res: @@ -48,23 +45,31 @@ def sendMessage(self, message): with open(os.path.join(self.meta["LogPath"], pilotUUID), "a") as pilotLog: try: - pilotLog.write(message + "\n") + messageContent = json.loads(message) + if isinstance(messageContent, list): + for elem in messageContent: + pilotLog.write(elem + "\n") + else: + # it could be a string, if emitted by pilot logger StringIO handler + pilotLog.write(messageContent) except IOError as ioerr: sLog.error("Error writing to log file:", str(ioerr)) return S_ERROR(str(ioerr)) return S_OK("Message logged successfully for pilot: %s" % (pilotUUID,)) - def finaliseLogs(self, payload): + def finaliseLogs(self, payload, logfile): """ Finalise a log file. Finalised logfile can be copied to a secure location. - :param logfile: payload containing log filename. + :param payload: additional info, a plugin might want to use (i.e. the system return code of a pilot script) + :type payload: dict + :param logfile: log filename (pilotUUID). :type logfile: json representation of dict :return: S_OK or S_ERROR :rtype: dict """ - logfile = json.loads(payload).get("pilotUUID", "Unspecified_ID") + returnCode = json.loads(payload).get("retCode", 0) res = self.pattern.match(logfile) if not res: sLog.error("Pilot UUID does not match the UUID pattern: ", "%s" % (logfile,)) @@ -73,8 +78,10 @@ def finaliseLogs(self, payload): try: filepath = self.meta["LogPath"] os.rename(os.path.join(filepath, logfile), os.path.join(filepath, logfile + ".log")) - return S_OK("Log file finalised for pilot: %s" % (logfile,)) + sLog.info("Log file finalised for pilot: %s (return code: %s)" % (logfile, returnCode)) + return S_OK() except Exception as err: + sLog.exception("Exception when finalising log: ", err) return S_ERROR(str(err)) def getMeta(self): diff --git a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py index b115144b274..87af9f1a186 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py @@ -61,21 +61,24 @@ def initializeRequest(self): auth_sendMessage = ["Operator", "Pilot", "GenericPilot"] - def export_sendMessage(self, message): + def export_sendMessage(self, message, pilotUUID): # def export_sendMessage(self, message, pilotUUID): """ - The method logs messages to Tornado and writes pilot log files, one per pilot. + The method logs messages to Tornado and forwards pilot log files, one per pilot, to a relevant plugin. + The pilot is identified by its UUID. - :param message: message sent by a client, in JSON format - :param pilotUUID: pilot UUID - used to create a log file - :return: S_OK or S_ERROR if a file cannot be created or written to. + :param message: message sent by a client, a list of strings in JSON format + :param pilotUUID: pilot UUID + :return: S_OK or S_ERROR if a plugin cannot process the message. + :rtype: dict """ ## Insert your method here, don't forget the return should be serializable ## Returned value may be an S_OK/S_ERROR ## You don't need to serialize in JSON, Tornado will do it - self.log.info("Message: ", message) + # self.log.info("Message: ", message) # the plugin returns S_OK or S_ERROR - result = self.loggingPlugin.sendMessage(message) + # leave JSON decoding to the selected plugin: + result = self.loggingPlugin.sendMessage(message, pilotUUID) return result auth_getMetadata = ["Operator", "TrustedHost"] @@ -90,16 +93,17 @@ def export_getMetadata(self): auth_finaliseLogs = ["Operator", "Pilot", "GenericPilot"] - def export_finaliseLogs(self, payload): + def export_finaliseLogs(self, payload, pilotUUID): """ - Finalise a log file. Finalised logfile can be copied to a secure location. if a file cache is used. + Finalise a log file. Finalised logfile can be copied to a secure location, if a file cache is used. - :param payload: data passed to the plugin finaliser, a string in the file cache plugin. - :type payload: str or dict + :param payload: data passed to the plugin finaliser. + :type payload: dict + :param pilotUUID: pilot UUID :return: S_OK or S_ERROR (via the plugin involved) :rtype: dict """ # The plugin returns the Dirac S_OK or S_ERROR object - return self.loggingPlugin.finaliseLogs(payload) + return self.loggingPlugin.finaliseLogs(payload, pilotUUID) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py index fbb597584a7..97116ad3e69 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py @@ -46,18 +46,81 @@ import time import tarfile import hashlib +# for remote logging +import subprocess +import json +import os +import urllib +import ssl +import argparse +import shlex +from uuid import uuid1 + +try: + # For Python 3.0 and later + from urllib.request import urlopen, HTTPError, URLError + from urllib.parse import urlencode +except ImportError: + # Fall back to Python 2's urllib2 + from urllib2 import urlopen, HTTPError, URLError + from urllib import urlencode + +try: + from cStringIO import StringIO +except ImportError: + from io import StringIO + +# formatting with microsecond accuracy, (ISO-8601) + +class MicrosecondFormatter(logging.Formatter): + def formatTime(self, record, datefmt=None): + ct = self.converter(record.created) + if datefmt: + s = time.strftime(datefmt, ct) + else: + t = time.strftime("%%Y-%%m-%%dT%%H:%%M:%%S", ct) + s = "%%s,%%06dZ" %% (t, (record.created - int(record.created)) * 1e6) + return s + +def sendMessage(url, method, rawMessage, pilotUUID): + + message = json.dumps((json.dumps(rawMessage), pilotUUID)) + if major >= 3: + data = urlencode({'method': method, 'args': message}).encode('utf-8') # encode to bytes ! for python3 + else: + data = urlencode({'method': method, 'args': message}) + caPath = os.getenv('X509_CERT_DIR') + cert = os.getenv('X509_USER_PROXY') + + context = ssl.create_default_context() + context.load_verify_locations(capath=caPath) + context.load_cert_chain(cert) + try: + res = urlopen(url, data, context=context) + # logger.info(res.read().strip()) + res.close() + except URLError as err: + logger.error(err) # setting up the logging -formatter = logging.Formatter(fmt='%%(asctime)s UTC %%(levelname)-8s %%(message)s', datefmt='%%Y-%%m-%%d %%H:%%M:%%S') +# formatter = logging.Formatter(fmt='%%(asctime)s UTC %%(levelname)-8s %%(message)s', datefmt='%%Y-%%m-%%d %%H:%%M:%%S') +formatter = MicrosecondFormatter('%%(asctime)s %%(levelname)-8s [%%(name)s] %%(message)s') logging.Formatter.converter = time.gmtime try: screen_handler = logging.StreamHandler(stream=sys.stdout) except TypeError: # python2.6 screen_handler = logging.StreamHandler(strm=sys.stdout) screen_handler.setFormatter(formatter) + +# add a string buffer handler +sio = StringIO() +buffer = logging.StreamHandler(sio) +buffer.setFormatter(formatter) + logger = logging.getLogger('pilotLogger') logger.setLevel(logging.DEBUG) logger.addHandler(screen_handler) +logger.addHandler(buffer) # just logging the environment as first thing logger.debug('===========================================================') @@ -66,6 +129,8 @@ logger.debug(key + '=' + val) logger.debug('===========================================================\\n') +logger.debug(sys.version) + # putting ourselves in the right directory pilotExecDir = '%(pilotExecDir)s' if not pilotExecDir: @@ -268,12 +333,60 @@ def pilotWrapperScript( localPilot += ( """ # now finally launching the pilot script (which should be called dirac-pilot.py) -cmd = "$py dirac-pilot.py %s" -logger.info('Executing: %%s' %% cmd) -sys.stdout.flush() -ret = os.system(cmd) +# get the setup name an -z, if present to get remote logging in place +opt = "%s" +# generate pilot UUID +UUID = str(uuid1()) +opt = opt + " --pilotUUID " + UUID + +args = opt.split() + +# let's see if we have remote logging enabled (-z), if not run the pilot script with os.system, as before + +logger.info("dirac-pilot.py will be called: with %%s " %% args) +optParser = argparse.ArgumentParser() +optParser.add_argument('-z', '--pilotLogging', action='store_true') +optParser.add_argument('-g', '--loggerURL', default="") +optParser.add_argument('-S', '--setup', default="") +optParser.add_argument('-F', '--pilotCFGFile', default="pilot.json") + +res, unknown = optParser.parse_known_args(args) + +setup = res.setup + +major, minor, micro, _, _ = sys.version_info + +if res.pilotLogging: + loggerURL = res.loggerURL + + if loggerURL: + logger.info("Remote logging activated.") + # send what we have so far. + sendMessage(loggerURL, 'sendMessage', buffer.stream.getvalue(), UUID) + + #opt = opt + " --pilotUUID " + UUID + proc = subprocess.Popen(shlex.split("$py dirac-pilot.py " + opt), bufsize = 1, + stdout=sys.stdout, stderr=sys.stderr, universal_newlines = True) + proc.wait() + ret = proc.returncode + + else: + # classic logger + logger.error("No Logging URL - cannot activate remote logger ") + cmd = "$py dirac-pilot.py %%s" %% opt + logger.info('Executing: %%s' %% cmd) + sys.stdout.flush() + ret = os.system(cmd) +else: + cmd = "$py dirac-pilot.py %%s" %% opt + logger.info('Executing: %%s' %% cmd) + sys.stdout.flush() + ret = os.system(cmd) # and cleaning up +if res.pilotLogging and loggerURL: + sendMessage(loggerURL, 'finaliseLogs', {'retCode': ret}, UUID) +buffer.stream.close() shutil.rmtree(pilotWorkingDirectory) # did it fail?