From b44ccaadd53cff91b9ad98f2028b55a1d42c17b7 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Tue, 14 Nov 2023 15:32:35 -0500 Subject: [PATCH] Add get user grants (MSSP aware) sample --- samples/README.md | 17 +- samples/user_management/README.md | 104 +++++++++++ samples/user_management/get_user_grants.py | 207 +++++++++++++++++++++ 3 files changed, 327 insertions(+), 1 deletion(-) create mode 100644 samples/user_management/get_user_grants.py diff --git a/samples/README.md b/samples/README.md index 55b38087d..b80bcd186 100644 --- a/samples/README.md +++ b/samples/README.md @@ -54,7 +54,7 @@ The following samples are categorized by CrowdStrike Falcon API service collecti | [Sensor Download](#sensor-download) | [Download the CrowdStrike sensor](#download-the-crowdstrike-sensor) | | [Sensor Update Policies](#sensor-update-policies) | [Policy Wonk](#policy-wonk) | | [Spotlight](#spotlight) | [Find vulnerable hosts by CVE ID](#find-vulnerable-hosts-by-cve-id)
[CISA DHS Known Exploited Vulnerabilities](#cisa-dhs-known-exploited-vulnerabilities)
[Spotlight Quick Report](#spotlight-quick-report) | -| [User Management](#user-management) | [Bulk user administration](#bulk-user-administration) | +| [User Management](#user-management) | [Bulk user administration](#bulk-user-administration)
[Get user grants](#get-user-grants) | ##### Class type legend @@ -1104,6 +1104,21 @@ This sample demonstrates the following CrowdStrike User Management API operation --- +### Get user grants +This [sample](user_management#get-user-grants) demonstrates retrieving a list of all user grants asynchronously using the User Management Service Class. + +[![User Management](https://img.shields.io/badge/Service%20Class-Get_User_Grants-silver?style=for-the-badge&labelColor=red&logo=)](user_management#get-user-grants) [![MSSP Use supported](https://img.shields.io/badge/-Supports%20MSSP-darkblue?logo=&style=for-the-badge)](user_management#get-user-grants) + +#### User Management API operations discussed +This sample demonstrates the following CrowdStrike User Management API operations: + +| Operation | Description | +| :--- | :--- | +| [queryUserV1](https://falconpy.io/Service-Collections/User-Management.html#queryuserv1) | List user IDs for all users in your customer account. | +| [combinedUserRolesV1](https://falconpy.io/Service-Collections/User-Management.html#combineduserrolesv1) | Get User Grant(s). This operation lists both direct as well as flight control grants between a user and a customer. | +| [retrieveUsersGETV1](https://falconpy.io/Service-Collections/User-Management.html#retrieveusersgetv1) | Get information about users including their name, UID, and CID by providing user UUIDs. | + +--- ## Suggestions Do you have a suggestion for an example you'd like to see? Are one of the examples not working as expected? Let us know by posting a message to our [discussion board](https://github.com/CrowdStrike/falconpy/discussions). diff --git a/samples/user_management/README.md b/samples/user_management/README.md index 82737f6e4..677a74e24 100644 --- a/samples/user_management/README.md +++ b/samples/user_management/README.md @@ -5,6 +5,7 @@ # User Management examples The examples in this folder focus on leveraging CrowdStrike's User Management API to perform administrative operations. - [Bulk import, update and remove users](#bulk-import-update-and-remove-users) +- [Get user grants](#get-user-grants) ## Bulk import, update, and remove users Consumes a provided user list (JSON format) and creates the user accounts as specified in your Falcon tenant. @@ -176,3 +177,106 @@ optional arguments: ### Example source code The source code for this example can be found [here](bulk_user.py). + +## Get user grants +Asynchronously retrieve a list of all users within the tenant, along with their grants and then +write the results to a comma-delimited text file. This solution is automatically Flight Control +aware and supports API debugging. + +### Running the program +In order to run this demonstration, you will need access to CrowdStrike API keys with the following scope: +| Service Collection | Scope | +| :---- | :---- | +| User Management | __READ__ | + +### Execution syntax +The following arguments are accepted at run time. + +| Argument | Long Argument | Description | +| :-- | :-- | :-- | +| `-h` | `--help` | show this help message and exit | +| `-d` | `--debug` | Enable API debugging | +| `-o` OUTPUT | `--output` OUTPUT | CSV output file name | +| `-k` FALCON_CLIENT_ID | `--falcon_client_id` FALCON_CLIENT_ID | Falcon Client ID | +| `-s` FALCON_CLIENT_SECRET | `--falcon_client_secret` FALCON_CLIENT_SECRET | Falcon Client Secret | + +```shell +python3 get_user_grants.py [-h] [-d] [-o OUTPUT] [-k CLIENT_ID] [-s CLIENT_SECRET] +``` + +#### Authentication +For users that have the environment variables `FALCON_CLIENT_ID` and `FALCON_CLIENT_SECRET` defined, you +do not need to provide authentication detail on the command line. + +```shell +python3 get_user_grants.py +``` + +If you do not have these values defined, you may provide them at runtime via the command line using the `-k` and `-s` arguments. + +```shell +python3 get_user_grants.py -k CLIENT_ID -s CLIENT_SECRET +``` + +#### Outputting results to a different location +You may define the name and location of the resulting output CSV file using the `-o` command line argument. +> Please note: You must provide the trailing slash to specify a directory. (`/` = Mac / Linux, `\` = Windows) + +##### Output to a file +```shell +python3 get_user_grants.py -o /path/to/output/file.csv +``` + +##### Output to a directory +```shell +python3 get_user_grants.py -o /path/to/directory/ +``` + +#### Enabling API debugging. +API debugging may be enabled with the `-d` command line argument. + +```shell +python3 get_user_grants.py -d +``` + +#### Command-line help +Command-line help is available via the `-h` argument. + +```shell +usage: get_user_grants.py [-h] [-d] [-o OUTPUT] [-k CLIENT_ID] [-s CLIENT_SECRET] + +Threaded user grant lookup sample. + + ______ __ _______ __ __ __ +| |.----.-----.--.--.--.--| | __| |_.----.|__| |--.-----. +| ---|| _| _ | | | | _ |__ | _| _|| | <| -__| +|______||__| |_____|________|_____|_______|____|__| |__|__|__|_____| + ___ ___ ___ ___ __ + | Y .-----.-----.----.| Y .---.-.-----.---.-.-----.-----.--------.-----.-----| |_ + |. | |__ --| -__| _||. | _ | | _ | _ | -__| | -__| | _| + |. | |_____|_____|__| |. \_/ |___._|__|__|___._|___ |_____|__|__|__|_____|__|__|____| + |: 1 | |: | | |_____| + |::.. . | |::.|:. | with Flight Control! + `-------' `--- ---' (FalconPy v1.3.0+) + +Asynchronously retrieve all user grants for every user defined within the tenant and output +the results to a comma-delimited text file. When not specified, this file is named user_grants.csv. + +Creation date: 11.13.2023 - jshcodes@CrowdStrike + +optional arguments: + -h, --help show this help message and exit + -d, --debug Enable debug. + -o OUTPUT, --output OUTPUT + CSV output filename. + +authentication arguments (not required if using environment authentication): + -k CLIENT_ID, --client_id CLIENT_ID + Falcon API client ID + -s CLIENT_SECRET, --client_secret CLIENT_SECRET + Falcon API client secret +``` + + +### Example source code +The source code for this example can be found [here](get_user_grants.py). diff --git a/samples/user_management/get_user_grants.py b/samples/user_management/get_user_grants.py new file mode 100644 index 000000000..7b9c8fbf9 --- /dev/null +++ b/samples/user_management/get_user_grants.py @@ -0,0 +1,207 @@ +r"""Threaded user grant lookup sample. + + ______ __ _______ __ __ __ +| |.----.-----.--.--.--.--| | __| |_.----.|__| |--.-----. +| ---|| _| _ | | | | _ |__ | _| _|| | <| -__| +|______||__| |_____|________|_____|_______|____|__| |__|__|__|_____| + ___ ___ ___ ___ __ + | Y .-----.-----.----.| Y .---.-.-----.---.-.-----.-----.--------.-----.-----| |_ + |. | |__ --| -__| _||. | _ | | _ | _ | -__| | -__| | _| + |. | |_____|_____|__| |. \_/ |___._|__|__|___._|___ |_____|__|__|__|_____|__|__|____| + |: 1 | |: | | |_____| + |::.. . | |::.|:. | with Flight Control! + `-------' `--- ---' (FalconPy v1.3.0+) + +Asynchronously retrieve all user grants for every user defined within the tenant and output +the results to a comma-delimited text file. When not specified, this file is named user_grants.csv. + +Creation date: 11.13.2023 - jshcodes@CrowdStrike +""" +from argparse import ArgumentParser, RawTextHelpFormatter, Namespace +from concurrent.futures import ThreadPoolExecutor +from csv import writer +from datetime import datetime +from logging import basicConfig, DEBUG +from os import getenv, path, mkdir +from typing import Tuple, List, Dict +try: + from falconpy import APIError, FlightControl, UserManagement, version +except ImportError as no_falconpy: + raise ImportError("In order to use this sample application, the CrowdStrike FalconPy " + "library (version 1.3.0 or greater) must be installed." + ) from no_falconpy + + +def consume_arguments() -> Tuple[Namespace, ArgumentParser]: + """Retrieve any provided command line arguments.""" + parser = ArgumentParser(description=__doc__, formatter_class=RawTextHelpFormatter) + parser.add_argument("-d", "--debug", help="Enable debug.", default=False, action="store_true") + parser.add_argument("-o", "--output", help="CSV output filename.", default="user_grants.csv") + auth = parser.add_argument_group("authentication arguments " + "(not required if using environment authentication)") + auth.add_argument("-k", "--client_id", + help="Falcon API client ID", + default=getenv("FALCON_CLIENT_ID") + ) + auth.add_argument("-s", "--client_secret", + help="Falcon API client secret", + default=getenv("FALCON_CLIENT_SECRET") + ) + + return parser.parse_args(), parser + + +def get_grants(interface: UserManagement, uuid: str) -> List[Dict[str, str]]: + """Retrieve all grants for the user UUID in question.""" + print(f" Retrieving grant detail for {uuid}{' ' * 30}", end="\r") + grant_detail = interface.get_user_grants(uuid).data + + return grant_detail + + +def get_grant_data(sdk: UserManagement) -> Tuple[List[str], List[Dict[str, str]]]: + """Retrieve all user UUIDs within the tenant and then retrieve the grants for each.""" + running = True + user_ids = [] # List of User IDs identified + user_grants = [] # List of grants retrieved + offset = None + # Query users endpoint has a limit of 500 results, so we will paginate through all + # results returned and append each iteration's results to our user_grants list + while running: + user_lookup = sdk.query_users(limit=500, offset=offset) + user_ids.extend(user_lookup.data) + total = user_lookup.total + offset = len(user_ids) + with ThreadPoolExecutor() as executor: + futures = { + executor.submit(get_grants, sdk, user_id) for user_id in user_lookup.data + } + for fut in futures: + user_grants.extend(fut.result()) + if len(user_ids) >= total: + running = False + + return user_ids, user_grants + + +def get_extended_user_data(sdk: UserManagement, + user_uuids: list, + grants: list + ) -> List[Dict[str, str]]: + """Retrieve extended user information and merge the results with the existing grants data.""" + user_info = {} # Temporary dictionary to populate with user information detail + # Retrieve users endpoint can only handle 500 IDs at a time, request results in batches + batches = [user_uuids[i:i+500] for i in range(0, len(user_uuids), 500)] + for batch in batches: + lookup_result = sdk.retrieve_users(batch) + for detail in lookup_result.data: + # Create a "clean" dictionary that leverages UUID as the key, + # we can use this to lookup the details when we merge dictioaries + cleaned = { + detail["uuid"]: { + "uid": detail["uid"], + "first_name": detail["first_name"], + "last_name": detail["last_name"], + "user_created_at": detail["created_at"] + } + } + # Add our cleaned dictionary to our master user information dictionary + user_info.update(cleaned) + # Loop through each grant and add in our extended user detail + for grant in grants: + grant["uid"] = user_info[grant["uuid"]]["uid"] + grant["first_name"] = user_info[grant["uuid"]]["first_name"] + grant["last_name"] = user_info[grant["uuid"]]["last_name"] + grant["user_created_at"] = user_info[grant["uuid"]]["user_created_at"] + + return grants + + +def write_grant_results(user_grants: list, output_file: str): + """Write grant details to the specified CSV file.""" + with open(output_file, "w", newline="", encoding="utf-8") as csv_file: + csv_writer = writer(csv_file) + if user_grants: + csv_writer.writerow(user_grants[0].keys()) # Header row + for grants in user_grants: + csv_writer.writerow(grants.values()) # Data rows + + +def process_tenant(cmdline: Namespace, child: str = None): + """Process the users identified within the current tenant.""" + # Create an instance of the UserManagement Service Class + # and authenticate to the child tenant if necessary. + users = UserManagement(client_id=cmdline.client_id, + client_secret=cmdline.client_secret, + member_cid=child, + debug=cmdline.debug, + pythonic=True + ) + # Retrieve a list of user UUIDs and grants + id_list, grant_list = get_grant_data(users) + # Enrich the grant list with extended user detail + grant_list = get_extended_user_data(users, id_list, grant_list) + print(" " * 80) # Clear the last status update line + # Calculate our destination CSV file name + write_to = cmdline.output + dir_name = path.dirname(cmdline.output) + if child: + print(f"Child tenant: {child}") + write_to = path.join(dir_name, f"{child}.csv") + # Create our destination path if it is missing + if dir_name and not path.exists(dir_name): + mkdir(dir_name) + # They only provided us an output directory + if path.isdir(write_to): + write_to = path.join(write_to, "user_grants.csv") + # Write the results to our output CSV file + write_grant_results(grant_list, f"{write_to}") + # Inform the user of the overall execution results + print(f"{len(id_list):,} total users identified.") + print(f"{len(grant_list):,} total grants retrieved.") + print(f"Results saved to: {write_to}") + + +# _ _ ____ _ _ _ ____ ____ _ _ ___ _ _ _ ____ +# |\/| |__| | |\ | |__/ | | | | | | |\ | |___ +# | | | | | | \| | \ |__| |__| | | | \| |___ +if __name__ == "__main__": + # Start the timer + begin = datetime.now().timestamp() + # Retrieve command line arguments and the argument parser + parsed, handler = consume_arguments() + # There are no credentials in the environment or command line, show help and quit + if not parsed.client_id or not parsed.client_secret: + handler.print_help() + raise SystemExit( + "\nYou must provide API credentials via the environment variables\n" + "FALCON_CLIENT_ID and FALCON_CLIENT_SECRET or you must provide\n" + "these values using the '-k' and '-s' command line arguments." + ) + # Credentials are present, inform the user we are starting + print(f"Process start ({datetime.utcfromtimestamp(begin)}, " + f"FalconPy v{version(agent_string=False)})" + ) + # Enable debug logging to the console if requested + if parsed.debug: + basicConfig(level=DEBUG) + # Construct an instance of the FlightControl Service Class + mssp = FlightControl(client_id=parsed.client_id, + client_secret=parsed.client_secret, + debug=parsed.debug, + pythonic=True + ) + try: + # Attempt to query for any available children + children = mssp.query_children().data + except APIError: + # No Flight Control scope + children = [] + # Process the current tenant + process_tenant(parsed) + # For each child identified + for child in children: + # Process the child tenant + process_tenant(parsed, child) + # This party is over folks + print(f"\nTotal processing time: {datetime.now().timestamp() - begin:.2f} seconds.")