diff --git a/README.md b/README.md index 674e1cb..25effa0 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,8 @@ limitations under the License. Update to the latest version by running `npm run update-app` after pulling the latest changes from the repository via `git pull --rebase --autostash`; you would need to redploy the *UI* for features marked as `frontend`, and *GCP components* for features marked as `backend`. +* [September 2024] + * `backend`: You can now process any video of any length or size - even beyond the Google Cloud Video AI API [limits](https://cloud.google.com/video-intelligence/quotas) of 50 GB size and up to 3h video length. * [August 2024] * Updated the [pricing](#pricing-and-quotas) section and Cloud calculator example to use the new (cheaper) pricing for `Gemini 1.5 Flash`. * `frontend`: You can now manually move the Smart Framing crop area to better capture the point of interest. Read more [here](#3-object-tracking-and-smart-framing). diff --git a/service/.env.yaml b/service/.env.yaml index e6ea8bb..870a671 100644 --- a/service/.env.yaml +++ b/service/.env.yaml @@ -19,3 +19,5 @@ CONFIG_VISION_MODEL: gemini-1.5-flash CONFIG_WHISPER_MODEL: small CONFIG_ANNOTATIONS_CONFIDENCE_THRESHOLD: '0.7' CONFIG_MULTIMODAL_ASSET_GENERATION: 'true' +CONFIG_MAX_VIDEO_CHUNK_SIZE: '1000000000' # 1 GB +CONFIG_MAX_AUDIO_CHUNK_SIZE: '480' # 8 minutes diff --git a/service/audio/audio.py b/service/audio/audio.py index 632d80b..43ce53f 100644 --- a/service/audio/audio.py +++ b/service/audio/audio.py @@ -17,11 +17,12 @@ This module contains functions to extract, split and transcribe audio files. """ +import datetime import logging import os import pathlib import shutil -from typing import Optional, Tuple +from typing import Optional, Sequence, Tuple import config as ConfigService import pandas as pd @@ -31,6 +32,103 @@ from iso639 import languages +def combine_audio_files(output_path: str, audio_files: Sequence[str]): + """Combines audio analysis files into a single file.""" + ffmpeg_cmds = ['ffmpeg'] + for audio_file in audio_files: + ffmpeg_cmds.extend(['-i', audio_file]) + + ffmpeg_cmds += ['-filter_complex'] + [ + ''.join([f'[{index}:0]' for index, _ in enumerate(audio_files)]) + + f'concat=n={len(audio_files)}:v=0:a=1[outa]' + ] + ['-map', '[outa]', output_path] + + Utils.execute_subprocess_commands( + cmds=ffmpeg_cmds, + description=( + f'Merge {len(audio_files)} audio files and output to {output_path}.' + ), + ) + os.chmod(output_path, 777) + + +def combine_analysis_chunks( + analysis_chunks: Sequence[pd.DataFrame] +) -> pd.DataFrame: + """Combines audio analysis chunks into a single response.""" + combined_df = pd.DataFrame() + max_audio_segment_id = 0 + max_end_s = 0 + + for df in analysis_chunks: + df['audio_segment_id'] += max_audio_segment_id + df['start_s'] += max_end_s + df['end_s'] += max_end_s + + max_audio_segment_id = df['audio_segment_id'].max() + max_end_s = df['end_s'].max() + + combined_df = pd.concat([combined_df, df], ignore_index=True) + + return combined_df + + +def combine_subtitle_files( + audio_output_dir: str, + subtitles_output_path: str, +): + """Combines audio analysis subtitle files content into a single file.""" + subtitles_files = [ + str(file_path) for file_path in pathlib.Path(audio_output_dir). + glob(f'*.{ConfigService.OUTPUT_SUBTITLES_TYPE}') + ] + logging.info( + 'THREADING - Combining %d subtitle files found in %s...', + len(subtitles_files), + audio_output_dir, + ) + combined_content = '' + last_timestamp = datetime.datetime.strptime('00:00:00.000', '%H:%M:%S.%f') + + for index, subtitles_file in enumerate(subtitles_files): + with open(subtitles_file, 'r', encoding='utf-8') as f: + lines = f.readlines() + + if index: + lines = lines[2:] + + for line in lines: + if '-->' in line: + start, end = line.strip().split(' --> ') + start_time = last_timestamp + datetime.timedelta( + minutes=int(start[:2]), + seconds=int(start[3:5]), + milliseconds=int(start[6:]), + ) + end_time = last_timestamp + datetime.timedelta( + minutes=int(end[:2]), + seconds=int(end[3:5]), + milliseconds=int(end[6:]), + ) + + start = start_time.strftime('%H:%M:%S.%f')[:-3] + end = end_time.strftime('%H:%M:%S.%f')[:-3] + + combined_content += f'{start} --> {end}\n' + else: + combined_content += line + + _, end = lines[-3].strip().split(' --> ') + last_timestamp += datetime.timedelta( + minutes=int(end[:2]), + seconds=int(end[3:5]), + milliseconds=int(end[6:]), + ) + + with open(subtitles_output_path, 'w', encoding='utf-8') as f: + f.write(combined_content) + + def extract_audio(video_file_path: str) -> Optional[str]: """Extracts the audio track from a video file, if it exists. @@ -82,6 +180,7 @@ def extract_audio(video_file_path: str) -> Optional[str]: def split_audio( output_dir: str, audio_file_path: str, + prefix='', ) -> Tuple[str, str]: """Splits the audio into vocals and music tracks and returns their paths. @@ -108,21 +207,34 @@ def split_audio( os.rmdir(base_path) vocals_file_path = str( - pathlib.Path(output_dir, ConfigService.OUTPUT_SPEECH_FILE) + pathlib.Path(output_dir, f'{prefix}{ConfigService.OUTPUT_SPEECH_FILE}') ) music_file_path = str( - pathlib.Path(output_dir, ConfigService.OUTPUT_MUSIC_FILE) + pathlib.Path(output_dir, f'{prefix}{ConfigService.OUTPUT_MUSIC_FILE}') ) + if prefix: + os.rename( + f'{output_dir}/{ConfigService.OUTPUT_SPEECH_FILE}', + vocals_file_path, + ) + os.rename( + f'{output_dir}/{ConfigService.OUTPUT_MUSIC_FILE}', + music_file_path, + ) + return vocals_file_path, music_file_path -def transcribe_audio(output_dir: str, audio_file_path: str) -> pd.DataFrame: +def transcribe_audio( + output_dir: str, + audio_file_path: str, +) -> Tuple[pd.DataFrame, str, float]: """Transcribes an audio file and returns the transcription. Args: - output_dir: directory where the transcription will be saved. - audio_file_path: path to the audio file that will be transcribed. + output_dir: Directory where the transcription will be saved. + audio_file_path: Path to the audio file that will be transcribed. Returns: A pandas dataframe with the transcription data. @@ -139,14 +251,7 @@ def transcribe_audio(output_dir: str, audio_file_path: str) -> pd.DataFrame: ) video_language = languages.get(alpha2=info.language).name - with open( - f'{output_dir}/{ConfigService.OUTPUT_LANGUAGE_FILE}', 'w', encoding='utf8' - ) as f: - f.write(video_language) - logging.info( - 'LANGUAGE - %s written successfully!', - ConfigService.OUTPUT_LANGUAGE_FILE, - ) + language_probability = info.language_probability results = list(segments) results_dict = [] @@ -162,8 +267,8 @@ def transcribe_audio(output_dir: str, audio_file_path: str) -> pd.DataFrame: ) writer({'segments': results_dict}, audio_file_path, {'highlight_words': True}) logging.info( - 'TRANSCRIPTION - %s written successfully!', - ConfigService.OUTPUT_SUBTITLES_FILE, + 'TRANSCRIPTION - transcript for %s written successfully!', + audio_file_path, ) transcription_data = [] @@ -185,4 +290,4 @@ def transcribe_audio(output_dir: str, audio_file_path: str) -> pd.DataFrame: 'transcript', ], ) - return transcription_dataframe + return transcription_dataframe, video_language, language_probability diff --git a/service/config/config.py b/service/config/config.py index 92d5db2..76e18cd 100644 --- a/service/config/config.py +++ b/service/config/config.py @@ -34,6 +34,18 @@ CONFIG_MULTIMODAL_ASSET_GENERATION = os.environ.get( 'CONFIG_MULTIMODAL_ASSET_GENERATION', 'false' ) == 'true' +CONFIG_MAX_VIDEO_CHUNK_SIZE = float( + os.environ.get( + 'CONFIG_MAX_VIDEO_CHUNK_SIZE', + f'{1 * 1e9}' # GB + ) +) +CONFIG_MAX_AUDIO_CHUNK_SIZE = float( + os.environ.get( + 'CONFIG_MAX_AUDIO_CHUNK_SIZE', + '480' # seconds + ) +) CONFIG_DEFAULT_SAFETY_CONFIG = { generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: ( @@ -64,6 +76,7 @@ OUTPUT_DATA_FILE = 'data.json' OUTPUT_COMBINATIONS_FILE = 'combos.json' OUTPUT_AV_SEGMENTS_DIR = 'av_segments_cuts' +OUTPUT_ANALYSIS_CHUNKS_DIR = 'analysis_chunks' OUTPUT_COMBINATION_ASSETS_DIR = 'assets' GCS_BASE_URL = 'https://storage.mtls.cloud.google.com' diff --git a/service/extractor/extractor.py b/service/extractor/extractor.py index 1c08cb8..1681828 100644 --- a/service/extractor/extractor.py +++ b/service/extractor/extractor.py @@ -19,6 +19,7 @@ """ import concurrent.futures +import json import logging import os import pathlib @@ -37,6 +38,452 @@ from vertexai.preview.generative_models import GenerativeModel, Part +def _process_video( + output_dir: str, + input_video_file_path: str, + gcs_folder: str, + gcs_bucket_name: str, +): + video_chunks = _get_video_chunks( + output_dir=output_dir, + video_file_path=input_video_file_path, + gcs_folder=gcs_folder, + gcs_bucket_name=gcs_bucket_name, + ) + size = len(video_chunks) + logging.info('EXTRACTOR - processing video with %d chunks...', size) + annotation_results = [None] * size + with concurrent.futures.ThreadPoolExecutor() as thread_executor: + futures_dict = { + thread_executor.submit( + VideoService.analyse_video, + video_file_path=video_file_path, + bucket_name=gcs_bucket_name, + gcs_folder=gcs_folder, + ): index + for index, video_file_path in enumerate(video_chunks) + } + + for response in concurrent.futures.as_completed(futures_dict): + index = futures_dict[response] + annotation_results[index] = response.result() + logging.info( + 'THREADING - analyse_video finished for chunk#%d!', + index + 1, + ) + + result = annotation_results[0] + if len(annotation_results) > 1: + logging.info( + 'THREADING - Combining %d analyse_video outputs...', + len(annotation_results), + ) + ( + result_json, + result, + ) = VideoService.combine_analysis_chunks(annotation_results) + analysis_filepath = str( + pathlib.Path(output_dir, ConfigService.OUTPUT_ANALYSIS_FILE) + ) + with open(analysis_filepath, 'w', encoding='utf-8') as f: + f.write(json.dumps(result_json, indent=2)) + StorageService.upload_gcs_file( + file_path=analysis_filepath, + bucket_name=gcs_bucket_name, + destination_file_name=str( + pathlib.Path(gcs_folder, ConfigService.OUTPUT_ANALYSIS_FILE) + ), + ) + return result + + +def _get_video_chunks( + output_dir: str, + video_file_path: str, + gcs_folder: str, + gcs_bucket_name: str, + size_limit: int = ConfigService.CONFIG_MAX_VIDEO_CHUNK_SIZE, +) -> Sequence[str]: + """Cuts the input video into smaller chunks by size.""" + _, file_ext = os.path.splitext(video_file_path) + output_folder = str( + pathlib.Path(output_dir, ConfigService.OUTPUT_ANALYSIS_CHUNKS_DIR) + ) + os.makedirs(output_folder, exist_ok=True) + + file_size = os.stat(video_file_path).st_size + duration = _get_media_duration(video_file_path) + current_duration = 0 + file_count = 0 + gcs_path = str( + pathlib.Path(gcs_folder, ConfigService.OUTPUT_ANALYSIS_CHUNKS_DIR) + ) + result = [] + + if file_size > size_limit: + while current_duration < duration: + file_count += 1 + output_file_path = str( + pathlib.Path(output_folder, f'{file_count}{file_ext}') + ) + Utils.execute_subprocess_commands( + cmds=[ + 'ffmpeg', + '-ss', + str(current_duration), + '-i', + video_file_path, + '-fs', + str(size_limit), + '-c', + 'copy', + output_file_path, + ], + description=( + f'Cut input video into {size_limit/1e9}GB chunks. ' + f'Chunk #{file_count}.' + ), + ) + os.chmod(output_file_path, 777) + new_duration = _get_media_duration(output_file_path) + if new_duration == 0.0: + logging.warning('Skipping processing 0 length chunk#%d...', file_count) + break + gcs_file_path = str(pathlib.Path(gcs_path, f'{file_count}{file_ext}')) + StorageService.upload_gcs_file( + file_path=output_file_path, + bucket_name=gcs_bucket_name, + destination_file_name=gcs_file_path, + ) + current_duration += new_duration + result.append(output_file_path) + else: + result.append(video_file_path) + + return result + + +def _process_audio( + output_dir: str, + input_audio_file_path: str, + gcs_folder: str, + gcs_bucket_name: str, + analyse_audio: bool, +) -> pd.DataFrame: + transcription_dataframe = pd.DataFrame() + + if input_audio_file_path and analyse_audio: + transcription_dataframe = _process_video_with_audio( + output_dir, + input_audio_file_path, + gcs_folder, + gcs_bucket_name, + ) + else: + _process_video_without_audio(output_dir, gcs_folder, gcs_bucket_name) + + return transcription_dataframe + + +def _process_video_with_audio( + output_dir: str, + input_audio_file_path: str, + gcs_folder: str, + gcs_bucket_name: str, +) -> pd.DataFrame: + audio_chunks = _get_audio_chunks( + output_dir=output_dir, + audio_file_path=input_audio_file_path, + gcs_folder=gcs_folder, + gcs_bucket_name=gcs_bucket_name, + ) + size = len(audio_chunks) + logging.info('EXTRACTOR - processing audio with %d chunks...', size) + vocals_files = [None] * size + music_files = [None] * size + transcription_dataframes = [None] * size + language_probability_dict = {} + audio_output_dir = str( + pathlib.Path(output_dir, ConfigService.OUTPUT_ANALYSIS_CHUNKS_DIR) + ) + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_executor: + futures_dict = { + thread_executor.submit( + _analyse_audio, + root_dir=output_dir, + output_dir=(audio_output_dir if size > 1 else output_dir), + index=index + 1, + audio_file_path=audio_file_path, + ): index + for index, audio_file_path in enumerate(audio_chunks) + } + + for response in concurrent.futures.as_completed(futures_dict): + index = futures_dict[response] + ( + vocals_file_path, + music_file_path, + audio_transcription_dataframe, + video_language, + language_probability, + ) = response.result() + vocals_files[index] = vocals_file_path + music_files[index] = music_file_path + transcription_dataframes[index] = audio_transcription_dataframe + if video_language in language_probability_dict: + language_probability_dict[video_language] = max( + language_probability_dict[video_language], + language_probability, + ) + else: + language_probability_dict[video_language] = language_probability + logging.info( + 'THREADING - analyse_audio finished for chunk#%d!', + index + 1, + ) + if size > 1: + StorageService.upload_gcs_dir( + source_directory=audio_output_dir, + bucket_name=gcs_bucket_name, + target_dir=str( + pathlib.Path( + gcs_folder, ConfigService.OUTPUT_ANALYSIS_CHUNKS_DIR + ) + ), + ) + + subtitles_output_path = str( + pathlib.Path( + output_dir, + ConfigService.OUTPUT_SUBTITLES_FILE, + ) + ) + if size > 1: + AudioService.combine_subtitle_files( + audio_output_dir, + subtitles_output_path, + ) + StorageService.upload_gcs_file( + file_path=subtitles_output_path, + bucket_name=gcs_bucket_name, + destination_file_name=str( + pathlib.Path(gcs_folder, ConfigService.OUTPUT_SUBTITLES_FILE) + ), + ) + logging.info( + 'TRANSCRIPTION - %s written successfully!', + ConfigService.OUTPUT_SUBTITLES_FILE, + ) + video_language = max( + language_probability_dict, key=language_probability_dict.get + ) + with open( + f'{output_dir}/{ConfigService.OUTPUT_LANGUAGE_FILE}', 'w', encoding='utf8' + ) as f: + f.write(video_language) + logging.info( + 'LANGUAGE - %s written successfully with language: %s!', + ConfigService.OUTPUT_LANGUAGE_FILE, + video_language, + ) + + transcription_dataframe = transcription_dataframes[0] + if len(transcription_dataframes) > 1: + logging.info( + 'THREADING - Combining %d transcribe_audio outputs...', + len(transcription_dataframes), + ) + transcription_dataframe = AudioService.combine_analysis_chunks( + transcription_dataframes + ) + logging.info( + 'TRANSCRIPTION - Full transcription dataframe: %r', + transcription_dataframe.to_json(orient='records') + ) + + if len(vocals_files) > 1: + logging.info( + 'THREADING - Combining %d split_audio vocals files...', + len(vocals_files), + ) + vocals_file_path = str( + pathlib.Path(output_dir, ConfigService.OUTPUT_SPEECH_FILE) + ) + AudioService.combine_audio_files(vocals_file_path, vocals_files) + logging.info('AUDIO - vocals_file_path: %s', vocals_file_path) + if len(music_files) > 1: + logging.info( + 'THREADING - Combining %d split_audio music files...', + len(music_files), + ) + music_file_path = str( + pathlib.Path(output_dir, ConfigService.OUTPUT_MUSIC_FILE) + ) + AudioService.combine_audio_files(music_file_path, music_files) + logging.info('AUDIO - music_file_path: %s', music_file_path) + + return transcription_dataframe + + +def _process_video_without_audio( + output_dir: str, + gcs_folder: str, + gcs_bucket_name: str, +): + """Skips audio analysis.""" + subtitles_filepath = str( + pathlib.Path(output_dir, ConfigService.OUTPUT_SUBTITLES_FILE) + ) + with open(subtitles_filepath, 'w', encoding='utf8'): + pass + StorageService.upload_gcs_file( + file_path=subtitles_filepath, + bucket_name=gcs_bucket_name, + destination_file_name=str( + pathlib.Path(gcs_folder, ConfigService.OUTPUT_SUBTITLES_FILE) + ), + ) + logging.info( + 'TRANSCRIPTION - Empty %s written successfully!', + ConfigService.OUTPUT_SUBTITLES_FILE, + ) + + +def _analyse_audio( + root_dir: str, + output_dir: str, + index: int, + audio_file_path: str, +) -> Tuple[str, str, str, str, float]: + """Runs audio analysis in parallel.""" + vocals_file_path = None + music_file_path = None + transcription_dataframe = None + + with concurrent.futures.ProcessPoolExecutor(max_workers=2) as process_executor: + futures_dict = { + process_executor.submit( + AudioService.transcribe_audio, + output_dir=output_dir, + audio_file_path=audio_file_path, + ): 'transcribe_audio', + process_executor.submit( + AudioService.split_audio, + output_dir=output_dir, + audio_file_path=audio_file_path, + prefix='' if root_dir == output_dir else f'{index}_', + ): 'split_audio', + } + + for future in concurrent.futures.as_completed(futures_dict): + source = futures_dict[future] + match source: + case 'transcribe_audio': + transcription_dataframe, language, probability = future.result() + logging.info( + 'THREADING - transcribe_audio finished for chunk#%d!', + index, + ) + logging.info( + 'TRANSCRIPTION - Transcription dataframe for chunk#%d: %r', + index, + transcription_dataframe.to_json(orient='records'), + ) + case 'split_audio': + vocals_file_path, music_file_path = future.result() + logging.info('THREADING - split_audio finished for chunk#%d!', index) + + return ( + vocals_file_path, + music_file_path, + transcription_dataframe, + language, + probability, + ) + + +def _get_audio_chunks( + output_dir: str, + audio_file_path: str, + gcs_folder: str, + gcs_bucket_name: str, + duration_limit: int = ConfigService.CONFIG_MAX_AUDIO_CHUNK_SIZE, +) -> Sequence[str]: + """Cuts the input audio into smaller chunks by duration.""" + _, file_ext = os.path.splitext(audio_file_path) + output_folder = str( + pathlib.Path(output_dir, ConfigService.OUTPUT_ANALYSIS_CHUNKS_DIR) + ) + os.makedirs(output_folder, exist_ok=True) + + duration = _get_media_duration(audio_file_path) + current_duration = 0 + file_count = 0 + gcs_path = str( + pathlib.Path(gcs_folder, ConfigService.OUTPUT_ANALYSIS_CHUNKS_DIR) + ) + result = [] + + if duration > duration_limit: + while current_duration < duration: + file_count += 1 + output_file_path = str( + pathlib.Path(output_folder, f'{file_count}{file_ext}') + ) + Utils.execute_subprocess_commands( + cmds=[ + 'ffmpeg', + '-ss', + str(current_duration), + '-i', + audio_file_path, + '-to', + str(duration_limit), + '-c', + 'copy', + output_file_path, + ], + description=( + f'Cut input audio into {duration_limit/60}min chunks. ' + f'Chunk #{file_count}.' + ), + ) + os.chmod(output_file_path, 777) + gcs_file_path = str(pathlib.Path(gcs_path, f'{file_count}{file_ext}')) + StorageService.upload_gcs_file( + file_path=output_file_path, + bucket_name=gcs_bucket_name, + destination_file_name=gcs_file_path, + ) + new_duration = _get_media_duration(output_file_path) + current_duration += new_duration + result.append(output_file_path) + else: + result.append(audio_file_path) + + return result + + +def _get_media_duration(input_file_path: str) -> float: + """Retrieves the duration of the input media file.""" + output = Utils.execute_subprocess_commands( + cmds=[ + 'ffprobe', + '-i', + input_file_path, + '-show_entries', + 'format=duration', + '-v', + 'quiet', + '-of', + 'default=noprint_wrappers=1:nokey=1', + ], + description=f'get duration of [{input_file_path}] with ffprobe', + ) + return float(output) + + class Extractor: """Encapsulates all the extraction logic.""" @@ -60,29 +507,43 @@ def extract(self): """Extracts all the available data from the input video.""" logging.info('EXTRACTOR - Starting extraction...') tmp_dir = tempfile.mkdtemp() - video_file_path = StorageService.download_gcs_file( + input_video_file_path = StorageService.download_gcs_file( file_path=self.video_file, output_dir=tmp_dir, bucket_name=self.gcs_bucket_name, ) - audio_file_path = AudioService.extract_audio(video_file_path) - transcription_dataframe = pd.DataFrame() + input_audio_file_path = AudioService.extract_audio(input_video_file_path) annotation_results = None + transcription_dataframe = pd.DataFrame() - if audio_file_path and self.video_file.video_metadata.analyse_audio: - ( - transcription_dataframe, - annotation_results, - vocals_file_path, - music_file_path, - ) = self.process_video_with_audio( - tmp_dir, - audio_file_path, - ) - logging.info('AUDIO - vocals_file_path: %s', vocals_file_path) - logging.info('AUDIO - music_file_path: %s', music_file_path) - else: - annotation_results = self.process_video_without_audio(tmp_dir) + with concurrent.futures.ProcessPoolExecutor() as process_executor: + futures_dict = { + process_executor.submit( + _process_audio, + output_dir=tmp_dir, + input_audio_file_path=input_audio_file_path, + gcs_folder=self.video_file.gcs_folder, + gcs_bucket_name=self.gcs_bucket_name, + analyse_audio=self.video_file.video_metadata.analyse_audio, + ): 'process_audio', + process_executor.submit( + _process_video, + output_dir=tmp_dir, + input_video_file_path=input_video_file_path, + gcs_folder=self.video_file.gcs_folder, + gcs_bucket_name=self.gcs_bucket_name, + ): 'process_video', + } + + for future in concurrent.futures.as_completed(futures_dict): + source = futures_dict[future] + match source: + case 'process_audio': + logging.info('THREADING - process_audio finished!') + transcription_dataframe = future.result() + case 'process_video': + logging.info('THREADING - process_video finished!') + annotation_results = future.result() optimised_av_segments = _create_optimised_segments( annotation_results, @@ -95,7 +556,7 @@ def extract(self): optimised_av_segments = self.cut_and_annotate_av_segments( tmp_dir, - video_file_path, + input_video_file_path, optimised_av_segments, ) logging.info( @@ -113,87 +574,6 @@ def extract(self): ) logging.info('EXTRACTOR - Extraction completed successfully!') - def process_video_without_audio(self, tmp_dir): - """Runs video analysis only.""" - subtitles_filepath = str( - pathlib.Path(tmp_dir, ConfigService.OUTPUT_SUBTITLES_FILE) - ) - with open(subtitles_filepath, 'w', encoding='utf8'): - pass - StorageService.upload_gcs_file( - file_path=subtitles_filepath, - bucket_name=self.gcs_bucket_name, - destination_file_name=str( - pathlib.Path( - self.video_file.gcs_folder, ConfigService.OUTPUT_SUBTITLES_FILE - ) - ), - ) - logging.info( - 'TRANSCRIPTION - Empty %s written successfully!', - ConfigService.OUTPUT_SUBTITLES_FILE, - ) - annotation_results = VideoService.analyse_video( - video_file=self.video_file, - bucket_name=self.gcs_bucket_name, - ) - logging.info('VIDEO_ANALYSIS - Completed successfully!') - return annotation_results - - def process_video_with_audio(self, tmp_dir: str, audio_file_path: str): - """Runs video and audio analyses in parallel.""" - transcription_dataframe = None - annotation_results = None - vocals_file_path = None - music_file_path = None - with concurrent.futures.ProcessPoolExecutor() as process_executor: - futures_dict = { - process_executor.submit( - AudioService.transcribe_audio, - output_dir=tmp_dir, - audio_file_path=audio_file_path, - ): 'transcribe_audio', - process_executor.submit( - VideoService.analyse_video, - video_file=self.video_file, - bucket_name=self.gcs_bucket_name, - ): 'analyse_video', - process_executor.submit( - AudioService.split_audio, - output_dir=tmp_dir, - audio_file_path=audio_file_path, - ): 'split_audio', - } - - for future in concurrent.futures.as_completed(futures_dict): - source = futures_dict[future] - match source: - case 'transcribe_audio': - transcription_dataframe = future.result() - logging.info('THREADING - transcribe_audio finished!') - logging.info( - 'TRANSCRIPTION - Transcription dataframe: %r', - transcription_dataframe.to_json(orient='records') - ) - StorageService.upload_gcs_dir( - source_directory=tmp_dir, - bucket_name=self.gcs_bucket_name, - target_dir=self.video_file.gcs_folder, - ) - case 'analyse_video': - annotation_results = future.result() - logging.info('THREADING - analyse_video finished!') - case 'split_audio': - vocals_file_path, music_file_path = future.result() - logging.info('THREADING - split_audio finished!') - - return ( - transcription_dataframe, - annotation_results, - vocals_file_path, - music_file_path, - ) - def cut_and_annotate_av_segments( self, tmp_dir: str, @@ -244,10 +624,14 @@ def cut_and_annotate_av_segments( description, keyword = response.result() descriptions[index] = description keywords[index] = keyword - resources_base_path = ( - f'{ConfigService.GCS_BASE_URL}/{self.gcs_bucket_name}/' - f'{parse.quote(self.video_file.gcs_folder)}/' - f'{ConfigService.OUTPUT_AV_SEGMENTS_DIR}/{index+1}' + resources_base_path = str( + pathlib.Path( + ConfigService.GCS_BASE_URL, + self.gcs_bucket_name, + parse.quote(self.video_file.gcs_folder), + ConfigService.OUTPUT_AV_SEGMENTS_DIR, + str(index + 1), + ) ) cut_paths[index] = f'{resources_base_path}.{self.video_file.file_ext}' screenshot_paths[index] = ( @@ -352,8 +736,7 @@ def _cut_and_annotate_av_segment( safety_settings=ConfigService.CONFIG_DEFAULT_SAFETY_CONFIG, ) if ( - response.candidates - and response.candidates[0].content.parts + response.candidates and response.candidates[0].content.parts and response.candidates[0].content.parts[0].text ): text = response.candidates[0].content.parts[0].text @@ -457,16 +840,12 @@ def _create_optimised_av_segments( silent_short_shot = ( not audio_segment_ids and visual_segment['duration_s'] <= 1 ) - continued_shot = set(audio_segment_ids).intersection( - current_audio_segment_ids - ) + continued_shot = set(audio_segment_ids + ).intersection(current_audio_segment_ids) if ( - continued_shot - or not current_visual_segments - or ( - silent_short_shot - and not current_audio_segment_ids + continued_shot or not current_visual_segments or ( + silent_short_shot and not current_audio_segment_ids and is_last_shot_short ) ): @@ -564,7 +943,12 @@ def _annotate_segments( text.append(_get_entities(text_dataframe, av_segment_id, return_key='text')) optimised_av_segments = optimised_av_segments.assign( - **{'labels': labels, 'objects': objects, 'logos': logo, 'text': text} + **{ + 'labels': labels, + 'objects': objects, + 'logos': logo, + 'text': text + } ) return optimised_av_segments @@ -610,12 +994,9 @@ def _get_entities( """ temp = data.loc[[(search_value in labels) for labels in data[search_key]]] entities = ( - temp[ - temp[confidence_key] - > ConfigService.CONFIG_ANNOTATIONS_CONFIDENCE_THRESHOLD - ] - .sort_values(by=confidence_key, ascending=False)[return_key] - .to_list() + temp[temp[confidence_key] > + ConfigService.CONFIG_ANNOTATIONS_CONFIDENCE_THRESHOLD + ].sort_values(by=confidence_key, ascending=False)[return_key].to_list() ) return list(set(entities)) diff --git a/service/video/video.py b/service/video/video.py index 0c388ee..667bc11 100644 --- a/service/video/video.py +++ b/service/video/video.py @@ -17,23 +17,258 @@ This module contains functions to interact with the Video AI API. """ -from typing import Sequence +import json +import os +import pathlib +import re +from typing import Any, Dict, Sequence, Tuple import config as ConfigService import pandas as pd -import utils as Utils from google.cloud import videointelligence +def combine_analysis_chunks( + analysis_chunks: Sequence[videointelligence.VideoAnnotationResults] +) -> Tuple[Dict[str, Any], videointelligence.VideoAnnotationResults]: + """Combines video analysis chunks into a single response.""" + concatenated_shots = [] + concatenated_texts = [] + concatenated_objects = [] + concatenated_faces = [] + concatenated_logos = [] + concatenated_segment_labels = [] + concatenated_shot_labels = [] + concatenated_frame_labels = [] + output = videointelligence.AnnotateVideoResponse() + output_result = videointelligence.VideoAnnotationResults() + cumulative_seconds = 0 + + for index, analysis_result in enumerate(analysis_chunks): + if not index: + segment_end = analysis_result.segment.end_time_offset + output_result.input_uri = re.sub( + fr'{ConfigService.OUTPUT_ANALYSIS_CHUNKS_DIR}/\d+', + ConfigService.INPUT_FILENAME, + analysis_result.input_uri, + ) + + shots = analysis_result.shot_annotations + texts = analysis_result.text_annotations + objects = analysis_result.object_annotations + faces = analysis_result.face_detection_annotations + logos = analysis_result.logo_recognition_annotations + segment_labels = analysis_result.segment_label_annotations + shot_labels = analysis_result.shot_label_annotations + frame_labels = analysis_result.frame_label_annotations + + if index: + for shot in shots: + set_offset('start_time_offset', shot, segment_end, cumulative_seconds) + set_offset('end_time_offset', shot, segment_end, cumulative_seconds) + for segment in [s for t in texts for s in t.segments]: + element = segment.segment + set_offset( + 'start_time_offset', + element, + segment_end, + cumulative_seconds, + ) + set_offset('end_time_offset', element, segment_end, cumulative_seconds) + for obj in objects: + element = obj.segment + set_offset( + 'start_time_offset', + element, + segment_end, + cumulative_seconds, + ) + set_offset('end_time_offset', element, segment_end, cumulative_seconds) + for frame in obj.frames: + set_offset( + 'time_offset', + frame, + segment_end, + cumulative_seconds, + ) + for face in [track for f in faces for track in f.tracks]: + element = face.segment + set_offset( + 'start_time_offset', + element, + segment_end, + cumulative_seconds, + ) + set_offset('end_time_offset', element, segment_end, cumulative_seconds) + for timestamped_object in face.timestamped_objects: + set_offset( + 'time_offset', + timestamped_object, + segment_end, + cumulative_seconds, + ) + for logo in [track for l in logos for track in l.tracks]: + element = logo.segment + set_offset( + 'start_time_offset', + element, + segment_end, + cumulative_seconds, + ) + set_offset('end_time_offset', element, segment_end, cumulative_seconds) + for timestamped_object in logo.timestamped_objects: + set_offset( + 'time_offset', + timestamped_object, + segment_end, + cumulative_seconds, + ) + for segment in [s for l in segment_labels for s in l.segments]: + element = segment.segment + set_offset( + 'start_time_offset', + element, + segment_end, + cumulative_seconds, + ) + set_offset('end_time_offset', element, segment_end, cumulative_seconds) + for segment in [s for l in shot_labels for s in l.segments]: + element = segment.segment + set_offset( + 'start_time_offset', + element, + segment_end, + cumulative_seconds, + ) + set_offset('end_time_offset', element, segment_end, cumulative_seconds) + for frame in [f for l in frame_labels for f in l.frames]: + set_offset( + 'time_offset', + frame, + segment_end, + cumulative_seconds, + ) + + segment_end = shots[-1].end_time_offset + + cumulative_seconds = segment_end.seconds or 0 + concatenated_shots.extend(shots) + concatenated_texts.extend(texts) + concatenated_objects.extend(objects) + concatenated_faces.extend(faces) + concatenated_logos.extend(logos) + concatenated_segment_labels.extend(segment_labels) + concatenated_shot_labels.extend(shot_labels) + concatenated_frame_labels.extend(frame_labels) + + output_result.shot_annotations = concatenated_shots + output_result.text_annotations = concatenated_texts + output_result.object_annotations = concatenated_objects + output_result.face_detection_annotations = concatenated_faces + output_result.logo_recognition_annotations = concatenated_logos + output_result.segment_label_annotations = concatenated_segment_labels + output_result.shot_label_annotations = concatenated_shot_labels + output_result.frame_label_annotations = concatenated_frame_labels + output_result.segment.end_time_offset = concatenated_shots[-1].end_time_offset + + output.annotation_results.append(output_result) + result = videointelligence.AnnotateVideoResponse(output) + result_json_camelcase = type(output).to_json(output) + result_json = convert_keys(json.loads(result_json_camelcase)) + return result_json, result.annotation_results[0] + + +def set_offset( + key: str, + element: Dict[str, Dict[str, int]], + segment_end: Dict[str, int], + cumulative_seconds: int, +): + """Adjusts the time offset for a video analysis shot.""" + time_element = getattr(element, key) + element_nanos = getattr(time_element, 'nanos', 0) + element_seconds = getattr(time_element, 'seconds', 0) + + segment_nanos = getattr(segment_end, 'nanos', 0) + segment_seconds = getattr(segment_end, 'seconds', cumulative_seconds) + + nanos = (element_nanos+segment_nanos) / 1e9 + additional_offset_seconds = 1 if nanos > 1 else 0 + nanos = int((nanos-additional_offset_seconds) * 1e9) + new_time_element = { + 'seconds': element_seconds + additional_offset_seconds + segment_seconds, + 'nanos': nanos, + } + + setattr(element, key, new_time_element) + + +def convert_keys(d): + """Recursively converts dict keys from camelCase to snake_case.""" + new_d = {} + for k, v in d.items(): + if isinstance(v, list): + v = [convert_keys(inner_v) for inner_v in v] + elif isinstance(v, dict): + v = convert_keys(v) + + snake_k = camel_to_snake(k) + if snake_k.endswith('time_offset'): + new_d[snake_k] = {} + seconds = int(v[:v.index('.')]) if '.' in v else int(v[:-1]) + nanos = int(round((float(v[:-1]) - seconds) * 1e9)) if '.' in v else 0 + if seconds: + new_d[snake_k]['seconds'] = seconds + if nanos: + new_d[snake_k]['nanos'] = nanos + else: + new_d[snake_k] = v + return new_d + + +def camel_to_snake(s): + """Converts a string from camelCase to snake_case.""" + return ''.join(['_' + c.lower() if c.isupper() else c for c in s]).lstrip('_') + + def analyse_video( - video_file: Utils.TriggerFile, + video_file_path: str, + bucket_name: str, + gcs_folder: str, +) -> videointelligence.VideoAnnotationResults: + """Runs video analysis via the Video AI API and returns the results.""" + file_path, file_ext = os.path.splitext(video_file_path) + file_name = pathlib.Path(file_path).name + is_chunk = ConfigService.OUTPUT_ANALYSIS_CHUNKS_DIR in file_path + gcs_path = str( + pathlib.Path(gcs_folder, ConfigService.OUTPUT_ANALYSIS_CHUNKS_DIR) + ) if is_chunk else gcs_folder + gcs_file_path = str(pathlib.Path(gcs_path, f'{file_name}{file_ext}')) + + return _run_video_intelligence( + bucket_name=bucket_name, + gcs_input_path=gcs_file_path, + gcs_output_path=str( + pathlib.Path( + gcs_path, + f'{file_name}.json' + if is_chunk else ConfigService.OUTPUT_ANALYSIS_FILE, + ) + ), + ) + + +def _run_video_intelligence( bucket_name: str, -) -> videointelligence.AnnotateVideoResponse: + gcs_input_path: str, + gcs_output_path: str, +) -> videointelligence.VideoAnnotationResults: """Runs video analysis via the Video AI API and returns the results. Args: - video_file: The video file to be analysed. bucket_name: The GCS bucket name where the video is stored. + gcs_input_path: The path to the input video file in GCS. + gcs_output_path: The path to the output analysis file in GCS. Returns: The annotation results from the Video AI API. @@ -44,7 +279,6 @@ def analyse_video( videointelligence.Feature.OBJECT_TRACKING, videointelligence.Feature.SHOT_CHANGE_DETECTION, videointelligence.Feature.FACE_DETECTION, - videointelligence.Feature.PERSON_DETECTION, videointelligence.Feature.LOGO_RECOGNITION, videointelligence.Feature.TEXT_DETECTION, ] @@ -55,26 +289,17 @@ def analyse_video( face_config = videointelligence.FaceDetectionConfig( include_bounding_boxes=True, include_attributes=True ) - person_config = videointelligence.PersonDetectionConfig( - include_bounding_boxes=True, - include_attributes=True, - include_pose_landmarks=True, - ) context = videointelligence.VideoContext( label_detection_config=label_config, face_detection_config=face_config, - person_detection_config=person_config, ) operation = video_client.annotate_video( request={ 'features': features, - 'input_uri': f'gs://{bucket_name}/{video_file.full_gcs_path}', - 'output_uri': ( - f'gs://{bucket_name}/{video_file.gcs_folder}/' - f'{ConfigService.OUTPUT_ANALYSIS_FILE}' - ), + 'input_uri': f'gs://{bucket_name}/{gcs_input_path}', + 'output_uri': f'gs://{bucket_name}/{gcs_output_path}', 'video_context': context, } ) @@ -84,7 +309,7 @@ def analyse_video( def get_visual_shots_data( - annotation_results: videointelligence.AnnotateVideoResponse, + annotation_results: videointelligence.VideoAnnotationResults, transcription_dataframe: pd.DataFrame, audio_segment_id_key: str = 'audio_segment_id', ) -> pd.DataFrame: @@ -114,7 +339,10 @@ def get_visual_shots_data( shots_data.append(( i + 1, _identify_segments( - start_time, end_time, transcription_dataframe, audio_segment_id_key + start_time, + end_time, + transcription_dataframe, + audio_segment_id_key, ), start_time, end_time, @@ -137,7 +365,7 @@ def get_visual_shots_data( def get_shot_labels_data( - annotation_results: videointelligence.AnnotateVideoResponse, + annotation_results: videointelligence.VideoAnnotationResults, optimised_av_segments: pd.DataFrame, av_segment_id_key: str = 'av_segment_id', ) -> pd.DataFrame: @@ -167,7 +395,10 @@ def get_shot_labels_data( labels_data.append(( shot_label.entity.description, _identify_segments( - start_time, end_time, optimised_av_segments, av_segment_id_key + start_time, + end_time, + optimised_av_segments, + av_segment_id_key, ), start_time, end_time, @@ -192,7 +423,7 @@ def get_shot_labels_data( def get_object_tracking_data( - annotation_results: videointelligence.AnnotateVideoResponse, + annotation_results: videointelligence.VideoAnnotationResults, optimised_av_segments: pd.DataFrame, av_segment_id_key: str = 'av_segment_id', ) -> pd.DataFrame: @@ -256,7 +487,7 @@ def get_object_tracking_data( def get_logo_detection_data( - annotation_results: videointelligence.AnnotateVideoResponse, + annotation_results: videointelligence.VideoAnnotationResults, optimised_av_segments: pd.DataFrame, av_segment_id_key: str = 'av_segment_id', ) -> pd.DataFrame: @@ -351,7 +582,7 @@ def get_logo_detection_data( def get_text_detection_data( - annotation_results: videointelligence.AnnotateVideoResponse, + annotation_results: videointelligence.VideoAnnotationResults, optimised_av_segments: pd.DataFrame, av_segment_id_key: str = 'av_segment_id', ) -> pd.DataFrame: @@ -408,7 +639,9 @@ def get_text_detection_data( 'box_vertices', ], ) - text_detection_dataframe = text_detection_dataframe.sort_values(by=['start_s']) + text_detection_dataframe = ( + text_detection_dataframe.sort_values(by=['start_s']) + ) return text_detection_dataframe