Skip to content

Commit

Permalink
Merge pull request #1 from DCMLab/development
Browse files Browse the repository at this point in the history
RC
  • Loading branch information
Johannes Hentschel authored Jun 20, 2022
2 parents 1f8592f + 0cc5156 commit b488f87
Show file tree
Hide file tree
Showing 15 changed files with 1,135 additions and 201 deletions.
17 changes: 15 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,20 @@
Changelog
=========

Version 0.1
===========
Version 0.2.0
=============

* new slicers: NoteSlicer, LocalKeySlicer
* new groupers: PieceGrouper, CorpusGrouper, YearGrouper, ModeGrouper
* new filter: IsAnnotatedFilter
* new writer: TSVWriter
* installs system-wide command ``dimcat pcvs``
* consistent naming of DataFrame MultiIndex levels
* more consistent interface, more abstractions


Version 0.1.0
=============

* new analyzers: TPCrange, PitchClassVectors, ChordSymbolUnigrams, ChordSymbolBigrams
* installs system-wide commands ``dimcat unigrams`` and ``dimcat bigrams`` for creating TSV files
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ install_requires =
python_version>="3.8"
seaborn
matplotlib
ms3>=0.5.2
ms3>=0.5.3
pandas>=1.3.0
numpy
pre-commit
Expand Down
6 changes: 5 additions & 1 deletion src/dimcat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@
TPCrange,
)
from .data import Corpus
from .writer import TSVwriter
from .filter import IsAnnotatedFilter
from .grouper import CorpusGrouper, ModeGrouper, PieceGrouper, YearGrouper
from .pipeline import Pipeline
from .slicer import LocalKeySlicer, NoteSlicer
from .writer import TSVWriter
150 changes: 87 additions & 63 deletions src/dimcat/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,97 +9,102 @@
from .utils import grams


def dict_of_series_result_to_dataframe(result, short_ids=False):
key = list(result.keys())[0]
if len(result) == 1 and not isinstance(key[0], str):
df = pd.DataFrame(result[key]).T
df.index = [key]
name = "fnames" if short_ids else "IDs"
df.index.rename(name, inplace=True)
else:
df = pd.concat(result.values(), keys=result.keys()).unstack()
nlevels = df.index.nlevels
level_names = (
["fname", "interval"] if short_ids else ["corpus", "fname", "interval"]
)
if nlevels == 1:
df.index.rename(level_names[0], inplace=True)
else:
df.index.rename(level_names[:nlevels], inplace=True)
return df


def dict_of_series_result_to_series(result, short_ids=False):
df = dict_of_series_result_to_dataframe(result, short_ids=short_ids)
return df.stack()
class Analyzer(PipelineStep, ABC):
"""Analyzers are PipelineSteps that process data and store the results in Data.processed."""


class FacetAnalyzer(PipelineStep, ABC):
class FacetAnalyzer(Analyzer):
"""Analyzers that work on one particular type of DataFrames."""

def __init__(self, concat_groups=False):
def __init__(self, once_per_group=False):
"""
Parameters
----------
concat_groups : :obj:`bool`
once_per_group : :obj:`bool`
By default, computes one result per group.
Set to False to instead compute one result per group item.
"""
self.required_facets = []
self.concat_groups = concat_groups
self.once_per_group = once_per_group
self.config = {}
""":obj:`dict`
This dictionary stores the parameters to be passed to the compute() method."""
self.group2pandas = None
""":obj:`str`
The name of the function that allows displaying one group's results as a single
pandas object. See data.Corpus.convert_group2pandas()"""
self.level_names = {"indices": "IDs"} if once_per_group else {}
""":obj:`dict`
Define {"indices": "index_level_name"} if the analysis is applied once per group,
because the index of the DataFrame holding the processed data won't be showing the
individual indices anymore.
"""

@abstractmethod
def compute(self, df):
"""Where the actual computation takes place."""

def process_data(self, data: Data) -> Data:
"""Returns a copy of the Data object containing processed data."""
processed = {}
for group, dfs in data.iter_facet(
self.required_facets[0], concatenate=self.concat_groups
self.required_facets[0], concatenate=self.once_per_group
):
processed[group] = {
ID: self.compute(df, **self.config) for ID, df in dfs.items()
}
processed_group = {}
for ID, df in dfs.items():
key = "group_ids" if self.once_per_group else ID
eligible, message = self.check(df)
if not eligible:
print(f"{ID}: {message}")
continue
processed_group[key] = self.compute(df, **self.config)
if len(processed_group) == 0:
print(f"Group '{group}' will be missing from the processed data.")
continue
processed[group] = processed_group
result = data.copy()
result.load_processed(processed)
result.track_pipeline(self, group2pandas=self.group2pandas, **self.level_names)
result.processed = processed
return result


class NotesAnalyzer(FacetAnalyzer):
def __init__(self, concat_groups=False):
def __init__(self, once_per_group=False):
"""Analyzers that work on notes tables.
Parameters
----------
concat_groups : :obj:`bool`
once_per_group : :obj:`bool`
By default, computes one result per group.
Set to False to instead compute one result per group item.
"""
super().__init__(once_per_group=once_per_group)
self.required_facets = ["notes"]
self.concat_groups = concat_groups
self.config = {}


class ChordSymbolAnalyzer(FacetAnalyzer):
def __init__(self, concat_groups=False):
def __init__(self, once_per_group=False):
"""Analyzers that work on expanded annotation tables.
Parameters
----------
concat_groups : :obj:`bool`
once_per_group : :obj:`bool`
By default, computes one result per group.
Set to False to instead compute one result per group item.
"""
super().__init__(once_per_group=once_per_group)
self.required_facets = ["expanded"]
self.concat_groups = concat_groups
self.config = {}


class TPCrange(NotesAnalyzer):
"""Computes the range from the minimum to the maximum Tonal Pitch Class (TPC)."""

def __init__(self, once_per_group=False):
super().__init__(once_per_group=once_per_group)
self.level_names["processed"] = "tpc_range"
self.group2pandas = "group_of_values2series"

@staticmethod
def compute(df):
"""Computes the range from the minimum to the maximum Tonal Pitch Class (TPC).
Expand All @@ -119,12 +124,18 @@ def compute(df):
class PitchClassVectors(NotesAnalyzer):
"""Analyzer that groups notes by their pitch class and aggregates their durations."""

def __init__(self, concat_groups=False, pitch_class_format="tpc", normalize=False):
def __init__(
self,
once_per_group=False,
pitch_class_format="tpc",
normalize=False,
ensure_pitch_classes=None,
):
"""Analyzer that groups notes by their pitch class and aggregates their durations.
Parameters
----------
concat_groups : :obj:`bool`
once_per_group : :obj:`bool`
By default, computes one result per group.
Set to False to instead compute one result per group item.
pitch_class_format : :obj:`str`, optional
Expand All @@ -136,17 +147,25 @@ def __init__(self, concat_groups=False, pitch_class_format="tpc", normalize=Fals
normalize : :obj:`bool`, optional
By default, the PCVs contain absolute durations in quarter notes. Pass True to normalize
the PCV for each slice.
ensure_pitch_classes : :obj:`Iterable`, optional
By default, pitch classes that don't appear don't appear. Pass a collection of pitch
classes if you want to ensure their presence even if empty. For example, if
``pitch_class_format='pc'`` you could pass ``ensure_columns=range(12)``.
"""
super().__init__(concat_groups=concat_groups)
self.config = dict(pitch_class_format=pitch_class_format, normalize=normalize)
super().__init__(once_per_group=once_per_group)
self.config = dict(
pitch_class_format=pitch_class_format,
normalize=normalize,
ensure_pitch_classes=ensure_pitch_classes,
)
self.level_names["processed"] = pitch_class_format
self.group2pandas = "group2dataframe_unstacked"

@staticmethod
def compute(
notes,
index_levels=None,
pitch_class_format="tpc",
normalize=False,
fillna=True,
ensure_pitch_classes=None,
):
"""Group notes by their pitch class and aggregate their durations.
Expand Down Expand Up @@ -200,36 +219,41 @@ def compute(
pcvs = pd.concat([pcvs, new_values]).sort_index()
return pcvs

def process_data(self, data: Data) -> Data:
result = super().process_data(data)
result._result_to_pandas = dict_of_series_result_to_dataframe
return result


class ChordSymbolUnigrams(ChordSymbolAnalyzer):
def __init__(self, once_per_group=False):
super().__init__(once_per_group=once_per_group)
self.level_names["processed"] = "chord"

@staticmethod
def compute(df):
if len(df) == 0:
return pd.Series()
return df.chord.value_counts()

def process_data(self, data: Data) -> Data:
result = super().process_data(data)
result._result_to_pandas = dict_of_series_result_to_series
return result
return df.chord.value_counts().rename("count")


class ChordSymbolBigrams(ChordSymbolAnalyzer):
def __init__(self, once_per_group=False):
super().__init__(once_per_group=once_per_group)
self.level_names["processed"] = ["from", "to"]
self.group2pandas = "group_of_series2series"

def check(self, df):
if df.shape[0] < 2:
return False, "DataFrame has only one row, cannot compute bigram."
return True, ""

@staticmethod
def compute(df):
if len(df) == 0:
return pd.Series()
bigrams = grams(df.chord.values, n=2)
df = pd.DataFrame(bigrams)
counts = df.groupby([0, 1]).size().sort_values(ascending=False)
try:
counts = (
df.groupby([0, 1]).size().sort_values(ascending=False).rename("count")
)
except KeyError:
print(df)
raise
return counts

def process_data(self, data: Data) -> Data:
result = super().process_data(data)
result._result_to_pandas = dict_of_series_result_to_series
return result
Loading

0 comments on commit b488f87

Please sign in to comment.