Skip to content

Commit

Permalink
Module implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vJan00 authored and vJan00 committed Oct 29, 2024
1 parent 3751e14 commit 29e17ec
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 155 deletions.
13 changes: 7 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
werkzeug
flask
python-dotenv
psutil
pytest
openai-whisper
werkzeug~=3.0.4
flask~=3.0.3
python-dotenv~=1.0.1
psutil~=6.1.0
pytest~=8.3.3
openai-whisper~=20240930
requests~=2.32.3
3 changes: 2 additions & 1 deletion src/.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
whisper_model = "small"
# small, medium, large, large-v2, large-v3, large-v3-turbo
whisper_model = "large-v3-turbo"
parallel_workers = 2
login_username = "username"
login_password = "password"
184 changes: 97 additions & 87 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from flask import Flask, request, Response
from werkzeug.datastructures import FileStorage, Authorization

from packages.Opencast import Opencast
from utils import database, util
from core.TsApi import TsApi
from dotenv import load_dotenv
Expand Down Expand Up @@ -37,108 +38,47 @@ def authorisation():
})


# Transcribe Routes
@app.route("/transcribe", methods=['POST'])
def transcribe_post():
"""
Endpoint to accept videos and links to whisper
:return: HttpResponse
"""
username: str = request.form.get("username")
password: str = request.form.get("password")
uid: str = str(uuid.uuid4())
priority: str = request.form.get("priority")

module: str = request.form.get("module")
module_id: str = request.form.get("module_id")
link: str = request.form.get("link")
if ('file' not in request.files) and (not link):
return {"error": "No file or link"}, 415

if ('file' not in request.files) and (not (module and module_id and link)):
return {"error": "No file or link with module and module id"}, 415

if not priority or not priority.isnumeric():
return {"error": "Priority nan"}, 400

if (100 / round(psutil.disk_usage('./').total / 1000000000, 1)
* round(psutil.disk_usage('./').used / 1000000000, 1) > 90):
return {"error": "Not enough storage"}, 507

if 'file' in request.files:
try:
file: FileStorage = request.files['file']
if file.filename == '':
return {"error": "No file"}, 415
if file:
uid = str(uuid.uuid4())
ts_api.add_file_to_queue(uid, file, int(priority))
return {"jobId": uid}, 201
except:
return {"Error": "Failed to add file"}, 500
elif link:
if len(ts_api.runningDownloads) >= 3:
return {"Error": "Too many downloads already running"}, 503

try:
uid = str(uuid.uuid4())
if username and password:
Thread(target=ts_api.add_link_to_queue,
args=(uid, link, int(priority),
username, password)).start()
file: FileStorage = request.files['file']
util.save_file(file, uid)
database.add_job(uid, None)
ts_api.add_to_queue(uid, None, int(priority))
return {"jobId": uid}, 201
# Insert modules here
elif module and module_id and link:
if module == "opencast":
if module_id in ts_api.opencastModules:
# Module specific
opencast_module: Opencast = ts_api.opencastModules[module_id]
opencast_module.link_list[uid] = link
database.add_job(uid, module_id)

ts_api.add_to_queue(uid, module_id, int(priority))
return {"jobId": uid}, 201
else:
Thread(target=ts_api.add_link_to_queue,
args=(uid, link, int(priority))).start()
return {"jobId": uid}, 201
except:
return {"Error": "Failed to add link"}, 500


@app.route("/status", methods=['GET'])
def status():
"""
Endpoint to return status of video
:return: HttpResponse
"""
req_id = request.args.get("id")
if database.exists_job(req_id):
job_data = database.load_job(req_id)
return {"jobId": req_id,
"status": util.get_status(job_data["status"])}, 200
else:
return {"error": "Job not found"}, 404


@app.route("/status/system", methods=['GET'])
def system_status():
"""
Endpoint to return status of system
:return: HttpResponse
"""
return {
"cpu_usage": round(psutil.cpu_percent(interval=0.5)
* 100 / psutil.cpu_count(), 1),
"cpu_cores": psutil.cpu_count(),
"ram_usage": round(psutil.virtual_memory().percent, 1),
"ram_free": round(psutil.virtual_memory().available
* 100 / psutil.virtual_memory().total, 1),
"storage_total": round(psutil.disk_usage('./').total / 1000000000, 1),
"storage_usage": round(psutil.disk_usage('./').used / 1000000000, 1),
"storage_free": round(psutil.disk_usage('./').free / 1000000000, 1),
"swap_usage": round(psutil.swap_memory().percent, 1),
"swap_free": round(psutil.swap_memory().free
* 100 / psutil.swap_memory().total, 1),
"queue_length": ts_api.queue.qsize(),
"running_jobs": len(ts_api.runningJobs),
"parallel_jobs": int(os.environ.get("parallel_workers")),
"running_downloads": len(ts_api.runningDownloads)
}, 200


@app.route("/", methods=['GET'])
def main():
"""
Endpoint to return main of system
:return: HttpResponse
"""
return {
"message": "Listening to API calls",
"status": 200
}, 200

return {"Error": "Module ID not found"}, 400
else:
return {"Error": "Module not found"}, 400

@app.route("/transcribe", methods=['GET'])
def transcribe_get():
Expand Down Expand Up @@ -199,6 +139,76 @@ def transcribe_delete():
else:
return {"error": "Job not found"}, 404

# Add Module Routes here
@app.route("/module/opencast", methods=['POST'])
def module_opencast_post():
"""
Endpoint to accept videos and links to whisper
:return: HttpResponse
"""
username: str = request.form.get("username")
password: str = request.form.get("password")
max_queue_length: str = request.form.get("max_queue_length")
if not(username and password):
return {"error": "No username or password specified"}, 400
if not max_queue_length:
return {"error": "No max queue length specified"}, 400
uid = str(uuid.uuid4())
ts_api.opencastModules[uid] = Opencast(username, password, max_queue_length)
return {"moduleId": uid}, 201

# Status Routes
@app.route("/status", methods=['GET'])
def status():
"""
Endpoint to return status of video
:return: HttpResponse
"""
req_id = request.args.get("id")
if database.exists_job(req_id):
job_data = database.load_job(req_id)
return {"jobId": req_id,
"status": util.get_status(job_data["status"])}, 200
else:
return {"error": "Job not found"}, 404


@app.route("/status/system", methods=['GET'])
def system_status():
"""
Endpoint to return status of system
:return: HttpResponse
"""
return {
"cpu_usage": round(psutil.cpu_percent(interval=0.5)
* 100 / psutil.cpu_count(), 1),
"cpu_cores": psutil.cpu_count(),
"ram_usage": round(psutil.virtual_memory().percent, 1),
"ram_free": round(psutil.virtual_memory().available
* 100 / psutil.virtual_memory().total, 1),
"storage_total": round(psutil.disk_usage('./').total / 1000000000, 1),
"storage_usage": round(psutil.disk_usage('./').used / 1000000000, 1),
"storage_free": round(psutil.disk_usage('./').free / 1000000000, 1),
"swap_usage": round(psutil.swap_memory().percent, 1),
"swap_free": round(psutil.swap_memory().free
* 100 / psutil.swap_memory().total, 1),
"queue_length": ts_api.queue.qsize(),
"running_jobs": len(ts_api.runningJobs),
"parallel_jobs": int(os.environ.get("parallel_workers")),
"running_downloads": len(ts_api.runningDownloads)
}, 200


@app.route("/", methods=['GET'])
def main():
"""
Endpoint to return main of system
:return: HttpResponse
"""
return {
"message": "Listening to API calls",
"status": 200
}, 200

@app.route("/language", methods=['GET'])
def language_get():
Expand Down
3 changes: 2 additions & 1 deletion src/core/Transcriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, ts_api, uid: str):
job_data = database.load_job(uid)
self.whisper_result = None
self.whisper_language = None
self.file_path = "./data/audioInput/" + job_data["filename"]
self.file_path = "./data/audioInput/" + uid
self.uid = uid
self.ts_api = ts_api
database.change_job_status(self.uid, 0) # Prepared
Expand Down Expand Up @@ -50,6 +50,7 @@ def transcriber_thread(self):
database.set_whisper_result(self.uid, result)
self.whisper_language = result['language']
database.set_whisper_language(self.uid, result['language'])
database.set_whisper_model(self.uid, model_size)
database.change_job_status(self.uid, 2) # Whispered
os.remove(self.file_path)
logging.debug("Finished Whisper for job with id " + self.uid + "!")
Expand Down
76 changes: 21 additions & 55 deletions src/core/TsApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import threading
import time
from queue import PriorityQueue
from typing import List
from typing import List, Dict

import requests
import whisper
from requests import request
from werkzeug.datastructures import FileStorage

from core.Transcriber import Transcriber
from packages.Opencast import Opencast
from utils import util, database


Expand All @@ -28,8 +30,15 @@ def __init__(self):
self.queue: PriorityQueue = database.load_queue()
self.runningJobs: List[str] = []
self.runningDownloads: List[str] = []
self.running: bool = True
self.opencastModules: Dict[str, Opencast] = {}
model_size = os.environ.get("whisper_model")
if not os.path.exists("./data/models/" + model_size +".pt"):
logging.info("Downloading Whisper model...")
model = whisper.load_model(model_size,
download_root="./data/models")
logging.info("Whisper model \"" + model_size + "\" loaded!")
logging.info("TsAPI started!")
self.running: bool = True

def exit(self, sig, frame):
"""
Expand All @@ -44,73 +53,30 @@ def exit(self, sig, frame):
logging.info("TsAPI stopped!")
sys.exit(1)

def add_file_to_queue(self, uid: str, file: FileStorage, priority: int):
def add_to_queue(self, uid: str, module_id: str | None, priority: int):
"""
Adds job to queue
:param module_id: The corresponding module id (if available)
:param uid: The uid of the job
:param file: The file the job should process
:param priority: The priority of the job itself
:return: The uid the job got assigned
"""
logging.info("Adding job with id " + uid + " to queue.")
# Safe file to input folder
util.save_file(file, uid)
# Add job to database
database.add_job(file.filename, uid)
# Add job to queue
self.queue.put((priority, uid))

def add_link_to_queue(self, uid: str, link: str, priority: int,
username: str = None, password: str = None):
"""
Adds job to queue
:param uid: The uid of the job
:param link: The link the job should process
:param username: The username to log in to the website
:param password: The password to log in to the website<
:param priority: The priority of the job itself
:return: The uid the job got assigned
"""
try:
logging.info("Downloading file for job id " + uid + "...")
self.runningDownloads.append(uid)
session: request = requests.Session()
# Add job to database
database.add_job(uid, uid)
# Check if username and password is present
if username and password:
session.auth = (username, password)
# Get Response
response = session.get(link, allow_redirects=False)
# Check Response
if response.status_code != 200:
raise Exception
file = FileStorage(
stream=io.BytesIO(response.content),
filename=uid,
content_length=response.headers.get("Content-Length"),
content_type=response.headers.get("Content-Type")
)
logging.info("Downloaded file for job id " + uid + ".")
# Safe file to input folder
util.save_file(file, uid)
self.runningDownloads.remove(uid)
logging.info("Adding job with id " + uid + " to queue.")
# Add job to queue
self.queue.put((priority, uid))
except:
database.change_job_status(uid, 3) # Failed
logging.error("Error downloading or adding job "
+ uid + " to queue.")
self.queue.put((priority, (uid, module_id)))

# Track running jobs
def register_job(self, uid):
def register_job(self, uid: str, module_id: str | None):
"""
Register a running job
:param module_id: The corresponding module id (if available)
:param uid: The uid of the job
:return: The transcriber model the registered job prepared
"""
logging.info("Starting job with id " + uid + ".")
# Checking Opencast module
if module_id and module_id in self.opencastModules:
self.opencastModules[module_id].download_file(uid)
# Create transcriber
trans = Transcriber(self, uid)
# Add running job
Expand Down Expand Up @@ -146,9 +112,9 @@ def ts_api_thread(self):
parallel_worker = int(os.environ.get("parallel_workers"))
if len(self.runningJobs) < parallel_worker:
if not self.queue.empty():
uid = self.queue.get()[1]
uid, module_id = self.queue.get()[1]
# Register running job
trans: Transcriber = self.register_job(uid)
trans: Transcriber = self.register_job(uid, module_id)
# Start running job
trans.start_thread()
time.sleep(5)
Loading

0 comments on commit 29e17ec

Please sign in to comment.