Skip to content

Commit

Permalink
Type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
AT0myks committed Jun 28, 2023
1 parent 6658fb6 commit f25239e
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 111 deletions.
44 changes: 28 additions & 16 deletions pycramfs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
from __future__ import annotations

import io
from functools import partial
from pathlib import PurePosixPath
from typing import TYPE_CHECKING, Any, BinaryIO, Iterator, Optional
from zlib import crc32

from pycramfs.const import CRC_OFFSET, CRC_SIZE
from pycramfs.file import Directory, File
from pycramfs.structure import Super
from pycramfs.util import BoundedSubStream, test_super

if TYPE_CHECKING:
from pycramfs.types import ByteStream, FileDescriptorOrPath, ReadableBuffer, StrPath

__version__ = "1.0.0"


class Cramfs:

def __init__(self, fd: BoundedSubStream, super: Super, rootdir: Directory = None, closefd: bool = True):
def __init__(
self,
fd: ByteStream,
super: Super,
rootdir: Directory,
closefd: bool = True
) -> None:
self._fd = fd
self._super = super
self._rootdir = rootdir
Expand All @@ -22,17 +34,17 @@ def __init__(self, fd: BoundedSubStream, super: Super, rootdir: Directory = None
def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
def __exit__(self, *_: Any) -> None:
if self._closefd:
self.close()

def __len__(self):
def __len__(self) -> int:
return self._super.fsid.files

def __iter__(self):
def __iter__(self) -> Iterator[File]:
yield from self._rootdir.riter()

def __contains__(self, item):
def __contains__(self, item: Any):
if isinstance(item, (str, PurePosixPath)):
return self.select(item) is not None
elif isinstance(item, File):
Expand All @@ -58,16 +70,16 @@ def size(self) -> int:
def close(self) -> None:
self._fd.close()

def find(self, filename):
def find(self, filename: StrPath) -> Optional[File]:
return self._rootdir.find(filename)

def select(self, path):
def select(self, path: StrPath) -> Optional[File]:
return self._rootdir.select(path)

def itermatch(self, pattern):
def itermatch(self, pattern: str) -> Iterator[File]:
yield from self._rootdir.itermatch(pattern)

def calculate_crc(self, size=1024**2):
def calculate_crc(self, size: int = 1024**2) -> int:
self._fd.seek(0)
crc = crc32(self._fd.read(CRC_OFFSET)) # Read until CRC
self._fd.read(CRC_SIZE) # Read the CRC but ignore it
Expand All @@ -77,23 +89,23 @@ def calculate_crc(self, size=1024**2):
return crc

@classmethod
def from_fd(cls, fd, offset=0, closefd=True):
def from_fd(cls, fd: BinaryIO, offset: int = 0, closefd: bool = True):
"""Create a Cramfs object from a file descriptor.
`offset` must be the absolute position at which the superblock starts.
"""
fd.seek(offset)
super = Super.from_fd(fd)
test_super(super)
fd = BoundedSubStream(fd, offset, offset + super.size)
self = cls(fd, super, closefd=closefd)
self._rootdir = Directory.from_fd(fd, self, self._super.root)
fd_ = BoundedSubStream(fd, offset, offset + super.size)
self = cls(fd_, super, None, closefd) # type: ignore
self._rootdir = Directory.from_fd(fd_, self, self._super.root)
return self

@classmethod
def from_bytes(cls, bytes_, offset=0):
def from_bytes(cls, bytes_: ReadableBuffer, offset: int = 0):
return cls.from_fd(io.BytesIO(bytes_), offset)

@classmethod
def from_file(cls, path, offset=0):
return cls.from_fd(open(path, "rb"), offset)
def from_file(cls, file: FileDescriptorOrPath, offset: int = 0):
return cls.from_fd(open(file, "rb"), offset)
28 changes: 14 additions & 14 deletions pycramfs/__main__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import argparse
from argparse import ArgumentParser, Namespace
from pathlib import Path, PurePosixPath

from pycramfs import Cramfs, __version__
from pycramfs.const import PAGE_SIZE
from pycramfs.exception import CramfsError
from pycramfs.extract import extract_dir, extract_file
from pycramfs.file import filetype
from pycramfs.file import Directory, File, Symlink, filetype
from pycramfs.util import _print, find_superblocks


def print_file(file):
def print_file(file: File) -> None:
# Max file size is 2**24-1 or 16777215 -> 8 characters.
# Max UID is 2**16-1 or 65535 -> 5 characters.
# Max GID is 2**8-1 or 255 -> 3 characters.
link = f"-> {file.readlink()}" if file.is_symlink else ''
link = f"-> {file.readlink()}" if isinstance(file, Symlink) else ''
print(file.filemode, f"{file.size:8} {file.uid:5}:{file.gid:<3}", file.path, link)


def list_(args):
def list_(args: Namespace) -> None:
if (types := args.type) is not None:
types = set(''.join(types).replace('f', '-'))
count = 0
Expand All @@ -38,15 +38,15 @@ def list_(args):
print(f"{count} file(s) found")


def info(args):
def info(args: Namespace) -> None:
width = 10
superblocks = find_superblocks(args.file)
if not superblocks:
print("No superblock found")
return
for idx, superblock in enumerate(superblocks):
super = argparse.Namespace(**superblock)
fsid = argparse.Namespace(**super.fsid)
super = Namespace(**superblock)
fsid = Namespace(**super.fsid)
print(f"Superblock #{idx + 1}")
print(f"{'Magic:':{width}} 0x{super.magic:X}")
print(f"{'Size:':{width}} {super.size:,}")
Expand All @@ -63,13 +63,13 @@ def info(args):
print()


def extract(args):
def extract(args: Namespace) -> None:
dest = args.dest
with Cramfs.from_file(args.file, args.offset) as cramfs:
file = cramfs.select(args.path)
if file is None:
raise CramfsError(f"{args.path} not found")
elif file.is_dir:
elif isinstance(file, Directory):
if dest is None:
dest = args.file.with_name(args.file.stem)
amount = extract_dir(file, dest, args.force, args.quiet)
Expand All @@ -80,7 +80,7 @@ def extract(args):
_print(f"{int(amount)} file(s) extracted to {dest.resolve()}", quiet=args.quiet)


def check(args):
def check(args: Namespace) -> None:
with Cramfs.from_file(args.file, args.offset) as cramfs:
for file in cramfs:
if file.inode.namelen == 0 and str(file.path) != '/':
Expand Down Expand Up @@ -115,13 +115,13 @@ def check(args):
def main():
filetypes = list(''.join(filetype).replace('-', 'f'))

pfile = argparse.ArgumentParser(add_help=False)
pfile = ArgumentParser(add_help=False)
pfile.add_argument("file", type=Path)

poffset = argparse.ArgumentParser(add_help=False)
poffset = ArgumentParser(add_help=False)
poffset.add_argument("-o", "--offset", type=int, default=0, help="absolute position of file system's start. Default: %(default)s")

parser = argparse.ArgumentParser()
parser = ArgumentParser()
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}")
subparsers = parser.add_subparsers(required=True, dest="command") # dest is only here to avoid a TypeError in Python 3.8 (see bpo-29298)

Expand Down
21 changes: 11 additions & 10 deletions pycramfs/const.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from enum import IntEnum, IntFlag
from typing import Final

MAGIC = 0x28CD3D45
SIGNATURE = "Compressed ROMFS"
MAGIC: Final = 0x28CD3D45
SIGNATURE: Final = "Compressed ROMFS"


class Width(IntEnum):
Expand All @@ -13,9 +14,9 @@ class Width(IntEnum):
OFFSET = 26


MAXPATHLEN = ((1 << Width.NAMELEN) - 1) << 2
MAXPATHLEN: Final = ((1 << Width.NAMELEN) - 1) << 2

PAGE_SIZE = 4096
PAGE_SIZE: Final = 4096


class Flag(IntFlag):
Expand All @@ -41,12 +42,12 @@ class BlockFlag(IntFlag):
DIRECT_PTR = 1 << 30


BLK_FLAGS = BlockFlag.UNCOMPRESSED | BlockFlag.DIRECT_PTR
BLK_FLAGS: Final = BlockFlag.UNCOMPRESSED | BlockFlag.DIRECT_PTR

CRC_OFFSET = 32 # Bytes
CRC_SIZE = 4 # Bytes
CRC_OFFSET: Final = 32 # Bytes
CRC_SIZE: Final = 4 # Bytes

BLK_PTR_FMT = "<I"
BLK_PTR_FMT: Final = "<I"

MAGIC_BYTES = MAGIC.to_bytes(4, "little")
SIGNATURE_BYTES = b"Compressed ROMFS"
MAGIC_BYTES: Final = MAGIC.to_bytes(4, "little")
SIGNATURE_BYTES: Final = b"Compressed ROMFS"
39 changes: 27 additions & 12 deletions pycramfs/extract.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
from __future__ import annotations

import stat
from os import chmod, utime
from pathlib import Path
from typing import Optional
from typing import TYPE_CHECKING, Optional

from pycramfs.const import Width
from pycramfs.file import DataFile, Directory
from pycramfs.file import (
FIFO,
BlockDevice,
CharacterDevice,
DataFile,
Directory,
File,
RegularFile,
Socket,
Symlink
)
from pycramfs.structure import Inode
from pycramfs.util import _print

if TYPE_CHECKING:
from pycramfs.types import StrOrBytesPath

try:
from os import lchown
from os import lchown # type: ignore
except ImportError:
def lchown(path, uid, gid):
def lchown(path: StrOrBytesPath, uid: int, gid: int) -> None:
pass


Expand All @@ -28,9 +43,9 @@ def write_file(path: Path, file: Optional[DataFile] = None, force: bool = False)


try:
from os import mknod
from os import mknod # type: ignore
except ImportError:
def mknod(path, mode=0o600, device=0):
def mknod(path: Path, mode: int = 0o600, device: int = 0) -> None:
write_file(path)


Expand All @@ -49,15 +64,15 @@ def change_file_status(path: Path, inode: Inode) -> None:
utime(path, (0, 0))


def extract_file(file, dest: Path, force: bool = False, quiet: bool = True) -> bool:
def extract_file(file: File, dest: Path, force: bool = False, quiet: bool = True) -> bool:
"""Extract a file that is not a directory.
Return whether the file was created.
"""
if file.is_file:
if isinstance(file, RegularFile):
write_file(dest, file, force)
chmod(dest, file.mode)
elif file.is_symlink:
elif isinstance(file, Symlink):
try:
dest.symlink_to(file.read_bytes())
except FileExistsError:
Expand All @@ -70,9 +85,9 @@ def extract_file(file, dest: Path, force: bool = False, quiet: bool = True) -> b
# Either the user is unprivileged or Developer Mode is not enabled.
write_file(dest, file, force)
else:
if file.is_char_device or file.is_block_device:
if isinstance(file, (CharacterDevice, BlockDevice)):
devtype = file.size
elif file.is_fifo or file.is_socket:
elif isinstance(file, (FIFO, Socket)):
devtype = 0
else:
_print(f"bogus mode: {file.path} ({file.mode:o})", quiet=quiet)
Expand All @@ -82,7 +97,7 @@ def extract_file(file, dest: Path, force: bool = False, quiet: bool = True) -> b
return True


def extract_dir(directory: Directory, dest: Path, force: bool = False, quiet: bool = True):
def extract_dir(directory: Directory, dest: Path, force: bool = False, quiet: bool = True) -> int:
"""Extract a directory tree. Return the amount of files created."""
total = directory.total
width = 2**Width.NAMELEN
Expand Down
Loading

0 comments on commit f25239e

Please sign in to comment.