Skip to content

Commit

Permalink
Move Job objects to postgres (#368)
Browse files Browse the repository at this point in the history
Move jobs to postgres

Co-authored-by: Michał Praszmo <[email protected]>
  • Loading branch information
msm-code and nazywam authored Feb 5, 2024
1 parent 4eca6fd commit 9d06fe5
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 112 deletions.
10 changes: 6 additions & 4 deletions docs/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
Please note that all this is 100% internal, and shouldn't be relied on.
Data format in redis can and does change between mquery releases.

Right now mquery is in the process of migrating internal storage to Postgres.

### Why redis?

Because very early daemon was a trivial piece of code, and Redis as a job
Expand All @@ -22,9 +24,9 @@ To connect to redis use `redis-cli`. For docker-compose use

Redis command documentation is pretty good and available at https://redis.io/commands/.

### Job objects (`job:*`)
### Job table (`job`)

Job object is a Hash represented by schema.JobSchema class.
Jobs are stored in the `job` table.

Every job has ID, which is a random 12 character string like 2OV8UP4DUOWK (the
same string that is visible in urls like http://mquery.net/query/2OV8UP4DUOWK).
Expand Down Expand Up @@ -75,9 +77,9 @@ Cache objects are arbitrary strings.
Right now they are only optionally used by metadata plugins, to
cache expensive computation in redis.

### Plugin objects (`plugin:*`)
### Configuration table (`configentry`)

Represented by schema.ConfigSchema class, they store plugin configuration.
Represented by models.configentry.ConfigEntry class.

For example, `plugin:TestPlugin` will store configuration for `TestPlugin` as a
dictionary. All plugins can expose their own arbitrary config options.
Expand Down
6 changes: 3 additions & 3 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from .lib.yaraparse import parse_yara
from .plugins import PluginManager
from .lib.ursadb import UrsaDb
from .models.job import Job, JobView
from .schema import (
JobsSchema,
JobSchema,
RequestConfigEdit,
RequestQueryMethod,
QueryRequestSchema,
Expand Down Expand Up @@ -492,11 +492,11 @@ def matches(

@app.get(
"/api/job/{job_id}",
response_model=JobSchema,
response_model=JobView,
tags=["stable"],
dependencies=[Depends(can_view_queries)],
)
def job_info(job_id: str) -> JobSchema:
def job_info(job_id: str) -> Job:
"""
Returns a metadata for a single job. May be useful for monitoring
a job progress.
Expand Down
188 changes: 110 additions & 78 deletions src/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from redis import StrictRedis
from enum import Enum
from rq import Queue # type: ignore
from sqlmodel import Session, SQLModel, create_engine, select, and_
from sqlmodel import Session, SQLModel, create_engine, select, and_, update

from .models.configentry import ConfigEntry
from .schema import JobSchema, MatchesSchema, AgentSpecSchema, ConfigSchema
from .models.job import Job
from .schema import MatchesSchema, AgentSpecSchema, ConfigSchema
from .config import app_config


Expand Down Expand Up @@ -67,55 +68,36 @@ def __schedule(self, agent: str, task: Any, *args: Any) -> None:

def get_job_ids(self) -> List[JobId]:
"""Gets IDs of all jobs in the database"""
return [key[4:] for key in self.redis.keys("job:*")]
with Session(self.engine) as session:
jobs = session.exec(select(Job)).all()
return [j.id for j in jobs]

def cancel_job(self, job: JobId) -> None:
"""Sets the job status to cancelled"""
self.redis.hmset(
f"job:{job}",
{"status": "cancelled", "finished": int(time())},
)
def cancel_job(self, job: JobId, error=None) -> None:
"""Sets the job status to cancelled, with optional error message"""
with Session(self.engine) as session:
session.execute(
update(Job)
.where(Job.id == job)
.values(status="cancelled", finished=int(time()), error=error)
)
session.commit()

def fail_job(self, job: JobId, message: str) -> None:
"""Sets the job status to cancelled with provided error message."""
self.redis.hmset(
f"job:{job}",
{"status": "cancelled", "error": message, "finished": int(time())},
)
self.cancel_job(job, message)

def get_job(self, job: JobId) -> JobSchema:
"""Retrieves a job from the database. Tries to fix corrupted objects"""
data = self.redis.hgetall(f"job:{job}")
if data.get("status") in ["expired", "failed"]:
# There is no support for migrations in Redis "databases".
# These are old statuses, not used in the new versions anymore.
data["status"] = "cancelled"

return JobSchema(
id=job,
status=data.get("status", "ERROR"),
error=data.get("error", None),
rule_name=data.get("rule_name", "ERROR"),
rule_author=data.get("rule_author", "unknown"),
raw_yara=data.get("raw_yara", "ERROR"),
submitted=data.get("submitted", 0),
finished=data.get("finished", None),
files_limit=data.get("files_limit", 0),
files_processed=int(data.get("files_processed", 0)),
files_matched=int(data.get("files_matched", 0)),
files_in_progress=int(data.get("files_in_progress", 0)),
total_files=int(data.get("total_files", 0)),
files_errored=int(data.get("files_errored", 0)),
reference=data.get("reference", ""),
taints=json.loads(data.get("taints", "[]")),
total_datasets=data.get("total_datasets", 0),
datasets_left=data.get("datasets_left", 0),
agents_left=data.get("agents_left", 0),
)
def get_job(self, job: JobId) -> Job:
"""Retrieves a job from the database"""
with Session(self.engine) as session:
return session.exec(select(Job).where(Job.id == job)).one()

def remove_query(self, job: JobId) -> None:
"""Sets the job status to removed"""
self.redis.hmset(f"job:{job}", {"status": "removed"})
with Session(self.engine) as session:
session.execute(
update(Job).where(Job.id == job).values(status="removed")
)
session.commit()

def add_match(self, job: JobId, match: MatchInfo) -> None:
self.redis.rpush(f"meta:{job}", match.to_json())
Expand All @@ -130,16 +112,31 @@ def job_start_work(self, job: JobId, in_progress: int) -> None:
:param job: ID of the job being updated.
:param in_progress: Number of files in the current work unit.
"""
self.redis.hincrby(f"job:{job}", "files_in_progress", in_progress)
with Session(self.engine) as session:
session.execute(
update(Job)
.where(Job.id == job)
.values(files_in_progress=Job.files_in_progress + in_progress)
)
session.commit()

def agent_finish_job(self, job: JobId) -> None:
"""Decrements the number of active agents in the given job. If there
are no more agents, job status is changed to done."""
new_agents = self.redis.hincrby(f"job:{job}", "agents_left", -1)
if new_agents <= 0:
self.redis.hmset(
f"job:{job}", {"status": "done", "finished": int(time())}
)
with Session(self.engine) as session:
(agents_left,) = session.execute(
update(Job)
.where(Job.id == job)
.values(agents_left=Job.agents_left - 1)
.returning(Job.agents_left)
).one()
if agents_left == 0:
session.execute(
update(Job)
.where(Job.id == job)
.values(finished=int(time()), status="done")
)
session.commit()

def agent_add_tasks_in_progress(
self, job: JobId, agent: str, tasks: int
Expand All @@ -159,21 +156,44 @@ def job_update_work(
"""Updates progress for the job. This will increment numbers processed,
inprogress, errored and matched files.
Returns the number of processed files after the operation."""
files = self.redis.hincrby(f"job:{job}", "files_processed", processed)
self.redis.hincrby(f"job:{job}", "files_in_progress", -processed)
self.redis.hincrby(f"job:{job}", "files_matched", matched)
self.redis.hincrby(f"job:{job}", "files_errored", errored)
return files
with Session(self.engine) as session:
(files_processed,) = session.execute(
update(Job)
.where(Job.id == job)
.values(
files_processed=Job.files_processed + processed,
files_in_progress=Job.files_in_progress - processed,
files_matched=Job.files_matched + matched,
files_errored=Job.files_errored + errored,
)
.returning(Job.files_processed)
).one()
session.commit()
return files_processed

def init_job_datasets(self, job: JobId, num_datasets: int) -> None:
"""Sets total_datasets and datasets_left, and status to processing"""
self.redis.hincrby(f"job:{job}", "total_datasets", num_datasets)
self.redis.hincrby(f"job:{job}", "datasets_left", num_datasets)
self.redis.hset(f"job:{job}", "status", "processing")
with Session(self.engine) as session:
session.execute(
update(Job)
.where(Job.id == job)
.values(
total_datasets=num_datasets,
datasets_left=num_datasets,
status="processing",
)
)
session.commit()

def dataset_query_done(self, job: JobId):
"""Decrements the number of datasets left by one."""
self.redis.hincrby(f"job:{job}", "datasets_left", -1)
with Session(self.engine) as session:
session.execute(
update(Job)
.where(Job.id == job)
.values(datasets_left=Job.datasets_left - 1)
)
session.commit()

def create_search_task(
self,
Expand All @@ -190,26 +210,29 @@ def create_search_task(
random.choice(string.ascii_uppercase + string.digits)
for _ in range(12)
)
job_obj = {
"status": "new",
"rule_name": rule_name,
"rule_author": rule_author,
"raw_yara": raw_yara,
"submitted": int(time()),
"files_limit": files_limit,
"reference": reference,
"files_in_progress": 0,
"files_processed": 0,
"files_matched": 0,
"files_errored": 0,
"total_files": 0,
"agents_left": len(agents),
"datasets_left": 0,
"total_datasets": 0,
"taints": json.dumps(taints),
}
with Session(self.engine) as session:
obj = Job(
id=job,
status="new",
rule_name=rule_name,
rule_author=rule_author,
raw_yara=raw_yara,
submitted=int(time()),
files_limit=files_limit,
reference=reference,
files_in_progress=0,
files_processed=0,
files_matched=0,
files_errored=0,
total_files=0,
agents_left=len(agents),
datasets_left=0,
total_datasets=0,
taints=taints,
)
session.add(obj)
session.commit()

self.redis.hmset(f"job:{job}", job_obj)
from . import tasks

for agent in agents:
Expand All @@ -235,7 +258,16 @@ def get_job_matches(
return MatchesSchema(job=self.get_job(job), matches=matches)

def update_job_files(self, job: JobId, total_files: int) -> int:
return self.redis.hincrby(f"job:{job}", "total_files", total_files)
"""Add total_files to the specified job, and return a new total."""
with Session(self.engine) as session:
(total_files,) = session.execute(
update(Job)
.where(Job.id == job)
.values(total_files=Job.total_files + total_files)
.returning(Job.total_files)
).one()
session.commit()
return total_files

def register_active_agent(
self,
Expand Down
38 changes: 38 additions & 0 deletions src/models/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from sqlmodel import SQLModel, Field, ARRAY, String, Column
from typing import Optional, List, Union


class JobBase(SQLModel):
"""Base class for entities related to mquery jobs"""

id: str
status: str
error: Optional[str]
rule_name: str
rule_author: str
raw_yara: str
submitted: int
finished: Optional[int]
files_limit: int
reference: str
files_processed: int
files_matched: int
files_in_progress: int
total_files: int
files_errored: int
taints: List[str] = Field(sa_column=Column(ARRAY(String)))
datasets_left: int
total_datasets: int
agents_left: int


class Job(JobBase, table=True):
"""Job object in the database. Internal ID is an implementation detail"""

internal_id: Union[int, None] = Field(default=None, primary_key=True)


class JobView(JobBase):
"""Pydantic model used in the public API"""

pass
27 changes: 3 additions & 24 deletions src/schema.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,11 @@
from enum import Enum
from typing import List, Dict, Optional
from pydantic import BaseModel, Field # type: ignore


class JobSchema(BaseModel):
id: str
status: str
error: Optional[str]
rule_name: str
rule_author: str
raw_yara: str
submitted: int
finished: Optional[int]
files_limit: int
reference: str
files_processed: int
files_matched: int
files_in_progress: int
total_files: int
files_errored: int
taints: List[str]
datasets_left: int
total_datasets: int
agents_left: int
from .models.job import JobView


class JobsSchema(BaseModel):
jobs: List[JobSchema]
jobs: List[JobView]


class ConfigSchema(BaseModel):
Expand Down Expand Up @@ -80,7 +59,7 @@ class ParseResponseSchema(BaseModel):


class MatchesSchema(BaseModel):
job: Dict
job: JobView
matches: List[Dict]


Expand Down
Loading

0 comments on commit 9d06fe5

Please sign in to comment.