Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.0] remove JobDB's site mask #7762

Merged
merged 7 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,6 @@ If is there any SE to be modified, you can do it as follows::

.. _activateRSS:
fstagni marked this conversation as resolved.
Show resolved Hide resolved

------------
Activate RSS
------------

If you did not see any problem, activate RSS by setting the CS option::

/Operations/[Defaults|SetupName]/ResourceStatus/Config/State = Active

------
Agents
Expand Down
35 changes: 20 additions & 15 deletions src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Objects.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
import os
import pytest
import tempfile
import errno
import DIRAC
from unittest import mock

from DIRAC.tests.Utilities.utils import generateDIRACConfig
import pytest

from DIRAC.ConfigurationSystem.private.ConfigurationClient import ConfigurationClient
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
from diraccfg import CFG
from DIRAC.DataManagementSystem.private.FTS3Plugins.DefaultFTS3Plugin import DefaultFTS3Plugin
import DIRAC
from DIRAC import S_OK
from DIRAC.Core.Utilities.DErrno import cmpError

from DIRAC.Resources.Storage.StorageBase import StorageBase

from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
from DIRAC.DataManagementSystem.Client.FTS3Operation import FTS3Operation

from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job
from DIRAC.Resources.Storage.StorageBase import StorageBase
from DIRAC.tests.Utilities.utils import generateDIRACConfig

DIRAC.gLogger.setLevel("DEBUG")
# pylint: disable=redefined-outer-name
Expand Down Expand Up @@ -181,6 +172,20 @@ def monkeypatchForAllTest(monkeypatch):
lambda _self, _seName, _vo: S_OK(),
)

def mock_init(self, useProxy=False, vo=None):
self.proxy = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that necessary here and below now ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before tests were never going through RSS for finding the status of a SE, now they do. So, some mocking of StorageFactory had to be added.

self.proxy = useProxy
self.resourceStatus = mock.MagicMock()
self.vo = vo
self.remoteProtocolSections = []
self.localProtocolSections = []
self.name = ""
self.options = {}
self.protocols = {}
self.storages = {}

monkeypatch.setattr(DIRAC.Resources.Storage.StorageFactory.StorageFactory, "__init__", mock_init)


def generateFTS3Job(sourceSE, targetSE, lfns, multiHopSE=None):
"""Utility to create a new FTS3Job object with some FTS3Files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@

import os
import tempfile
import pytest
from unittest import mock

import pytest
from diraccfg import CFG

import DIRAC

from DIRAC.ConfigurationSystem.private.ConfigurationClient import ConfigurationClient
from DIRAC import S_OK
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData

from DIRAC.Resources.Storage.StorageBase import StorageBase
from DIRAC.ConfigurationSystem.private.ConfigurationClient import ConfigurationClient
from DIRAC.DataManagementSystem.private.FTS3Plugins.DefaultFTS3Plugin import DefaultFTS3Plugin
from DIRAC import S_OK
from DIRAC.Resources.Storage.StorageBase import StorageBase

# pylint: disable=redefined-outer-name

Expand Down Expand Up @@ -318,6 +317,21 @@ def fts3Plugin(monkeypatch):
monkeypatch.setattr(
DIRAC.Resources.Storage.StorageElement.StorageElementItem, "addAccountingOperation", lambda: None
)

def mock_init(self, useProxy=False, vo=None):
self.proxy = False
self.proxy = useProxy
self.resourceStatus = mock.MagicMock()
self.vo = vo
self.remoteProtocolSections = []
self.localProtocolSections = []
self.name = ""
self.options = {}
self.protocols = {}
self.storages = {}

monkeypatch.setattr(DIRAC.Resources.Storage.StorageFactory.StorageFactory, "__init__", mock_init)

fts3Plugin = DefaultFTS3Plugin()

return fts3Plugin
Expand Down
49 changes: 16 additions & 33 deletions src/DIRAC/Interfaces/API/DiracAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@
"""
import os

from DIRAC import gLogger, gConfig, S_OK, S_ERROR
from DIRAC.Core.Utilities.PromptUser import promptUser
from DIRAC.Core.Base.API import API
from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.ConfigurationSystem.Client.CSAPI import CSAPI
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
from DIRAC.Core.Base.API import API
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.Core.Utilities.PromptUser import promptUser
from DIRAC.FrameworkSystem.Client.NotificationClient import NotificationClient
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.ResourceStatusSystem.Client.ResourceStatusClient import ResourceStatusClient
from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
from DIRAC.WorkloadManagementSystem.Client.WMSAdministratorClient import WMSAdministratorClient
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
from DIRAC.WorkloadManagementSystem.Client.WMSAdministratorClient import WMSAdministratorClient

voName = ""
ret = getProxyInfo(disableVOMS=True)
Expand All @@ -45,7 +44,6 @@ def __init__(self):

self.scratchDir = gConfig.getValue(self.section + "/ScratchDir", "/tmp")
self.currentDir = os.getcwd()
self.rssFlag = ResourceStatus().rssFlag
self.sitestatus = SiteStatus()

#############################################################################
Expand Down Expand Up @@ -157,34 +155,30 @@ def getSiteSection(self, site, printOutput=False):

#############################################################################
def allowSite(self, site, comment, printOutput=False):
"""Adds the site to the site mask.
"""Adds the site to the site mask. The site must be a valid DIRAC site name

Example usage:

>>> gLogger.notice(diracAdmin.allowSite())
>>> gLogger.notice(diracAdmin.allowSite('LCG.CERN.ch'))
{'OK': True, 'Value': }

:param str site: DIRAC site name
:param str comment: comment for the site status update
:return: S_OK,S_ERROR

"""
result = self._checkSiteIsValid(site)
if not result["OK"]:
if not (result := self._checkSiteIsValid(site))["OK"]:
return result

result = self.getSiteMask(status="Active")
if not result["OK"]:
if not (result := self.getSiteMask(status="Active"))["OK"]:
return result
siteMask = result["Value"]
fstagni marked this conversation as resolved.
Show resolved Hide resolved
if site in siteMask:
if printOutput:
gLogger.notice(f"Site {site} is already Active")
return S_OK(f"Site {site} is already Active")

if self.rssFlag:
result = self.sitestatus.setSiteStatus(site, "Active", comment)
else:
result = WMSAdministratorClient().allowSite(site, comment)
if not result["OK"]:
if not (result := self.sitestatus.setSiteStatus(site, "Active", comment))["OK"]:
return result

if printOutput:
Expand All @@ -203,16 +197,10 @@ def getSiteMaskLogging(self, site=None, printOutput=False):

:return: S_OK,S_ERROR
"""
result = self._checkSiteIsValid(site)
if not result["OK"]:
if not (result := self._checkSiteIsValid(site))["OK"]:
return result

if self.rssFlag:
result = ResourceStatusClient().selectStatusElement("Site", "History", name=site)
else:
result = WMSAdministratorClient().getSiteMaskLogging(site)

if not result["OK"]:
if not (result := ResourceStatusClient().selectStatusElement("Site", "History", name=site))["OK"]:
return result

if printOutput:
Expand Down Expand Up @@ -250,8 +238,7 @@ def banSite(self, site, comment, printOutput=False):
:return: S_OK,S_ERROR

"""
result = self._checkSiteIsValid(site)
if not result["OK"]:
if not (result := self._checkSiteIsValid(site))["OK"]:
return result

mask = self.getSiteMask(status="Banned")
Expand All @@ -263,11 +250,7 @@ def banSite(self, site, comment, printOutput=False):
gLogger.notice(f"Site {site} is already Banned")
return S_OK(f"Site {site} is already Banned")

if self.rssFlag:
result = self.sitestatus.setSiteStatus(site, "Banned", comment)
else:
result = WMSAdministratorClient().banSite(site, comment)
if not result["OK"]:
if not (result := self.sitestatus.setSiteStatus(site, "Banned", comment))["OK"]:
return result

if printOutput:
Expand Down
44 changes: 8 additions & 36 deletions src/DIRAC/ResourceStatusSystem/Client/ResourceStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,32 @@
"""

import math
from time import sleep
from datetime import datetime, timedelta
from time import sleep

from DIRAC import gConfig, gLogger, S_OK, S_ERROR
from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.ConfigurationSystem.Client.CSAPI import CSAPI
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.DIRACSingleton import DIRACSingleton
from DIRAC.ResourceStatusSystem.Client.ResourceStatusClient import ResourceStatusClient
from DIRAC.ResourceStatusSystem.Utilities.InfoGetter import getPoliciesThatApply
from DIRAC.ResourceStatusSystem.Utilities.RSSCacheNoThread import RSSCache
from DIRAC.ResourceStatusSystem.Utilities.RssConfiguration import RssConfiguration
from DIRAC.ResourceStatusSystem.Utilities.InfoGetter import getPoliciesThatApply


class ResourceStatus(metaclass=DIRACSingleton):
"""
ResourceStatus helper that connects to CS if RSS flag is not Active.
It keeps the connection to the db / server as an object member, to avoid creating a new
one massively.
ResourceStatus helper keeps the connection to the db / server as an object member,
to avoid creating a new connection every time we need to do one.
"""

def __init__(self, rssFlag=None):
def __init__(self):
"""
Constructor, initializes the rssClient.
"""
self.log = gLogger.getSubLogger(self.__class__.__name__)
self.rssConfig = RssConfiguration()
self.__opHelper = Operations()
self.rssClient = ResourceStatusClient()
self.rssFlag = rssFlag
if rssFlag is None:
self.rssFlag = self.__getMode()

cacheLifeTime = int(self.rssConfig.getConfigCache())

Expand Down Expand Up @@ -101,10 +95,7 @@ def getElementStatus(self, elementName, elementType, statusType=None, default=No
elif elementType == "Catalog":
statusType = ["all"]

if self.rssFlag:
return self.__getRSSElementStatus(elementName, elementType, statusType, vO)
else:
return self.__getCSElementStatus(elementName, elementType, statusType, default)
return self.__getRSSElementStatus(elementName, elementType, statusType, vO)

def setElementStatus(self, elementName, elementType, statusType, status, reason=None, tokenOwner=None):
"""Tries set information in RSS and in CS.
Expand All @@ -130,10 +121,7 @@ def setElementStatus(self, elementName, elementType, statusType, status, reason=
S_OK( xyz.. )
"""

if self.rssFlag:
return self.__setRSSElementStatus(elementName, elementType, statusType, status, reason, tokenOwner)
else:
return self.__setCSElementStatus(elementName, elementType, statusType, status)
return self.__setRSSElementStatus(elementName, elementType, statusType, status, reason, tokenOwner)

################################################################################

Expand Down Expand Up @@ -298,22 +286,6 @@ def __setCSElementStatus(self, elementName, elementType, statusType, status):

return res

def __getMode(self):
"""
Gets flag defined (or not) on the RSSConfiguration.
If defined as 'Active', we use RSS, if not, we use the CS when possible (and WMS for Sites).
"""

res = self.rssConfig.getConfigState()

if res == "Active":
if self.rssClient is None:
self.rssClient = ResourceStatusClient()
return True

self.rssClient = None
return False

def isStorageElementAlwaysBanned(self, seName, statusType):
"""Checks if the AlwaysBanned policy is applied to the SE given as parameter

Expand Down
Loading
Loading