Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Job objects to postgres #368

Merged
merged 15 commits into from
Feb 5, 2024
10 changes: 6 additions & 4 deletions docs/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
Please note that all this is 100% internal, and shouldn't be relied on.
Data format in redis can and does change between mquery releases.

Right now mquery is in the process of migrating internal storage to Postgres.

### Why redis?

Because very early daemon was a trivial piece of code, and Redis as a job
Expand All @@ -22,9 +24,9 @@ To connect to redis use `redis-cli`. For docker-compose use

Redis command documentation is pretty good and available at https://redis.io/commands/.

### Job objects (`job:*`)
### Job table (`job`)

Job object is a Hash represented by schema.JobSchema class.
Jobs are stored in the `job` table.

Every job has ID, which is a random 12 character string like 2OV8UP4DUOWK (the
same string that is visible in urls like http://mquery.net/query/2OV8UP4DUOWK).
Expand Down Expand Up @@ -75,9 +77,9 @@ Cache objects are arbitrary strings.
Right now they are only optionally used by metadata plugins, to
cache expensive computation in redis.

### Plugin objects (`plugin:*`)
### Configuration table (`configentry`)

Represented by schema.ConfigSchema class, they store plugin configuration.
Represented by models.configentry.ConfigEntry class.

For example, `plugin:TestPlugin` will store configuration for `TestPlugin` as a
dictionary. All plugins can expose their own arbitrary config options.
Expand Down
6 changes: 3 additions & 3 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from .lib.yaraparse import parse_yara
from .plugins import PluginManager
from .lib.ursadb import UrsaDb
from .models.job import Job, JobView
from .schema import (
JobsSchema,
JobSchema,
RequestConfigEdit,
RequestQueryMethod,
QueryRequestSchema,
Expand Down Expand Up @@ -492,11 +492,11 @@ def matches(

@app.get(
"/api/job/{job_id}",
response_model=JobSchema,
response_model=JobView,
tags=["stable"],
dependencies=[Depends(can_view_queries)],
)
def job_info(job_id: str) -> JobSchema:
def job_info(job_id: str) -> Job:
"""
Returns a metadata for a single job. May be useful for monitoring
a job progress.
Expand Down
188 changes: 110 additions & 78 deletions src/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from redis import StrictRedis
from enum import Enum
from rq import Queue # type: ignore
from sqlmodel import Session, SQLModel, create_engine, select, and_
from sqlmodel import Session, SQLModel, create_engine, select, and_, update

from .models.configentry import ConfigEntry
from .schema import JobSchema, MatchesSchema, AgentSpecSchema, ConfigSchema
from .models.job import Job
from .schema import MatchesSchema, AgentSpecSchema, ConfigSchema
from .config import app_config


Expand Down Expand Up @@ -67,55 +68,36 @@ def __schedule(self, agent: str, task: Any, *args: Any) -> None:

def get_job_ids(self) -> List[JobId]:
"""Gets IDs of all jobs in the database"""
return [key[4:] for key in self.redis.keys("job:*")]
with Session(self.engine) as session:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're probably gonna be writing this a lot, should we maybe create a helper decorator like with self.get_session()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think it'll be useful to even have something like

self.execute(update(Job).where(...))

to handle commits etc automaticaly (this is a very common pattern). I'll do a separate refactor PR in a while

jobs = session.exec(select(Job)).all()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably could optimize this by selecting just the required row. But if there's not that many jobs and no automatic table joins it's probably not worth the effort at this moment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean just the required collumn? Yeah, I've thought about this but when I wrote it I only wanted to have it working and go back to it later.

It's not worth refactoring IMO, because this function is a wrong abstraction. It's used in some places, for example like this:

for job in db.get_job_ids():
    job.get_job()

I didn't change it right away to avoid changes outside of the db.py but the database abstraction need to be changed significantly later.

return [j.id for j in jobs]

def cancel_job(self, job: JobId) -> None:
"""Sets the job status to cancelled"""
self.redis.hmset(
f"job:{job}",
{"status": "cancelled", "finished": int(time())},
)
def cancel_job(self, job: JobId, error=None) -> None:
"""Sets the job status to cancelled, with optional error message"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to the error message though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice (refactoring gone wrong)

with Session(self.engine) as session:
session.execute(
update(Job)
.where(Job.id == job)
.values(status="cancelled", finished=int(time()), error=error)
)
session.commit()

def fail_job(self, job: JobId, message: str) -> None:
"""Sets the job status to cancelled with provided error message."""
self.redis.hmset(
f"job:{job}",
{"status": "cancelled", "error": message, "finished": int(time())},
)
self.cancel_job(job, message)

def get_job(self, job: JobId) -> JobSchema:
"""Retrieves a job from the database. Tries to fix corrupted objects"""
data = self.redis.hgetall(f"job:{job}")
if data.get("status") in ["expired", "failed"]:
# There is no support for migrations in Redis "databases".
# These are old statuses, not used in the new versions anymore.
data["status"] = "cancelled"

return JobSchema(
id=job,
status=data.get("status", "ERROR"),
error=data.get("error", None),
rule_name=data.get("rule_name", "ERROR"),
rule_author=data.get("rule_author", "unknown"),
raw_yara=data.get("raw_yara", "ERROR"),
submitted=data.get("submitted", 0),
finished=data.get("finished", None),
files_limit=data.get("files_limit", 0),
files_processed=int(data.get("files_processed", 0)),
files_matched=int(data.get("files_matched", 0)),
files_in_progress=int(data.get("files_in_progress", 0)),
total_files=int(data.get("total_files", 0)),
files_errored=int(data.get("files_errored", 0)),
reference=data.get("reference", ""),
taints=json.loads(data.get("taints", "[]")),
total_datasets=data.get("total_datasets", 0),
datasets_left=data.get("datasets_left", 0),
agents_left=data.get("agents_left", 0),
)
def get_job(self, job: JobId) -> Job:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the correct typing should be Optional[Job] most likely?

And then we'd want to use one_or_none().

Unless sqlalchemy.orm.exc.NoResultFound is handled somewhere down the line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not handled, but this should raise an exception if the job with a specified ID doesn't exist. Does it make sense? 🤔 It's used mostly in context where the UID is known to be good, or when it's user input and if it's bad it should fail

"""Retrieves a job from the database"""
with Session(self.engine) as session:
return session.exec(select(Job).where(Job.id == job)).one()

def remove_query(self, job: JobId) -> None:
"""Sets the job status to removed"""
self.redis.hmset(f"job:{job}", {"status": "removed"})
with Session(self.engine) as session:
session.execute(
update(Job).where(Job.id == job).values(status="removed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some kind of enum/consts for the query statuses at some point probably?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I have it in the TODO, saving it for a refactor 🙏 (I'll need to check how to do this in sqlalchemy but I'll check other projects I guess)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue out of it: #370

)
session.commit()

def add_match(self, job: JobId, match: MatchInfo) -> None:
self.redis.rpush(f"meta:{job}", match.to_json())
Expand All @@ -130,16 +112,31 @@ def job_start_work(self, job: JobId, in_progress: int) -> None:
:param job: ID of the job being updated.
:param in_progress: Number of files in the current work unit.
"""
self.redis.hincrby(f"job:{job}", "files_in_progress", in_progress)
with Session(self.engine) as session:
session.execute(
update(Job)
.where(Job.id == job)
.values(files_in_progress=Job.files_in_progress + in_progress)
)
session.commit()

def agent_finish_job(self, job: JobId) -> None:
"""Decrements the number of active agents in the given job. If there
are no more agents, job status is changed to done."""
new_agents = self.redis.hincrby(f"job:{job}", "agents_left", -1)
if new_agents <= 0:
self.redis.hmset(
f"job:{job}", {"status": "done", "finished": int(time())}
)
with Session(self.engine) as session:
(agents_left,) = session.execute(
update(Job)
.where(Job.id == job)
.values(agents_left=Job.agents_left - 1)
.returning(Job.agents_left)
).one()
if agents_left == 0:
session.execute(
update(Job)
.where(Job.id == job)
.values(finished=int(time()), status="done")
)
session.commit()

def agent_add_tasks_in_progress(
self, job: JobId, agent: str, tasks: int
Expand All @@ -159,21 +156,44 @@ def job_update_work(
"""Updates progress for the job. This will increment numbers processed,
inprogress, errored and matched files.
Returns the number of processed files after the operation."""
files = self.redis.hincrby(f"job:{job}", "files_processed", processed)
self.redis.hincrby(f"job:{job}", "files_in_progress", -processed)
self.redis.hincrby(f"job:{job}", "files_matched", matched)
self.redis.hincrby(f"job:{job}", "files_errored", errored)
return files
with Session(self.engine) as session:
(files_processed,) = session.execute(
update(Job)
.where(Job.id == job)
.values(
files_processed=Job.files_processed + processed,
files_in_progress=Job.files_in_progress - processed,
files_matched=Job.files_matched + matched,
files_errored=Job.files_errored + errored,
)
.returning(Job.files_processed)
).one()
session.commit()
return files_processed

def init_job_datasets(self, job: JobId, num_datasets: int) -> None:
"""Sets total_datasets and datasets_left, and status to processing"""
self.redis.hincrby(f"job:{job}", "total_datasets", num_datasets)
self.redis.hincrby(f"job:{job}", "datasets_left", num_datasets)
self.redis.hset(f"job:{job}", "status", "processing")
with Session(self.engine) as session:
session.execute(
update(Job)
.where(Job.id == job)
.values(
total_datasets=num_datasets,
datasets_left=num_datasets,
status="processing",
)
)
session.commit()

def dataset_query_done(self, job: JobId):
"""Decrements the number of datasets left by one."""
self.redis.hincrby(f"job:{job}", "datasets_left", -1)
with Session(self.engine) as session:
session.execute(
update(Job)
.where(Job.id == job)
.values(datasets_left=Job.datasets_left - 1)
)
session.commit()

def create_search_task(
self,
Expand All @@ -190,26 +210,29 @@ def create_search_task(
random.choice(string.ascii_uppercase + string.digits)
for _ in range(12)
)
job_obj = {
"status": "new",
"rule_name": rule_name,
"rule_author": rule_author,
"raw_yara": raw_yara,
"submitted": int(time()),
"files_limit": files_limit,
"reference": reference,
"files_in_progress": 0,
"files_processed": 0,
"files_matched": 0,
"files_errored": 0,
"total_files": 0,
"agents_left": len(agents),
"datasets_left": 0,
"total_datasets": 0,
"taints": json.dumps(taints),
}
with Session(self.engine) as session:
obj = Job(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could theoretically provide the defaults in the model declaration, but since this is the only initialization at this moment it's probably not worth it at the moment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question 🤔 depends on which one is clearer. Having defaults in the model for things like files_in_progress makes sense. When have a proper database I consider splitting this into several smaller objects (to avoid one huge job object that counts everything).

id=job,
status="new",
rule_name=rule_name,
rule_author=rule_author,
raw_yara=raw_yara,
submitted=int(time()),
files_limit=files_limit,
reference=reference,
files_in_progress=0,
files_processed=0,
files_matched=0,
files_errored=0,
total_files=0,
agents_left=len(agents),
datasets_left=0,
total_datasets=0,
taints=taints,
)
session.add(obj)
session.commit()

self.redis.hmset(f"job:{job}", job_obj)
from . import tasks

for agent in agents:
Expand All @@ -235,7 +258,16 @@ def get_job_matches(
return MatchesSchema(job=self.get_job(job), matches=matches)

def update_job_files(self, job: JobId, total_files: int) -> int:
return self.redis.hincrby(f"job:{job}", "total_files", total_files)
"""Add total_files to the specified job, and return a new total."""
with Session(self.engine) as session:
(total_files,) = session.execute(
update(Job)
.where(Job.id == job)
.values(total_files=Job.total_files + total_files)
.returning(Job.total_files)
).one()
session.commit()
return total_files

def register_active_agent(
self,
Expand Down
38 changes: 38 additions & 0 deletions src/models/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from sqlmodel import SQLModel, Field, ARRAY, String, Column
from typing import Optional, List, Union


class JobBase(SQLModel):
"""Base class for entities related to mquery jobs"""

id: str
status: str
error: Optional[str]
rule_name: str
rule_author: str
raw_yara: str
submitted: int
finished: Optional[int]
files_limit: int
reference: str
files_processed: int
files_matched: int
files_in_progress: int
total_files: int
files_errored: int
taints: List[str] = Field(sa_column=Column(ARRAY(String)))
datasets_left: int
total_datasets: int
agents_left: int


class Job(JobBase, table=True):
"""Job object in the database. Internal ID is an implementation detail"""

internal_id: Union[int, None] = Field(default=None, primary_key=True)


class JobView(JobBase):
"""Pydantic model used in the public API"""

pass
27 changes: 3 additions & 24 deletions src/schema.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,11 @@
from enum import Enum
from typing import List, Dict, Optional
from pydantic import BaseModel, Field # type: ignore


class JobSchema(BaseModel):
id: str
status: str
error: Optional[str]
rule_name: str
rule_author: str
raw_yara: str
submitted: int
finished: Optional[int]
files_limit: int
reference: str
files_processed: int
files_matched: int
files_in_progress: int
total_files: int
files_errored: int
taints: List[str]
datasets_left: int
total_datasets: int
agents_left: int
from .models.job import JobView


class JobsSchema(BaseModel):
jobs: List[JobSchema]
jobs: List[JobView]


class ConfigSchema(BaseModel):
Expand Down Expand Up @@ -80,7 +59,7 @@ class ParseResponseSchema(BaseModel):


class MatchesSchema(BaseModel):
job: Dict
job: JobView
matches: List[Dict]


Expand Down
Loading
Loading