Skip to content

Commit

Permalink
Fetch results of evening voting sessions
Browse files Browse the repository at this point in the history
While on most plenary days, there is only one voting session around midday, on some days there is another sesssion in the evening, usually around 17:00. The vote results of the evening sessions are appended to the same source document that also contains the results of the midday votes.

Currently, we only run the `RCVListPipeline` between 12:00 and 15:00 until we’ve been able to fetch vote results successfully. Once we’ve fetched the results, we do not attempt to fetch them again. That means that so far we did not (automatically) fetch results of the evening voting session.

In addition to the current behavior, this change tries to fetch vote results between 17:00 and 20:00, until we’ve been able to fetch them successfully a second time. This is only the first part of the solution, as we also need to check that we only stop scraping vote results once we’ve been able to fetch updated results (e.g. by storing a hash of the source data for every successful pipeline run).
  • Loading branch information
tillprochaska committed Dec 7, 2024
1 parent 84519dc commit 076f0d4
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 19 deletions.
51 changes: 33 additions & 18 deletions backend/howtheyvote/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time

from prometheus_client import Gauge
from sqlalchemy import exists, func, select
from sqlalchemy import select
from structlog import get_logger

from .. import config
Expand All @@ -12,20 +12,38 @@
from ..models import PipelineRun, PipelineRunResult, PlenarySession
from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline
from ..query import session_is_current_at
from .worker import SkipPipelineError, Weekday, Worker
from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully

log = get_logger(__name__)


def op_rcv() -> None:
def op_rcv_midday() -> None:
"""Checks if there is a current plenary session and, if yes, fetches the latest roll-call
vote results."""
today = datetime.date.today()

if not _is_session_day(today):
raise SkipPipelineError()

if _ran_successfully(RCVListPipeline, today):
if pipeline_ran_successfully(RCVListPipeline, today):
raise SkipPipelineError()

pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today)
pipeline.run()


def op_rcv_evening() -> None:
"""While on most plenary days, there’s only one voting session around midday, on some days
there is another sesssion in the evening, usually around 17:00. The vote results of the
evening sessions are appended to the same source document that also contains the results
of the midday votes. This method fetches the latest roll-call vote results, even if they
have been fetched successfully earlier on the same day."""
today = datetime.date.today()

if not _is_session_day(today):
raise SkipPipelineError()

if pipeline_ran_successfully(RCVListPipeline, today, count=2):
raise SkipPipelineError()

pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today)
Expand Down Expand Up @@ -75,19 +93,6 @@ def _is_session_day(date: datetime.date) -> bool:
return session is not None


def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool:
"""Check if a given pipeline has been run successfully on a given day."""
query = (
exists()
.where(PipelineRun.pipeline == pipeline.__name__)
.where(func.date(PipelineRun.started_at) == func.date(date))
.where(PipelineRun.result == PipelineRunResult.SUCCESS)
.select()
)

return bool(Session.execute(query).scalar())


worker = Worker()

# Mon at 04:00
Expand All @@ -110,14 +115,24 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool:

# Mon-Thu between 12:00 and 15:00, every 10 mins
worker.schedule_pipeline(
op_rcv,
op_rcv_midday,
name=RCVListPipeline.__name__,
weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU},
hours=range(12, 15),
minutes=range(0, 60, 10),
tz=config.TIMEZONE,
)

# Mon-Thu between 17:00 and 20:00, every 10 mins
worker.schedule_pipeline(
op_rcv_evening,
name=RCVListPipeline.__name__,
weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU},
hours=range(17, 20),
minutes=range(0, 60, 10),
tz=config.TIMEZONE,
)

# Mon-Thu, between 13:00 and 20:00, every 30 mins
worker.schedule_pipeline(
op_press,
Expand Down
19 changes: 19 additions & 0 deletions backend/howtheyvote/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client import start_http_server as start_metrics_server
from schedule import Scheduler
from sqlalchemy import func, select
from structlog import get_logger

from .. import config
Expand Down Expand Up @@ -58,6 +59,24 @@ class Weekday(enum.Enum):
Handler = Callable[..., Any]


def pipeline_ran_successfully(
pipeline: type[object],
date: datetime.date,
count: int = 1,
) -> bool:
"""Check if a given pipeline has been run successfully on a given day."""
query = (
select(func.count())
.select_from(PipelineRun)
.where(PipelineRun.pipeline == pipeline.__name__)
.where(func.date(PipelineRun.started_at) == func.date(date))
.where(PipelineRun.result == PipelineRunResult.SUCCESS)
)
result = Session.execute(query).scalar() or 0

return result >= count


class Worker:
"""Running a worker starts a long-running process that executes data pipelines in regular
intervals and stores the result of the pipeline runs in the database."""
Expand Down
44 changes: 43 additions & 1 deletion backend/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from howtheyvote.models import PipelineRun, PipelineRunResult
from howtheyvote.pipelines import DataUnavailableError, PipelineError
from howtheyvote.worker.worker import Weekday, Worker
from howtheyvote.worker.worker import Weekday, Worker, pipeline_ran_successfully


def get_handler():
Expand Down Expand Up @@ -170,3 +170,45 @@ def pipeline_error():

assert runs[1].pipeline == "pipeline_error"
assert runs[1].result == PipelineRunResult.FAILURE


def test_pipeline_ran_successfully(db_session):
class TestPipeline:
pass

now = datetime.datetime.now()
today = now.date()

run = PipelineRun(
started_at=now,
finished_at=now,
pipeline=TestPipeline.__name__,
result=PipelineRunResult.FAILURE,
)
db_session.add(run)
db_session.commit()

assert pipeline_ran_successfully(TestPipeline, today) is False

run = PipelineRun(
started_at=now,
finished_at=now,
pipeline=TestPipeline.__name__,
result=PipelineRunResult.SUCCESS,
)
db_session.add(run)
db_session.commit()

assert pipeline_ran_successfully(TestPipeline, today) is True
assert pipeline_ran_successfully(TestPipeline, today, count=2) is False

run = PipelineRun(
started_at=now,
finished_at=now,
pipeline=TestPipeline.__name__,
result=PipelineRunResult.SUCCESS,
)
db_session.add(run)
db_session.commit()

assert pipeline_ran_successfully(TestPipeline, today, count=2) is True

0 comments on commit 076f0d4

Please sign in to comment.