Skip to content

Commit

Permalink
Refactor task
Browse files Browse the repository at this point in the history
  • Loading branch information
ablakley-r7 committed Jan 28, 2025
1 parent 30f954c commit bb5a7f4
Showing 1 changed file with 65 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import insightconnect_plugin_runtime
from insightconnect_plugin_runtime.exceptions import APIException, PluginException
from .schema import MonitorSiemLogsInput, MonitorSiemLogsOutput, MonitorSiemLogsState, Input, Output, Component, State
from typing import Dict, Tuple
from typing import Dict, List, Tuple
from datetime import datetime, timezone, timedelta


Expand Down Expand Up @@ -41,7 +41,7 @@ def run(self, params={}, state={}, custom_config={}):
)
if not state:
state = INITIAL_STATE
self.apply_custom_config(state, custom_config, max_api_lookback_date)
self.apply_custom_config(state, custom_config)
query_config = self.prepare_query_params(state, max_run_lookback_date, now_date)
logs, query_config = self.get_all_logs(run_condition, query_config)
exit_state, has_more_pages = self.prepare_exit_state(query_config, now_date)
Expand All @@ -62,14 +62,28 @@ def run(self, params={}, state={}, custom_config={}):
return [], existing_state, False, 500, PluginException(preset=PluginException.Preset.UNKNOWN, data=error)

def detect_run_condition(self, query_config: Dict) -> str:
"""
Return runtype based on query configuration
:param query_config:
:return: runtype string
"""
if not query_config:
return INITIAL_RUN
for log_type_config in query_config.values():
if not log_type_config.get("caught_up"):
return PAGINATION_RUN
return SUBSEQUENT_RUN

def get_max_lookback_date(self, now: datetime, run_condition: str, custom_config: bool):
def get_max_lookback_date(
self, now: datetime, run_condition: str, custom_config: bool
) -> Tuple[datetime, datetime]:
"""
Get max API lookback date and max lookback date for run condition
:param now:
:param run_condition:
:param custom_config:
:return: max_api_lookback_date, max_run_lookback_date
"""
max_api_lookback_days = MAX_LOOKBACK_DAYS
max_run_lookback_days = max_api_lookback_days
if run_condition in [INITIAL_RUN] and not custom_config:
Expand All @@ -79,20 +93,25 @@ def get_max_lookback_date(self, now: datetime, run_condition: str, custom_config
max_run_lookback_date = (now - timedelta(days=max_run_lookback_days)).date()
return max_api_lookback_date, max_run_lookback_date

def apply_custom_config(
self, current_query_config: Dict, custom_config: Dict, max_lookback_date: datetime.date
) -> Dict:
def apply_custom_config(self, current_query_config: Dict, custom_config: Dict) -> None:
"""
Apply custom configuration for lookback, query date applies to start and end time of query
:param current_query_config:
:param custom_config:
:return: N/A
"""
for log_type, lookback_date_string in custom_config.items():
lookback_date = datetime.strptime(lookback_date_string, "%Y-%m-%d").date()
if lookback_date > max_lookback_date:
self.logger.info(f"TASK: Setting query date to {lookback_date} for {log_type} log type")
current_query_config[log_type] = {"query_date": lookback_date_string}
else:
self.logger.info(
f"TASK: Supplied lookback date of {lookback_date} is beyond max lookback of {max_lookback_date} for {log_type} log type"
)
self.logger.info(f"TASK: Supplied lookback date of {lookback_date_string} for {log_type} log type")
current_query_config[log_type] = {"query_date": lookback_date_string}

def prepare_query_params(self, query_config, max_lookback_date, now_date):
def prepare_query_params(self, query_config: Dict, max_lookback_date: Dict, now_date: datetime) -> Dict:
"""
Prepare query for request. Validate query dates, move forward when caught up
:param query_config:
:param max_lookback_date:
:param now_date:
:return:
"""
for log_type, log_type_config in query_config.items():
query_date_str = log_type_config.get("query_date")
if query_date_str:
Expand All @@ -104,17 +123,33 @@ def prepare_query_params(self, query_config, max_lookback_date, now_date):
log_type_config["query_date"] = query_date + timedelta(days=1)
log_type_config["caught_up"] = False
log_type_config.pop("next_page")
query_config[log_type] = self.validate_config_lookback(log_type_config, max_lookback_date)
query_config[log_type] = self.validate_config_lookback(log_type_config, max_lookback_date, now_date)
return query_config

def validate_config_lookback(self, log_type_config, max_lookback_date):
if isinstance(log_type_config.get("query_date"), str):
log_type_config["query_date"] = datetime.strptime(log_type_config["query_date"], "%Y-%m-%d").date()
if log_type_config.get("query_date") < max_lookback_date:
def validate_config_lookback(self, log_type_config: Dict, max_lookback_date: datetime, now_date: datetime) -> Dict:
"""
Ensures provided query date in scope of request time window
:param log_type_config:
:param max_lookback_date:
:param now_date:
:return: log_type_config
"""
query_date = log_type_config.get("query_date")
if isinstance(query_date, str):
query_date = datetime.strptime(query_date, "%Y-%m-%d").date()
if query_date < max_lookback_date:
return {"query_date": max_lookback_date}
if query_date > now_date:
log_type_config["query_date"] = now_date
return log_type_config

def get_all_logs(self, run_condition: str, query_config: Dict):
def get_all_logs(self, run_condition: str, query_config: Dict) -> Tuple[List, Dict]:
"""
Gets all logs of provided log type. First retrieves batch URLs. Then downloads and reads batches, pooling logs.
:param run_condition:
:param query_config:
:return: Logs, updated query configuration (state)
"""
complete_logs = []
for log_type, log_type_config in query_config.items():
if (not log_type_config.get("caught_up")) or (run_condition != PAGINATION_RUN):
Expand All @@ -129,13 +164,21 @@ def get_all_logs(self, run_condition: str, query_config: Dict):
self.logger.info(f"TASK: Query for {log_type} is caught up. Skipping as we are currently paginating")
return complete_logs, query_config

def prepare_exit_state(self, state, now_date) -> Tuple[Dict, bool]:
def prepare_exit_state(self, state: dict, now_date: datetime) -> Tuple[Dict, bool]:
"""
Prepare state and pagination for task completion. Format date time.
:param state:
:param now_date:
:return: state, has_more_pages
"""
has_more_pages = False
for log_type, log_type_config in state.items():
query_date = log_type_config.get("query_date")
if log_type_config.get("caught_up") is True:
has_more_pages = True
if query_date:
if isinstance(query_date, str):
query_date = datetime.strptime(query_date, "%Y-%m-%d").date()
if query_date < now_date:
has_more_pages = True
log_type_config["query_date"] = query_date.strftime("%Y-%m-%d")
Expand Down

0 comments on commit bb5a7f4

Please sign in to comment.