Skip to content

Commit

Permalink
Log tracked jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
gerardsegarra committed Apr 24, 2024
1 parent 7cd92ea commit 6cf98dd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 28 deletions.
41 changes: 18 additions & 23 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import datetime
import logging
from logging.config import dictConfig
import os
Expand All @@ -10,7 +9,7 @@
from const import GithubHeaders, LOGGING_CONFIG
from github import GithubJob
from utils import dict_to_logfmt
from queryql import query_node
from queryql import query_nodes

dictConfig(LOGGING_CONFIG)

Expand All @@ -27,6 +26,7 @@
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)

jobs = dict()
node_ids = set()


# check all calls are valid
Expand Down Expand Up @@ -68,9 +68,10 @@ def process_workflow_job():
if job.action == "queued":
# add to memory
jobs[job.id] = job
query_node(job.node_id)
node_ids.add(job.node_id)

elif job.action == "in_progress":
node_ids.remove(job.node_id)
job_requested = jobs.get(job.id)
time_to_start = None
if not job_requested:
Expand Down Expand Up @@ -98,6 +99,7 @@ def process_workflow_job():
jobs[job.id] = job

elif job.action == "completed":
node_ids.remove(job.node_id)
job_requested = jobs.get(job.id)
if not job_requested:
app.logger.warning(f"Job {job.id} is {job.action} but not stored!")
Expand Down Expand Up @@ -132,29 +134,22 @@ def monitor_queued_jobs():
"""Return the job that has been queued and not starting for long time."""
app.logger.debug("Starting monitor_queued_jobs")

if not jobs:
if not node_ids:
return

queued_jobs = [job for job in jobs.values() if job.action == "queued"]
if not queued_jobs:
return

job = min(queued_jobs, key=lambda x: x.time_start)
delay = (datetime.now() - job.time_start).seconds

if delay <= int(os.getenv("QUEUED_JOBS_DELAY_THRESHOLD", 150)):
return

context_details = {
"action": "monitor_queued",
"job_id": job.id,
"job_name": job.name,
"repository": job.repository,
"started_at": job.time_start,
"delay": delay,
}
jobs_data = query_nodes(node_ids)
for job in jobs_data["data"]["nodes"]:
context_details = {
"action": "monitor_queued",
"job_id": job["id"],
"job_name": job["name"],
"status": job["status"],
"started_at": job["startedAt"],
"completed_at": job["completedAt"],
}
app.logger.info(dict_to_logfmt(context_details))

app.logger.info(dict_to_logfmt(context_details))
# app.logger.info(dict_to_logfmt(context_details))


allowed_events = {"workflow_job": process_workflow_job}
Expand Down
12 changes: 7 additions & 5 deletions src/queryql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

from types import List

from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

Expand All @@ -16,11 +18,11 @@


# Provide a GraphQL query
def query_node(node_id):
def query_nodes(node_id_list: List[str]):
query = gql(
"""
query getCheckRun($node_id: ID!) {
node(id: $node_id) {
query getCheckRuns($node_id_list: [ID!]!) {
nodes(ids: $node_id_list) {
... on CheckRun {
id
name
Expand All @@ -29,10 +31,10 @@ def query_node(node_id):
completedAt
}
}
}
}
"""
)
params = {"node_id": node_id}
params = {"node_id_list": node_id_list}

result = client.execute(query, variable_values=params)
app.logger.info(f"Node type {result}")

0 comments on commit 6cf98dd

Please sign in to comment.