Skip to content

Commit

Permalink
update ssh.py unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AlyaGomaa committed Oct 1, 2024
1 parent d3881b3 commit 73cf821
Showing 1 changed file with 7 additions and 151 deletions.
158 changes: 7 additions & 151 deletions tests/test_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,13 @@ def test_detect_successful_ssh_by_slips():
host_key_alg="",
host_key="",
)
result = ssh.detect_successful_ssh_by_slips("profileid", "twid", flow)
expected_result = True
assert result == expected_result
conn_log_flow = {
"saddr": flow.saddr,
"daddr": flow.daddr,
"sbytes": 2000,
"dbytes": 2000,
}
assert ssh.detect_successful_ssh_by_slips("twid", conn_log_flow, flow)
ssh.set_evidence.ssh_successful.assert_called_once_with(
"twid",
"192.168.1.1",
Expand All @@ -174,154 +178,6 @@ def test_detect_successful_ssh_by_slips():
flow.starttime,
by="Slips",
)
assert "1234" not in ssh.connections_checked_in_ssh_timer_thread


def test_detect_successful_ssh_by_zeek():
ssh = ModuleFactory().create_ssh_analyzer_obj()
profileid = "profile_192.168.1.1"
twid = "timewindow1"
flow = SSH(
starttime="1726655400.0",
uid="1234",
daddr="192.168.1.2",
saddr="192.168.1.1",
version="",
auth_success="true",
auth_attempts="",
client="",
server="",
cipher_alg="",
mac_alg="",
compression_alg="",
kex_alg="",
host_key_alg="",
host_key="",
)
flow_data = {
"daddr": "192.168.1.2",
"saddr": "192.168.1.1",
"sbytes": 1000,
"dbytes": 1000,
}
mock_flow = {"1234": json.dumps(flow_data)}
ssh.db.search_tws_for_flow = MagicMock(return_value=mock_flow)
ssh.set_evidence = MagicMock()
ssh.connections_checked_in_ssh_timer_thread = []
assert ssh.set_evidence_ssh_successful_by_zeek(twid, flow)
ssh.set_evidence.ssh_successful.assert_called_once_with(
twid,
flow_data["saddr"],
flow_data["daddr"],
flow_data["sbytes"] + flow_data["dbytes"],
flow.uid,
flow.starttime,
by="Zeek",
)
assert flow.uid not in ssh.connections_checked_in_ssh_timer_thread
ssh.db.search_tws_for_flow.assert_called_once_with(
profileid, twid, flow.uid
)


def test_detect_successful_ssh_by_zeek_flow_exists_auth_success():
ssh = ModuleFactory().create_ssh_analyzer_obj()

mock_flow = {
"test_uid": json.dumps(
{
"daddr": "192.168.1.2",
"saddr": "192.168.1.1",
"sbytes": 1000,
"dbytes": 1000,
"auth_success": True,
}
)
}

ssh.db.search_tws_for_flow = MagicMock(return_value=mock_flow)
ssh.set_evidence = MagicMock()
flow = SSH(
starttime="1726655400.0",
uid="1234",
saddr="192.168.1.1",
daddr="192.168.1.2",
version="",
auth_success="true",
auth_attempts="",
client="",
server="",
cipher_alg="",
mac_alg="",
compression_alg="",
kex_alg="",
host_key_alg="",
host_key="",
)
result = ssh.set_evidence_ssh_successful_by_zeek("twid", flow)

expected_result = True
assert result == expected_result
ssh.set_evidence.ssh_successful.assert_called_once_with(
"twid",
"192.168.1.1",
"192.168.1.2",
2000,
flow.uid,
flow.starttime,
by="Zeek",
)
assert flow.uid not in ssh.connections_checked_in_ssh_timer_thread


def test_detect_successful_ssh_by_zeek_flow_exists_auth_fail():
ssh = ModuleFactory().create_ssh_analyzer_obj()

mock_flow = {
"test_uid": json.dumps(
{
"daddr": "192.168.1.2",
"saddr": "192.168.1.1",
"sbytes": 1000,
"dbytes": 1000,
"auth_success": False,
}
)
}

ssh.db.search_tws_for_flow = MagicMock(return_value=mock_flow)
ssh.set_evidence = MagicMock()
flow = SSH(
starttime="1726655400.0",
uid="1234",
saddr="192.168.1.1",
daddr="192.168.1.2",
version="",
auth_success="true",
auth_attempts="",
client="",
server="",
cipher_alg="",
mac_alg="",
compression_alg="",
kex_alg="",
host_key_alg="",
host_key="",
)
result = ssh.set_evidence_ssh_successful_by_zeek("twid", flow)

expected_result = True
assert result == expected_result
ssh.set_evidence.ssh_successful.assert_called_once_with(
"twid",
"192.168.1.1",
"192.168.1.2",
2000,
flow.uid,
flow.starttime,
by="Zeek",
)
assert flow.uid not in ssh.connections_checked_in_ssh_timer_thread


async def test_analyze_no_message():
Expand Down

0 comments on commit 73cf821

Please sign in to comment.