Skip to content

Commit

Permalink
Revamped ASM sync code and minor fixes for c6g alerts/topcve scans
Browse files Browse the repository at this point in the history
  • Loading branch information
aloftus23 committed Sep 13, 2024
1 parent 1e91bea commit 20f09d8
Show file tree
Hide file tree
Showing 15 changed files with 720 additions and 222 deletions.
7 changes: 6 additions & 1 deletion adhoc_investigations/intelx_adhoc_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time

# Third-Party Libraries
from dateutil.parser import parse
import pandas as pd
import requests

Expand All @@ -16,6 +17,9 @@
# IntelX API Info
api_key = adhoc_config.get_ini_data().get("intelx")

def parse_datetime(date):
"""Parse datetime string for multiple formats."""
return parse(date)

def query_identity_api(domain, start_date, end_date):
"""Create an initial search and return the search id."""
Expand Down Expand Up @@ -118,7 +122,8 @@ def get_intelx_data(org_abbrv, start_date, end_date, domains, save_file):
all_df["date"] = all_df["date"].str.strip() # remove leading/trailing spaces
# all_df['datetime'] = pd.to_datetime(all_df['date'], format='mixed')
# all_df['datetime'] = pd.to_datetime(all_df['date'])
all_df['datetime'] = pd.to_datetime(all_df['date'], format='ISO8601')
# all_df['datetime'] = pd.to_datetime(all_df['date'], format='ISO8601')
all_df['datetime'] = all_df.date.apply(parse_datetime)
all_df['date'] = all_df['datetime'].dt.strftime('%m/%d/%Y')
all_df.reset_index(drop=True, inplace=True)

Expand Down
120 changes: 64 additions & 56 deletions src/pe_asm/asm_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
"""

# Standard Python Libraries
from datetime import timedelta
import logging
import sys
import time
from typing import Any, Dict

# Third-Party Libraries
Expand Down Expand Up @@ -61,68 +63,70 @@
def run_asm_sync(staging, method):
"""Collect and sync ASM data."""
if method == "asm":
# Run function to fetch and store all CyHy assets in the P&E database
LOGGER.info("Collecting CyHy assets")
get_cyhy_assets(staging)
LOGGER.info("Finished.")
# --- Local Portion of ASM Sync ---
# *** This portion of the ASM Sync process needs to be run locally
# on a Macbook because the Accessor is not allowed to directly
# connect to the CyHy environment. A dedicated python script is
# available for this "local step" of the ASM Sync

# Fill the P&E CIDRs table from CyHy assets
LOGGER.info("Filling CIDRs.")
fill_cidrs("all_orgs", staging)
LOGGER.info("Finished.")
# Fetch assets from the CyHy database and store them in the PE database
# LOGGER.info("Retrieving assets from the CyHy database...")
# get_cyhy_assets(staging) # <- needs to happen locally
# LOGGER.info("Finished retrieving assets from the CyHy database")


# --- Non-Local Portion of ASM Sync ---
# *** This portion of the ASM Sync process can run remotely on the
# Accessor because it does not require connecting to the CyHy environment

print("*** Running ATC-Framework version of ASM Sync ***")
# Fill the PE CIDRs table using the CyHy assets
LOGGER.info("Filling the CIDRs table using the retrieved CyHy assets...")
fill_cidrs("all_orgs", staging)
LOGGER.info("Finished filling the CIDRs table using the retrieved CyHy assets")
# Identify which CIDRs are current
LOGGER.info("Identify CIDR changes")
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
identify_cidr_changes(conn)
conn.close()

# Fill root domains from dot gov table
LOGGER.info("Identifying CIDR changes...")
identify_cidr_changes(staging)
LOGGER.info("Finished identifying CIDR changes")

# Fill root domains using the retrieved dot gov data
LOGGER.info("Filling the root domains table using the retrieved dot gov data...")
# TODO
LOGGER.info("Finished filling the root domains table using the retrieved dot gov data")

# Enumerate sub domains from roots
LOGGER.info("Enumerating roots and saving sub-domains.")
# Enumerate subdomains from roots
LOGGER.info("Enumerating sub-domains from root domains...")
get_subdomains(staging)
LOGGER.info("Finished.")

# Connect subs from ips
LOGGER.info("Linking subs from ips.")
connect_subs_from_ips(staging)
LOGGER.info("Finished.")

# Connect ips from subs
LOGGER.info("Linking ips from subs.")
connect_ips_from_subs(staging)
LOGGER.info("Finished.")

# Identify the current IPs, sub-domains, and connections
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
LOGGER.info("Identify IP changes.")
identify_ip_changes(conn)
LOGGER.info("Identify Sub changes.")
identify_sub_changes(conn)
LOGGER.info("Identify IP SUB changes.")
identify_ip_sub_changes(conn)
conn.close()
LOGGER.info("Finished")

# Update Identified sub-domains
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
identified_sub_domains(conn)
LOGGER.info("Finished enumerating sub-domains from root domains")

# Link subdomains and ips using ips
LOGGER.info("Linking sub-domains and ips using ips...")
connect_subs_from_ips(staging) # *** Takes a really long time ~16 days
LOGGER.info("Finished linking sub-domains and ips using ips")

# Link subdomains and ips using subdomains
LOGGER.info("Linking sub-domains and ips using sub-domains...")
connect_ips_from_subs(staging) # Takes a little more than a day
LOGGER.info("Finished linking sub-domains and ips using sub-domains")

# Identify which IPs, sub-domains, and connections are current
LOGGER.info("Identify IP changes...")
identify_ip_changes(staging)
LOGGER.info("Finished identifying IP changes")
LOGGER.info("Identifying sub-domain changes...")
identify_sub_changes(staging)
LOGGER.info("Finished identifying sub-domain changes")
LOGGER.info("Identifying IP sub-domain link changes...")
identify_ip_sub_changes(staging)
LOGGER.info("Finished identifying IP sub-domain link changes")
LOGGER.info("Updating identified sub-domains...")
identified_sub_domains(staging)
LOGGER.info("Finished updating identified sub-domains")

# Run shodan dedupe
LOGGER.info("Running Shodan dedupe.")
dedupe(staging)
LOGGER.info("Finished.")
LOGGER.info("Running Shodan dedupe...")
dedupe(staging) # Takes about ~12hrs
LOGGER.info("Finished running Shodan dedupe")

elif method == "scorecard":
LOGGER.info("STARTING")
Expand Down Expand Up @@ -177,7 +181,6 @@ def main():
datefmt="%m/%d/%Y %I:%M:%S",
level=log_level.upper(),
)
LOGGER.info("Starting ASM sync scripts")

# Check for the staging option
try:
Expand All @@ -186,8 +189,13 @@ def main():
print(e)
staging = False

# Run ASM finder
# Run ASM Sync
LOGGER.info("--- ASM Sync Process Starting ---")
start_time = time.time()
run_asm_sync(staging, validated_args["METHOD"])
end_time = time.time()
LOGGER.info(f"Execution time for ASM Sync: {str(timedelta(seconds=(end_time - start_time)))} (H:M:S)")
LOGGER.info("--- ASM Sync Process Complete ---")

# Stop logging and clean up
logging.shutdown()
86 changes: 61 additions & 25 deletions src/pe_asm/data/cyhy_db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ def insert_sub_domains(conn, df):
cursor = conn.cursor()
extras.execute_values(cursor, sql.format(table, cols), tpls)
conn.commit()
cursor.close()
except (Exception, psycopg2.DatabaseError) as err:
# Show error and close connection if failed
LOGGER.error("There was a problem with your database query %s", err)
Expand Down Expand Up @@ -612,104 +613,132 @@ def identify_org_asset_changes(conn):
conn.commit()


def identify_cidr_changes(conn):
def identify_cidr_changes(staging):
"""Identify CIDR changes."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# Execute queries
cursor = conn.cursor()
LOGGER.info("Marking CIDRs that are in the db.")
LOGGER.info("Marking CIDRs as current if seen within the last 3 days")
cursor.execute(
"""
UPDATE cidrs
set current = True
where last_seen > (CURRENT_DATE - INTERVAL '3 days')
where last_seen > (CURRENT_DATE - INTERVAL '20 days')
"""
)
conn.commit()

LOGGER.info("Marking CIDRs that are no longer seen.")
cursor = conn.cursor()
LOGGER.info("Marking CIDRs as not current if not seen within the last 3 days")
cursor.execute(
"""
UPDATE cidrs
set current = False
where last_seen < (CURRENT_DATE - INTERVAL '3 days')
where last_seen < (CURRENT_DATE - INTERVAL '20 days')
"""
)
conn.commit()
cursor.close()
# Close database connection
conn.close()


def identify_ip_changes(conn):
def identify_ip_changes(staging):
"""Identify IP changes."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# Execute queries
cursor = conn.cursor()
LOGGER.info("Marking IPs that are in the db.")
cursor.execute(
"""
UPDATE ips
set current = True
where last_seen > (CURRENT_DATE - INTERVAL '15 days')
where last_seen > (CURRENT_DATE - INTERVAL '20 days')
"""
)
conn.commit()

LOGGER.info("Marking IPs that are no longer seen.")
cursor = conn.cursor()
cursor.execute(
"""
UPDATE ips
set current = False
where last_seen < (CURRENT_DATE - INTERVAL '15 days') or last_seen isnull;
where last_seen < (CURRENT_DATE - INTERVAL '20 days') or last_seen isnull;
"""
)
conn.commit()
cursor.close()
# Close database connection
conn.close()


def identify_sub_changes(conn):
def identify_sub_changes(staging):
"""Identify IP changes."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# Execute queries
cursor = conn.cursor()
LOGGER.info("Marking Subs that are in the db.")
cursor.execute(
"""
UPDATE sub_domains
set current = True
where last_seen > (CURRENT_DATE - INTERVAL '15 days')
where last_seen > (CURRENT_DATE - INTERVAL '20 days')
"""
)
conn.commit()

LOGGER.info("Marking IPs that are no longer seen.")
cursor = conn.cursor()
cursor.execute(
"""
UPDATE sub_domains
set current = False
where last_seen < (CURRENT_DATE - INTERVAL '15 days') or last_seen isnull;
where last_seen < (CURRENT_DATE - INTERVAL '20 days') or last_seen isnull;
"""
)
conn.commit()
cursor.close()
# Close database connection
conn.close()


def identify_ip_sub_changes(conn):
def identify_ip_sub_changes(staging):
"""Identify IP/Subs changes."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# Execute queries
cursor = conn.cursor()
LOGGER.info("Marking Subs that are in the db.")
LOGGER.info("Marking IP-subs that are in the db.")
cursor.execute(
"""
UPDATE ips_subs
set current = True
where last_seen > (CURRENT_DATE - INTERVAL '15 days')
where last_seen > (CURRENT_DATE - INTERVAL '20 days')
"""
)
conn.commit()

LOGGER.info("Marking IPs that are no longer seen.")
cursor = conn.cursor()
LOGGER.info("Marking IP-subs that are no longer seen.")
cursor.execute(
"""
UPDATE ips_subs
set current = False
where last_seen < (CURRENT_DATE - INTERVAL '15 days') or last_seen isnull;
where last_seen < (CURRENT_DATE - INTERVAL '20 days') or last_seen isnull;
"""
)
conn.commit()
cursor.close()
# Close database connection
conn.close()


def insert_cyhy_scorecard_data(conn, df, table_name, on_conflict):
Expand All @@ -734,8 +763,13 @@ def insert_cyhy_scorecard_data(conn, df, table_name, on_conflict):
cursor.close()


def identified_sub_domains(conn):
def identified_sub_domains(staging):
"""Set sub-domains to identified."""
# Connect to database
if staging:
conn = pe_db_staging_connect()
else:
conn = pe_db_connect()
# If the sub's root-domain has enumerate=False, then "identified" is True
cursor = conn.cursor()
LOGGER.info("Marking identified sub-domains.")
Expand All @@ -749,6 +783,8 @@ def identified_sub_domains(conn):
)
conn.commit()
cursor.close()
# Close database connection
conn.close()


def get_fceb_orgs(conn):
Expand Down
4 changes: 2 additions & 2 deletions src/pe_asm/helpers/enumerate_subs_from_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def get_subdomains(staging=False, roots_df=None):
else:
conn = pe_db_connect()

# Query root domains
# Query root domains if none provided
if not isinstance(roots_df, pd.DataFrame):
roots_df = query_roots(conn)
total_roots = len(roots_df.index)
Expand All @@ -89,7 +89,7 @@ def get_subdomains(staging=False, roots_df=None):
# Enumerate for sub-domains
LOGGER.info("Enumerating this root: %s", root_row["root_domain"])
subs = enumerate_roots(root_row["root_domain"], root_row["root_domain_uid"])
LOGGER.info(subs)
# LOGGER.info(subs) # Too much log output
# Create DataFrame
subs_df = pd.DataFrame(subs)

Expand Down
Loading

0 comments on commit 20f09d8

Please sign in to comment.