Skip to content

Commit

Permalink
Add get_remote_fixture and get_remote_fixture_archive for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanohoro committed Sep 21, 2023
1 parent 0ca89f8 commit 4fabf99
Showing 1 changed file with 102 additions and 1 deletion.
103 changes: 102 additions & 1 deletion src/python/strelka/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import datetime
import gzip
import io
import os
import tarfile
import typing
from pathlib import Path
from zipfile import ZipFile

import magic
import py7zr
import requests

from strelka.strelka import File


def run_test_scan(
mocker, scan_class, fixture_path=None, options=None, backend_cfg=None
mocker,
scan_class,
fixture_fileobj: typing.IO = None,
fixture_path: str = None,
options=None,
backend_cfg=None,
):
if options is None:
options = {}
Expand All @@ -20,6 +35,8 @@ def run_test_scan(

if fixture_path:
data = Path(fixture_path).read_bytes()
elif fixture_fileobj:
data = fixture_fileobj.read()
else:
data = None

Expand All @@ -31,3 +48,87 @@ def run_test_scan(
)

return scanner.event


def get_remote_fixture(url: str, session: requests.Session = None) -> io.BytesIO:
"""Download a fixture from a URL"""

# Get a streamed version of the downloaded file
if session:
response = session.get(url, stream=True)
else:
response = requests.get(url, stream=True)

response.raw.decode_content = True

# Convert the raw file-like object to a real BytesIO object
bytesfile = io.BytesIO()
bytesfile.write(response.raw.read())
bytesfile.seek(0)

return bytesfile


def get_remote_fixture_archive(
url: str, session: requests.Session = None, password: str = None
) -> [dict[str, io.BytesIO]]:
"""Decompress zip, 7zip, gzip, tar+gzip remote fixtures with an optional password"""
bytesfile: io.BytesIO = get_remote_fixture(url, session)

mime: magic.Magic = magic.Magic(mime=True)
mime_type: str = mime.from_buffer(bytesfile.read())
bytesfile.seek(0)

allfiles: dict[str, typing.IO] = {}

if mime_type == "application/zip":
try:
with ZipFile(bytesfile) as archive:
for fileentry in archive.filelist:
if not fileentry.is_dir():
allfiles.update(
{
fileentry.filename: io.BytesIO(
archive.read(
fileentry.filename,
pwd=password.encode("utf-8")
if password
else None,
)
)
}
)
except Exception as e:
raise

elif mime_type == "application/x-7z-compressed":
try:
with py7zr.SevenZipFile(bytesfile, password=password) as archive:
allfiles = archive.readall()
except Exception as e:
raise

elif mime_type == "application/gzip":
try:
with gzip.open(bytesfile) as archive:
allfiles.update(
{os.path.basename(url).rstrip(".gz"): io.BytesIO(archive.read())}
)

except Exception as e:
raise

elif mime_type == "application/x-tar":
try:
with tarfile.open(fileobj=bytesfile) as archive:
for member in archive.getmembers():
if member.isfile():
allfiles.update({member.name: archive.extractfile(member)})

except Exception as e:
raise

else:
raise ValueError(f"Archive type {mime_type} not supported")

return allfiles

0 comments on commit 4fabf99

Please sign in to comment.