Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add check threads util #1063

Merged
merged 5 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ PositionGroup.alter()
- Allow `ModuleNotFoundError` or `ImportError` for optional dependencies #1023
- Ensure integrity of group tables #1026
- Convert list of LFP artifact removed interval list to array #1046
- Merge duplicate functions in decoding and spikesorting #1050, #1053, #1058
- Merge duplicate functions in decoding and spikesorting #1050, #1053, #1062
- Revise docs organization.
- Misc -> Features/ForDevelopers. #1029
- Installation instructions -> Setup notebook. #1029
- Migrate SQL export tools to `utils` to support exporting `DandiPath` #1048
- Add tool for checking threads for metadata locks on a table #1063

### Pipelines

Expand Down
140 changes: 122 additions & 18 deletions src/spyglass/utils/dj_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datajoint.utils import get_master, to_camel_case, user_choice
from networkx import NetworkXError
from packaging.version import parse as version_parse
from pandas import DataFrame
from pymysql.err import DataError

from spyglass.utils.database_settings import SHARED_MODULES
Expand Down Expand Up @@ -158,10 +159,10 @@ def _nwb_table_tuple(self) -> tuple:

Used to determine fetch_nwb behavior. Also used in Merge.fetch_nwb.
Implemented as a cached_property to avoid circular imports."""
from spyglass.common.common_nwbfile import (
from spyglass.common.common_nwbfile import ( # noqa F401
AnalysisNwbfile,
Nwbfile,
) # noqa F401
)

table_dict = {
AnalysisNwbfile: "analysis_file_abs_path",
Expand Down Expand Up @@ -263,12 +264,12 @@ def fetch_pynapple(self, *attrs, **kwargs):
def _import_part_masters(self):
"""Import tables that may constrain a RestrGraph. See #1002"""
from spyglass.decoding.decoding_merge import DecodingOutput # noqa F401
from spyglass.decoding.v0.clusterless import ( # noqa F401
from spyglass.decoding.v0.clusterless import (
UnitMarksIndicatorSelection,
)
from spyglass.decoding.v0.sorted_spikes import ( # noqa F401
) # noqa F401
from spyglass.decoding.v0.sorted_spikes import (
SortedSpikesIndicatorSelection,
)
) # noqa F401
from spyglass.decoding.v1.core import PositionGroup # noqa F401
from spyglass.lfp.analysis.v1 import LFPBandSelection # noqa F401
from spyglass.lfp.lfp_merge import LFPOutput # noqa F401
Expand All @@ -279,15 +280,15 @@ def _import_part_masters(self):
from spyglass.mua.v1.mua import MuaEventsV1 # noqa F401
from spyglass.position.position_merge import PositionOutput # noqa F401
from spyglass.ripple.v1.ripple import RippleTimesV1 # noqa F401
from spyglass.spikesorting.analysis.v1.group import ( # noqa F401
from spyglass.spikesorting.analysis.v1.group import (
SortedSpikesGroup,
)
from spyglass.spikesorting.spikesorting_merge import ( # noqa F401
) # noqa F401
from spyglass.spikesorting.spikesorting_merge import (
SpikeSortingOutput,
)
from spyglass.spikesorting.v0.figurl_views import ( # noqa F401
) # noqa F401
from spyglass.spikesorting.v0.figurl_views import (
SpikeSortingRecordingView,
)
) # noqa F401

_ = (
DecodingOutput(),
Expand Down Expand Up @@ -475,12 +476,8 @@ def _delete_deps(self) -> List[Table]:
Used to delay import of tables until needed, avoiding circular imports.
Each of these tables inheits SpyglassMixin.
"""
from spyglass.common import ( # noqa F401
IntervalList,
LabMember,
LabTeam,
Session,
)
from spyglass.common import LabMember # noqa F401
from spyglass.common import IntervalList, LabTeam, Session
from spyglass.common.common_nwbfile import schema # noqa F401

self._session_pk = Session.primary_key[0]
Expand Down Expand Up @@ -1018,6 +1015,113 @@ def restrict_by(

return ret

# ------------------------------ Check locks ------------------------------

def exec_sql_fetchall(self, query):
"""
Execute the given query and fetch the results. Parameters
----------
query : str
The SQL query to execute. Returns
-------
list of tuples
The results of the query.
"""
results = dj.conn().query(query).fetchall()
return results # Check if performance schema is enabled

@classmethod
def check_threads(self, detailed=False, all_threads=False) -> DataFrame:
"""Check for locked threads in the database.

Set as classmethod to avoid calling table data when checking status.
Use MyTable.check_threads() to check the status of MyTable.

Parameters
----------
detailed : bool, optional
Show all columns in the metadata_locks table. Default False, show
summary.
all_threads : bool, optional
Show all threads, not just those related to this table.
Default False.


Returns
-------
DataFrame
A DataFrame containing the metadata locks.
"""
performance__status = self.exec_sql_fetchall(
"SHOW VARIABLES LIKE 'performance_schema';"
)
if performance__status[0][1] == "OFF":
raise RuntimeError(
"Database does not monitor threads. "
+ "Please ask you administrator to enable performance schema."
)

metadata_locks_query = """
SELECT
ml.OBJECT_SCHEMA, -- Table schema
ml.OBJECT_NAME, -- Table name
ml.OBJECT_TYPE, -- What is locked
ml.LOCK_TYPE, -- Type of lock
ml.LOCK_STATUS, -- Lock status
ml.OWNER_THREAD_ID, -- Thread ID of the lock owner
t.PROCESSLIST_ID, -- User connection ID
t.PROCESSLIST_USER, -- User
t.PROCESSLIST_HOST, -- User machine
t.PROCESSLIST_TIME, -- Time in seconds
t.PROCESSLIST_DB, -- Thread database
t.PROCESSLIST_COMMAND, -- Likely Query
t.PROCESSLIST_STATE, -- Waiting for lock, sending data, or locked
t.PROCESSLIST_INFO -- Actual query
FROM performance_schema.metadata_locks AS ml
JOIN performance_schema.threads AS t
ON ml.OWNER_THREAD_ID = t.THREAD_ID
"""

where_clause = (
f"WHERE ml.OBJECT_SCHEMA = '{self.database}' "
+ f"AND ml.OBJECT_NAME = '{self.table_name}'"
)
metadata_locks_query += ";" if all_threads else where_clause

df = DataFrame(
self.exec_sql_fetchall(metadata_locks_query),
columns=[
"Schema", # ml.OBJECT_SCHEMA -- Table schema
"Table Name", # ml.OBJECT_NAME -- Table name
"Locked", # ml.OBJECT_TYPE -- What is locked
"Lock Type", # ml.LOCK_TYPE -- Type of lock
"Lock Status", # ml.LOCK_STATUS -- Lock status
"Thread ID", # ml.OWNER_THREAD_ID -- Thread ID of the lock owner
"Connection ID", # t.PROCESSLIST_ID -- User connection ID
"User", # t.PROCESSLIST_USER -- User
"Host", # t.PROCESSLIST_HOST -- User machine
"Process Database", # t.PROCESSLIST_DB -- Thread database
"Time (s)", # t.PROCESSLIST_TIME -- Time in seconds
"Process", # t.PROCESSLIST_COMMAND -- Likely Query
"State", # t.PROCESSLIST_STATE
"Query", # t.PROCESSLIST_INFO -- Actual query
],
)

df["Name"] = df["User"].apply(self._delete_deps[0]().get_djuser_name)

keep_cols = []
if all_threads:
keep_cols.append("Table")
df["Table"] = df["Schema"] + "." + df["Table Name"]
df = df.drop(columns=["Schema", "Table Name"])

if not detailed:
keep_cols.extend(["Locked", "Name", "Time (s)", "Process", "State"])
df = df[keep_cols]

return df


class SpyglassMixinPart(SpyglassMixin, dj.Part):
"""
Expand Down
Loading