Skip to content

Commit

Permalink
Merge pull request #8 from uni-halle/modulesAndQueueRework
Browse files Browse the repository at this point in the history
Modules and queue rework
  • Loading branch information
vJan00 authored Nov 5, 2024
2 parents 3751e14 + 6df8e08 commit 781d275
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 244 deletions.
13 changes: 7 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
werkzeug
flask
python-dotenv
psutil
pytest
openai-whisper
werkzeug~=3.0.4
flask~=3.0.3
python-dotenv~=1.0.1
psutil~=6.1.0
pytest~=8.3.3
openai-whisper==20240930
requests~=2.32.3
3 changes: 2 additions & 1 deletion src/.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
whisper_model = "small"
# small, medium, large, large-v2, large-v3, large-v3-turbo
whisper_model = "large-v3-turbo"
parallel_workers = 2
login_username = "username"
login_password = "password"
230 changes: 143 additions & 87 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import os
import uuid
from threading import Thread

import io
import psutil
Expand All @@ -10,6 +9,7 @@
from flask import Flask, request, Response
from werkzeug.datastructures import FileStorage, Authorization

from packages.Opencast import Opencast
from utils import database, util
from core.TsApi import TsApi
from dotenv import load_dotenv
Expand Down Expand Up @@ -37,107 +37,72 @@ def authorisation():
})


# Transcribe Routes
@app.route("/transcribe", methods=['POST'])
def transcribe_post():
"""
Endpoint to accept videos and links to whisper
:return: HttpResponse
"""
username: str = request.form.get("username")
password: str = request.form.get("password")
uid: str = str(uuid.uuid4())
priority: str = request.form.get("priority")

module: str = request.form.get("module")
module_id: str = request.form.get("module_id")
link: str = request.form.get("link")
if ('file' not in request.files) and (not link):
return {"error": "No file or link"}, 415

if ('file' not in request.files) and (not (module and module_id and link)):
return {"error": "No file or link with module and module id"}, 415

if not priority or not priority.isnumeric():
return {"error": "Priority nan"}, 400

if (100 / round(psutil.disk_usage('./').total / 1000000000, 1)
* round(psutil.disk_usage('./').used / 1000000000, 1) > 90):
# Self-care system
ram_usage = round(psutil.virtual_memory().percent, 1)
cpu_usage = round(psutil.cpu_percent(interval=0.5))
storage_total = round(psutil.disk_usage('./').total / 1000000000, 1)
storage_usage = round(psutil.disk_usage('./').used / 1000000000, 1)

if 100 / storage_total * storage_usage > 90:
return {"error": "Not enough storage"}, 507

if 'file' in request.files:
try:
file: FileStorage = request.files['file']
if file.filename == '':
return {"error": "No file"}, 415
if file:
uid = str(uuid.uuid4())
ts_api.add_file_to_queue(uid, file, int(priority))
return {"jobId": uid}, 201
except:
return {"Error": "Failed to add file"}, 500
elif link:
if len(ts_api.runningDownloads) >= 3:
return {"Error": "Too many downloads already running"}, 503

try:
uid = str(uuid.uuid4())
if username and password:
Thread(target=ts_api.add_link_to_queue,
args=(uid, link, int(priority),
username, password)).start()
return {"jobId": uid}, 201
else:
Thread(target=ts_api.add_link_to_queue,
args=(uid, link, int(priority))).start()
return {"jobId": uid}, 201
except:
return {"Error": "Failed to add link"}, 500
if ram_usage > 90:
return {"error": "Not enough ram"}, 507

if cpu_usage > 400:
return {"error": "Not enough cpu"}, 507

@app.route("/status", methods=['GET'])
def status():
"""
Endpoint to return status of video
:return: HttpResponse
"""
req_id = request.args.get("id")
if database.exists_job(req_id):
job_data = database.load_job(req_id)
return {"jobId": req_id,
"status": util.get_status(job_data["status"])}, 200
else:
return {"error": "Job not found"}, 404


@app.route("/status/system", methods=['GET'])
def system_status():
"""
Endpoint to return status of system
:return: HttpResponse
"""
return {
"cpu_usage": round(psutil.cpu_percent(interval=0.5)
* 100 / psutil.cpu_count(), 1),
"cpu_cores": psutil.cpu_count(),
"ram_usage": round(psutil.virtual_memory().percent, 1),
"ram_free": round(psutil.virtual_memory().available
* 100 / psutil.virtual_memory().total, 1),
"storage_total": round(psutil.disk_usage('./').total / 1000000000, 1),
"storage_usage": round(psutil.disk_usage('./').used / 1000000000, 1),
"storage_free": round(psutil.disk_usage('./').free / 1000000000, 1),
"swap_usage": round(psutil.swap_memory().percent, 1),
"swap_free": round(psutil.swap_memory().free
* 100 / psutil.swap_memory().total, 1),
"queue_length": ts_api.queue.qsize(),
"running_jobs": len(ts_api.runningJobs),
"parallel_jobs": int(os.environ.get("parallel_workers")),
"running_downloads": len(ts_api.runningDownloads)
}, 200
if ts_api.queue.qsize() > 50:
return {"error": "The queue is full"}, 507

# Old File upload
if 'file' in request.files:
file: FileStorage = request.files['file']
util.save_file(file, uid)
database.add_job(uid, None)
ts_api.add_to_queue(uid, None, int(priority))
return {"jobId": uid}, 201
# Insert modules here
elif module and module_id and link:
if module == "opencast":
if module_id in ts_api.opencastModules:
# Module specific
opencast_module: Opencast = ts_api.opencastModules[module_id]
if (opencast_module.queue_entry
< opencast_module.max_queue_entry):
opencast_module.queue_entry = (
opencast_module.queue_entry + 1)
opencast_module.link_list[uid] = link
database.add_job(uid, module_id)

@app.route("/", methods=['GET'])
def main():
"""
Endpoint to return main of system
:return: HttpResponse
"""
return {
"message": "Listening to API calls",
"status": 200
}, 200
ts_api.add_to_queue(uid, module_id, int(priority))
return {"jobId": uid}, 201
else:
return {"error": "Max Opencast Queue length reached"}, 429
else:
return {"error": "Module ID not found"}, 400
else:
return {"error": "Module not found"}, 400


@app.route("/transcribe", methods=['GET'])
Expand Down Expand Up @@ -200,6 +165,79 @@ def transcribe_delete():
return {"error": "Job not found"}, 404


# Add Module Routes here
@app.route("/module/opencast", methods=['POST'])
def module_opencast_post():
"""
Endpoint to accept videos and links to whisper
:return: HttpResponse
"""
username: str = request.form.get("username")
password: str = request.form.get("password")
max_queue_length: int = int(request.form.get("max_queue_length"))
if not (username and password):
return {"error": "No username or password specified"}, 400
if not max_queue_length:
return {"error": "No max queue length specified"}, 400
uid = str(uuid.uuid4())
ts_api.opencastModules[uid] = Opencast(username, password,
max_queue_length, uid)
return {"moduleId": uid}, 201


# Status Routes
@app.route("/status", methods=['GET'])
def status():
"""
Endpoint to return status of video
:return: HttpResponse
"""
req_id = request.args.get("id")
if database.exists_job(req_id):
job_data = database.load_job(req_id)
return {"jobId": req_id,
"status": util.get_status(job_data["status"])}, 200
else:
return {"error": "Job not found"}, 404


@app.route("/status/system", methods=['GET'])
def system_status():
"""
Endpoint to return status of system
:return: HttpResponse
"""
return {
"cpu_usage": round(psutil.cpu_percent(interval=0.5)
* 100 / psutil.cpu_count(), 1),
"cpu_cores": psutil.cpu_count(),
"ram_usage": round(psutil.virtual_memory().percent, 1),
"ram_free": round(psutil.virtual_memory().available
* 100 / psutil.virtual_memory().total, 1),
"storage_total": round(psutil.disk_usage('./').total / 1000000000, 1),
"storage_usage": round(psutil.disk_usage('./').used / 1000000000, 1),
"storage_free": round(psutil.disk_usage('./').free / 1000000000, 1),
"swap_usage": round(psutil.swap_memory().percent, 1),
"swap_free": round(psutil.swap_memory().free
* 100 / psutil.swap_memory().total, 1),
"queue_length": ts_api.queue.qsize(),
"running_jobs": len(ts_api.runningJobs),
"parallel_jobs": int(os.environ.get("parallel_workers"))
}, 200


@app.route("/", methods=['GET'])
def main():
"""
Endpoint to return main of system
:return: HttpResponse
"""
return {
"message": "Listening to API calls",
"status": 200
}, 200


@app.route("/language", methods=['GET'])
def language_get():
"""
Expand All @@ -208,11 +246,29 @@ def language_get():
"""
req_id = request.args.get("id")
if database.exists_job(req_id):
if database.load_job(req_id)["status"] >= 2: # Whispered
job_data = database.load_job(req_id)
job_data = database.load_job(req_id)
if "whisper_language" in job_data:
return {"jobId": req_id,
"language": job_data["whisper_language"]}, 200
else:
return {"error": "Job not whispered"}, 200
return {"error": "Job not preprocessed"}, 200
else:
return {"error": "Job not found"}, 404


@app.route("/model", methods=['GET'])
def model_get():
"""
Endpoint to return language of video
:return: HttpResponse
"""
req_id = request.args.get("id")
if database.exists_job(req_id):
job_data = database.load_job(req_id)
if "whisper_model" in job_data: # Whispered
return {"jobId": req_id,
"model": job_data["whisper_model"]}, 200
else:
return {"error": "Job not preprocessed"}, 200
else:
return {"error": "Job not found"}, 404
28 changes: 22 additions & 6 deletions src/core/Transcriber.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import threading
import os

import torch

from utils import database
import whisper

Expand All @@ -14,10 +17,9 @@ def __init__(self, ts_api, uid: str):
"""
logging.debug("Preparing Transcriber for job with id " + uid + "...")
# Prepare
job_data = database.load_job(uid)
self.whisper_result = None
self.whisper_language = None
self.file_path = "./data/audioInput/" + job_data["filename"]
self.file_path = "./data/audioInput/" + uid
self.uid = uid
self.ts_api = ts_api
database.change_job_status(self.uid, 0) # Prepared
Expand All @@ -40,16 +42,30 @@ def transcriber_thread(self):
try:
logging.debug("Starting Whisper for job with id "
+ self.uid + "...")
# Whisper
# Whisper model
model_size = os.environ.get("whisper_model")
model = whisper.load_model(model_size,
download_root="./data/models")
database.set_whisper_model(self.uid, model_size)
# Load audio
audio = whisper.load_audio(self.file_path)
result = whisper.transcribe(model, audio)
short_audio = whisper.pad_or_trim(audio)
# Detect language
mel = (whisper.log_mel_spectrogram(short_audio, model.dims.n_mels)
.to(model.device).to(torch.float32))
_, probs = model.detect_language(mel)
self.whisper_language = str(max(probs, key=probs.get))
database.set_whisper_language(self.uid,
str(max(probs, key=probs.get)))
database.change_job_status(self.uid, 4) # Preprocessed
# Translate audio
result = whisper.transcribe(model=model,
audio=audio,
language=self.whisper_language,
fp16=False)
# Store results
self.whisper_result = result
database.set_whisper_result(self.uid, result)
self.whisper_language = result['language']
database.set_whisper_language(self.uid, result['language'])
database.change_job_status(self.uid, 2) # Whispered
os.remove(self.file_path)
logging.debug("Finished Whisper for job with id " + self.uid + "!")
Expand Down
Loading

0 comments on commit 781d275

Please sign in to comment.