diff --git a/docs/redis.md b/docs/redis.md index d0de8e3e..2a5fd69a 100644 --- a/docs/redis.md +++ b/docs/redis.md @@ -3,6 +3,8 @@ Please note that all this is 100% internal, and shouldn't be relied on. Data format in redis can and does change between mquery releases. +Right now mquery is in the process of migrating internal storage to Postgres. + ### Why redis? Because very early daemon was a trivial piece of code, and Redis as a job @@ -22,9 +24,9 @@ To connect to redis use `redis-cli`. For docker-compose use Redis command documentation is pretty good and available at https://redis.io/commands/. -### Job objects (`job:*`) +### Job table (`job`) -Job object is a Hash represented by schema.JobSchema class. +Jobs are stored in the `job` table. Every job has ID, which is a random 12 character string like 2OV8UP4DUOWK (the same string that is visible in urls like http://mquery.net/query/2OV8UP4DUOWK). @@ -75,9 +77,9 @@ Cache objects are arbitrary strings. Right now they are only optionally used by metadata plugins, to cache expensive computation in redis. -### Plugin objects (`plugin:*`) +### Configuration table (`configentry`) -Represented by schema.ConfigSchema class, they store plugin configuration. +Represented by models.configentry.ConfigEntry class. For example, `plugin:TestPlugin` will store configuration for `TestPlugin` as a dictionary. All plugins can expose their own arbitrary config options. diff --git a/src/app.py b/src/app.py index 064d592e..968139dd 100644 --- a/src/app.py +++ b/src/app.py @@ -27,9 +27,9 @@ from .lib.yaraparse import parse_yara from .plugins import PluginManager from .lib.ursadb import UrsaDb +from .models.job import Job, JobView from .schema import ( JobsSchema, - JobSchema, RequestConfigEdit, RequestQueryMethod, QueryRequestSchema, @@ -492,11 +492,11 @@ def matches( @app.get( "/api/job/{job_id}", - response_model=JobSchema, + response_model=JobView, tags=["stable"], dependencies=[Depends(can_view_queries)], ) -def job_info(job_id: str) -> JobSchema: +def job_info(job_id: str) -> Job: """ Returns a metadata for a single job. May be useful for monitoring a job progress. diff --git a/src/db.py b/src/db.py index fc3b24df..8bacd1bf 100644 --- a/src/db.py +++ b/src/db.py @@ -7,10 +7,11 @@ from redis import StrictRedis from enum import Enum from rq import Queue # type: ignore -from sqlmodel import Session, SQLModel, create_engine, select, and_ +from sqlmodel import Session, SQLModel, create_engine, select, and_, update from .models.configentry import ConfigEntry -from .schema import JobSchema, MatchesSchema, AgentSpecSchema, ConfigSchema +from .models.job import Job +from .schema import MatchesSchema, AgentSpecSchema, ConfigSchema from .config import app_config @@ -67,55 +68,36 @@ def __schedule(self, agent: str, task: Any, *args: Any) -> None: def get_job_ids(self) -> List[JobId]: """Gets IDs of all jobs in the database""" - return [key[4:] for key in self.redis.keys("job:*")] + with Session(self.engine) as session: + jobs = session.exec(select(Job)).all() + return [j.id for j in jobs] - def cancel_job(self, job: JobId) -> None: - """Sets the job status to cancelled""" - self.redis.hmset( - f"job:{job}", - {"status": "cancelled", "finished": int(time())}, - ) + def cancel_job(self, job: JobId, error=None) -> None: + """Sets the job status to cancelled, with optional error message""" + with Session(self.engine) as session: + session.execute( + update(Job) + .where(Job.id == job) + .values(status="cancelled", finished=int(time()), error=error) + ) + session.commit() def fail_job(self, job: JobId, message: str) -> None: """Sets the job status to cancelled with provided error message.""" - self.redis.hmset( - f"job:{job}", - {"status": "cancelled", "error": message, "finished": int(time())}, - ) + self.cancel_job(job, message) - def get_job(self, job: JobId) -> JobSchema: - """Retrieves a job from the database. Tries to fix corrupted objects""" - data = self.redis.hgetall(f"job:{job}") - if data.get("status") in ["expired", "failed"]: - # There is no support for migrations in Redis "databases". - # These are old statuses, not used in the new versions anymore. - data["status"] = "cancelled" - - return JobSchema( - id=job, - status=data.get("status", "ERROR"), - error=data.get("error", None), - rule_name=data.get("rule_name", "ERROR"), - rule_author=data.get("rule_author", "unknown"), - raw_yara=data.get("raw_yara", "ERROR"), - submitted=data.get("submitted", 0), - finished=data.get("finished", None), - files_limit=data.get("files_limit", 0), - files_processed=int(data.get("files_processed", 0)), - files_matched=int(data.get("files_matched", 0)), - files_in_progress=int(data.get("files_in_progress", 0)), - total_files=int(data.get("total_files", 0)), - files_errored=int(data.get("files_errored", 0)), - reference=data.get("reference", ""), - taints=json.loads(data.get("taints", "[]")), - total_datasets=data.get("total_datasets", 0), - datasets_left=data.get("datasets_left", 0), - agents_left=data.get("agents_left", 0), - ) + def get_job(self, job: JobId) -> Job: + """Retrieves a job from the database""" + with Session(self.engine) as session: + return session.exec(select(Job).where(Job.id == job)).one() def remove_query(self, job: JobId) -> None: """Sets the job status to removed""" - self.redis.hmset(f"job:{job}", {"status": "removed"}) + with Session(self.engine) as session: + session.execute( + update(Job).where(Job.id == job).values(status="removed") + ) + session.commit() def add_match(self, job: JobId, match: MatchInfo) -> None: self.redis.rpush(f"meta:{job}", match.to_json()) @@ -130,16 +112,31 @@ def job_start_work(self, job: JobId, in_progress: int) -> None: :param job: ID of the job being updated. :param in_progress: Number of files in the current work unit. """ - self.redis.hincrby(f"job:{job}", "files_in_progress", in_progress) + with Session(self.engine) as session: + session.execute( + update(Job) + .where(Job.id == job) + .values(files_in_progress=Job.files_in_progress + in_progress) + ) + session.commit() def agent_finish_job(self, job: JobId) -> None: """Decrements the number of active agents in the given job. If there are no more agents, job status is changed to done.""" - new_agents = self.redis.hincrby(f"job:{job}", "agents_left", -1) - if new_agents <= 0: - self.redis.hmset( - f"job:{job}", {"status": "done", "finished": int(time())} - ) + with Session(self.engine) as session: + (agents_left,) = session.execute( + update(Job) + .where(Job.id == job) + .values(agents_left=Job.agents_left - 1) + .returning(Job.agents_left) + ).one() + if agents_left == 0: + session.execute( + update(Job) + .where(Job.id == job) + .values(finished=int(time()), status="done") + ) + session.commit() def agent_add_tasks_in_progress( self, job: JobId, agent: str, tasks: int @@ -159,21 +156,44 @@ def job_update_work( """Updates progress for the job. This will increment numbers processed, inprogress, errored and matched files. Returns the number of processed files after the operation.""" - files = self.redis.hincrby(f"job:{job}", "files_processed", processed) - self.redis.hincrby(f"job:{job}", "files_in_progress", -processed) - self.redis.hincrby(f"job:{job}", "files_matched", matched) - self.redis.hincrby(f"job:{job}", "files_errored", errored) - return files + with Session(self.engine) as session: + (files_processed,) = session.execute( + update(Job) + .where(Job.id == job) + .values( + files_processed=Job.files_processed + processed, + files_in_progress=Job.files_in_progress - processed, + files_matched=Job.files_matched + matched, + files_errored=Job.files_errored + errored, + ) + .returning(Job.files_processed) + ).one() + session.commit() + return files_processed def init_job_datasets(self, job: JobId, num_datasets: int) -> None: """Sets total_datasets and datasets_left, and status to processing""" - self.redis.hincrby(f"job:{job}", "total_datasets", num_datasets) - self.redis.hincrby(f"job:{job}", "datasets_left", num_datasets) - self.redis.hset(f"job:{job}", "status", "processing") + with Session(self.engine) as session: + session.execute( + update(Job) + .where(Job.id == job) + .values( + total_datasets=num_datasets, + datasets_left=num_datasets, + status="processing", + ) + ) + session.commit() def dataset_query_done(self, job: JobId): """Decrements the number of datasets left by one.""" - self.redis.hincrby(f"job:{job}", "datasets_left", -1) + with Session(self.engine) as session: + session.execute( + update(Job) + .where(Job.id == job) + .values(datasets_left=Job.datasets_left - 1) + ) + session.commit() def create_search_task( self, @@ -190,26 +210,29 @@ def create_search_task( random.choice(string.ascii_uppercase + string.digits) for _ in range(12) ) - job_obj = { - "status": "new", - "rule_name": rule_name, - "rule_author": rule_author, - "raw_yara": raw_yara, - "submitted": int(time()), - "files_limit": files_limit, - "reference": reference, - "files_in_progress": 0, - "files_processed": 0, - "files_matched": 0, - "files_errored": 0, - "total_files": 0, - "agents_left": len(agents), - "datasets_left": 0, - "total_datasets": 0, - "taints": json.dumps(taints), - } + with Session(self.engine) as session: + obj = Job( + id=job, + status="new", + rule_name=rule_name, + rule_author=rule_author, + raw_yara=raw_yara, + submitted=int(time()), + files_limit=files_limit, + reference=reference, + files_in_progress=0, + files_processed=0, + files_matched=0, + files_errored=0, + total_files=0, + agents_left=len(agents), + datasets_left=0, + total_datasets=0, + taints=taints, + ) + session.add(obj) + session.commit() - self.redis.hmset(f"job:{job}", job_obj) from . import tasks for agent in agents: @@ -235,7 +258,16 @@ def get_job_matches( return MatchesSchema(job=self.get_job(job), matches=matches) def update_job_files(self, job: JobId, total_files: int) -> int: - return self.redis.hincrby(f"job:{job}", "total_files", total_files) + """Add total_files to the specified job, and return a new total.""" + with Session(self.engine) as session: + (total_files,) = session.execute( + update(Job) + .where(Job.id == job) + .values(total_files=Job.total_files + total_files) + .returning(Job.total_files) + ).one() + session.commit() + return total_files def register_active_agent( self, diff --git a/src/models/job.py b/src/models/job.py new file mode 100644 index 00000000..90e5cca6 --- /dev/null +++ b/src/models/job.py @@ -0,0 +1,38 @@ +from sqlmodel import SQLModel, Field, ARRAY, String, Column +from typing import Optional, List, Union + + +class JobBase(SQLModel): + """Base class for entities related to mquery jobs""" + + id: str + status: str + error: Optional[str] + rule_name: str + rule_author: str + raw_yara: str + submitted: int + finished: Optional[int] + files_limit: int + reference: str + files_processed: int + files_matched: int + files_in_progress: int + total_files: int + files_errored: int + taints: List[str] = Field(sa_column=Column(ARRAY(String))) + datasets_left: int + total_datasets: int + agents_left: int + + +class Job(JobBase, table=True): + """Job object in the database. Internal ID is an implementation detail""" + + internal_id: Union[int, None] = Field(default=None, primary_key=True) + + +class JobView(JobBase): + """Pydantic model used in the public API""" + + pass diff --git a/src/schema.py b/src/schema.py index fdc1a6b9..9e34b141 100644 --- a/src/schema.py +++ b/src/schema.py @@ -1,32 +1,11 @@ from enum import Enum from typing import List, Dict, Optional from pydantic import BaseModel, Field # type: ignore - - -class JobSchema(BaseModel): - id: str - status: str - error: Optional[str] - rule_name: str - rule_author: str - raw_yara: str - submitted: int - finished: Optional[int] - files_limit: int - reference: str - files_processed: int - files_matched: int - files_in_progress: int - total_files: int - files_errored: int - taints: List[str] - datasets_left: int - total_datasets: int - agents_left: int +from .models.job import JobView class JobsSchema(BaseModel): - jobs: List[JobSchema] + jobs: List[JobView] class ConfigSchema(BaseModel): @@ -80,7 +59,7 @@ class ParseResponseSchema(BaseModel): class MatchesSchema(BaseModel): - job: Dict + job: JobView matches: List[Dict] diff --git a/src/tasks.py b/src/tasks.py index 9ec90d3a..0a92e88e 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -9,7 +9,7 @@ from .util import make_sha256_tag from .config import app_config from .plugins import PluginManager -from .schema import JobSchema +from .models.job import Job from .lib.yaraparse import parse_yara, combine_rules from .lib.ursadb import Json, UrsaDb from .metadata import Metadata @@ -85,7 +85,7 @@ def update_metadata( match = MatchInfo(orig_name, metadata, matches) self.db.add_match(job, match) - def execute_yara(self, job: JobSchema, files: List[str]) -> None: + def execute_yara(self, job: Job, files: List[str]) -> None: rule = yara.compile(source=job.raw_yara) num_matches = 0 num_errors = 0 @@ -129,7 +129,7 @@ def execute_yara(self, job: JobSchema, files: List[str]) -> None: f"in {scanned_datasets}/{job.total_datasets} ({dataset_percent:.0%}) of datasets.", ) - def add_tasks_in_progress(self, job: JobSchema, tasks: int) -> None: + def add_tasks_in_progress(self, job: Job, tasks: int) -> None: """See documentation of db.agent_add_tasks_in_progress""" self.db.agent_add_tasks_in_progress(job.id, self.group_id, tasks)