Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented audio extraction, adding audio streams, displaying audio stream #4

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
74 changes: 42 additions & 32 deletions src/ilabs_streamsync/example_script.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,44 @@
from __future__ import annotations

import mne
from streamsync import StreamSync, extract_audio_from_video

if __name__ == "__main__":
# load an MNE raw file
raw = "/Users/user/VideoSync_NonSubject/sinclair_alexis_audiosync_240110_raw.fif"
channel = "STI011"
cams = ["/Users/user/VideoSync_NonSubject/sinclair_alexis_audiosync_240110_CAM3.mp4"]
output_dir = "/Users/user/VideoSync_NonSubject/output"
flux1 = None
my_events = []

for cam in cams:
extract_audio_from_video(cam, output_dir, overwrite=False) #This could potentially return filenames to avoid the hardcoding seen below.
ss = StreamSync(raw, channel)

ss.add_stream("/Users/user/VideoSync_NonSubject/output/sinclair_alexis_audiosync_240110_CAM3_16bit.wav", channel=1)
ss.plot_sync_pulses(tmin=0.5,tmax=50)

# subjects = ["146a", "222b"]

# for subj in subjects:
# construct the filename/path
# load the Raw
# figure out where video files are & load them
# extract_audio_from_video(cam1)

# ss = StreamSync(raw, "STIM001")
# ss.add_stream(audio1)
# ss.add_camera_events(my_events)
# ss.add_stream(flux1)
# result = ss.do_syncing()
# fig = ss.plot_sync()
# annot = ss.add_camera_events(my_events)
# raw.set_annotations(annot)
# fig.savefig(...)
# if result < 0.7:
# write_log_msg(f"subj {subj} had bad pulse syncing, aborting")
# continue

from ilabs_streamsync import StreamSync, extract_audio_from_video

# load an MNE raw file
raw = None
cam1 = None
flux1 = None
my_events = []


subjects = ["146a", "222b"]

for subj in subjects:
# construct the filename/path
# load the Raw
# figure out where video files are & load them
audio1 = extract_audio_from_video(cam1)

ss = StreamSync(raw, "STIM001")
ss.add_stream(audio1)
ss.add_camera_events(my_events)
ss.add_stream(flux1)
result = ss.do_syncing()
fig = ss.plot_sync()
annot = ss.add_camera_events(my_events)
raw.set_annotations(annot)
fig.savefig(...)
if result < 0.7:
write_log_msg(f"subj {subj} had bad pulse syncing, aborting")
continue

# apply maxfilter
# do ICA
# apply maxfilter
# do ICA
23 changes: 23 additions & 0 deletions src/ilabs_streamsync/streamdata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

class StreamData:
"""
Store information about stream of data.
"""
def __init__(self, filename, sample_rate, pulses, data):
"""
Initialize object with associated properties.

filename: str
Path to the file with stream data
sample_rate: int
Sampling rate of the data
pulses: np.array
Numpy array representing the pulses.
data: np.array
NumPy array representing all streams of data.
"""
self.filename = filename
self.sample_rate = sample_rate
self.pulses = pulses
self.data = data
154 changes: 138 additions & 16 deletions src/ilabs_streamsync/streamsync.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@

from __future__ import annotations

import logging
import os
import pathlib
import subprocess

import matplotlib.pyplot as plt
import mne
import numpy as np
from scipy.io.wavfile import read as wavread
from streamdata import StreamData

FFMPEG_TIMEOUT_SEC = 50

class StreamSync:
"""Synchronize two data streams.

Expand All @@ -9,39 +25,145 @@ class StreamSync:
"""

def __init__(self, reference_object, pulse_channel):
self.ref_stream = reference_object.get_chan(pulse_channel)
self.sfreq = reference_object.info["sfreq"] # Hz
self.streams = []
"""Initialize StreamSync object with 'Raw' MEG associated with it.

reference_object: str TODO: is str the best method for this, or should this be pathlib obj?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I thought reference_object should be an object in memory, not a path to a file (whether str or Path). I still lean that way, on the assumption that the user is very likely to also be at least a bit familiar with MNE-Python (and thus know how to load a Raw file).

But I guess there's a case to be made that if the add_stream method takes a file path, then maybe the StreamSync constructor should also take in a file path. After reflection I'd say let's support both. The way to annotate that is mne.io.Raw | path-like, and we write the code so that Raw and str and Path will all work:

if isinstance(reference_obj, str):
    reference_obj = Path(reference_obj)
if isinstance(reference_obj, Path):
    reference_obj = mne.io.read_raw(reference_obj, ...)
# from now on we can safely assume it's a Raw object

File path to an MEG raw file with fif formatting. TODO: Verify fif only?
pulse_channel: str
A string associated with the stim channel name.
"""
# Check provided reference_object for type and existence.
if not reference_object:
raise TypeError("reference_object is None. Please provide a path.")
if type(reference_object) is not str:
raise TypeError("reference_object must be a file path of type str.")
ref_path_obj = pathlib.Path(reference_object)
if not ref_path_obj.exists():
raise OSError("reference_object file path does not exist.")
if not ref_path_obj.suffix == ".fif":
raise ValueError("Provided reference object does not point to a .fif file.")
Comment on lines +35 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is probably overkill. MNE-Python will already provide informative error messages if the path doesn't point to a valid Raw file, or if the thing that is passed as a filename isn't in fact path-like.


# Load in raw file if valid
raw = mne.io.read_raw_fif(reference_object, preload=False, allow_maxshield=True)

#Check type and value of pulse_channel, and ensure reference object has such a channel.
if not pulse_channel:
raise TypeError("pulse_channel is None. Please provide a channel name of type str.")
if type(pulse_channel) is not str:
raise TypeError("pulse_chanel parameter must be of type str.")
if raw[pulse_channel] is None:
raise ValueError('pulse_channel does not exist in refrence_object.')
Comment on lines +49 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several points of feedback here:

  1. MNE-Python lets you pick channels by index (integer) or by name (string); in principle we don't need to restrict users to just strings.
  2. Even if we did want to restrict to strings, the preferred way to type check would be if not isinstance(pulse_channel, str) rather than if type(pulse_channel) is not str
  3. all of these failure modes will be handled by MNE-Python already if the user passes an invalid channel selector. It should be enough to just do self.ref_stream = reference_object.get_data(picks=[pulse_channel]). If you want you could try/except that line, and provide a nicer error message than what MNE-Python provides when the channel is not found (if you think it would help the user substantially)



self.raw = mne.io.read_raw_fif(reference_object, preload=False, allow_maxshield=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you are loading in (a second time) the same file you already loaded into the variable raw. Assuming that's a mistake?

setting that aside: what is the motivation for keeping a reference to the Raw object as part of the StreamSync object? I think all we need is sfreq and a numpy array of the pulse channel data (or am I forgetting something)?

self.ref_stream = raw[pulse_channel]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raw[pulse_channel] returns a tuple of two arrays: (data, times). do we need the times? If not we can do raw.get_data(picks=[pulse_channel])


self.sfreq = self.raw.info["sfreq"] # Hz

self.streams = [] # list of StreamData objects

def add_stream(self, stream, channel=None, events=None):
"""Add a new ``Raw`` or video stream, optionally with events.

stream : Raw | wav
An audio or FIF stream.
stream : str
File path to an audio or FIF stream.
channel : str | int | None
Which channel of `stream` contains the sync pulse sequence.
events : array-like | None
Events associated with the stream. TODO: should they be integer sample
numbers? Timestamps? Do we support both?
Comment on lines 73 to 74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as the input format for events, I don't really know what researchers/annotators are going to want to do. We have seen cases where the data was in the form of timestamps, probably something like HH:MM:SS.123456. So unless @NeuroLaunch has opinions about what (other) format(s) we should target, I'd say start with parsing HH:MM:SS.123456-formatted data, and we can expand to other formats later.

As far as the output format of events: MNE-Python has 2 ways of representing events (event arrays, and Annotations objects). We should decide which one (or both?) we want to use when converting/syncing camera timestamps to the Raw file's time domain. @NeuroLaunch do you have an opinion here? @ashtondoane are you familiar with the two kinds of MNE event representations?

If I had to put a stake in the ground I'd probably say "use Annotations" but I haven't thought very hard about it yet... maybe implement that first, and if we find that we need to also implement event array support, we can add that later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with the MNE representations, I will have to read the documentation. I'll begin with annotations as @NeuroLaunch also mentioned this as a possibility and we can adjust later if necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear to me that this has actually been addressed, as nothing is done with events in the code; unresolving.

"""
pulses = self._extract_pulse_sequence_from_stream(stream, channel=channel)
self.streams.append(pulses)
self.streams.append(self._extract_data_from_stream(stream, channel=channel))

def _extract_pulse_sequence_from_stream(self, stream, channel):
# TODO triage based on input type (e.g., if it's a Raw, pull out a stim chan,
# if it's audio, just add it as-is)
def _extract_data_from_stream(self, stream, channel):
"""Extract pulses and raw data from stream provided. TODO: Implement adding a annotation stream."""
ext = pathlib.Path(stream).suffix
if ext == ".wav":
return self._extract_data_from_wav(stream, channel)
raise TypeError("Stream provided was of unsupported format. Please provide a wav file.")


def _extract_data_from_wav(self, stream, channel):
"""Return tuple of (pulse channel, audio channel) from stereo file."""
srate, wav_signal = wavread(stream)
return StreamData(filename = stream, sample_rate=srate, pulses=wav_signal[:,channel], data=wav_signal[:,1-channel])

def remove_stream(self, stream):
pass

def do_syncing(self):
"""Synchronize all streams with the reference stream."""
# TODO (waves hands) do the hard part.
# TODO spit out a report of correlation/association between all pairs of streams
pass

def plot_sync(self):
pass
def plot_sync_pulses(self, tmin=0, tmax=None):
"""Plot each stream in the class.

tmin: int
Minimum timestamp to be graphed.
tmax: int
Maximum timestamp to be graphed.
"""
fig, axset = plt.subplots(len(self.streams)+1, 1, figsize = [8,6]) #show individual channels seperately, and the 0th plot is the combination of these.
# Plot reference_object
trig, tt_trig = self.ref_stream
trig = trig.reshape(tt_trig.shape)
idx = np.where((tt_trig>=tmin) & (tt_trig<tmax))
axset[0].plot(tt_trig[idx], trig[idx]*100, c='r')
axset[0].set_title("Reference MEG")
# Plot all other streams
for i, stream in enumerate(self.streams):
npts = len(stream.pulses)
tt = np.arange(npts) / stream.sample_rate
idx = np.where((tt>=tmin) & (tt<tmax))
axset[i+1].plot(tt[idx], stream.pulses[idx].T)
axset[i+1].set_title(pathlib.Path(stream.filename).name)
# Make label equal to simply the cam number
plt.show()

def extract_audio_from_video(path_to_video, output_dir, overwrite=False):
"""Extract audio from path provided.

path_to_video: str
Path to audio file
TODO allow path_to_video to take regex?
output_dir: str
Path to directory where extracted audio should be sent

Effects:
Creates output directory if non-existent. For each video found, creates
a file with the associated audio labeled the same way.

Raises:
ValueException if video path does not exist,
Exception if filename is taken in output_dir
"""
p = pathlib.Path(path_to_video)
audio_file = p.with_stem(f"{p.stem}_16_bit").with_suffix(".wav").name
if not p.exists():
raise ValueError('Path provided cannot be found.')
if not overwrite and pathlib.PurePath.joinpath(pathlib.Path(output_dir), pathlib.Path(audio_file)).exists():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not overwrite and pathlib.PurePath.joinpath(pathlib.Path(output_dir), pathlib.Path(audio_file)).exists():
if not overwrite and (pathlib.Path(output_dir) / audio_file).exists():

raise Exception(f"Audio already exists for {path_to_video} in output directory.")

# Create output directory is non-existent.
od = pathlib.Path(output_dir)
od.mkdir(exist_ok=True, parents=True)
output_path = output_dir + "/" + audio_file

command = ['ffmpeg',
'-acodec', 'pcm_s24le', # force little-endian format (req'd for Linux)
'-i', path_to_video,
'-map', '0:a', # audio only (per DM)
# '-af', 'highpass=f=0.1',
'-acodec', 'pcm_s16le',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-acodec is being passed twice, with different values. Is that intended?

'-ac', '2', # no longer mono output, so setting to "2"
'-y', '-vn', # overwrite output file without asking; no video
'-loglevel', 'error',
output_path]
pipe = subprocess.run(command, timeout=FFMPEG_TIMEOUT_SEC, check=False)

def extract_audio_from_video(path_to_video, channel):
"""Path can be a regex or glob to allow batch processing."""
pass
logger = logging.getLogger(__name__)
if pipe.returncode==0:
logger.info(f'Audio extraction was successful for {path_to_video}')
else:
logger.info(f"Audio extraction unsuccessful for {path_to_video}")
Loading