Skip to content

Commit

Permalink
Handle security audit findings (#23)
Browse files Browse the repository at this point in the history
* Fixed DP-001

* Fixed DP-002

* Fixed DP-003

* Fixed DP-012

* Skip permission tests on Windows
  • Loading branch information
roll authored Sep 26, 2024
1 parent 2d8e3b8 commit 196461f
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 18 deletions.
15 changes: 15 additions & 0 deletions dplib/actions/__spec__/test_metadata_convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import pytest

from dplib.actions.metadata.convert import convert_metadata


@pytest.mark.parametrize(
"source,target",
(
("bad", "ckan"),
("ckan", "bad"),
),
)
def test_convert_metadata_bad_source_or_target_dp_012(source: str, target: str):
with pytest.raises(ValueError):
convert_metadata("resource.json", type="resource", source=source, target=target) # type: ignore
16 changes: 13 additions & 3 deletions dplib/actions/metadata/convert.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from importlib import import_module
from typing import Literal, Optional, Type, Union, cast
from typing import List, Literal, Optional, Type, cast, get_args

from ...models import Package, Resource
from ...system import Model
Expand All @@ -17,9 +17,17 @@ def convert_metadata(
) -> Model:
"""Convert metadata from one notation to another."""

# Validate source/target
if source and source not in NOTATIONS:
raise ValueError(f"Unknown source notation: {source}")
if target and target not in NOTATIONS:
raise ValueError(f"Unknown target notation: {target}")

# Source model
Source = Resource if type == "resource" else Package
if source:
if source not in NOTATIONS:
raise ValueError(f"Unknown source notation: {source}")
module = import_module(f"dplib.plugins.{source}.models")
Source: Type[Model] = getattr(module, f"{source.capitalize()}{type.capitalize()}")
model = Source.from_path(path, format=format)
Expand All @@ -35,5 +43,7 @@ def convert_metadata(
return model


IType = Union[Literal["package"], Literal["resource"]]
INotation = Union[Literal["ckan"], Literal["dcat"], Literal["github"], Literal["zenodo"]]
IType = Literal["package", "resource"]
INotation = Literal["ckan", "dcat", "github", "zenodo"]

NOTATIONS: List[INotation] = list(get_args(INotation))
24 changes: 24 additions & 0 deletions dplib/helpers/__spec__/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import platform
import stat
from pathlib import Path

import pytest

from dplib.helpers.file import write_file


@pytest.mark.skipif(platform.system() == "Windows", reason="Unix only")
def test_write_file_implicit_permissions_dp_002(tmpdir: Path):
path = str(tmpdir / "test.txt")
write_file(path, "Hello, World!")
permissions = oct(stat.S_IMODE(os.stat(path).st_mode))
assert permissions == "0o600"


@pytest.mark.skipif(platform.system() == "Windows", reason="Unix only")
def test_write_file_explicit_permissions_dp_003(tmpdir: Path):
path = str(tmpdir / "test.txt")
write_file(path, "Hello, World!", permissions=0o644)
permissions = oct(stat.S_IMODE(os.stat(path).st_mode))
assert permissions == "0o644"
16 changes: 16 additions & 0 deletions dplib/helpers/__spec__/test_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pytest

from dplib.helpers.path import assert_safe_path


@pytest.mark.parametrize(
"path",
(
"../data.csv",
"/etc/home/secret.json",
"http:test/../../secret.json",
),
)
def test_assert_safe_path_raises_dp_001(path: str):
with pytest.raises(Exception):
assert_safe_path(path)
16 changes: 11 additions & 5 deletions dplib/helpers/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,28 @@ def read_file(
raise Error(f'Cannot read file "{path}"')


def write_file(path: str, body: Any, *, mode: str = "wt", encoding: str = "utf-8"):
def write_file(
path: str,
body: Any,
*,
mode: str = "wt",
encoding: str = "utf-8",
permissions: int = 0o600,
):
try:
eff_enc = encoding if mode == "wt" else None
with tempfile.NamedTemporaryFile(mode, delete=False, encoding=eff_enc) as file:
file.write(body)
file.flush()
move_file(file.name, path, mode=0o644)
move_file(file.name, path, permissions=permissions)
except Exception:
raise Error(f'Cannot write file "{path}"')


def move_file(source: str, target: str, *, mode: Optional[int] = None):
def move_file(source: str, target: str, *, permissions: int = 0o600):
try:
Path(target).parent.mkdir(parents=True, exist_ok=True)
shutil.move(source, target)
if mode:
os.chmod(target, 0o644)
os.chmod(target, permissions)
except Exception:
raise Error(f'Cannot move file "{source}:{target}"')
22 changes: 14 additions & 8 deletions dplib/helpers/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import os
from pathlib import Path
from typing import Optional, Tuple
from urllib.parse import urlparse

import fsspec # type: ignore

from ..error import Error

Expand All @@ -21,7 +22,7 @@ def infer_format(path: str, *, raise_missing: bool = False):

def infer_basepath(path: str):
basepath = os.path.dirname(path)
if basepath and not is_url_path(basepath):
if basepath and is_file_protocol_path(basepath):
if not os.path.abspath(basepath):
basepath = os.path.relpath(basepath, start=os.getcwd())
return basepath
Expand All @@ -38,21 +39,26 @@ def ensure_basepath(path: str, basepath: Optional[str] = None) -> Tuple[str, str
def join_basepath(path: str, basepath: Optional[str] = None) -> str:
if not basepath:
return path
if is_url_path(path):
if not is_file_protocol_path(path):
return path
if is_url_path(basepath):
if not is_file_protocol_path(basepath):
return f"{basepath}/{path}"
return os.path.join(basepath, path)


def is_url_path(path: str) -> bool:
scheme = urlparse(path).scheme
return scheme in ["http", "https"]
def is_file_protocol_path(path: str) -> bool:
info = fsspec.utils.infer_storage_options(path) # type: ignore
return info.get("protocol") == "file" # type: ignore


def is_http_or_ftp_protocol_path(path: str) -> bool:
info = fsspec.utils.infer_storage_options(path) # type: ignore
return info.get("protocol") in ["http", "https", "ftp", "ftps"] # type: ignore


def assert_safe_path(path: str, *, basepath: Optional[str] = None):
"""Assert that the path (untrusted) is not outside the basepath (trusted)"""
if not is_url_path(path):
if is_file_protocol_path(path):
try:
root = Path(basepath or os.getcwd()).resolve()
item = root.joinpath(path).resolve()
Expand Down
4 changes: 2 additions & 2 deletions dplib/helpers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ..errors.metadata import MetadataError
from .dict import load_dict
from .file import read_file
from .path import is_url_path
from .path import is_http_or_ftp_protocol_path

# TODO: implement additional user-side profile caching

Expand Down Expand Up @@ -40,7 +40,7 @@ def read_profile(*, profile: str) -> types.IDict:
profile = os.path.join(settings.PROFILE_BASEDIR, version, filename)

# Ensure profile is URL
if not is_url_path(profile):
if not is_http_or_ftp_protocol_path(profile):
raise Error(f'Profile MUST be a URL: "{profile}"')

# Read jsonSchema
Expand Down

0 comments on commit 196461f

Please sign in to comment.