Skip to content

Commit

Permalink
Lint with mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
pedropombeiro committed Oct 1, 2024
1 parent 52e868e commit 9ee9978
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 61 deletions.
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#FROM ubuntu:16.04
FROM ubuntu:22.04
FROM python:slim-bullseye

RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get -y install tzdata && apt-get -y clean

Expand All @@ -24,7 +23,8 @@ RUN apt-get -y install \
iproute2 \
pdftk \
poppler-utils \
&& apt-get -y clean
&& apt-get -y clean && \
pip install requests

RUN cd /tmp && \
wget https://download.brother.com/welcome/dlf105200/brscan4-0.4.11-1.amd64.deb && \
Expand Down
110 changes: 69 additions & 41 deletions script/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import tempfile
import time
from datetime import datetime
from typing import List, TextIO
from typing import List, Optional, TextIO

from sendtoftps import sendtoftps
from trigger_inotify import trigger_inotify
Expand Down Expand Up @@ -59,7 +59,7 @@ def execute_command_pid(log: TextIO, command: List[str], **kwargs) -> int:


def scan_cmd(
log: TextIO, device: str, output_batch: str, scanimage_args: List[str]
log: TextIO, device: Optional[str], output_batch: str, scanimage_args: List[str]
) -> None:
log.flush() # Required, otherwise scanimage output will appear before the already printed output

Expand Down Expand Up @@ -90,13 +90,13 @@ def notify(log: TextIO, file_path: str, message: str) -> None:
)
trigger_telegram(
log,
f"Scanner: {message}",
os.getenv("TELEGRAM_TOKEN"),
os.getenv("TELEGRAM_CHATID"),
f"Scanner: {message}",
)


def latest_batch_dir() -> str:
def latest_batch_dir() -> Optional[str]:
prefix = datetime.today().strftime("%Y-%m-%d")
dir_entries = glob.glob(os.path.join(tempfile.gettempdir(), f"{prefix}*"))
dirs = filter(os.path.isdir, dir_entries)
Expand Down Expand Up @@ -148,18 +148,23 @@ def remove_blank_pages(
)
output, _ = process.communicate()
if process.returncode != 0:
print(f" ERROR: getting number of pages from {input_file}: {output}")
print(f" ERROR: getting number of pages from {input_file}")
return
info = output.decode()
pages_line = re.search(r"^Pages:\s*(\d+)", info, re.MULTILINE)
if pages_line is None:
print(f" ERROR: finding number of pages in {info}")
return
pages_line = re.search(r"^Pages:\s*(\d+)", output.decode(), re.MULTILINE)
page_count = int(pages_line.group(1))

print(
f" Analyzing {page_count} pages in {input_file} with threshold {remove_blank_threshold}%"
f" Analyzing {page_count} pages in {
input_file} with threshold {remove_blank_threshold}%"
)
os.chdir(dirname)

def non_blank_pages() -> List[str]:
picked_pages = []
picked_pages: List[str] = []
for page in range(1, page_count + 1):
# Use subprocess to run gs and get ink coverage
process = subprocess.Popen(
Expand All @@ -181,13 +186,22 @@ def non_blank_pages() -> List[str]:
output.decode(),
re.MULTILINE,
)
ink_coverage = sum(map(float, ink_coverage_line.groups()))
if ink_coverage_line is None:
ink_coverage = None
else:
ink_coverage = sum(map(float, ink_coverage_line.groups()))

if ink_coverage < remove_blank_threshold:
print(f" Page {page}: delete (ink coverage: {ink_coverage:.2f}%)")
if ink_coverage is not None and ink_coverage < remove_blank_threshold:
print(
f" Page {
page}: delete (ink coverage: {ink_coverage:.2f}%)"
)
else:
picked_pages += str(page)
print(f" Page {page}: keep (ink coverage: {ink_coverage:.2f}%)")
print(
f" Page {
page}: keep (ink coverage: {ink_coverage:.2f}%)"
)

return picked_pages

Expand All @@ -210,20 +224,27 @@ def non_blank_pages() -> List[str]:
print(f" No blank pages detected in {input_file}")
else:
os.replace(output_file, input_file)
print(f" Removed {removed_pages} blank pages and saved as {input_file}")
print(
f" Removed {
removed_pages} blank pages and saved as {input_file}"
)
except FileNotFoundError:
print(
f" WARNING: '{command[0]}' executable not found. Skipping PDF manipulation."
f" WARNING: '{
command[0]}' executable not found. Skipping PDF manipulation."
)
except subprocess.CalledProcessError:
print(f" ERROR: manipulating {input_file}. Skipping PDF manipulation.")
print(
f" ERROR: manipulating {
input_file}. Skipping PDF manipulation."
)


#
# Async job methods
#
def convert_and_post_process(
job_name: str, side: str, remove_blank_threshold: float
job_name: str, side: str, remove_blank_threshold: Optional[float]
) -> None:
log = sys.stdout
log.flush()
Expand Down Expand Up @@ -295,19 +316,18 @@ def convert_and_post_process(
)

notify(log, ocr_pdf_name, f"{ocr_pdf_name} ({side}) OCR finished")
sendtoftps(
log,
os.getenv("FTP_USER"),
os.getenv("FTP_PASSWORD"),
os.getenv("FTP_HOST"),
os.getenv("FTP_PATH"),
ocr_pdf_path,
)

if os.getenv("REMOVE_ORIGINAL_AFTER_OCR") == "true" and os.path.isfile(
ocf_pdf_path
):
os.remove(output_pdf_file)
ftp_user = os.getenv("FTP_USER")
ftp_password = os.getenv("FTP_PASSWORD")
ftp_host = os.getenv("FTP_HOST")
ftp_path = os.getenv("FTP_PATH")
sendtoftps(log, ftp_user, ftp_password,
ftp_host, ftp_path, ocr_pdf_path)

if os.getenv("REMOVE_ORIGINAL_AFTER_OCR") == "true" and os.path.isfile(
ocr_pdf_path
):
os.remove(output_pdf_file)

print(f" {side} side: Conversion and post-processing for finished.")
print("-----------------------------------")
Expand All @@ -316,7 +336,8 @@ def convert_and_post_process(
def wait_for_rear_pages_or_convert(job_name: str) -> None:
# Wait for 2 minutes in case there is a rear side scan
print(
f" front side: Waiting for 2 minutes before starting file conversion for {job_name}"
f" front side: Waiting for 2 minutes before starting file conversion for {
job_name}"
)
time.sleep(120)

Expand Down Expand Up @@ -365,14 +386,15 @@ def save_front_processing_pid(job_dir: str, pid: int) -> None:
pid_file.write(str(pid))


def kill_front_processing_from_pid(job_dir: str) -> int:
def kill_front_processing_from_pid(job_dir: str) -> Optional[int]:
path = scan_pid_path(job_dir)
pid = None
try:
with open(path, "r") as scan_pid_file:
pid = int(scan_pid_file.read().strip())
print(
f" rear side: Read pid from {path}, killing front processing job {pid}"
f" rear side: Read pid from {
path}, killing front processing job {pid}"
)
os.kill(pid, signal.SIGKILL)
except FileNotFoundError:
Expand All @@ -389,8 +411,9 @@ def kill_front_processing_from_pid(job_dir: str) -> int:
#
# Scan entry points
#
def scan_front(log: TextIO, device: str, scanimage_args=[]) -> None:
job_name = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") # Generate unique timestamp
def scan_front(log: TextIO, device: Optional[str], scanimage_args=[]) -> None:
# Generate unique timestamp
job_name = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
job_dir = os.path.join(tempfile.gettempdir(), job_name)
filepath_base = os.path.join(job_dir, f"{job_name}-front-page")
tmp_output_batch = f"{filepath_base}%04d.pnm"
Expand Down Expand Up @@ -418,13 +441,14 @@ def scan_front(log: TextIO, device: str, scanimage_args=[]) -> None:
elif pid > 0:
save_front_processing_pid(job_dir, pid)
print(
f" front side: INFO: Waiting to start conversion process for {job_name} in process with PID {pid}"
f" front side: INFO: Waiting to start conversion process for {
job_name} in process with PID {pid}"
)
else:
print(f" front side: ERROR: Fork failed ({pid}).")


def scan_rear(log: TextIO, device: str, scanimage_args=None) -> None:
def scan_rear(log: TextIO, device: Optional[str], scanimage_args=None) -> None:
# Find latest directory in temp directory
job_name = latest_batch_dir()
print(f"- Scanning rear to latest batch {job_name}")
Expand Down Expand Up @@ -456,7 +480,8 @@ def scan_rear(log: TextIO, device: str, scanimage_args=None) -> None:

# Rename pages
number_of_pages = len(
[f for f in os.listdir(".") if (os.path.isfile(f) and "front-page" in f)]
[f for f in os.listdir(".") if (
os.path.isfile(f) and "front-page" in f)]
)
print(f" rear side: INFO: number of pages scanned: {number_of_pages}")

Expand All @@ -466,7 +491,8 @@ def scan_rear(log: TextIO, device: str, scanimage_args=None) -> None:
cnt_formatted = f"{cnt:03d}"
os.rename(filename, f"index{cnt_formatted}-1-{filename}")
print(
f" rear side: DEBUG: renamed {filename} to index{cnt_formatted}-1-{filename}"
f" rear side: DEBUG: renamed {filename} to index{
cnt_formatted}-1-{filename}"
)

cnt = 0
Expand All @@ -476,13 +502,15 @@ def scan_rear(log: TextIO, device: str, scanimage_args=None) -> None:
rear_index_formatted = f"{rear_index:03d}"
os.rename(filename, f"index{rear_index_formatted}-2-{filename}")
print(
f" rear side: DEBUG: renamed {filename} to index{rear_index_formatted}-2-{filename}"
f" rear side: DEBUG: renamed {filename} to index{
rear_index_formatted}-2-{filename}"
)

# Convert to PDF
remove_blank_threshold = os.getenv("REMOVE_BLANK_THRESHOLD")
if remove_blank_threshold:
remove_blank_threshold = float(remove_blank_threshold)
remove_blank_threshold_str = os.getenv("REMOVE_BLANK_THRESHOLD")
remove_blank_threshold = None
if remove_blank_threshold_str is not None:
remove_blank_threshold = float(remove_blank_threshold_str)

pid = os.fork()
if pid == 0: # Child process
Expand Down
19 changes: 16 additions & 3 deletions script/sendtoftps.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
#!/usr/bin/python3

import subprocess
from typing import List, Optional, TextIO

def sendtoftps(log, user, password, address, filepath, file):

def sendtoftps(
log: TextIO,
user: Optional[str],
password: Optional[str],
address: Optional[str],
filepath: Optional[str],
file: Optional[str],
) -> None:
"""Uploads a file to an FTP server.
Args:
Expand All @@ -12,15 +22,18 @@ def sendtoftps(log, user, password, address, filepath, file):
file (str): The file to upload.
"""

command = [
if not any([user, password, address, filepath, file]):
return

command: List[str] = [
"curl",
"--silent",
"--show-error",
"--ssl-reqd",
"--user",
f"{user}:{password}",
"--upload-file",
file,
str(file),
f"ftp://{address}{filepath}",
]

Expand Down
9 changes: 7 additions & 2 deletions script/trigger_inotify.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
#!/usr/bin/python3

import subprocess
from typing import TextIO
from typing import Optional, TextIO


def trigger_inotify(
log: TextIO, user: str, password: str, address: str, filepath: str, file: str
log: TextIO,
user: Optional[str],
password: Optional[str],
address: Optional[str],
filepath: Optional[str],
file: Optional[str],
) -> None:
"""Triggers inotify for a file.
Expand Down
18 changes: 6 additions & 12 deletions script/trigger_telegram.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#!/usr/bin/python3

import os
import urllib.parse
from typing import TextIO
from typing import Optional, TextIO

import requests

def trigger_telegram(log: TextIO, token: str, chat_id: str, message: str) -> None:

def trigger_telegram(
log: TextIO, message: str, token: Optional[str], chat_id: Optional[str]
) -> None:
"""Sends a Telegram message using the provided token and chat ID."""

if not token or not chat_id:
Expand All @@ -23,18 +26,9 @@ def trigger_telegram(log: TextIO, token: str, chat_id: str, message: str) -> Non
# Prepare data payload
payload = {"chat_id": chat_id, "text": encoded_message}

# Use requests library for a more robust solution (install with 'pip install requests')
try:
import requests

response = requests.post(url, json=payload)
response.raise_for_status() # Raise an exception for non-200 response
print(" Telegram message sent successfully.")
except ModuleNotFoundError:
print(" WARNING: 'requests' library not found. Using wget fallback.")
# Fallback using wget (not recommended for production due to limited feedback)
os.system(
f"wget -qO- --post-data='chat_id={chat_id}&text={encoded_message}' '{url}' >/dev/null"
)
except requests.exceptions.RequestException as e:
print(f" ERROR: sending Telegram message: {e}")

0 comments on commit 9ee9978

Please sign in to comment.