Skip to content

Commit

Permalink
update IStarlakeOrchestration
Browse files Browse the repository at this point in the history
  • Loading branch information
fupelaqu committed Nov 22, 2024
1 parent cf40b5b commit 3222f46
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
__all__ = ['starlake_airflow_job', 'starlake_airflow_options']
__all__ = ['starlake_airflow_job', 'starlake_airflow_options', 'starlake_airflow_orchestration']

from .starlake_airflow_job import StarlakeAirflowJob, DEFAULT_DAG_ARGS, DEFAULT_POOL
from .starlake_airflow_options import StarlakeAirflowOptions
from .starlake_airflow_options import StarlakeAirflowOptions
from .starlake_airflow_orchestration import StarlakeAirflowOrchestration
Original file line number Diff line number Diff line change
@@ -1,25 +1,55 @@
from __future__ import annotations

from ai.starlake.job import IStarlakeJob, StarlakeSparkConfig, StarlakeOptions
from ai.starlake.job import IStarlakeJob, StarlakeSparkConfig, StarlakePreLoadStrategy

from ai.starlake.orchestration.starlake_schedules import StarlakeSchedules
from ai.starlake.orchestration.starlake_dependencies import StarlakeDependencies

import inspect

import sys

from typing import Generic, List, TypeVar, Union

T = TypeVar("T")

U = TypeVar("U")

class IStarlakeOrchestration(Generic[U], IStarlakeJob[T], StarlakeOptions):
def __init__(self, job: IStarlakeJob[T], sparkConfig: StarlakeSparkConfig, **kwargs) -> None:
"""Initializes a new IStarlakeOrchestration instance.
args:
job (IStarlakeJob[T]): The required starlake job.
sparkConfig (StarlakeSparkConfig): The required spark config.
class IStarlakeOrchestration(Generic[U], IStarlakeJob[T]):
def __init__(self, pre_load_strategy: Union[StarlakePreLoadStrategy, str, None], options: dict=None, **kwargs) -> None:
"""Overrides IStarlakeJob.__init__()
Args:
pre_load_strategy (Union[StarlakePreLoadStrategy, str, None]): The pre-load strategy to use.
options (dict): The options to use.
"""
self.job = job
self.sparkConfig = sparkConfig
self.options = job.options
super().__init__(pre_load_strategy=pre_load_strategy, options=options, **kwargs)

# Get the current call stack
stack = inspect.stack()

# Get the caller's stack frame
caller_frame = stack[1]

# Retrieve the filename of the caller (the one who instantiated this class)
self.caller_filename = caller_frame.filename

# Get the name of the caller module
self.caller_module_name = caller_frame.frame.f_globals["__name__"]

# Access the caller's global variables
self.caller_globals = sys.modules[self.caller_module_name].__dict__

def default_spark_config(*args, **kwargs) -> StarlakeSparkConfig:
return StarlakeSparkConfig(
memory=self.caller_globals.get('spark_executor_memory', None),
cores=self.caller_globals.get('spark_executor_cores', None),
instances=self.caller_globals.get('spark_executor_instances', None),
cls_options=self,
options=self.options,
**kwargs
)

self.spark_config: StarlakeSparkConfig = getattr(self.caller_module_name, "get_spark_config", default_spark_config)

def sl_generate_scheduled_tables(self, schedules: StarlakeSchedules, **kwargs) -> List[U]:
"""Generate the Starlake dags that will orchestrate the load of the specified domains.
Expand Down

0 comments on commit 3222f46

Please sign in to comment.