Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User-defined cron jobs #62

Merged
merged 8 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 77 additions & 42 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ aiosignalrcore = "^0.9.2"
fcache = "^0.4.7"
click = "^8.0.1"
pyee = "^8.1.0"
APScheduler = "^3.7.0"
sentry-sdk = "^1.1.0"

[tool.poetry.dev-dependencies]
Expand Down
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file added src/demo_tzbtc/jobs/__init__.py
Empty file.
Empty file.
24 changes: 24 additions & 0 deletions src/dipdup/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ async def create_package(self) -> None:
with open(join(handlers_path, '__init__.py'), 'w'):
pass

self._logger.info('Creating `%s.jobs` package', self._config.package)
jobs_path = join(self._config.package_path, 'jobs')
with suppress(FileExistsError):
mkdir(jobs_path)
with open(join(jobs_path, '__init__.py'), 'w'):
pass

self._logger.info('Creating `%s/sql` directory', self._config.package)
sql_path = join(self._config.package_path, 'sql')
with suppress(FileExistsError):
Expand Down Expand Up @@ -315,6 +322,23 @@ async def generate_user_handlers(self) -> None:
else:
raise NotImplementedError(f'Index kind `{index_config.kind}` is not supported')

async def generate_jobs(self) -> None:
if not self._config.jobs:
return

jobs_path = join(self._config.package_path, 'jobs')
with open(join(dirname(__file__), 'templates', 'job.py.j2')) as file:
job_template = Template(file.read())

job_callbacks = set(job_config.callback for job_config in self._config.jobs.values())
for job_callback in job_callbacks:
self._logger.info('Generating job `%s`', job_callback)
job_code = job_template.render(job=job_callback)
job_path = join(jobs_path, f'{job_callback}.py')
if not exists(job_path):
with open(job_path, 'w') as file:
file.write(job_code)

async def cleanup(self) -> None:
"""Remove fetched JSONSchemas"""
self._logger.info('Cleaning up')
Expand Down
26 changes: 26 additions & 0 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@ class HandlerConfig:

def __post_init_post_parse__(self):
self._callback_fn = None
if self.callback in (ROLLBACK_HANDLER, CONFIGURE_HANDLER):
raise ConfigurationError(f'`{self.callback}` callback name is reserved')
if self.callback and self.callback != pascal_to_snake(self.callback):
raise ConfigurationError('`callback` field must conform to snake_case naming style')

@property
def callback_fn(self) -> Callable:
Expand Down Expand Up @@ -610,6 +614,13 @@ def valid_url(cls, v):
return v


@dataclass
class JobConfig(HandlerConfig):
crontab: str
args: Optional[Dict[str, Any]] = None
atomic: bool = False


@dataclass
class SentryConfig:
dsn: str
Expand All @@ -627,6 +638,7 @@ class DipDupConfig:
:param templates: Mapping of template aliases and index templates
:param database: Database config
:param hasura: Hasura config
:param jobs: Mapping of job aliases and job configs
:param sentry: Sentry integration config
"""

Expand All @@ -638,6 +650,7 @@ class DipDupConfig:
templates: Optional[Dict[str, IndexConfigTemplateT]] = None
database: Union[SqliteDatabaseConfig, PostgresDatabaseConfig] = SqliteDatabaseConfig(kind='sqlite')
hasura: Optional[HasuraConfig] = None
jobs: Optional[Dict[str, JobConfig]] = None
sentry: Optional[SentryConfig] = None

def __post_init_post_parse__(self):
Expand Down Expand Up @@ -833,6 +846,12 @@ def _initialize_handler_callback(self, handler_config: HandlerConfig) -> None:
callback_fn = getattr(handler_module, handler_config.callback)
handler_config.callback_fn = callback_fn

def _initialize_job_callback(self, job_config: JobConfig) -> None:
_logger.info('Registering job callback `%s`', job_config.callback)
job_module = importlib.import_module(f'{self.package}.jobs.{job_config.callback}')
callback_fn = getattr(job_module, job_config.callback)
job_config.callback_fn = callback_fn

def _initialize_index(self, index_name: str, index_config: IndexConfigT) -> None:
if index_name in self._initialized:
return
Expand Down Expand Up @@ -890,12 +909,19 @@ def _initialize_index(self, index_name: str, index_config: IndexConfigT) -> None

self._initialized.append(index_name)

def _initialize_jobs(self) -> None:
if not self.jobs:
return
for job_config in self.jobs.values():
self._initialize_job_callback(job_config)

def initialize(self) -> None:
_logger.info('Setting up handlers and types for package `%s`', self.package)

self.pre_initialize()
for index_name, index_config in self.indexes.items():
self._initialize_index(index_name, index_config)
self._initialize_jobs()


@dataclass
Expand Down
Loading