Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DTT1 - PoC - Test module #4666

Closed
7 tasks done
fcaffieri opened this issue Oct 31, 2023 · 11 comments · Fixed by #4709
Closed
7 tasks done

DTT1 - PoC - Test module #4666

fcaffieri opened this issue Oct 31, 2023 · 11 comments · Fixed by #4709
Assignees

Comments

@fcaffieri
Copy link
Member

fcaffieri commented Oct 31, 2023

Description

This issue aims to design and create a PoC of the Test module.

This module is responsible for launching the test into the infrastructure allocated and provisioned.
This module will work as a black box where it will have a clear input and output.

  • Input: A YAML file with all the resources allocated and provisioned.
  • output: The result of the test.

For this PoC, this module will test the following use of cases:

  • Install
  • Registration
  • Connection
  • Basic info
  • Restart
  • Stop
  • Uninstall

Parent issue #4524.

@QU3B1M
Copy link
Member

QU3B1M commented Nov 8, 2023

Update report

  • Cloned repo and setup environment
  • Installed dependencies and configured two VMs to use in the inventory
  • Facing some problems with the connection to the VMs, actual error:
    fatal: [Agent1]: UNREACHABLE! => {"changed": false, "msg": "Failed to connect to the host via ssh: Load key \"/wazuh-qa/poc-tests/utils/keys\": error in libcrypto\r\[email protected]: Permission denied (publickey,password).", "unreachable": true}
    

@QU3B1M
Copy link
Member

QU3B1M commented Nov 9, 2023

  • Completed the configuration of the environment with all the playbooks running correctly

  • Add new test_install.py test file with some basic install validations:

    import pwd
    import grp
    
    from pathlib import Path
    
    
    ## --- Constants ---
    
    # Paths
    WAZUH_PATH = Path("/var", "ossec")
    CONFIGURATION_PATH = Path(WAZUH_PATH, "etc")
    BINARIES_PATHS = Path(WAZUH_PATH, "bin")
    WAZUH_CONF = Path(CONFIGURATION_PATH, "ossec.conf")
    WAZUH_CONTROL = Path(BINARIES_PATHS, "wazuh-control")
    
    # Unix users and groups
    WAZUH_USER = "wazuh"
    WAZUH_GROUP = "wazuh"
    
    
    ## --- Tests ---
    
    def test_wazuh_user():
        all_users = [x[0] for x in pwd.getpwall()]
        assert WAZUH_USER in all_users, "Wazuh user not found."
    
    
    def test_wazuh_group():
        all_groups = [x[0] for x in grp.getgrall()]
        assert WAZUH_GROUP in all_groups, "Wazuh group not found."
    
    
    def test_wazuh_configuration():
        assert CONFIGURATION_PATH.exists(), "Configuration directory not found."
        assert WAZUH_CONF.exists(), "Configuration file not found."
    
    
    def test_wazuh_control():
        assert BINARIES_PATHS.exists(), "Binaries directory not found."
        assert WAZUH_CONTROL.exists(), "Wazuh control binary not found."

    The script was tested on the agent execution with all the scenarios passing.

     ============================= test session starts ==============================
    platform linux -- Python 3.8.10, pytest-7.4.3, pluggy-1.3.0 -- /usr/bin/python3
    cachedir: .pytest_cache
    rootdir: /tmp/tests
    collecting ... collected 4 items
    
    test_install.py::test_wazuh_user PASSED                                  [ 25%]
    test_install.py::test_wazuh_group PASSED                                 [ 50%]
    test_install.py::test_wazuh_configuration PASSED                         [ 75%]
    test_install.py::test_wazuh_control PASSED                               [100%]
    
    ============================== 4 passed in 0.02s ===============================

@QU3B1M
Copy link
Member

QU3B1M commented Nov 10, 2023

Update report - Install & registration tests completed

  • Pushed changes on branch enhacenment/4666-dtt1-poc-test-module

  • Replace test_manager & test_agent for test_install.py

  • Add more constants and move them into an independent file inside a new helpers directory

  • Create utils.py file to store the utilities functions used in the tests

    import subprocess
    
    from .constants import WAZUH_CONTROL, AGENT_CONTROL
    
    
    def get_service() -> str:
        """
        Retrieves the name of the Wazuh service running on the current platform.
    
        Returns:
            str: The name of the Wazuh service.
    
        """
        control_output = subprocess.check_output(
            [WAZUH_CONTROL, "info", "-t"], stderr=subprocess.PIPE)
        return control_output.decode('utf-8').strip()
    
    
    def get_daemons_status() -> dict:
        """
        Get the status of the Wazuh daemons.
    
        Return: 
            dict: The daemons (keys) and their status(values).
        """
        daemons_status = {}
    
        control_output = subprocess.run(
            [WAZUH_CONTROL, "status"], stdout=subprocess.PIPE)
        control_output_decoded = control_output.stdout.decode('utf-8')
    
        for line in control_output_decoded.split('\n'):
            if "running" in line:
                daemon_name = line.split(' ')[0]
                status = line.replace(daemon_name, '').replace('.', '').lstrip()
                daemons_status[daemon_name] = status
    
        return daemons_status
    
    
    def get_registered_agents():
        """
        Get the registered agents on the manager.
    
        return:
            list: The registered agents.
        """
        registered_agents = []
    
        control_output = subprocess.run(
            [AGENT_CONTROL, "-l"], stdout=subprocess.PIPE)
        control_output_decoded = control_output.stdout.decode('utf-8')
    
        for line in control_output_decoded.split('\n'):
            if "ID:" in line:
                agent_info = line.split(',')
                agent_dict = {
                    'ID': agent_info[0].split(':')[1].strip(),
                    'Name': agent_info[1].split(':')[1].strip(),
                    'IP': agent_info[2].split(':')[1].strip(),
                    'Status': agent_info[3].strip()
                }
                registered_agents.append(agent_dict)
    
        return registered_agents
    
    
    def find_string_in_file(file_path: str, target: str) -> bool:
        """
        Reads a file and checks if the expected line is there.
    
        Args:
            file_path (str): The path of the file to read.
            target (str): The expected string.
    
        Returns:
            bool: True if the line is found, False otherwise.
        """
        with open(file_path, 'r') as file:
            for line in file:
                if target in line:
                    return True
        return False
  • Add test_wazuh_daemons case to test_install.py & update the file to use the helpers

    import pwd
    import grp
    
    from helpers import constants, utils
    
    
    def test_wazuh_user():
        all_users = [x[0] for x in pwd.getpwall()]
        assert constants.WAZUH_USER in all_users, "Wazuh user not found."
    
    
    def test_wazuh_group():
        all_groups = [x[0] for x in grp.getgrall()]
        assert constants.WAZUH_GROUP in all_groups, "Wazuh group not found."
    
    
    def test_wazuh_configuration():
        assert constants.CONFIGURATIONS_DIR.exists(), "Configuration directory not found."
        assert constants.WAZUH_CONF.exists(), "Configuration file not found."
    
    
    def test_wazuh_control():
        assert constants.BINARIES_DIR.exists(), "Binaries directory not found."
        assert constants.WAZUH_CONTROL.exists(), "Wazuh control binary not found."
    
    
    def test_wazuh_daemons():
        actual_daemons = utils.get_daemons_status()
    
        if utils.get_service() == "agent":
            expected_daemons = constants.AGENT_DAEMONS
        else:
            expected_daemons = constants.MANAGER_DAEMONS
    
        for daemon in expected_daemons:
            assert daemon in actual_daemons.keys(), f"{daemon} not found."
  • Create test_registration python and yaml test files

    test_registration.py

    import pytest
    
    from helpers import constants, utils
    
    
    def test_agent_is_registered_in_server():
        if utils.get_service() == "agent":
            pytest.skip("Skipping test for agent installations.")
        registered_agents = utils.get_registered_agents()
        assert [a for a in registered_agents if a.get('ID') == '001'], "Agent is not registered."
    
    
    def test_register_logs_were_generated():
        if utils.get_service() == "agent":
            expected_log = "Requesting a key from server"
        else:
            expected_log = "Received request for a new agent"
    
        log_found = utils.find_string_in_file(constants.WAZUH_LOG, expected_log)
        assert log_found, "Register logs were not generated."
    
    
    def test_client_keys_file_exists():
        assert constants.CLIENT_KEYS.exists(), 'client.keys file not found.'
    
    
    def test_agent_key_is_in_client_keys():
        assert '001' in constants.CLIENT_KEYS.read_text(
        ), 'Agent key not found in client.keys file.'

    test_registration.yml

    - hosts: all
      become: true
      vars:
        path: "/tmp"
        test_directory: "tests"
        test_file: "test_registration.py"
        curl_path: /usr/bin/curl
    
      tasks:
        - name: Get Wazuh target
          shell: "/var/ossec/bin/wazuh-control info | grep -E -o 'agent|server'"
          register: target
    
        - name: Run tests on Wazuh agent
          command: "pytest {{ test_file }} --target={{ target.stdout|quote }} -v"
          args:
            chdir: "{{ path }}/{{ test_directory }}"
          when: target.stdout == 'agent'
    
        - name: Run tests on Wazuh manager
          command: "pytest {{ test_file }} --target={{ target.stdout|quote }} -v"
          args:
            chdir: "{{ path }}/{{ test_directory }}"
          when: target.stdout == 'server'
    

    Output of the tests executed locally

    ============================= test session starts ==============================
    platform linux -- Python 3.10.12, pytest-7.4.3, pluggy-1.3.0 -- /usr/bin/python3
    cachedir: .pytest_cache
    rootdir: /tmp/tests
    collecting ... collected 4 items
    
    test_registration.py::test_agent_is_registered_in_server PASSED          [ 25%]
    test_registration.py::test_register_logs_were_generated PASSED           [ 50%]
    test_registration.py::test_client_keys_file_exists PASSED                [ 75%]
    test_registration.py::test_agent_key_is_in_client_keys PASSED            [100%]
    
    ============================== 4 passed in 0.06s ===============================

@QU3B1M
Copy link
Member

QU3B1M commented Nov 13, 2023

Update report

  • Add file_monitor function to utils, it helps us find a certain string on a file with a timeout (a resumed version of the integration framework's FileMonitor)

    def file_monitor(monitored_file: str, target_string: str, timeout: int = 30) -> None:
        encoding = get_file_encoding(monitored_file)
    
        # Check in the current file content for the string.
        with open(monitored_file, encoding=encoding) as _file:
            for line in _file:
                if target_string in line:
                    return line
    
        # Start count to set the timeout.
        start_time = time.time()
    
        # Start the file monitoring for future lines.
        with open(monitored_file, encoding=encoding) as _file:
            # Go to the end of the file.
            _file.seek(0, 2)
            while time.time() - start_time < timeout:
                current_position = _file.tell()
                line = _file.readline()
    
                if not line:
                    # No new line, wait for nex try.
                    _file.seek(current_position)
                    time.sleep(0.1)
                else:
                    # New line, check if the string matches.
                    if target_string in line:
                        return line
  • Replace the usage of find_string_in_file with this new function.

  • Add test_connection python test file and test playbook

    from helpers import utils
    from helpers.constants import CONNECTION_SERVER, CONNECTION_AGENT, WAZUH_LOG, ALERTS_JSON
    
    
    # Actual running service.
    service = utils.get_service()
    
    
    def test_agent_connects_to_manager():
        expected_log = CONNECTION_AGENT if service == "agent" else CONNECTION_SERVER
        log_file = WAZUH_LOG if service == "agent" else ALERTS_JSON
    
        assert utils.file_monitor(log_file, expected_log)
    
    
    def test_agent_connection_status():
        expected_status = "connected" if service == "agent" else "Active"
    
        assert utils.get_agent_connection_status("001") == expected_status
    ============================= test session starts ==============================
    platform linux -- Python 3.10.12, pytest-7.4.3, pluggy-1.3.0 -- /usr/bin/python3
    cachedir: .pytest_cache
    rootdir: /tmp/tests
    collecting ... collected 2 items
    
    test_connection.py::test_agent_connects_to_manager PASSED
    test_connection.py::test_agent_connection_status PASSED
     
    ============================== 2 passed in 0.12s ===============================

@QU3B1M
Copy link
Member

QU3B1M commented Nov 14, 2023

  • Add util to execute Wauzuh binaries with args

  • Update the execution conditional in test yamls from when: target.stdout == 'server' to when: "'Manager' in inventory_hostname"

  • Add test_basic_info python testfile and playbook (a very simple test, could be updated)

    import os
    
    from helpers import utils
    
    
    def test_wazuh_version():
        assert os.environ['to_version'] in utils.get_version(), "Wazuh version is not the expected."
    
    
    def test_wazuh_revision():
        assert utils.get_revision() == os.environ['revision'], "Wazuh revision is not the expected."
  • Add restart provision playbook with test_restart playbook and python test file

    restart.yml

    - hosts: all
      become: true
      tasks:
        - name: Restart Wazuh component
          command: "/var/ossec/bin/wazuh-control restart"
    

    test_restart.yml

    - hosts: all
      become: true
      vars:
        path: "/tmp"
        test_directory: "tests"
        test_file: "test_restart.py"
        curl_path: /usr/bin/curl
    
      tasks:
        - name: Get Wazuh target
          shell: "/var/ossec/bin/wazuh-control info | grep -E -o 'agent|server'"
          register: target
    
        - name: Run tests on Wazuh agent
          command: "pytest {{ test_file }} --target={{ target.stdout|quote }} -v"
          args:
            chdir: "{{ path }}/{{ test_directory }}"
          when: "'Agent' in inventory_hostname"
    
        - name: Run tests on Wazuh manager
          command: "pytest {{ test_file }} --target={{ target.stdout|quote }} -v"
          args:
            chdir: "{{ path }}/{{ test_directory }}"
          when: "'Manager' in inventory_hostname"
    

@QU3B1M
Copy link
Member

QU3B1M commented Nov 15, 2023

  • Add util to check the service status

    def get_service_status() -> str:
        """
        Get the status of the Wazuh service.
    
        Returns:
            str: The status of the Wazuh service.
        """
        if get_service() == "agent":
            service_name = "wazuh-agent"
        else:
            service_name = "wazuh-manager"
        return run_command("systemctl", ["is-active", service_name]).strip()
  • Complete test_restart python test function

    from helpers import utils
    from helpers.constants import DELETING_RESPONSES, RELEASING_RESOURCES, STARTED, WAZUH_LOG
    
    
    # Actual running service.
    service = utils.get_service()
    
    
    def test_release_resources_shutdown_log_raised():
        assert utils.file_monitor(WAZUH_LOG, RELEASING_RESOURCES), "Release resources log not found."
    
    
    def test_deleting_responses_shutdown_log_raised():
        assert utils.file_monitor(WAZUH_LOG, DELETING_RESPONSES), "Deleting responses log not found."
    
    
    def test_start_log_raised():
        assert utils.file_monitor(WAZUH_LOG, STARTED), "Start log not found."
    
    
    def test_service_started():
        assert utils.get_service_status() == "active", "Service is not active after restart."
  • Add test_stop playbook and python testfile

    from helpers import utils
    from helpers.constants import DELETING_RESPONSES, RELEASING_RESOURCES, WAZUH_LOG
    
    
    # Actual running service.
    service = utils.get_service()
    
    
    def test_release_resources_shutdown_log_raised():
        assert utils.file_monitor(WAZUH_LOG, RELEASING_RESOURCES), "Release resources log not found."
    
    
    def test_deleting_responses_shutdown_log_raised():
        assert utils.file_monitor(WAZUH_LOG, DELETING_RESPONSES), "Deleting responses log not found."
    
    
    def test_service_started():
        assert utils.get_service_status() == "inactive", "Service is active."
  • Add test_uninstall python testfile & ansible playbooks

    import grp
    import os
    import pwd
    
    from helpers import constants, utils
    
    
    def test_wazuh_user():
        all_users = [x[0] for x in pwd.getpwall()]
        assert constants.WAZUH_USER not in all_users, "Wazuh user found."
    
    
    def test_wazuh_group():
        all_groups = [x[0] for x in grp.getgrall()]
        assert constants.WAZUH_GROUP not in all_groups, "Wazuh group found."
    
    
    def test_wazuh_configuration_dir():
        assert not constants.CONFIGURATIONS_DIR.exists(), "Configuration directory found."
    
    
    def test_wazuh_configuration():
        assert not constants.WAZUH_CONF.exists(), "Configuration file found."
    
    
    def test_wazuh_binaries_dir():
        assert not constants.BINARIES_DIR.exists(), "Binaries directory found."
    
    
    def test_wazuh_control():
        assert not constants.WAZUH_CONTROL.exists(), "Wazuh control binary found."
  • Research improvements for ansible playbooks

@QU3B1M
Copy link
Member

QU3B1M commented Nov 16, 2023

Update report

  • Improve the tests ansible playbooks applying the following template:
    - hosts: all
      become: true
      vars:
        path: "/tmp"
        test_directory: "tests"
        test_file: "TEST_FILE.py"
        curl_path: /usr/bin/curl
    
      tasks:
        - name: Test TEST_NAME
          block:
            - name: Execute tests
              command: "pytest {{ test_file }} --target={{ target }} -vl"
              args:
                chdir: "{{ path }}/{{ test_directory }}"
              vars:
                target: "{% if 'Agent' in inventory_hostname %}agent{% elif 'Manager' in inventory_hostname %}server{% endif %}"
              register: pytest_log
    
          always:
            - name: Save pytest output to a file
              copy:
                remote_src: yes
                content: "{{ pytest_log.stdout }}"
                dest: "{{ path }}/{{ test_directory }}/{{ test_file }}.log"
    
  • Research test logs outputs methods to improve the integration with the observation module.
    Currently the test output is being saved under /temp/tests/TEST_NAME.log as raw pytest output with the -l argument to get some extra info in the errors traceback. This method is viable, but there is other options like:

    In my opinion, may be better to just get the results from the raw pytest output, anyway the InfluxDB solution seems interesting.

@QU3B1M
Copy link
Member

QU3B1M commented Nov 21, 2023

Update report

  • Resume influxdb implementation facing some problems in the setup, not that easy to make it work correctly as the pytest-influxdb plugin's documentation is not updated. Concluded in leaving aside this implementation.
  • Implement pytest-tinybird, it seems more viable to use this solution, the implementation is easy and straight forward:
    • Create a free account in tinybird.co

    • Generate an API token

    • Update the ansible playbook with the required env variables

        environment:
          TINYBIRD_URL: https://api.us-east.tinybird.co     # Depends on the region.
          TINYBIRD_DATASOURCE: <DATASOURCE_NAME>            # Will be created if not existent.
          TINYBIRD_TOKEN: <TOKEN_API>                       # Token with write permissions.
    • Add the option --report-to-tinybird to the test execution

        block:
          - name: Execute tests
            command: "pytest {{ test_file }}.py --target={{ target }} -vl --report-to-tinybird"
    • Execute the test playbooks

    • Check the results on TinyBird dashboard
      image

@QU3B1M QU3B1M linked a pull request Nov 23, 2023 that will close this issue
@QU3B1M
Copy link
Member

QU3B1M commented Nov 23, 2023

Update report

Finished modifications on the test module & create pull request to merge this module with the rest of them

@rauldpm
Copy link
Member

rauldpm commented Dec 11, 2023

Review: #4524 (comment)

@fcaffieri
Copy link
Member Author

Reviews answered at #4524 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants