From 2a4c190db239b4c5061ca60362900f477a48d53a Mon Sep 17 00:00:00 2001 From: Michael Ekstrand Date: Sat, 11 Nov 2023 15:23:37 -0500 Subject: [PATCH] reformat --- lenskit/__init__.py | 4 +- lenskit/algorithms/__init__.py | 22 +-- lenskit/algorithms/als.py | 144 ++++++++++------- lenskit/algorithms/basic.py | 56 +++---- lenskit/algorithms/bias.py | 61 ++++---- lenskit/algorithms/funksvd.py | 103 +++++++------ lenskit/algorithms/item_knn.py | 245 +++++++++++++++++------------ lenskit/algorithms/mf_common.py | 4 +- lenskit/algorithms/ranking.py | 20 +-- lenskit/algorithms/svd.py | 21 +-- lenskit/algorithms/user_knn.py | 74 ++++----- lenskit/batch/_predict.py | 36 +++-- lenskit/batch/_recommend.py | 26 ++-- lenskit/batch/_train.py | 4 +- lenskit/crossfold.py | 78 ++++++---- lenskit/data/matrix.py | 26 ++-- lenskit/datasets/__init__.py | 2 +- lenskit/datasets/fetch.py | 48 +++--- lenskit/datasets/movielens.py | 264 +++++++++++++++++++------------- lenskit/math/solve.py | 44 ++++-- lenskit/metrics/predict.py | 22 +-- lenskit/metrics/topn.py | 173 ++++++++++----------- lenskit/sharing/__init__.py | 22 +-- lenskit/sharing/binpickle.py | 28 ++-- lenskit/sharing/shm.py | 32 ++-- lenskit/topn.py | 80 +++++----- lenskit/util/__init__.py | 35 +++-- lenskit/util/accum.py | 8 +- lenskit/util/debug.py | 62 ++++---- lenskit/util/log.py | 14 +- lenskit/util/parallel.py | 50 +++--- lenskit/util/random.py | 18 ++- lenskit/util/test.py | 13 +- lenskit/util/timing.py | 3 +- 34 files changed, 1026 insertions(+), 816 deletions(-) diff --git a/lenskit/__init__.py b/lenskit/__init__.py index 9eedb9d0d..19c34f19c 100644 --- a/lenskit/__init__.py +++ b/lenskit/__init__.py @@ -5,13 +5,14 @@ from lenskit.algorithms import * # noqa: F401,F403 -__version__ = '0.15.0' +__version__ = "0.15.0" class DataWarning(UserWarning): """ Warning raised for detectable problems with input data. """ + pass @@ -19,4 +20,5 @@ class ConfigWarning(UserWarning): """ Warning raised for detectable problems with algorithm configurations. """ + pass diff --git a/lenskit/algorithms/__init__.py b/lenskit/algorithms/__init__.py index dbec96cff..5c57cbb66 100644 --- a/lenskit/algorithms/__init__.py +++ b/lenskit/algorithms/__init__.py @@ -10,7 +10,7 @@ from abc import ABCMeta, abstractmethod import inspect -__all__ = ['Algorithm', 'Recommender', 'Predictor', 'CandidateSelector'] +__all__ = ["Algorithm", "Recommender", "Predictor", "CandidateSelector"] class Algorithm(metaclass=ABCMeta): @@ -68,10 +68,10 @@ def get_params(self, deep=True): if hasattr(self, name) and name not in self.IGNORED_PARAMS: value = getattr(self, name) params[name] = value - if deep and hasattr(value, 'get_params'): + if deep and hasattr(value, "get_params"): sps = value.get_params(deep) for k, sv in sps.items(): - params[name + '__' + k] = sv + params[name + "__" + k] = sv return params @@ -101,16 +101,16 @@ def predict(self, pairs, ratings=None): raise NotImplementedError() def upred(df): - user, = df['user'].unique() - items = df['item'] + (user,) = df["user"].unique() + items = df["item"] preds = self.predict_for_user(user, items) - preds.name = 'prediction' - res = df.join(preds, on='item', how='left') + preds.name = "prediction" + res = df.join(preds, on="item", how="left") return res.prediction - res = pairs.loc[:, ['user', 'item']].groupby('user', sort=False).apply(upred) - res.reset_index(level='user', inplace=True, drop=True) - res.name = 'prediction' + res = pairs.loc[:, ["user", "item"]].groupby("user", sort=False).apply(upred) + res.reset_index(level="user", inplace=True, drop=True) + res.name = "prediction" return res.loc[pairs.index.values] @abstractmethod @@ -173,6 +173,7 @@ def adapt(cls, algo): algo(Predictor): the underlying rating predictor. """ from .basic import TopN + if isinstance(algo, Recommender): return algo else: @@ -212,6 +213,7 @@ def rated_items(ratings): """ import pandas as pd import numpy as np + if isinstance(ratings, pd.Series): return ratings.index.values elif isinstance(ratings, np.ndarray): diff --git a/lenskit/algorithms/als.py b/lenskit/algorithms/als.py index c418ab0f7..156dc41a3 100644 --- a/lenskit/algorithms/als.py +++ b/lenskit/algorithms/als.py @@ -15,10 +15,7 @@ _logger = logging.getLogger(__name__) -PartialModel = namedtuple('PartialModel', [ - 'users', 'items', - 'user_matrix', 'item_matrix' -]) +PartialModel = namedtuple("PartialModel", ["users", "items", "user_matrix", "item_matrix"]) @njit @@ -343,10 +340,22 @@ class BiasedMF(MFPredictor): Random number generator or state (see :func:`lenskit.util.random.rng`). progress: a :func:`tqdm.tqdm`-compatible progress bar function """ + timer = None - def __init__(self, features, *, iterations=20, reg=0.1, damping=5, bias=True, method='cd', - rng_spec=None, progress=None, save_user_features=True): + def __init__( + self, + features, + *, + iterations=20, + reg=0.1, + damping=5, + bias=True, + method="cd", + rng_spec=None, + progress=None, + save_user_features=True, + ): self.features = features self.iterations = iterations self.regularization = reg @@ -377,12 +386,18 @@ def fit(self, ratings, **kwargs): pass # we just need to do the iterations if self.user_features_ is not None: - _logger.info('trained model in %s (|P|=%f, |Q|=%f)', self.timer, - np.linalg.norm(self.user_features_, 'fro'), - np.linalg.norm(self.item_features_, 'fro')) + _logger.info( + "trained model in %s (|P|=%f, |Q|=%f)", + self.timer, + np.linalg.norm(self.user_features_, "fro"), + np.linalg.norm(self.item_features_, "fro"), + ) else: - _logger.info('trained model in %s (|Q|=%f)', self.timer, - np.linalg.norm(self.item_features_, 'fro')) + _logger.info( + "trained model in %s (|Q|=%f)", + self.timer, + np.linalg.norm(self.item_features_, "fro"), + ) del self.timer return self @@ -399,13 +414,14 @@ def fit_iters(self, ratings, **kwargs): """ if self.bias: - _logger.info('[%s] fitting bias model', self.timer) + _logger.info("[%s] fitting bias model", self.timer) self.bias.fit(ratings) current, uctx, ictx = self._initial_model(ratings) - _logger.info('[%s] training biased MF model with ALS for %d features', - self.timer, self.features) + _logger.info( + "[%s] training biased MF model with ALS for %d features", self.timer, self.features + ) for epoch, model in enumerate(self._train_iters(current, uctx, ictx)): self._save_params(model) yield self @@ -423,7 +439,7 @@ def _save_params(self, model): def _initial_model(self, ratings): # transform ratings using offsets if self.bias: - _logger.info('[%s] normalizing ratings', self.timer) + _logger.info("[%s] normalizing ratings", self.timer) ratings = self.bias.transform(ratings) "Initialize a model and build contexts." @@ -431,17 +447,17 @@ def _initial_model(self, ratings): n_users = len(users) n_items = len(items) - _logger.debug('setting up contexts') + _logger.debug("setting up contexts") trmat = rmat.transpose() - _logger.debug('initializing item matrix') + _logger.debug("initializing item matrix") imat = self.rng.standard_normal((n_items, self.features)) imat /= np.linalg.norm(imat, axis=1).reshape((n_items, 1)) - _logger.debug('|Q|: %f', np.linalg.norm(imat, 'fro')) - _logger.debug('initializing user matrix') + _logger.debug("|Q|: %f", np.linalg.norm(imat, "fro")) + _logger.debug("initializing user matrix") umat = self.rng.standard_normal((n_users, self.features)) umat /= np.linalg.norm(umat, axis=1).reshape((n_users, 1)) - _logger.debug('|P|: %f', np.linalg.norm(umat, 'fro')) + _logger.debug("|P|: %f", np.linalg.norm(umat, "fro")) return PartialModel(users, items, umat, imat), rmat, trmat @@ -461,24 +477,24 @@ def _train_iters(self, current, uctx, ictx): assert ictx.nrows == n_items assert ictx.ncols == n_users - if self.method == 'cd': + if self.method == "cd": train = _train_matrix_cd - elif self.method == 'lu': + elif self.method == "lu": train = _train_matrix_lu else: - raise ValueError('invalid training method ' + self.method) + raise ValueError("invalid training method " + self.method) if isinstance(self.regularization, tuple): ureg, ireg = self.regularization else: ureg = ireg = self.regularization - for epoch in self.progress(range(self.iterations), desc='BiasedMF', leave=False): + for epoch in self.progress(range(self.iterations), desc="BiasedMF", leave=False): du = train(uctx, current.user_matrix, current.item_matrix, ureg) - _logger.debug('[%s] finished user epoch %d', self.timer, epoch) + _logger.debug("[%s] finished user epoch %d", self.timer, epoch) di = train(ictx, current.item_matrix, current.user_matrix, ireg) - _logger.debug('[%s] finished item epoch %d', self.timer, epoch) - _logger.info('[%s] finished epoch %d (|ΔP|=%.3f, |ΔQ|=%.3f)', self.timer, epoch, du, di) + _logger.debug("[%s] finished item epoch %d", self.timer, epoch) + _logger.info("[%s] finished epoch %d (|ΔP|=%.3f, |ΔQ|=%.3f)", self.timer, epoch, du, di) yield current def predict_for_user(self, user, items, ratings=None): @@ -513,8 +529,9 @@ def predict_for_user(self, user, items, ratings=None): return scores def __str__(self): - return 'als.BiasedMF(features={}, regularization={})'.\ - format(self.features, self.regularization) + return "als.BiasedMF(features={}, regularization={})".format( + self.features, self.regularization + ) class ImplicitMF(MFPredictor): @@ -561,10 +578,22 @@ class ImplicitMF(MFPredictor): Random number generator or state (see :func:`lenskit.util.random.rng`). progress: a :func:`tqdm.tqdm`-compatible progress bar function """ + timer = None - def __init__(self, features, *, iterations=20, reg=0.1, weight=40, use_ratings=False, - method='cg', rng_spec=None, progress=None, save_user_features=True): + def __init__( + self, + features, + *, + iterations=20, + reg=0.1, + weight=40, + use_ratings=False, + method="cg", + rng_spec=None, + progress=None, + save_user_features=True, + ): self.features = features self.iterations = iterations self.reg = reg @@ -582,14 +611,20 @@ def fit(self, ratings, **kwargs): pass if self.user_features_ is not None: - _logger.info('[%s] finished training model with %d features (|P|=%f, |Q|=%f)', - self.timer, self.features, - np.linalg.norm(self.user_features_, 'fro'), - np.linalg.norm(self.item_features_, 'fro')) + _logger.info( + "[%s] finished training model with %d features (|P|=%f, |Q|=%f)", + self.timer, + self.features, + np.linalg.norm(self.user_features_, "fro"), + np.linalg.norm(self.item_features_, "fro"), + ) else: - _logger.info('[%s] finished training model with %d features (|Q|=%f)', - self.timer, self.features, - np.linalg.norm(self.item_features_, 'fro')) + _logger.info( + "[%s] finished training model with %d features (|Q|=%f)", + self.timer, + self.features, + np.linalg.norm(self.item_features_, "fro"), + ) # unpack the regularization if isinstance(self.reg, tuple): @@ -605,10 +640,12 @@ def fit(self, ratings, **kwargs): def fit_iters(self, ratings, **kwargs): current, uctx, ictx = self._initial_model(ratings) - _logger.info('[%s] training implicit MF model with ALS for %d features', - self.timer, self.features) - _logger.info('have %d observations for %d users and %d items', - uctx.nnz, uctx.nrows, ictx.nrows) + _logger.info( + "[%s] training implicit MF model with ALS for %d features", self.timer, self.features + ) + _logger.info( + "have %d observations for %d users and %d items", uctx.nnz, uctx.nrows, ictx.nrows + ) for model in self._train_iters(current, uctx, ictx): self._save_model(model) yield self @@ -624,37 +661,37 @@ def _save_model(self, model): def _train_iters(self, current, uctx, ictx): "Generator of training iterations." - if self.method == 'lu': + if self.method == "lu": train = _train_implicit_lu - elif self.method == 'cg': + elif self.method == "cg": train = _train_implicit_cg else: - raise ValueError('unknown solver ' + self.method) + raise ValueError("unknown solver " + self.method) if isinstance(self.reg, tuple): ureg, ireg = self.reg else: ureg = ireg = self.reg - for epoch in self.progress(range(self.iterations), desc='ImplicitMF', leave=False): + for epoch in self.progress(range(self.iterations), desc="ImplicitMF", leave=False): du = train(uctx, current.user_matrix, current.item_matrix, ureg) - _logger.debug('[%s] finished user epoch %d', self.timer, epoch) + _logger.debug("[%s] finished user epoch %d", self.timer, epoch) di = train(ictx, current.item_matrix, current.user_matrix, ireg) - _logger.debug('[%s] finished item epoch %d', self.timer, epoch) - _logger.info('[%s] finished epoch %d (|ΔP|=%.3f, |ΔQ|=%.3f)', self.timer, epoch, du, di) + _logger.debug("[%s] finished item epoch %d", self.timer, epoch) + _logger.info("[%s] finished epoch %d (|ΔP|=%.3f, |ΔQ|=%.3f)", self.timer, epoch, du, di) yield current def _initial_model(self, ratings): "Initialize a model and build contexts." if not self.use_ratings: - ratings = ratings[['user', 'item']] + ratings = ratings[["user", "item"]] rmat, users, items = sparse_ratings(ratings) n_users = len(users) n_items = len(items) - _logger.debug('setting up contexts') + _logger.debug("setting up contexts") # force values to exist if rmat.values is None: rmat.values = np.ones(rmat.nnz) @@ -685,5 +722,6 @@ def predict_for_user(self, user, items, ratings=None): return self.score_by_ids(user, items) def __str__(self): - return 'als.ImplicitMF(features={}, reg={}, w={})'.\ - format(self.features, self.reg, self.weight) + return "als.ImplicitMF(features={}, reg={}, w={})".format( + self.features, self.reg, self.weight + ) diff --git a/lenskit/algorithms/basic.py b/lenskit/algorithms/basic.py index d76ceeec0..0e7147e4a 100644 --- a/lenskit/algorithms/basic.py +++ b/lenskit/algorithms/basic.py @@ -38,9 +38,9 @@ def __init__(self, selector=None): self.selector = selector def fit(self, ratings, **kwargs): - pop = ratings.groupby('item').user.count() - pop.name = 'score' - self.item_pop_ = pop.astype('float64') + pop = ratings.groupby("item").user.count() + pop.name = "score" + self.item_pop_ = pop.astype("float64") if self.selector is None: self.selector = UnratedItemCandidateSelector() @@ -63,7 +63,7 @@ def recommend(self, user, n=None, candidates=None, ratings=None): return scores.nlargest(n).reset_index() def __str__(self): - return 'Popular' + return "Popular" class PopScore(Predictor): @@ -84,26 +84,26 @@ class PopScore(Predictor): Item popularity scores. """ - def __init__(self, score_method='quantile'): + def __init__(self, score_method="quantile"): self.score_method = score_method def fit(self, ratings, **kwargs): - _logger.info('counting item popularity') - scores = ratings['item'].value_counts() - if self.score_method == 'rank': - _logger.info('ranking %d items', len(scores)) + _logger.info("counting item popularity") + scores = ratings["item"].value_counts() + if self.score_method == "rank": + _logger.info("ranking %d items", len(scores)) scores = scores.rank().sort_index() - elif self.score_method == 'quantile': - _logger.info('computing quantiles for %d items', len(scores)) + elif self.score_method == "quantile": + _logger.info("computing quantiles for %d items", len(scores)) cmass = scores.sort_values() cmass = cmass.cumsum() cdens = cmass / scores.sum() scores = cdens.sort_index() - elif self.score_method == 'count': - _logger.info('scoring items with their rating counts') + elif self.score_method == "count": + _logger.info("scoring items with their rating counts") scores = scores.sort_index() else: - raise ValueError('invalid scoring method ' + repr(self.score_method)) + raise ValueError("invalid scoring method " + repr(self.score_method)) self.item_scores_ = scores @@ -113,7 +113,7 @@ def predict_for_user(self, user, items, ratings=None): return self.item_scores_.reindex(items) def __str__(self): - return 'PopScore({})'.format(self.score_method) + return "PopScore({})".format(self.score_method) class Memorized(Predictor): @@ -134,7 +134,7 @@ def fit(self, *args, **kwargs): def predict_for_user(self, user, items, ratings=None): uscores = self.scores[self.scores.user == user] - urates = uscores.set_index('item').rating + urates = uscores.set_index("item").rating return urates.reindex(items) @@ -170,7 +170,7 @@ def predict_for_user(self, user, items, ratings=None): preds = None for algo in self.algorithms: - _logger.debug('predicting for %d items for user %s', len(remaining), user) + _logger.debug("predicting for %d items for user %s", len(remaining), user) aps = algo.predict_for_user(user, remaining, ratings=ratings) aps = aps[aps.notna()] if preds is None: @@ -185,7 +185,7 @@ def predict_for_user(self, user, items, ratings=None): def __str__(self): str_algos = [str(algo) for algo in self.algorithms] - return 'Fallback([{}])'.format(', '.join(str_algos)) + return "Fallback([{}])".format(", ".join(str_algos)) class EmptyCandidateSelector(CandidateSelector): @@ -196,7 +196,7 @@ class EmptyCandidateSelector(CandidateSelector): dtype_ = np.int64 def fit(self, ratings, **kwarsg): - self.dtype_ = ratings['item'].dtype + self.dtype_ = ratings["item"].dtype def candidates(self, user, ratings=None): return np.array([], dtype=self.dtype_) @@ -213,14 +213,15 @@ class UnratedItemCandidateSelector(CandidateSelector): user_items_(CSR): Items rated by each known user, as positions in the ``items`` index. """ + items_ = None users_ = None user_items_ = None def fit(self, ratings, **kwargs): - r2 = ratings[['user', 'item']] + r2 = ratings[["user", "item"]] sparse = sparse_ratings(r2) - _logger.info('trained unrated candidate selector for %d ratings', sparse.matrix.nnz) + _logger.info("trained unrated candidate selector for %d ratings", sparse.matrix.nnz) self.items_ = sparse.items self.users_ = sparse.users self.user_items_ = sparse.matrix @@ -255,10 +256,11 @@ class AllItemsCandidateSelector(CandidateSelector): Attributes: items_(numpy.ndarray): All known items. """ + items_ = None def fit(self, ratings, **kwargs): - self.items_ = ratings['item'].unique() + self.items_ = ratings["item"].unique() return self def candidates(self, user, ratings=None): @@ -290,7 +292,7 @@ def __init__(self, selector=None, rng_spec=None): def fit(self, ratings, **kwargs): self.selector.fit(ratings, **kwargs) - items = pd.DataFrame(ratings['item'].unique(), columns=['item']) + items = pd.DataFrame(ratings["item"].unique(), columns=["item"]) self.items = items return self @@ -301,12 +303,12 @@ def recommend(self, user, n=None, candidates=None, ratings=None): n = len(candidates) rng = self.rng_source(user) - c_df = pd.DataFrame(candidates, columns=['item']) + c_df = pd.DataFrame(candidates, columns=["item"]) recs = c_df.sample(n, random_state=rng) return recs.reset_index(drop=True) def __str__(self): - return 'Random' + return "Random" class KnownRating(Predictor): @@ -315,9 +317,9 @@ class KnownRating(Predictor): """ def fit(self, ratings, **kwargs): - self.ratings = ratings.set_index(['user', 'item']).sort_index() + self.ratings = ratings.set_index(["user", "item"]).sort_index() return self def predict_for_user(self, user, items, ratings=None): - uscores = self.ratings.xs(user, level='user', drop_level=True) + uscores = self.ratings.xs(user, level="user", drop_level=True) return uscores.rating.reindex(items) diff --git a/lenskit/algorithms/bias.py b/lenskit/algorithms/bias.py index 2fa7e7b5f..09233a6a3 100644 --- a/lenskit/algorithms/bias.py +++ b/lenskit/algorithms/bias.py @@ -72,27 +72,27 @@ def fit(self, ratings, **kwargs): Returns: Bias: the fit bias object. """ - _logger.info('building bias model for %d ratings', len(ratings)) + _logger.info("building bias model for %d ratings", len(ratings)) self.mean_ = ratings.rating.mean() - _logger.info('global mean: %.3f', self.mean_) + _logger.info("global mean: %.3f", self.mean_) nrates = ratings.assign(rating=lambda df: df.rating - self.mean_) if self.items: - group = nrates.groupby('item').rating + group = nrates.groupby("item").rating self.item_offsets_ = self._mean(group, self.item_damping) - self.item_offsets_.name = 'i_off' - _logger.info('computed means for %d items', len(self.item_offsets_)) + self.item_offsets_.name = "i_off" + _logger.info("computed means for %d items", len(self.item_offsets_)) else: self.item_offsets_ = None if self.users: if self.item_offsets_ is not None: - nrates = nrates.join(pd.DataFrame(self.item_offsets_), on='item', how='inner') + nrates = nrates.join(pd.DataFrame(self.item_offsets_), on="item", how="inner") nrates = nrates.assign(rating=lambda df: df.rating - df.i_off) - self.user_offsets_ = self._mean(nrates.groupby('user').rating, self.user_damping) - self.user_offsets_.name = 'u_off' - _logger.info('computed means for %d users', len(self.user_offsets_)) + self.user_offsets_ = self._mean(nrates.groupby("user").rating, self.user_damping) + self.user_offsets_.name = "u_off" + _logger.info("computed means for %d users", len(self.user_offsets_)) else: self.user_offsets_ = None @@ -117,38 +117,35 @@ def transform(self, ratings, *, indexes=False): A data frame with ``rating`` transformed by subtracting user-item bias prediction. """ - rvps = ratings[['user', 'item']].copy() - rvps['rating'] = ratings['rating'] - self.mean_ + rvps = ratings[["user", "item"]].copy() + rvps["rating"] = ratings["rating"] - self.mean_ if self.item_offsets_ is not None: - rvps = rvps.join(self.item_offsets_, on='item', how='left') - rvps['rating'] -= rvps['i_off'].fillna(0) - rvps = rvps.drop(columns='i_off') + rvps = rvps.join(self.item_offsets_, on="item", how="left") + rvps["rating"] -= rvps["i_off"].fillna(0) + rvps = rvps.drop(columns="i_off") if self.user_offsets_ is not None: - rvps = rvps.join(self.user_offsets_, on='user', how='left') - rvps['rating'] -= rvps['u_off'].fillna(0) - rvps = rvps.drop(columns='u_off') + rvps = rvps.join(self.user_offsets_, on="user", how="left") + rvps["rating"] -= rvps["u_off"].fillna(0) + rvps = rvps.drop(columns="u_off") if indexes: - rvps['uidx'] = self.user_offsets_.index.get_indexer(rvps['user']) - rvps['iidx'] = self.item_offsets_.index.get_indexer(rvps['item']) + rvps["uidx"] = self.user_offsets_.index.get_indexer(rvps["user"]) + rvps["iidx"] = self.item_offsets_.index.get_indexer(rvps["item"]) return rvps def inverse_transform(self, ratings): """ Transform ratings by removing the bias term. """ - rvps = pd.DataFrame({ - 'user': ratings['user'], - 'item': ratings['item'] - }) - rvps['rating'] = ratings['rating'] + self.mean_ + rvps = pd.DataFrame({"user": ratings["user"], "item": ratings["item"]}) + rvps["rating"] = ratings["rating"] + self.mean_ if self.item_offsets_ is not None: - rvps = rvps.join(self.item_offsets_, on='item', how='left') - rvps['rating'] += rvps['i_off'].fillna(0) - del rvps['i_off'] + rvps = rvps.join(self.item_offsets_, on="item", how="left") + rvps["rating"] += rvps["i_off"].fillna(0) + del rvps["i_off"] if self.user_offsets_ is not None: - rvps = rvps.join(self.user_offsets_, on='user', how='left') - rvps['rating'] += rvps['u_off'].fillna(0) - del rvps['u_off'] + rvps = rvps.join(self.user_offsets_, on="user", how="left") + rvps["rating"] += rvps["u_off"].fillna(0) + del rvps["u_off"] return rvps def transform_user(self, ratings): @@ -236,7 +233,7 @@ def predict_for_user(self, user, items, ratings=None): preds = preds + umean elif self.user_offsets_ is not None: umean = self.user_offsets_.get(user, 0.0) - _logger.debug('using mean(user %s) = %.3f', user, umean) + _logger.debug("using mean(user %s) = %.3f", user, umean) preds = preds + umean return preds @@ -258,4 +255,4 @@ def _mean(self, series, damping): return series.mean() def __str__(self): - return 'Bias(ud={}, id={})'.format(self.user_damping, self.item_damping) + return "Bias(ud={}, id={})".format(self.user_damping, self.item_damping) diff --git a/lenskit/algorithms/funksvd.py b/lenskit/algorithms/funksvd.py index 990453480..0ffd3a493 100644 --- a/lenskit/algorithms/funksvd.py +++ b/lenskit/algorithms/funksvd.py @@ -9,6 +9,7 @@ import numpy as np import numba as n from pandas.core.series import Series + try: from numba.experimental import jitclass except ImportError: @@ -21,16 +22,19 @@ _logger = logging.getLogger(__name__) -@jitclass([ - ('user_features', n.double[:, :]), - ('item_features', n.double[:, :]), - ('feature_count', n.int32), - ('user_count', n.int32), - ('item_count', n.int32), - ('initial_value', n.double) -]) +@jitclass( + [ + ("user_features", n.double[:, :]), + ("item_features", n.double[:, :]), + ("feature_count", n.int32), + ("user_count", n.int32), + ("item_count", n.int32), + ("initial_value", n.double), + ] +) class Model: "Internal model class for training SGD MF." + def __init__(self, umat, imat): self.user_features = umat self.item_features = imat @@ -52,13 +56,15 @@ def _fresh_model(nfeatures, nusers, nitems, init=0.1): return model -@jitclass([ - ('iter_count', n.int32), - ('lrate', n.double), - ('reg_term', n.double), - ('rmin', n.double), - ('rmax', n.double) -]) +@jitclass( + [ + ("iter_count", n.int32), + ("lrate", n.double), + ("reg_term", n.double), + ("rmin", n.double), + ("rmax", n.double), + ] +) class _Params: def __init__(self, niters, lrate, reg, rmin, rmax): self.iter_count = niters @@ -78,11 +84,7 @@ def make_params(niters, lrate, reg, range): return _Params(niters, lrate, reg, rmin, rmax) -@jitclass([ - ('est', n.double[:]), - ('feature', n.int32), - ('trail', n.double) -]) +@jitclass([("est", n.double[:]), ("feature", n.int32), ("trail", n.double)]) class _FeatContext: def __init__(self, est, feature, trail): self.est = est @@ -90,13 +92,15 @@ def __init__(self, est, feature, trail): self.trail = trail -@jitclass([ - ('users', n.int32[:]), - ('items', n.int32[:]), - ('ratings', n.double[:]), - ('bias', n.double[:]), - ('n_samples', n.uint64) -]) +@jitclass( + [ + ("users", n.int32[:]), + ("items", n.int32[:]), + ("ratings", n.double[:]), + ("bias", n.double[:]), + ("n_samples", n.uint64), + ] +) class Context: def __init__(self, users, items, ratings, bias): self.users = users @@ -169,8 +173,7 @@ def train(ctx: Context, params: _Params, model: Model, timer): fc = _FeatContext(est, f, trail) rmse = _train_feature(ctx, params, model, fc) end = time.perf_counter() - _logger.info('[%s] finished feature %d (RMSE=%f) in %.2fs', - timer, f, rmse, end - start) + _logger.info("[%s] finished feature %d (RMSE=%f) in %.2fs", timer, f, rmse, end - start) est = est + model.user_features[ctx.users, f] * model.item_features[ctx.items, f] est = np.maximum(est, params.rmin) @@ -215,8 +218,18 @@ class FunkSVD(MFPredictor): The random state for shuffling the data prior to training. """ - def __init__(self, features, iterations=100, *, lrate=0.001, reg=0.015, - damping=5, range=None, bias=True, random_state=None): + def __init__( + self, + features, + iterations=100, + *, + lrate=0.001, + reg=0.015, + damping=5, + range=None, + bias=True, + random_state=None, + ): self.features = features self.iterations = iterations self.lrate = lrate @@ -240,21 +253,21 @@ def fit(self, ratings, **kwargs): """ util.check_env() timer = util.Stopwatch() - if 'rating' not in ratings: - _logger.warning('no rating column found, assuming rating values of 1.0') + if "rating" not in ratings: + _logger.warning("no rating column found, assuming rating values of 1.0") ratings = ratings.assign(rating=1.0) if self.bias: - _logger.info('[%s] fitting bias model', timer) + _logger.info("[%s] fitting bias model", timer) self.bias.fit(ratings) - _logger.info('[%s] preparing rating data for %d samples', timer, len(ratings)) - _logger.debug('shuffling rating data') + _logger.info("[%s] preparing rating data for %d samples", timer, len(ratings)) + _logger.debug("shuffling rating data") shuf = np.arange(len(ratings), dtype=np.int_) self.random.shuffle(shuf) ratings = ratings.iloc[shuf, :] - _logger.debug('[%s] indexing users and items', timer) + _logger.debug("[%s] indexing users and items", timer) uidx = pd.Index(ratings.user.unique()) iidx = pd.Index(ratings.item.unique()) @@ -263,7 +276,7 @@ def fit(self, ratings, **kwargs): items = iidx.get_indexer(ratings.item).astype(np.int32) assert np.all(items >= 0) - _logger.debug('[%s] computing initial estimates', timer) + _logger.debug("[%s] computing initial estimates", timer) if self.bias: initial = pd.Series(self.bias.mean_, index=ratings.index, dtype=np.float_) ibias, initial = _align_add_bias(self.bias.item_offsets_, iidx, ratings.item, initial) @@ -271,19 +284,18 @@ def fit(self, ratings, **kwargs): else: initial = pd.Series(0.0, index=ratings.index) - _logger.debug('have %d estimates for %d ratings', len(initial), len(ratings)) + _logger.debug("have %d estimates for %d ratings", len(initial), len(ratings)) assert len(initial) == len(ratings) - _logger.debug('[%s] initializing data structures', timer) - context = Context(users, items, ratings.rating.astype(np.float_).values, - initial.values) + _logger.debug("[%s] initializing data structures", timer) + context = Context(users, items, ratings.rating.astype(np.float_).values, initial.values) params = make_params(self.iterations, self.lrate, self.reg, self.range) model = _fresh_model(self.features, len(uidx), len(iidx)) - _logger.info('[%s] training biased MF model with %d features', timer, self.features) + _logger.info("[%s] training biased MF model with %d features", timer, self.features) train(context, params, model, timer) - _logger.info('finished model training in %s', timer) + _logger.info("finished model training in %s", timer) self.user_index_ = uidx self.item_index_ = iidx @@ -307,5 +319,4 @@ def predict_for_user(self, user, items, ratings=None): return preds def __str__(self): - return 'FunkSVD(features={}, reg={})'.\ - format(self.features, self.reg) + return "FunkSVD(features={}, reg={})".format(self.features, self.reg) diff --git a/lenskit/algorithms/item_knn.py b/lenskit/algorithms/item_knn.py index 23bb309ca..bca7616bb 100644 --- a/lenskit/algorithms/item_knn.py +++ b/lenskit/algorithms/item_knn.py @@ -71,8 +71,9 @@ def _trim_sim_block(nitems, bsp, bitems, block, min_sim, max_nbrs): sp, lep = block_csr.row_extent(r) lim = lep - sp if c != bsp + r and v >= min_sim: - eps[r] = kvp_minheap_insert(sp, eps[r], lim, c, v, - block_csr.colinds, block_csr.values) + eps[r] = kvp_minheap_insert( + sp, eps[r], lim, c, v, block_csr.colinds, block_csr.values + ) # we're done! return block_csr @@ -195,10 +196,7 @@ def _predict_sum(model, nitems, nrange, ratings, rated, targets): return scores -_predictors = { - 'weighted-average': _predict_weighted_average, - 'sum': _predict_sum -} +_predictors = {"weighted-average": _predict_weighted_average, "sum": _predict_sum} class ItemItem(Predictor): @@ -256,15 +254,17 @@ class ItemItem(Predictor): user_index_(pandas.Index): the index of known user IDs for the rating matrix. rating_matrix_(matrix.CSR): the user-item rating matrix for looking up users' ratings. """ - IGNORED_PARAMS = ['feedback'] - EXTRA_PARAMS = ['center', 'aggregate', 'use_ratings'] - AGG_SUM = intern('sum') - AGG_WA = intern('weighted-average') + IGNORED_PARAMS = ["feedback"] + EXTRA_PARAMS = ["center", "aggregate", "use_ratings"] + + AGG_SUM = intern("sum") + AGG_WA = intern("weighted-average") RATING_AGGS = [AGG_WA] # the aggregates that use rating values - def __init__(self, nnbrs, min_nbrs=1, min_sim=1.0e-6, save_nbrs=None, feedback='explicit', - **kwargs): + def __init__( + self, nnbrs, min_nbrs=1, min_sim=1.0e-6, save_nbrs=None, feedback="explicit", **kwargs + ): self.nnbrs = nnbrs if self.nnbrs is not None and self.nnbrs < 1: self.nnbrs = -1 @@ -274,42 +274,48 @@ def __init__(self, nnbrs, min_nbrs=1, min_sim=1.0e-6, save_nbrs=None, feedback=' self.min_sim = min_sim self.save_nbrs = save_nbrs - if feedback == 'explicit': - defaults = { - 'center': True, - 'aggregate': self.AGG_WA, - 'use_ratings': True - } - elif feedback == 'implicit': - defaults = { - 'center': False, - 'aggregate': self.AGG_SUM, - 'use_ratings': False - } + if feedback == "explicit": + defaults = {"center": True, "aggregate": self.AGG_WA, "use_ratings": True} + elif feedback == "implicit": + defaults = {"center": False, "aggregate": self.AGG_SUM, "use_ratings": False} else: - raise ValueError(f'invalid feedback mode: {feedback}') + raise ValueError(f"invalid feedback mode: {feedback}") defaults.update(kwargs) - self.center = defaults['center'] - self.aggregate = intern(defaults['aggregate']) - self.use_ratings = defaults['use_ratings'] + self.center = defaults["center"] + self.aggregate = intern(defaults["aggregate"]) + self.use_ratings = defaults["use_ratings"] self._check_setup() def _check_setup(self): if not self.use_ratings: if self.center: - _logger.warning('item-item configured to ignore ratings, but ``center=True`` - likely bug') - warnings.warn(util.clean_str(''' + _logger.warning( + "item-item configured to ignore ratings, but ``center=True`` - likely bug" + ) + warnings.warn( + util.clean_str( + """ item-item configured to ignore ratings, but ``center=True``. This configuration is unlikely to work well. - '''), ConfigWarning) - if self.aggregate == 'weighted-average': - _logger.warning('item-item configured to ignore ratings, but using weighted averages - likely bug') - warnings.warn(util.clean_str(''' + """ + ), + ConfigWarning, + ) + if self.aggregate == "weighted-average": + _logger.warning( + "item-item configured to ignore ratings, but using weighted averages - likely bug" + ) + warnings.warn( + util.clean_str( + """ item-item configured to ignore ratings, but use weighted averages. This configuration is unlikely to work well. - '''), ConfigWarning) + """ + ), + ConfigWarning, + ) def fit(self, ratings, **kwargs): """ @@ -328,29 +334,38 @@ def fit(self, ratings, **kwargs): # 2. Compute similarities with pairwise dot products self._timer = util.Stopwatch() - _logger.debug('[%s] beginning fit, memory use %s', self._timer, util.max_memory()) - _logger.debug('[%s] using CSR kernel %s', self._timer, csrk.name) + _logger.debug("[%s] beginning fit, memory use %s", self._timer, util.max_memory()) + _logger.debug("[%s] using CSR kernel %s", self._timer, csrk.name) init_rmat, users, items = sparse_ratings(ratings) n_items = len(items) - _logger.info('[%s] made sparse matrix for %d items (%d ratings from %d users)', - self._timer, len(items), init_rmat.nnz, len(users)) - _logger.debug('[%s] made matrix, memory use %s', self._timer, util.max_memory()) + _logger.info( + "[%s] made sparse matrix for %d items (%d ratings from %d users)", + self._timer, + len(items), + init_rmat.nnz, + len(users), + ) + _logger.debug("[%s] made matrix, memory use %s", self._timer, util.max_memory()) rmat, item_means = self._mean_center(ratings, init_rmat, items) - _logger.debug('[%s] centered, memory use %s', self._timer, util.max_memory()) + _logger.debug("[%s] centered, memory use %s", self._timer, util.max_memory()) rmat = self._normalize(rmat) - _logger.debug('[%s] normalized, memory use %s', self._timer, util.max_memory()) + _logger.debug("[%s] normalized, memory use %s", self._timer, util.max_memory()) - _logger.info('[%s] computing similarity matrix', self._timer) + _logger.info("[%s] computing similarity matrix", self._timer) smat = self._compute_similarities(rmat) - _logger.debug('[%s] computed, memory use %s', self._timer, util.max_memory()) + _logger.debug("[%s] computed, memory use %s", self._timer, util.max_memory()) - _logger.info('[%s] got neighborhoods for %d of %d items', - self._timer, np.sum(np.diff(smat.rowptrs) > 0), n_items) + _logger.info( + "[%s] got neighborhoods for %d of %d items", + self._timer, + np.sum(np.diff(smat.rowptrs) > 0), + n_items, + ) - _logger.info('[%s] computed %d neighbor pairs', self._timer, smat.nnz) + _logger.info("[%s] computed %d neighbor pairs", self._timer, smat.nnz) self.item_index_ = items self.item_means_ = item_means @@ -360,8 +375,8 @@ def fit(self, ratings, **kwargs): self.rating_matrix_ = init_rmat # create an inverted similarity matrix for efficient scanning self._sim_inv_ = smat.transpose() - _logger.info('[%s] transposed matrix for optimization', self._timer) - _logger.debug('[%s] done, memory use %s', self._timer, util.max_memory()) + _logger.info("[%s] transposed matrix for optimization", self._timer) + _logger.debug("[%s] done, memory use %s", self._timer, util.max_memory()) return self @@ -369,16 +384,17 @@ def _mean_center(self, ratings, rmat, items): if not self.center: return rmat, None - item_means = ratings.groupby('item').rating.mean() + item_means = ratings.groupby("item").rating.mean() item_means = item_means.reindex(items).values mcvals = rmat.values - item_means[rmat.colinds] nmat = rmat.copy(False) nmat.values = mcvals if np.allclose(nmat.values, 0): - _logger.warn('normalized ratings are zero, centering is not recommended') - warnings.warn("Ratings seem to have the same value, centering is not recommended.", - DataWarning) - _logger.info('[%s] computed means for %d items', self._timer, len(item_means)) + _logger.warn("normalized ratings are zero, centering is not recommended") + warnings.warn( + "Ratings seem to have the same value, centering is not recommended.", DataWarning + ) + _logger.info("[%s] computed means for %d items", self._timer, len(item_means)) return nmat, item_means def _normalize(self, rmat): @@ -393,7 +409,7 @@ def _normalize(self, rmat): assert norm_mat.shape[1] == rmat.shape[1] # and reset NaN norm_mat.data[np.isnan(norm_mat.data)] = 0 - _logger.info('[%s] normalized rating matrix columns', self._timer) + _logger.info("[%s] normalized rating matrix columns", self._timer) return CSR.from_scipy(norm_mat, False) def _compute_similarities(self, rmat): @@ -404,11 +420,16 @@ def _compute_similarities(self, rmat): m_nbrs = 0 bounds = _make_blocks(nitems, 1000) - _logger.info('[%s] splitting %d items (%d ratings) into %d blocks', - self._timer, nitems, trmat.nnz, len(bounds)) + _logger.info( + "[%s] splitting %d items (%d ratings) into %d blocks", + self._timer, + nitems, + trmat.nnz, + len(bounds), + ) blocks = [trmat.subset_rows(sp, ep) for (sp, ep) in bounds] - _logger.info('[%s] computing similarities', self._timer) + _logger.info("[%s] computing similarities", self._timer) ptrs = List(bounds) nbs = List(blocks) if not nbs: @@ -420,11 +441,17 @@ def _compute_similarities(self, rmat): nnz = sum(b.nnz for b in s_blocks) tot_rows = sum(b.nrows for b in s_blocks) - _logger.info('[%s] computed %d similarities for %d items in %d blocks', - self._timer, nnz, tot_rows, len(s_blocks)) + _logger.info( + "[%s] computed %d similarities for %d items in %d blocks", + self._timer, + nnz, + tot_rows, + len(s_blocks), + ) row_nnzs = np.concatenate([b.row_nnzs() for b in s_blocks]) - assert len(row_nnzs) == nitems, \ - 'only have {} rows for {} items'.format(len(row_nnzs), nitems) + assert len(row_nnzs) == nitems, "only have {} rows for {} items".format( + len(row_nnzs), nitems + ) smat = CSR.empty(nitems, nitems, row_nnzs) start = 0 @@ -433,29 +460,38 @@ def _compute_similarities(self, rmat): end = start + bnr v_sp = smat.rowptrs[start] v_ep = smat.rowptrs[end] - _logger.debug('block %d (%d:%d) has %d entries, storing in %d:%d', - bi, start, end, b.nnz, v_sp, v_ep) + _logger.debug( + "block %d (%d:%d) has %d entries, storing in %d:%d", + bi, + start, + end, + b.nnz, + v_sp, + v_ep, + ) smat.colinds[v_sp:v_ep] = b.colinds smat.values[v_sp:v_ep] = b.values start = end - _logger.info('[%s] sorting similarity matrix with %d entries', self._timer, smat.nnz) + _logger.info("[%s] sorting similarity matrix with %d entries", self._timer, smat.nnz) _sort_nbrs(smat) return smat def predict_for_user(self, user, items, ratings=None): - _logger.debug('predicting %d items for user %s', len(items), user) + _logger.debug("predicting %d items for user %s", len(items), user) if ratings is None: if user not in self.user_index_: - _logger.debug('user %s missing, returning empty predictions', user) + _logger.debug("user %s missing, returning empty predictions", user) return pd.Series(np.nan, index=items) upos = self.user_index_.get_loc(user) - ratings = pd.Series(self.rating_matrix_.row_vs(upos), - index=pd.Index(self.item_index_[self.rating_matrix_.row_cs(upos)])) + ratings = pd.Series( + self.rating_matrix_.row_vs(upos), + index=pd.Index(self.item_index_[self.rating_matrix_.row_cs(upos)]), + ) if not ratings.index.is_unique: - wmsg = 'user {} has duplicate ratings, this is likely to cause problems'.format(user) + wmsg = "user {} has duplicate ratings, this is likely to cause problems".format(user) warnings.warn(wmsg, DataWarning) # set up rating array @@ -465,7 +501,7 @@ def predict_for_user(self, user, items, ratings=None): m_rates = ratings[ri_pos >= 0] ri_pos = ri_pos[ri_pos >= 0] rate_v = np.full(n_items, np.nan, dtype=np.float_) - rated = np.zeros(n_items, dtype='bool') + rated = np.zeros(n_items, dtype="bool") # mean-center the rating array if self.center: rate_v[ri_pos] = m_rates.values - self.item_means_[ri_pos] @@ -473,7 +509,7 @@ def predict_for_user(self, user, items, ratings=None): rate_v[ri_pos] = m_rates.values rated[ri_pos] = True - _logger.debug('user %s: %d of %d rated items in model', user, len(ri_pos), len(ratings)) + _logger.debug("user %s: %d of %d rated items in model", user, len(ri_pos), len(ratings)) assert np.sum(np.logical_not(np.isnan(rate_v))) == len(ri_pos) assert np.all(np.isnan(rate_v) == np.logical_not(rated)) @@ -481,7 +517,7 @@ def predict_for_user(self, user, items, ratings=None): # ipos will be an array of item indices i_pos = self.item_index_.get_indexer(items) i_pos = i_pos[i_pos >= 0] - _logger.debug('user %s: %d of %d requested items in model', user, len(i_pos), len(items)) + _logger.debug("user %s: %d of %d requested items in model", user, len(i_pos), len(items)) # now we take a first pass through the data to count _viable_ targets # This computes the number of neighbors (and their weight sum) for @@ -493,8 +529,9 @@ def predict_for_user(self, user, items, ratings=None): i_cts = i_cts[viable] i_sums = i_sums[viable] i_nbrs = i_nbrs[viable] - _logger.debug('user %s: %d of %d requested items possibly reachable', - user, len(i_pos), len(items)) + _logger.debug( + "user %s: %d of %d requested items possibly reachable", user, len(i_pos), len(items) + ) # look for some fast paths if self.aggregate == self.AGG_SUM and self.min_sim >= 0: @@ -507,15 +544,21 @@ def predict_for_user(self, user, items, ratings=None): else: fast_items = i_pos fast_scores = i_sums - slow_items = np.array([], dtype='i4') + slow_items = np.array([], dtype="i4") - _logger.debug('user %s: using fast-path similarity sum for %d items', - user, len(fast_items)) + _logger.debug( + "user %s: using fast-path similarity sum for %d items", user, len(fast_items) + ) if len(slow_items): - iscores = _predict_sum(self.sim_matrix_, len(self.item_index_), - (self.min_nbrs, self.nnbrs), - rate_v, rated, slow_items) + iscores = _predict_sum( + self.sim_matrix_, + len(self.item_index_), + (self.min_nbrs, self.nnbrs), + rate_v, + rated, + slow_items, + ) else: iscores = np.full(len(self.item_index_), np.nan) iscores[fast_items] = fast_scores @@ -527,19 +570,30 @@ def predict_for_user(self, user, items, ratings=None): fast_scores = rate_v[i_nbrs[fast_mask]] if self.min_sim < 0: fast_scores *= np.sign(i_sums[fast_mask]) - _logger.debug('user %s: fast-pathed %d scores', user, len(fast_scores)) + _logger.debug("user %s: fast-pathed %d scores", user, len(fast_scores)) slow_items = i_pos[i_cts > 1] - iscores = _predict_weighted_average(self.sim_matrix_, len(self.item_index_), - (self.min_nbrs, self.nnbrs), - rate_v, rated, slow_items) + iscores = _predict_weighted_average( + self.sim_matrix_, + len(self.item_index_), + (self.min_nbrs, self.nnbrs), + rate_v, + rated, + slow_items, + ) iscores[fast_items] = fast_scores else: # now compute the predictions - _logger.debug('user %s: taking the slow path', user) + _logger.debug("user %s: taking the slow path", user) agg = _predictors[self.aggregate] - iscores = agg(self.sim_matrix_, len(self.item_index_), (self.min_nbrs, self.nnbrs), - rate_v, rated, i_pos) + iscores = agg( + self.sim_matrix_, + len(self.item_index_), + (self.min_nbrs, self.nnbrs), + rate_v, + rated, + i_pos, + ) if self.center and self.aggregate in self.RATING_AGGS: iscores += self.item_means_ @@ -547,8 +601,9 @@ def predict_for_user(self, user, items, ratings=None): results = pd.Series(iscores, index=self.item_index_) results = results.reindex(items, fill_value=np.nan) - _logger.debug('user %s: predicted for %d of %d items', - user, results.notna().sum(), len(items)) + _logger.debug( + "user %s: predicted for %d of %d items", user, results.notna().sum(), len(items) + ) return results @@ -557,7 +612,7 @@ def _count_viable_targets(self, targets, rated): # initialize counts to zero counts = np.zeros(len(self.item_index_), dtype=np.int32) sums = np.zeros(len(self.item_index_)) - last_nbrs = np.full(len(self.item_index_), -1, 'i4') + last_nbrs = np.full(len(self.item_index_), -1, "i4") # count the number of times each item is reachable from the neighborhood for ri in rated: nbrs = self._sim_inv_.row_cs(ri) @@ -570,14 +625,14 @@ def _count_viable_targets(self, targets, rated): def __getstate__(self): state = dict(self.__dict__) - if '_sim_inv_' in state and not in_share_context(): - del state['_sim_inv_'] + if "_sim_inv_" in state and not in_share_context(): + del state["_sim_inv_"] return state def __setstate__(self, state): self.__dict__.update(state) - if hasattr(self, 'sim_matrix_') and not hasattr(self, '_sim_inv_'): + if hasattr(self, "sim_matrix_") and not hasattr(self, "_sim_inv_"): self._sim_inv_ = self.sim_matrix_.transpose() def __str__(self): - return 'ItemItem(nnbrs={}, msize={})'.format(self.nnbrs, self.save_nbrs) + return "ItemItem(nnbrs={}, msize={})".format(self.nnbrs, self.save_nbrs) diff --git a/lenskit/algorithms/mf_common.py b/lenskit/algorithms/mf_common.py index 59addf891..d7dcec646 100644 --- a/lenskit/algorithms/mf_common.py +++ b/lenskit/algorithms/mf_common.py @@ -93,7 +93,7 @@ def score_by_ids(self, user, items, u_features=None): if u_features is None: uidx = self.lookup_user(user) if uidx < 0: - _logger.debug('user %s not in model', user) + _logger.debug("user %s not in model", user) return pd.Series(np.nan, index=items) else: uidx = None @@ -106,7 +106,7 @@ def score_by_ids(self, user, items, u_features=None): good_iidx = iidx[good] # multiply - _logger.debug('scoring %d items for user %s', len(good_items), user) + _logger.debug("scoring %d items for user %s", len(good_items), user) rv = self.score(uidx, good_iidx, u_features) res = pd.Series(rv, index=good_items) diff --git a/lenskit/algorithms/ranking.py b/lenskit/algorithms/ranking.py index 91a75b614..7d8b3e515 100644 --- a/lenskit/algorithms/ranking.py +++ b/lenskit/algorithms/ranking.py @@ -40,6 +40,7 @@ class TopN(Recommender, Predictor): def __init__(self, predictor, selector=None): from .basic import UnratedItemCandidateSelector + self.predictor = predictor self.selector = selector if selector is not None else UnratedItemCandidateSelector() @@ -59,8 +60,8 @@ def fit(self, ratings, **kwargs): return self def fit_iters(self, ratings, **kwargs): - if not hasattr(self.predictor, 'fit_iters'): - raise AttributeError('predictor has no method fit_iters') + if not hasattr(self.predictor, "fit_iters"): + raise AttributeError("predictor has no method fit_iters") self.selector.fit(ratings, **kwargs) pred = self.predictor @@ -80,8 +81,8 @@ def recommend(self, user, n=None, candidates=None, ratings=None): scores = scores.nlargest(n) else: scores = scores.sort_values(ascending=False) - scores.name = 'score' - scores.index.name = 'item' + scores.name = "score" + scores.index.name = "item" return scores.reset_index() def predict(self, pairs, ratings=None): @@ -91,7 +92,7 @@ def predict_for_user(self, user, items, ratings=None): return self.predictor.predict_for_user(user, items, ratings) def __str__(self): - return 'TopN/' + str(self.predictor) + return "TopN/" + str(self.predictor) class PlackettLuce(Recommender): @@ -112,10 +113,11 @@ class PlackettLuce(Recommender): def __init__(self, predictor, selector=None, *, rng_spec=None): from .basic import UnratedItemCandidateSelector, Popular + if isinstance(predictor, TopN): - _log.warn('wrapping Top-N in PlackettLuce, candidate selector probably redundant') + _log.warn("wrapping Top-N in PlackettLuce, candidate selector probably redundant") elif isinstance(predictor, Popular): - _log.warn('wrapping Popular in Plackett-Luce, consider PopScore') + _log.warn("wrapping Popular in Plackett-Luce, consider PopScore") self.predictor = predictor self.selector = selector if selector is not None else UnratedItemCandidateSelector() @@ -142,6 +144,6 @@ def recommend(self, user, n=None, candidates=None, ratings=None): scores = scores.nlargest(n) else: scores = scores.sort_values(ascending=False) - scores.name = 'score' - scores.index.name = 'item' + scores.name = "score" + scores.index.name = "item" return scores.reset_index() diff --git a/lenskit/algorithms/svd.py b/lenskit/algorithms/svd.py index 7253ab1a0..26ca725d7 100644 --- a/lenskit/algorithms/svd.py +++ b/lenskit/algorithms/svd.py @@ -5,6 +5,7 @@ try: from sklearn.decomposition import TruncatedSVD + SKL_AVAILABLE = True except ImportError: TruncatedSVD = None @@ -29,9 +30,9 @@ class BiasedSVD(Predictor): example and for cases where you want to evaluate a pure SVD implementation. """ - def __init__(self, features, *, damping=5, bias=True, algorithm='randomized'): + def __init__(self, features, *, damping=5, bias=True, algorithm="randomized"): if TruncatedSVD is None: - raise ImportError('sklearn.decomposition') + raise ImportError("sklearn.decomposition") if bias is True: self.bias = Bias(damping=damping) else: @@ -40,14 +41,14 @@ def __init__(self, features, *, damping=5, bias=True, algorithm='randomized'): def fit(self, ratings, **kwargs): timer = Stopwatch() - _log.info('[%s] computing bias', timer) + _log.info("[%s] computing bias", timer) self.bias.fit(ratings) g_bias = self.bias.mean_ u_bias = self.bias.user_offsets_ i_bias = self.bias.item_offsets_ - _log.info('[%s] sparsifying and normalizing matrix', timer) + _log.info("[%s] sparsifying and normalizing matrix", timer) r_mat, users, items = sparse_ratings(ratings, users=u_bias.index, items=i_bias.index) # global r_mat.values -= g_bias @@ -56,7 +57,7 @@ def fit(self, ratings, **kwargs): r_mat = r_mat.to_scipy() assert r_mat.shape == (len(u_bias), len(i_bias)) - _log.info('[%s] training SVD (k=%d)', timer, self.factorization.n_components) + _log.info("[%s] training SVD (k=%d)", timer, self.factorization.n_components) Xt = self.factorization.fit_transform(r_mat) self.user_components_ = Xt _log.info("finished model training in %s", timer) @@ -83,15 +84,15 @@ def predict_for_user(self, user, items, ratings=None): def get_params(self, deep=True): params = { - 'features': self.factorization.n_components, - 'algorithm': self.factorization.algorithm + "features": self.factorization.n_components, + "algorithm": self.factorization.algorithm, } if deep and self.bias: for k, v in self.bias.get_params(True).items(): - params['bias__' + k] = v + params["bias__" + k] = v else: - params['bias'] = self.bias + params["bias"] = self.bias return params def __str__(self): - return f'BiasedSVD({self.factorization})' + return f"BiasedSVD({self.factorization})" diff --git a/lenskit/algorithms/user_knn.py b/lenskit/algorithms/user_knn.py index 17c66b8a7..2793ab06f 100644 --- a/lenskit/algorithms/user_knn.py +++ b/lenskit/algorithms/user_knn.py @@ -130,36 +130,29 @@ class UserUser(Predictor): rating_matrix_(matrix.CSR): Normalized user-item rating matrix. transpose_matrix_(matrix.CSR): Transposed un-normalized rating matrix. """ - IGNORED_PARAMS = ['feedback'] - EXTRA_PARAMS = ['center', 'aggregate', 'use_ratings'] - AGG_SUM = intern('sum') - AGG_WA = intern('weighted-average') + + IGNORED_PARAMS = ["feedback"] + EXTRA_PARAMS = ["center", "aggregate", "use_ratings"] + AGG_SUM = intern("sum") + AGG_WA = intern("weighted-average") RATING_AGGS = [AGG_WA] - def __init__(self, nnbrs, min_nbrs=1, min_sim=0, feedback='explicit', **kwargs): + def __init__(self, nnbrs, min_nbrs=1, min_sim=0, feedback="explicit", **kwargs): self.nnbrs = nnbrs self.min_nbrs = min_nbrs self.min_sim = min_sim - if feedback == 'explicit': - defaults = { - 'center': True, - 'aggregate': self.AGG_WA, - 'use_ratings': True - } - elif feedback == 'implicit': - defaults = { - 'center': False, - 'aggregate': self.AGG_SUM, - 'use_ratings': False - } + if feedback == "explicit": + defaults = {"center": True, "aggregate": self.AGG_WA, "use_ratings": True} + elif feedback == "implicit": + defaults = {"center": False, "aggregate": self.AGG_SUM, "use_ratings": False} else: - raise ValueError(f'invalid feedback mode: {feedback}') + raise ValueError(f"invalid feedback mode: {feedback}") defaults.update(kwargs) - self.center = defaults['center'] - self.aggregate = intern(defaults['aggregate']) - self.use_ratings = defaults['use_ratings'] + self.center = defaults["center"] + self.aggregate = intern(defaults["aggregate"]) + self.use_ratings = defaults["use_ratings"] def fit(self, ratings, **kwargs): """ @@ -174,7 +167,7 @@ def fit(self, ratings, **kwargs): # mean-center ratings if self.center: - umeans = uir.normalize_rows('center') + umeans = uir.normalize_rows("center") else: umeans = None @@ -184,7 +177,7 @@ def fit(self, ratings, **kwargs): # L2-normalize ratings so dot product is cosine if uir.values is None or not self.use_ratings: uir.values = np.full(uir.nnz, 1.0) - uir.normalize_rows('unit') + uir.normalize_rows("unit") self.rating_matrix_ = uir self.user_index_ = users @@ -210,11 +203,11 @@ def predict_for_user(self, user, items, ratings=None): """ watch = util.Stopwatch() - items = pd.Index(items, name='item') + items = pd.Index(items, name="item") ratings, umean = self._get_user_data(user, ratings) if ratings is None: - return pd.Series(index=items, dtype='float64') + return pd.Series(index=items, dtype="float64") assert len(ratings) == len(self.item_index_) # ratings is a dense vector # now ratings is normalized to be a mean-centered unit vector @@ -225,7 +218,7 @@ def predict_for_user(self, user, items, ratings=None): if user in self.user_index_: nsims[self.user_index_.get_loc(user)] = 0 - _logger.debug('computed user similarities') + _logger.debug("computed user similarities") results = np.full(len(items), np.nan, dtype=np.float_) ri_pos = self.item_index_.get_indexer(items.values) @@ -234,17 +227,26 @@ def predict_for_user(self, user, items, ratings=None): elif self.aggregate == self.AGG_SUM: agg = _agg_sum else: - raise ValueError('invalid aggregate ' + self.aggregate) - - _score(ri_pos, results, self.transpose_matrix_, nsims, - self.nnbrs, self.min_sim, self.min_nbrs, agg) + raise ValueError("invalid aggregate " + self.aggregate) + + _score( + ri_pos, + results, + self.transpose_matrix_, + nsims, + self.nnbrs, + self.min_sim, + self.min_nbrs, + agg, + ) if self.aggregate in self.RATING_AGGS: results += umean - results = pd.Series(results, index=items, name='prediction') + results = pd.Series(results, index=items, name="prediction") - _logger.debug('scored %d of %d items for %s in %s', - results.notna().sum(), len(items), user, watch) + _logger.debug( + "scored %d of %d items for %s in %s", results.notna().sum(), len(items), user, watch + ) return results def _get_user_data(self, user, ratings): @@ -257,10 +259,10 @@ def _get_user_data(self, user, ratings): ratings = rmat.row(upos) umean = self.user_means_[upos] if self.user_means_ is not None else 0 except KeyError: - _logger.warning('user %d has no ratings and none provided', user) + _logger.warning("user %d has no ratings and none provided", user) return None, 0 else: - _logger.debug('using provided ratings for user %d', user) + _logger.debug("using provided ratings for user %d", user) if self.center: umean = ratings.mean() ratings = ratings - umean @@ -281,4 +283,4 @@ def __setstate__(self, state): self.aggregate = intern(self.aggregate) def __str__(self): - return 'UserUser(nnbrs={}, min_sim={})'.format(self.nnbrs, self.min_sim) + return "UserUser(nnbrs={}, min_sim={})".format(self.nnbrs, self.min_sim) diff --git a/lenskit/batch/_predict.py b/lenskit/batch/_predict.py index 405a0dbd7..e0e93cae6 100644 --- a/lenskit/batch/_predict.py +++ b/lenskit/batch/_predict.py @@ -12,10 +12,16 @@ def _predict_user(model, req): user, udf = req watch = util.Stopwatch() - res = model.predict_for_user(user, udf['item']) - res = pd.DataFrame({'user': user, 'item': res.index, 'prediction': res.values}) - _logger.debug('%s produced %f/%d predictions for %s in %s', - model, res.prediction.notna().sum(), len(udf), user, watch) + res = model.predict_for_user(user, udf["item"]) + res = pd.DataFrame({"user": user, "item": res.index, "prediction": res.values}) + _logger.debug( + "%s produced %f/%d predictions for %s in %s", + model, + res.prediction.notna().sum(), + len(udf), + user, + watch, + ) return res @@ -62,24 +68,24 @@ def predict(algo, pairs, *, n_jobs=None, **kwargs): the prediction results. If ``pairs`` contains a `rating` column, this result will also contain a `rating` column. """ - if n_jobs is None and 'nprocs' in kwargs: - n_jobs = kwargs['nprocs'] - warnings.warn('nprocs is deprecated, use n_jobs', DeprecationWarning) + if n_jobs is None and "nprocs" in kwargs: + n_jobs = kwargs["nprocs"] + warnings.warn("nprocs is deprecated, use n_jobs", DeprecationWarning) - nusers = pairs['user'].nunique() + nusers = pairs["user"].nunique() timer = util.Stopwatch() with util.parallel.invoker(algo, _predict_user, n_jobs=n_jobs) as worker: del algo # maybe free some memory - _logger.info('generating %d predictions for %d users (setup took %s)', - len(pairs), nusers, timer) + _logger.info( + "generating %d predictions for %d users (setup took %s)", len(pairs), nusers, timer + ) timer = util.Stopwatch() - results = worker.map((user, udf.copy()) for (user, udf) in pairs.groupby('user')) + results = worker.map((user, udf.copy()) for (user, udf) in pairs.groupby("user")) results = pd.concat(results) - _logger.info('generated %d predictions for %d users in %s', - len(pairs), nusers, timer) + _logger.info("generated %d predictions for %d users in %s", len(pairs), nusers, timer) - if 'rating' in pairs: - return pairs.join(results.set_index(['user', 'item']), on=('user', 'item')) + if "rating" in pairs: + return pairs.join(results.set_index(["user", "item"]), on=("user", "item")) return results diff --git a/lenskit/batch/_recommend.py b/lenskit/batch/_recommend.py index 5de6089c5..4d2667346 100644 --- a/lenskit/batch/_recommend.py +++ b/lenskit/batch/_recommend.py @@ -14,14 +14,13 @@ def _recommend_user(algo, req): user, n, candidates = req - _logger.debug('generating recommendations for %s', user) + _logger.debug("generating recommendations for %s", user) watch = util.Stopwatch() res = algo.recommend(user, n, candidates) - _logger.debug('%s recommended %d/%s items for %s in %s', - str(algo), len(res), n, user, watch) + _logger.debug("%s recommended %d/%s items for %s in %s", str(algo), len(res), n, user, watch) - res['user'] = user - res['rank'] = np.arange(1, len(res) + 1) + res["user"] = user + res["rank"] = np.arange(1, len(res) + 1) return res.reset_index(drop=True) @@ -63,29 +62,28 @@ def recommend(algo, users, n, candidates=None, *, n_jobs=None, **kwargs): ``score``, and any other columns returned by the recommender. """ - if n_jobs is None and 'nprocs' in kwargs: - n_jobs = kwargs['nprocs'] - warnings.warn('nprocs is deprecated, use n_jobs', DeprecationWarning) + if n_jobs is None and "nprocs" in kwargs: + n_jobs = kwargs["nprocs"] + warnings.warn("nprocs is deprecated, use n_jobs", DeprecationWarning) if not isinstance(algo, PersistedModel): rec_algo = Recommender.adapt(algo) if candidates is None and rec_algo is not algo: - warnings.warn('no candidates provided and algo is not a recommender, unlikely to work') + warnings.warn("no candidates provided and algo is not a recommender, unlikely to work") algo = rec_algo del rec_algo - if 'ratings' in kwargs: - warnings.warn('Providing ratings to recommend is not supported', DeprecationWarning) + if "ratings" in kwargs: + warnings.warn("Providing ratings to recommend is not supported", DeprecationWarning) candidates = __standard_cand_fun(candidates) with util.parallel.invoker(algo, _recommend_user, n_jobs=n_jobs) as worker: - _logger.info('recommending with %s for %d users (n_jobs=%s)', - str(algo), len(users), n_jobs) + _logger.info("recommending with %s for %d users (n_jobs=%s)", str(algo), len(users), n_jobs) del algo timer = util.Stopwatch() results = worker.map((user, n, candidates(user)) for user in users) results = pd.concat(results, ignore_index=True, copy=False) - _logger.info('recommended for %d users in %s', len(users), timer) + _logger.info("recommended for %d users in %s", len(users), timer) return results diff --git a/lenskit/batch/_train.py b/lenskit/batch/_train.py index 89dd8fcf5..df6c78b40 100644 --- a/lenskit/batch/_train.py +++ b/lenskit/batch/_train.py @@ -9,10 +9,10 @@ def _train_and_save(algo, file, ratings, kwargs): "Worker for subprocess model training" - _log.info('training %s on %d ratings', algo, len(ratings)) + _log.info("training %s on %d ratings", algo, len(ratings)) timer = Stopwatch() algo.fit(ratings, **kwargs) - _log.info('trained %s in %s', algo, timer) + _log.info("trained %s in %s", algo, timer) if file is None: return persist_binpickle(algo).transfer() else: diff --git a/lenskit/crossfold.py b/lenskit/crossfold.py index 3945731dc..8e9ece320 100644 --- a/lenskit/crossfold.py +++ b/lenskit/crossfold.py @@ -10,10 +10,10 @@ import pandas as pd from . import util -TTPair = namedtuple('TTPair', ['train', 'test']) -TTPair.__doc__ = 'Train-test pair (named tuple).' -TTPair.train.__doc__ = 'Train data for this pair.' -TTPair.test.__doc__ = 'Test data for this pair.' +TTPair = namedtuple("TTPair", ["train", "test"]) +TTPair.__doc__ = "Train-test pair (named tuple)." +TTPair.train.__doc__ = "Train data for this pair." +TTPair.test.__doc__ = "Test data for this pair." _logger = logging.getLogger(__name__) @@ -36,7 +36,7 @@ def partition_rows(data, partitions, *, rng_spec=None): """ confirm_unique_index(data) - _logger.info('partitioning %d ratings into %d partitions', len(data), partitions) + _logger.info("partitioning %d ratings into %d partitions", len(data), partitions) # create an array of indexes rows = np.arange(len(data)) @@ -48,7 +48,7 @@ def partition_rows(data, partitions, *, rng_spec=None): # convert each partition into a split for i, ts in enumerate(test_sets): test = data.iloc[ts, :] - trains = test_sets[:i] + test_sets[(i + 1):] + trains = test_sets[:i] + test_sets[(i + 1) :] train_idx = np.concatenate(trains) train = data.iloc[train_idx, :] yield TTPair(train, test) @@ -105,8 +105,12 @@ def sample_rows(data, partitions, size, disjoint=True, *, rng_spec=None): return TTPair(train, test) if disjoint and partitions * size >= len(data): - _logger.warning('wanted %d disjoint splits of %d each, but only have %d rows; partitioning', - partitions, size, len(data)) + _logger.warning( + "wanted %d disjoint splits of %d each, but only have %d rows; partitioning", + partitions, + size, + len(data), + ) return partition_rows(data, partitions) # create an array of indexes @@ -115,11 +119,11 @@ def sample_rows(data, partitions, size, disjoint=True, *, rng_spec=None): rng = util.rng(rng_spec) if disjoint: - _logger.info('creating %d disjoint samples of size %d', partitions, size) + _logger.info("creating %d disjoint samples of size %d", partitions, size) ips = _disjoint_sample(rows, partitions, size, rng) else: - _logger.info('taking %d samples of size %d', partitions, size) + _logger.info("taking %d samples of size %d", partitions, size) ips = _n_samples(rows, partitions, size, rng) return (TTPair(data.iloc[ip.train, :], data.iloc[ip.test, :]) for ip in ips) @@ -132,8 +136,8 @@ def _disjoint_sample(xs, n, size, rng): # convert each partition into a split for i in range(n): start = i * size - test = xs[start:start + size] - train = np.concatenate((xs[:start], xs[start + size:])) + test = xs[start : start + size] + train = np.concatenate((xs[:start], xs[start + size :])) yield TTPair(train, test) @@ -190,6 +194,7 @@ class SampleFrac(PartitionMethod): Args: frac(float): the fraction items to select for testing. """ + def __init__(self, frac, rng_spec=None): self.fraction = frac self.rng = util.rng(rng_spec, legacy=True) @@ -207,12 +212,12 @@ class LastN(PartitionMethod): n(int): The number of test items to select. """ - def __init__(self, n, col='timestamp'): + def __init__(self, n, col="timestamp"): self.n = n self.column = col def __call__(self, udf): - return udf.sort_values(self.column).iloc[-self.n:] + return udf.sort_values(self.column).iloc[-self.n :] class LastFrac(PartitionMethod): @@ -222,7 +227,8 @@ class LastFrac(PartitionMethod): Args: frac(double): the fraction of items to select for testing. """ - def __init__(self, frac, col='timestamp'): + + def __init__(self, frac, col="timestamp"): self.fraction = frac self.column = col @@ -248,10 +254,11 @@ def partition_users(data, partitions: int, method: PartitionMethod, *, rng_spec= """ confirm_unique_index(data) - user_col = data['user'] + user_col = data["user"] users = user_col.unique() - _logger.info('partitioning %d rows for %d users into %d partitions', - len(data), len(users), partitions) + _logger.info( + "partitioning %d rows for %d users into %d partitions", len(data), len(users), partitions + ) # create an array of indexes into user row rows = np.arange(len(users)) @@ -265,21 +272,22 @@ def partition_users(data, partitions: int, method: PartitionMethod, *, rng_spec= # get our users! test_us = users[ts] # sample the data frame - _logger.info('fold %d: selecting test ratings', i) - ugf = data[data.user.isin(test_us)].groupby('user') + _logger.info("fold %d: selecting test ratings", i) + ugf = data[data.user.isin(test_us)].groupby("user") test = ugf.apply(method) # get rid of the group index test = test.reset_index(0, drop=True) # now test is indexed on the data frame! so we can get the rest - _logger.info('fold %d: partitioning training data', i) + _logger.info("fold %d: partitioning training data", i) mask = pd.Series(True, index=data.index) mask[test.index] = False train = data[mask] yield TTPair(train, test) -def sample_users(data, partitions: int, size: int, method: PartitionMethod, disjoint=True, *, - rng_spec=None): +def sample_users( + data, partitions: int, size: int, method: PartitionMethod, disjoint=True, *, rng_spec=None +): """ Create train-test partitions by sampling users. This function does not care what kind of data is in `data`, so long as it is @@ -304,16 +312,16 @@ def sample_users(data, partitions: int, size: int, method: PartitionMethod, disj confirm_unique_index(data) rng = util.rng(rng_spec, legacy=True) - user_col = data['user'] + user_col = data["user"] users = user_col.unique() if disjoint and partitions * size >= len(users): - _logger.warning('cannot take %d disjoint samples of size %d from %d users', - partitions, size, len(users)) + _logger.warning( + "cannot take %d disjoint samples of size %d from %d users", partitions, size, len(users) + ) yield from partition_users(data, partitions, method) return - _logger.info('sampling %d users into %d partitions (n=%d)', - len(users), partitions, size) + _logger.info("sampling %d users into %d partitions (n=%d)", len(users), partitions, size) if disjoint: rng.shuffle(users) @@ -322,12 +330,12 @@ def sample_users(data, partitions: int, size: int, method: PartitionMethod, disj for i in range(partitions): # get our test users! if disjoint: - test_us = users[i*size:(i+1)*size] + test_us = users[i * size : (i + 1) * size] else: test_us = rng.choice(users, size, False) # sample the data frame - test = data[data.user.isin(test_us)].groupby('user').apply(method) + test = data[data.user.isin(test_us)].groupby("user").apply(method) # get rid of the group index test = test.reset_index(0, drop=True) # now test is indexed on the data frame! so we can get the rest @@ -354,10 +362,12 @@ def simple_test_pair(ratings, n_users=1000, n_rates=5, f_rates=None): def confirm_unique_index(data): """Confirms dataframe has unique index values, and if not, - throws ValueError with helpful log message""" + throws ValueError with helpful log message""" if not data.index.is_unique: _logger.error("Index has duplicate values") - _logger.info("If index values do not matter, consider running " + - ".reset_index() on the dataframe before partitioning") - raise ValueError('Index is not uniquely valued') + _logger.info( + "If index values do not matter, consider running " + + ".reset_index() on the dataframe before partitioning" + ) + raise ValueError("Index is not uniquely valued") diff --git a/lenskit/data/matrix.py b/lenskit/data/matrix.py index 21b26de6d..04567ffed 100644 --- a/lenskit/data/matrix.py +++ b/lenskit/data/matrix.py @@ -12,7 +12,7 @@ _log = logging.getLogger(__name__) -RatingMatrix = namedtuple('RatingMatrix', ['matrix', 'users', 'items']) +RatingMatrix = namedtuple("RatingMatrix", ["matrix", "users", "items"]) RatingMatrix.__doc__ = """ A rating matrix with associated indices. @@ -41,30 +41,32 @@ def sparse_ratings(ratings, scipy=False, *, users=None, items=None): a named tuple containing the sparse matrix, user index, and item index. """ if users is None: - users = pd.Index(np.unique(ratings.user), name='user') + users = pd.Index(np.unique(ratings.user), name="user") if items is None: - items = pd.Index(np.unique(ratings.item), name='item') + items = pd.Index(np.unique(ratings.item), name="item") - _log.debug('creating matrix with %d ratings for %d items by %d users', - len(ratings), len(items), len(users)) + _log.debug( + "creating matrix with %d ratings for %d items by %d users", + len(ratings), + len(items), + len(users), + ) row_ind = users.get_indexer(ratings.user).astype(np.intc) if np.any(row_ind < 0): - raise ValueError('provided user index does not cover all users') + raise ValueError("provided user index does not cover all users") col_ind = items.get_indexer(ratings.item).astype(np.intc) if np.any(col_ind < 0): - raise ValueError('provided item index does not cover all users') + raise ValueError("provided item index does not cover all users") - if 'rating' in ratings.columns: + if "rating" in ratings.columns: vals = np.require(ratings.rating.values, np.float64) else: vals = None - if scipy == 'coo': - matrix = sps.coo_matrix( - (vals, (row_ind, col_ind)), shape=(len(users), len(items)) - ) + if scipy == "coo": + matrix = sps.coo_matrix((vals, (row_ind, col_ind)), shape=(len(users), len(items))) else: matrix = CSR.from_coo(row_ind, col_ind, vals, (len(users), len(items))) if scipy: diff --git a/lenskit/datasets/__init__.py b/lenskit/datasets/__init__.py index 262a14652..f2fb2a9dc 100644 --- a/lenskit/datasets/__init__.py +++ b/lenskit/datasets/__init__.py @@ -1 +1 @@ -from .movielens import * # noqa: F403 +from .movielens import * # noqa: F403 diff --git a/lenskit/datasets/fetch.py b/lenskit/datasets/fetch.py index 0be178a2e..ed8299bc8 100644 --- a/lenskit/datasets/fetch.py +++ b/lenskit/datasets/fetch.py @@ -5,17 +5,17 @@ from pathlib import Path import logging -_log = logging.getLogger('lenskit.datasets.fetch') +_log = logging.getLogger("lenskit.datasets.fetch") ML_LOC = "http://files.grouplens.org/datasets/movielens/" ML_DATASETS = { - 'ml-100k': 'ml-100k/u.data', - 'ml-1m': 'ml-1m/ratings.dat', - 'ml-10m': 'ml-10M100K/ratings.dat', - 'ml-20m': 'ml-20m/ratings.csv', - 'ml-25m': 'ml-25m/ratings.csv', - 'ml-latest': 'ml-latest/ratings.csv', - 'ml-latest-small': 'ml-latest-small/ratings.csv', + "ml-100k": "ml-100k/u.data", + "ml-1m": "ml-1m/ratings.dat", + "ml-10m": "ml-10M100K/ratings.dat", + "ml-20m": "ml-20m/ratings.csv", + "ml-25m": "ml-25m/ratings.csv", + "ml-latest": "ml-latest/ratings.csv", + "ml-latest-small": "ml-latest-small/ratings.csv", } @@ -37,46 +37,48 @@ def fetch_ml(name: str, base_dir: Path): base_dir: The base directory into which data should be extracted. """ - zipname = f'{name}.zip' + zipname = f"{name}.zip" zipfile = base_dir / zipname zipurl = ML_LOC + zipname test_file = base_dir / ML_DATASETS[name] if test_file.exists(): - _log.info(test_file, 'already exists') + _log.info(test_file, "already exists") return - _log.info('downloading data set %s', name) - with zipfile.open('wb') as zf: + _log.info("downloading data set %s", name) + with zipfile.open("wb") as zf: res = urlopen(zipurl) block = res.read(8 * 1024 * 1024) while len(block): - _log.debug('received %d bytes', len(block)) + _log.debug("received %d bytes", len(block)) zf.write(block) block = res.read(8 * 1024 * 1024) - _log.info('unpacking data set') - with ZipFile(zipfile, 'r') as zf: + _log.info("unpacking data set") + with ZipFile(zipfile, "r") as zf: zf.extractall(base_dir) def _fetch_main(): logging.basicConfig(stream=sys.stderr, level=logging.INFO) parser = argparse.ArgumentParser() - parser.add_argument('name', help='the name of the dataset to fetch') - parser.add_argument('--data-dir', metavar='DIR', help='save extracted data to DIR', default='data') + parser.add_argument("name", help="the name of the dataset to fetch") + parser.add_argument( + "--data-dir", metavar="DIR", help="save extracted data to DIR", default="data" + ) args = parser.parse_args() name = args.name - _log.info('fetching data set %s', name) + _log.info("fetching data set %s", name) dir = Path(args.data_dir) - _log.info('extracting data to %s', dir) - if name.startswith('ml-'): + _log.info("extracting data to %s", dir) + if name.startswith("ml-"): fetch_ml(name, dir) else: - _log.error('unknown data set %s', name) - raise ValueError('invalid data set') + _log.error("unknown data set %s", name) + raise ValueError("invalid data set") -if __name__ == '__main__': +if __name__ == "__main__": _fetch_main() diff --git a/lenskit/datasets/movielens.py b/lenskit/datasets/movielens.py index 01bca6ba0..5ba34556e 100644 --- a/lenskit/datasets/movielens.py +++ b/lenskit/datasets/movielens.py @@ -13,17 +13,17 @@ _log = logging.getLogger(__name__) __doctest_skip__ = [] -if not os.path.exists('data/ml-100k'): - __doctest_skip__.append('ML100K.*') -if not os.path.exists('data/ml-20m'): - __doctest_skip__.append('MovieLens.tag_genome') -if not os.path.exists('data/ml-1m.*'): - __doctest_skip__.append('ML1M.*') -if not os.path.exists('data/ml-10M100K'): - __doctest_skip__.append('ML10M.*') - __doctest_skip__.append('MLM.*') +if not os.path.exists("data/ml-100k"): + __doctest_skip__.append("ML100K.*") +if not os.path.exists("data/ml-20m"): + __doctest_skip__.append("MovieLens.tag_genome") +if not os.path.exists("data/ml-1m.*"): + __doctest_skip__.append("ML1M.*") +if not os.path.exists("data/ml-10M100K"): + __doctest_skip__.append("ML10M.*") + __doctest_skip__.append("MLM.*") -__all__ = ['MovieLens', 'ML100K', 'ML1M', 'ML10M'] +__all__ = ["MovieLens", "ML100K", "ML1M", "ML10M"] class MovieLens: @@ -35,7 +35,7 @@ class MovieLens: path(str or pathlib.Path): Path to the directory containing the data set. """ - def __init__(self, path='data/ml-20m'): + def __init__(self, path="data/ml-20m"): self.path = Path(path) @cached @@ -55,15 +55,18 @@ def ratings(self): [100004 rows x 4 columns] """ - fn = self.path / 'ratings.csv' - ratings = pd.read_csv(fn, dtype={ - 'movieId': np.int32, - 'userId': np.int32, - 'rating': np.float64, - 'timestamp': np.int32 - }) - ratings.rename(columns={'userId': 'user', 'movieId': 'item'}, inplace=True) - _log.debug('loaded %s, takes %d bytes', fn, ratings.memory_usage().sum()) + fn = self.path / "ratings.csv" + ratings = pd.read_csv( + fn, + dtype={ + "movieId": np.int32, + "userId": np.int32, + "rating": np.float64, + "timestamp": np.int32, + }, + ) + ratings.rename(columns={"userId": "user", "movieId": "item"}, inplace=True) + _log.debug("loaded %s, takes %d bytes", fn, ratings.memory_usage().sum()) return ratings @cached @@ -84,15 +87,18 @@ def movies(self): [9125 rows x 2 columns] """ - fn = self.path / 'movies.csv' - movies = pd.read_csv(fn, dtype={ - 'movieId': np.int32, - 'title': object, - 'genres': object, - }) - movies.rename(columns={'movieId': 'item'}, inplace=True) - movies.set_index('item', inplace=True) - _log.debug('loaded %s, takes %d bytes', fn, movies.memory_usage().sum()) + fn = self.path / "movies.csv" + movies = pd.read_csv( + fn, + dtype={ + "movieId": np.int32, + "title": object, + "genres": object, + }, + ) + movies.rename(columns={"movieId": "item"}, inplace=True) + movies.set_index("item", inplace=True) + _log.debug("loaded %s, takes %d bytes", fn, movies.memory_usage().sum()) return movies @cached @@ -114,15 +120,13 @@ def links(self): [9125 rows x 2 columns] """ - fn = self.path / 'links.csv' - links = pd.read_csv(fn, dtype={ - 'movieId': np.int32, - 'imdbId': np.int64, - 'tmdbId': pd.Int64Dtype() - }) - links.rename(columns={'movieId': 'item'}, inplace=True) - links.set_index('item', inplace=True) - _log.debug('loaded %s, takes %d bytes', fn, links.memory_usage().sum()) + fn = self.path / "links.csv" + links = pd.read_csv( + fn, dtype={"movieId": np.int32, "imdbId": np.int64, "tmdbId": pd.Int64Dtype()} + ) + links.rename(columns={"movieId": "item"}, inplace=True) + links.set_index("item", inplace=True) + _log.debug("loaded %s, takes %d bytes", fn, links.memory_usage().sum()) return links @cached @@ -143,15 +147,18 @@ def tags(self): [1296 rows x 4 columns] """ - fn = self.path / 'tags.csv' - tags = pd.read_csv(fn, dtype={ - 'movieId': np.int32, - 'userId': np.int32, - 'tag': object, - 'timestamp': np.int32, - }) - tags.rename(columns={'userId': 'user', 'movieId': 'item'}, inplace=True) - _log.debug('loaded %s, takes %d bytes', fn, tags.memory_usage().sum()) + fn = self.path / "tags.csv" + tags = pd.read_csv( + fn, + dtype={ + "movieId": np.int32, + "userId": np.int32, + "tag": object, + "timestamp": np.int32, + }, + ) + tags.rename(columns={"userId": "user", "movieId": "item"}, inplace=True) + _log.debug("loaded %s, takes %d bytes", fn, tags.memory_usage().sum()) return tags @cached @@ -173,19 +180,22 @@ def tag_genome(self): [10381 rows x 1128 columns] """ - fn = self.path / 'genome-scores.csv' - tags = pd.read_csv(self.path / 'genome-tags.csv') - tags = tags.set_index('tagId') - tags = tags['tag'].astype('category') - genome = pd.read_csv(fn, dtype={ - 'movieId': np.int32, - 'tagId': np.int32, - 'relevance': np.float64, - }) - genome.rename(columns={'userId': 'user', 'movieId': 'item'}, inplace=True) - genome = genome.join(tags, on='tagId') - genome = genome.pivot(index='item', columns='tag', values='relevance') - _log.debug('loaded %s, takes %d bytes', fn, genome.memory_usage().sum()) + fn = self.path / "genome-scores.csv" + tags = pd.read_csv(self.path / "genome-tags.csv") + tags = tags.set_index("tagId") + tags = tags["tag"].astype("category") + genome = pd.read_csv( + fn, + dtype={ + "movieId": np.int32, + "tagId": np.int32, + "relevance": np.float64, + }, + ) + genome.rename(columns={"userId": "user", "movieId": "item"}, inplace=True) + genome = genome.join(tags, on="tagId") + genome = genome.pivot(index="item", columns="tag", values="relevance") + _log.debug("loaded %s, takes %d bytes", fn, genome.memory_usage().sum()) return genome @@ -195,13 +205,13 @@ class ML100K: the more current data sets loaded by :class:`MovieLens`. """ - def __init__(self, path='data/ml-100k'): + def __init__(self, path="data/ml-100k"): self.path = Path(path) @property def available(self): "Query whether the data set exists." - return (self.path / 'u.data').exists() + return (self.path / "u.data").exists() @cached def ratings(self): @@ -219,12 +229,15 @@ def ratings(self): ... [100000 rows x 4 columns] """ - fn = self.path / 'u.data' - ratings = pd.read_csv(fn, sep='\t', header=None, - names=['user', 'item', 'rating', 'timestamp'], - dtype={'user': np.int32, 'item': np.int32, - 'rating': np.float32, 'timestamp': np.int32}) - _log.debug('loaded %s', fn) + fn = self.path / "u.data" + ratings = pd.read_csv( + fn, + sep="\t", + header=None, + names=["user", "item", "rating", "timestamp"], + dtype={"user": np.int32, "item": np.int32, "rating": np.float32, "timestamp": np.int32}, + ) + _log.debug("loaded %s", fn) return ratings @cached @@ -244,13 +257,16 @@ def users(self): ... [943 rows x 4 columns] """ - fn = self.path / 'u.user' - users = pd.read_csv(fn, sep='|', header=None, - names=['user', 'age', 'gender', 'occupation', 'zip'], - dtype={'user': np.int32, 'age': np.int8, - 'occupation': 'category'}) - _log.debug('loaded %s', fn) - return users.set_index('user') + fn = self.path / "u.user" + users = pd.read_csv( + fn, + sep="|", + header=None, + names=["user", "age", "gender", "occupation", "zip"], + dtype={"user": np.int32, "age": np.int8, "occupation": "category"}, + ) + _log.debug("loaded %s", fn) + return users.set_index("user") @cached def movies(self): @@ -269,17 +285,37 @@ def movies(self): ... [1682 rows x 23 columns] """ - fn = self.path / 'u.item' + fn = self.path / "u.item" genres = [ - 'unknown', 'Action', 'Adventure', 'Animation', - "Children's", 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', - 'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', - 'Thriller', 'War', 'Western' + "unknown", + "Action", + "Adventure", + "Animation", + "Children's", + "Comedy", + "Crime", + "Documentary", + "Drama", + "Fantasy", + "Film-Noir", + "Horror", + "Musical", + "Mystery", + "Romance", + "Sci-Fi", + "Thriller", + "War", + "Western", ] - items = pd.read_csv(fn, sep='|', header=None, encoding='latin1', - names=['item', 'title', 'release', 'vidrelease', 'imdb'] + genres) - _log.debug('loaded %s', fn) - return items.set_index('item') + items = pd.read_csv( + fn, + sep="|", + header=None, + encoding="latin1", + names=["item", "title", "release", "vidrelease", "imdb"] + genres, + ) + _log.debug("loaded %s", fn) + return items.set_index("item") class MLM: @@ -306,13 +342,16 @@ def ratings(self): ... [10000054 rows x 4 columns] """ - fn = self.path / 'ratings.dat' - ratings = pd.read_csv(fn, sep=':', header=None, - names=['user', '_ui', 'item', '_ir', 'rating', '_rt', 'timestamp'], - usecols=[0, 2, 4, 6], - dtype={'user': np.int32, 'item': np.int32, - 'rating': np.float32, 'timestamp': np.int32}) - _log.debug('loaded %s', fn) + fn = self.path / "ratings.dat" + ratings = pd.read_csv( + fn, + sep=":", + header=None, + names=["user", "_ui", "item", "_ir", "rating", "_rt", "timestamp"], + usecols=[0, 2, 4, 6], + dtype={"user": np.int32, "item": np.int32, "rating": np.float32, "timestamp": np.int32}, + ) + _log.debug("loaded %s", fn) return ratings @cached @@ -332,13 +371,17 @@ def movies(self): ... [10681 rows x 2 columns] """ - fn = self.path / 'movies.dat' - movies = pd.read_csv(fn, sep=':', header=None, - names=['item', '_ir', 'title', '_tg', 'genres'], - usecols=[0, 2, 4], - dtype={'item': np.int32}) - movies.set_index('item', inplace=True) - _log.debug('loaded %s', fn) + fn = self.path / "movies.dat" + movies = pd.read_csv( + fn, + sep=":", + header=None, + names=["item", "_ir", "title", "_tg", "genres"], + usecols=[0, 2, 4], + dtype={"item": np.int32}, + ) + movies.set_index("item", inplace=True) + _log.debug("loaded %s", fn) return movies @@ -346,7 +389,8 @@ class ML10M(MLM): """ MovieLens 10M100K data set. """ - def __init__(self, path='data/ml-10M100K'): + + def __init__(self, path="data/ml-10M100K"): super().__init__(path) @@ -359,7 +403,7 @@ class ML1M(MLM): with the 10M data set. """ - def __init__(self, path='data/ml-1m'): + def __init__(self, path="data/ml-1m"): super().__init__(path) @cached @@ -379,13 +423,15 @@ def users(self): ... [6040 rows x 3 columns] """ - fn = self.path / 'users.dat' - users = pd.read_csv(fn, sep=':', header=None, - names=['user', '_ug', 'gender', '_ga', 'age', - '_ao', 'occupation', '_oz', 'zip'], - usecols=[0, 2, 4, 8], - dtype={'user': np.int32, 'gender': 'category', 'age': np.int8, - 'timestamp': np.int32}) - users.set_index('user', inplace=True) - _log.debug('loaded %s', fn) + fn = self.path / "users.dat" + users = pd.read_csv( + fn, + sep=":", + header=None, + names=["user", "_ug", "gender", "_ga", "age", "_ao", "occupation", "_oz", "zip"], + usecols=[0, 2, 4, 8], + dtype={"user": np.int32, "gender": "category", "age": np.int8, "timestamp": np.int32}, + ) + users.set_index("user", inplace=True) + _log.debug("loaded %s", fn) return users diff --git a/lenskit/math/solve.py b/lenskit/math/solve.py index dfbf966bd..8f12ec364 100644 --- a/lenskit/math/solve.py +++ b/lenskit/math/solve.py @@ -11,19 +11,23 @@ __ffi = cffi.FFI() -__uplo_U = np.array([ord('U')], dtype=np.int8) -__uplo_L = np.array([ord('L')], dtype=np.int8) -__trans_N = np.array([ord('N')], dtype=np.int8) -__trans_T = np.array([ord('T')], dtype=np.int8) -__trans_C = np.array([ord('C')], dtype=np.int8) -__diag_U = np.array([ord('U')], dtype=np.int8) -__diag_N = np.array([ord('N')], dtype=np.int8) +__uplo_U = np.array([ord("U")], dtype=np.int8) +__uplo_L = np.array([ord("L")], dtype=np.int8) +__trans_N = np.array([ord("N")], dtype=np.int8) +__trans_T = np.array([ord("T")], dtype=np.int8) +__trans_C = np.array([ord("C")], dtype=np.int8) +__diag_U = np.array([ord("U")], dtype=np.int8) +__diag_N = np.array([ord("N")], dtype=np.int8) __inc_1 = np.ones(1, dtype=np.int32) -__dtrsv = __ffi.cast("void (*) (char*, char*, char*, int*, double*, int*, double*, int*)", - get_cython_function_address("scipy.linalg.cython_blas", "dtrsv")) -__dposv = __ffi.cast("void (*) (char*, int*, int*, double*, int*, double*, int*, int*)", - get_cython_function_address("scipy.linalg.cython_lapack", "dposv")) +__dtrsv = __ffi.cast( + "void (*) (char*, char*, char*, int*, double*, int*, double*, int*)", + get_cython_function_address("scipy.linalg.cython_blas", "dtrsv"), +) +__dposv = __ffi.cast( + "void (*) (char*, int*, int*, double*, int*, double*, int*, int*)", + get_cython_function_address("scipy.linalg.cython_lapack", "dposv"), +) @n.njit @@ -49,10 +53,16 @@ def _dposv(A, b, lower): info = np.zeros(1, dtype=np.intc) info_p = __ffi.from_buffer(info) - __dposv(__ffi.from_buffer(uplo), n_p, nrhs_p, - __ffi.from_buffer(A), n_p, - __ffi.from_buffer(b), n_p, - info_p) + __dposv( + __ffi.from_buffer(uplo), + n_p, + nrhs_p, + __ffi.from_buffer(A), + n_p, + __ffi.from_buffer(b), + n_p, + info_p, + ) _ref_sink(narr, n_p, nrhs, nrhs_p, info, info_p) @@ -66,6 +76,6 @@ def dposv(A, b, lower=False): """ info = _dposv(A, b, lower) if info < 0: - raise ValueError('invalid args to dposv, code ' + str(info)) + raise ValueError("invalid args to dposv, code " + str(info)) elif info > 0: - raise RuntimeError('error in dposv, code ' + str(info)) + raise RuntimeError("error in dposv, code " + str(info)) diff --git a/lenskit/metrics/predict.py b/lenskit/metrics/predict.py index a6e662f69..9f245b6dc 100644 --- a/lenskit/metrics/predict.py +++ b/lenskit/metrics/predict.py @@ -14,12 +14,12 @@ def _check_missing(truth, missing): truth: the series of truth values missing: what to do with missing values """ - if missing == 'error' and truth.isna().any(): + if missing == "error" and truth.isna().any(): nmissing = truth.isna().sum() - raise ValueError('missing truth for {} predictions'.format(nmissing)) + raise ValueError("missing truth for {} predictions".format(nmissing)) -def rmse(predictions, truth, missing='error'): +def rmse(predictions, truth, missing="error"): """ Compute RMSE (root mean squared error). This is computed as: @@ -48,7 +48,7 @@ def rmse(predictions, truth, missing='error'): truth = pd.Series(truth) # realign - predictions, truth = predictions.align(truth, join='left') + predictions, truth = predictions.align(truth, join="left") _check_missing(truth, missing) diff = predictions - truth @@ -58,7 +58,7 @@ def rmse(predictions, truth, missing='error'): return np.sqrt(msq) -def mae(predictions, truth, missing='error'): +def mae(predictions, truth, missing="error"): """ Compute MAE (mean absolute error). This is computed as: @@ -86,7 +86,7 @@ def mae(predictions, truth, missing='error'): predictions = pd.Series(predictions) truth = pd.Series(truth) - predictions, truth = predictions.align(truth, join='left') + predictions, truth = predictions.align(truth, join="left") _check_missing(truth, missing) diff = predictions - truth @@ -95,7 +95,7 @@ def mae(predictions, truth, missing='error'): return adiff.mean() -def global_metric(predictions, *, score_column='prediction', metric=rmse, **kwargs): +def global_metric(predictions, *, score_column="prediction", metric=rmse, **kwargs): """ Compute a global prediction accuracy metric for a set of predictions. @@ -114,11 +114,11 @@ def global_metric(predictions, *, score_column='prediction', metric=rmse, **kwar """ scores = predictions[score_column] - truth = predictions['rating'] + truth = predictions["rating"] return metric(scores, truth, **kwargs) -def user_metric(predictions, *, score_column='prediction', metric=rmse, **kwargs): +def user_metric(predictions, *, score_column="prediction", metric=rmse, **kwargs): """ Compute a mean per-user prediction accuracy metric for a set of predictions. @@ -138,7 +138,7 @@ def user_metric(predictions, *, score_column='prediction', metric=rmse, **kwargs """ def score(df): - return metric(df[score_column], df['rating']) + return metric(df[score_column], df["rating"]) - u_scores = predictions.groupby('user').apply(score) + u_scores = predictions.groupby("user").apply(score) return u_scores.mean() diff --git a/lenskit/metrics/topn.py b/lenskit/metrics/topn.py index d89d1351a..5f66380de 100644 --- a/lenskit/metrics/topn.py +++ b/lenskit/metrics/topn.py @@ -14,6 +14,7 @@ def bulk_impl(metric): """ Decorator to register a bulk implementation for a metric. """ + def wrap(impl): metric.bulk_score = impl return impl @@ -40,23 +41,23 @@ def precision(recs, truth, k=None): if nrecs == 0: return None - ngood = recs['item'].isin(truth.index).sum() + ngood = recs["item"].isin(truth.index).sum() return ngood / nrecs @bulk_impl(precision) def _bulk_precision(recs, truth, k=None): if k is not None: - recs = recs[recs['rank'] <= k] - lcounts = pd.Series(k, index=recs['LKRecID'].unique()) - lcounts.index.name = 'LKRecID' + recs = recs[recs["rank"] <= k] + lcounts = pd.Series(k, index=recs["LKRecID"].unique()) + lcounts.index.name = "LKRecID" else: - lcounts = recs.groupby(['LKRecID'])['item'].count() + lcounts = recs.groupby(["LKRecID"])["item"].count() - good = recs.join(truth, on=['LKTruthID', 'item'], how='inner') - gcounts = good.groupby(['LKRecID'])['item'].count() + good = recs.join(truth, on=["LKTruthID", "item"], how="inner") + gcounts = good.groupby(["LKRecID"])["item"].count() - lcounts, gcounts = lcounts.align(gcounts, join='left', fill_value=0) + lcounts, gcounts = lcounts.align(gcounts, join="left", fill_value=0) return gcounts / lcounts @@ -78,30 +79,30 @@ def recall(recs, truth, k=None): nrel = min(nrel, k) recs = recs.iloc[:k] - ngood = recs['item'].isin(truth.index).sum() + ngood = recs["item"].isin(truth.index).sum() return ngood / nrel @bulk_impl(recall) def _bulk_recall(recs, truth, k=None): - tcounts = truth.reset_index().groupby('LKTruthID')['item'].count() + tcounts = truth.reset_index().groupby("LKTruthID")["item"].count() if k is not None: - _log.debug('truncating to k for recall') + _log.debug("truncating to k for recall") tcounts = np.minimum(tcounts, k) - recs = recs[recs['rank'] <= k] + recs = recs[recs["rank"] <= k] - good = recs.join(truth, on=['LKTruthID', 'item'], how='inner') - gcounts = good.groupby('LKRecID')['item'].count() + good = recs.join(truth, on=["LKTruthID", "item"], how="inner") + gcounts = good.groupby("LKRecID")["item"].count() # we need all lists, because some might have no truth (oops), some no recs (also oops) - lists = recs[['LKRecID', 'LKTruthID']].drop_duplicates() + lists = recs[["LKRecID", "LKTruthID"]].drop_duplicates() - scores = lists.join(gcounts.to_frame('ngood'), on='LKRecID', how='left') - scores['ngood'].fillna(0, inplace=True) - scores = scores.join(tcounts.to_frame('nrel'), on='LKTruthID', how='left') - scores = scores.set_index('LKRecID') - return scores['ngood'] / scores['nrel'] + scores = lists.join(gcounts.to_frame("ngood"), on="LKRecID", how="left") + scores["ngood"].fillna(0, inplace=True) + scores = scores.join(tcounts.to_frame("nrel"), on="LKTruthID", how="left") + scores = scores.set_index("LKRecID") + return scores["ngood"] / scores["nrel"] def hit(recs, truth, k=None): @@ -124,7 +125,7 @@ def hit(recs, truth, k=None): nrel = min(nrel, k) recs = recs.iloc[:k] - good = recs['item'].isin(truth.index) + good = recs["item"].isin(truth.index) if np.any(good): return 1 else: @@ -133,28 +134,28 @@ def hit(recs, truth, k=None): @bulk_impl(hit) def _bulk_hit(recs, truth, k=None): - tcounts = truth.reset_index().groupby('LKTruthID')['item'].count() + tcounts = truth.reset_index().groupby("LKTruthID")["item"].count() if k is not None: - _log.debug('truncating to k for recall') + _log.debug("truncating to k for recall") tcounts = np.minimum(tcounts, k) - recs = recs[recs['rank'] <= k] + recs = recs[recs["rank"] <= k] - good = recs.join(truth, on=['LKTruthID', 'item'], how='inner') - gcounts = good.groupby('LKRecID')['item'].count() + good = recs.join(truth, on=["LKTruthID", "item"], how="inner") + gcounts = good.groupby("LKRecID")["item"].count() # we need all lists, because some might have no truth (oops), some no recs (also oops) - lists = recs[['LKRecID', 'LKTruthID']].drop_duplicates() + lists = recs[["LKRecID", "LKTruthID"]].drop_duplicates() - scores = lists.join(gcounts.to_frame('ngood'), on='LKRecID', how='left') - scores['ngood'].fillna(0, inplace=True) + scores = lists.join(gcounts.to_frame("ngood"), on="LKRecID", how="left") + scores["ngood"].fillna(0, inplace=True) - scores = scores.join(tcounts.to_frame('nrel'), on='LKTruthID', how='left') - scores = scores.set_index('LKRecID') + scores = scores.join(tcounts.to_frame("nrel"), on="LKTruthID", how="left") + scores = scores.set_index("LKRecID") - good = scores['ngood'] > 0 - good = good.astype('f4') - good[scores['nrel'] == 0] = np.nan + good = scores["ngood"] > 0 + good = good.astype("f4") + good[scores["nrel"] == 0] = np.nan return good @@ -173,8 +174,8 @@ def recip_rank(recs, truth, k=None): if k is not None: recs = recs.iloc[:k] - good = recs['item'].isin(truth.index) - npz, = np.nonzero(good.to_numpy()) + good = recs["item"].isin(truth.index) + (npz,) = np.nonzero(good.to_numpy()) if len(npz): return 1.0 / (npz[0] + 1.0) else: @@ -185,17 +186,17 @@ def recip_rank(recs, truth, k=None): def _bulk_rr(recs, truth, k=None): # find everything with truth if k is not None: - recs = recs[recs['rank'] <= k] - joined = recs.join(truth, on=['LKTruthID', 'item'], how='inner') + recs = recs[recs["rank"] <= k] + joined = recs.join(truth, on=["LKTruthID", "item"], how="inner") # compute min ranks - ranks = joined.groupby('LKRecID')['rank'].agg('min') + ranks = joined.groupby("LKRecID")["rank"].agg("min") # reciprocal ranks scores = 1.0 / ranks - _log.debug('have %d scores with MRR %.3f', len(scores), scores.mean()) + _log.debug("have %d scores with MRR %.3f", len(scores), scores.mean()) # fill with zeros - rec_ids = recs['LKRecID'].unique() + rec_ids = recs["LKRecID"].unique() scores = scores.reindex(rec_ids, fill_value=0.0) - _log.debug('filled to get %s scores w/ MRR %.3f', len(scores), scores.mean()) + _log.debug("filled to get %s scores w/ MRR %.3f", len(scores), scores.mean()) # and we're done return scores @@ -226,7 +227,7 @@ def _dcg(scores, discount=np.log2): def _fixed_dcg(n, discount=np.log2): - ranks = np.arange(1, n+1) + ranks = np.arange(1, n + 1) disc = discount(ranks) disc = np.maximum(disc, 1) disc = np.reciprocal(disc) @@ -255,11 +256,11 @@ def dcg(recs, truth, discount=np.log2): if the discount is greater than 1. """ - tpos = truth.index.get_indexer(recs['item']) + tpos = truth.index.get_indexer(recs["item"]) tgood = tpos >= 0 - if 'rating' in truth.columns: + if "rating" in truth.columns: # make an array of ratings for this rec list - r_rates = truth['rating'].values[tpos] + r_rates = truth["rating"].values[tpos] r_rates[tpos < 0] = 0 achieved = _dcg(r_rates, discount) else: @@ -304,15 +305,15 @@ def ndcg(recs, truth, discount=np.log2, k=None): if k is not None: recs = recs.iloc[:k] - tpos = truth.index.get_indexer(recs['item']) + tpos = truth.index.get_indexer(recs["item"]) - if 'rating' in truth.columns: + if "rating" in truth.columns: i_rates = np.sort(truth.rating.values)[::-1] if k is not None: i_rates = i_rates[:k] ideal = _dcg(i_rates, discount) # make an array of ratings for this rec list - r_rates = truth['rating'].values[tpos] + r_rates = truth["rating"].values[tpos] r_rates[tpos < 0] = 0 achieved = _dcg(r_rates, discount) else: @@ -328,36 +329,36 @@ def ndcg(recs, truth, discount=np.log2, k=None): @bulk_impl(ndcg) def _bulk_ndcg(recs, truth, discount=np.log2, k=None): - if 'rating' not in truth.columns: + if "rating" not in truth.columns: truth = truth.assign(rating=np.ones(len(truth), dtype=np.float32)) - ideal = truth.groupby(level='LKTruthID')['rating'].rank(method='first', ascending=False) + ideal = truth.groupby(level="LKTruthID")["rating"].rank(method="first", ascending=False) if k is not None: ideal = ideal[ideal <= k] ideal = discount(ideal) ideal = np.maximum(ideal, 1) - ideal = truth['rating'] / ideal - ideal = ideal.groupby(level='LKTruthID').sum() - ideal.name = 'ideal' + ideal = truth["rating"] / ideal + ideal = ideal.groupby(level="LKTruthID").sum() + ideal.name = "ideal" - list_ideal = recs[['LKRecID', 'LKTruthID']].drop_duplicates() - list_ideal = list_ideal.join(ideal, on='LKTruthID', how='left') - list_ideal = list_ideal.set_index('LKRecID') + list_ideal = recs[["LKRecID", "LKTruthID"]].drop_duplicates() + list_ideal = list_ideal.join(ideal, on="LKTruthID", how="left") + list_ideal = list_ideal.set_index("LKRecID") if k is not None: - recs = recs[recs['rank'] <= k] - rated = recs.join(truth, on=['LKTruthID', 'item'], how='inner') - rd = discount(rated['rank']) + recs = recs[recs["rank"] <= k] + rated = recs.join(truth, on=["LKTruthID", "item"], how="inner") + rd = discount(rated["rank"]) rd = np.maximum(rd, 1) - rd = rated['rating'] / rd - rd = rated[['LKRecID']].assign(util=rd) - dcg = rd.groupby(['LKRecID'])['util'].sum().reset_index(name='dcg') - dcg = dcg.set_index('LKRecID') + rd = rated["rating"] / rd + rd = rated[["LKRecID"]].assign(util=rd) + dcg = rd.groupby(["LKRecID"])["util"].sum().reset_index(name="dcg") + dcg = dcg.set_index("LKRecID") - dcg = dcg.join(list_ideal, how='outer') - dcg['ndcg'] = dcg['dcg'].fillna(0) / dcg['ideal'] + dcg = dcg.join(list_ideal, how="outer") + dcg["ndcg"] = dcg["dcg"].fillna(0) / dcg["ideal"] - return dcg['ndcg'] + return dcg["ndcg"] def rbp(recs, truth, k=None, patience=0.5, normalize=False): @@ -396,18 +397,18 @@ def rbp(recs, truth, k=None, patience=0.5, normalize=False): else: k = len(recs) - if 'rank' not in recs.columns: - recs = recs.assign(rank=np.arange(1, len(recs)+1)) + if "rank" not in recs.columns: + recs = recs.assign(rank=np.arange(1, len(recs) + 1)) - if np.min(recs['rank']) != 1: - warnings.warn('rank should start with 1') + if np.min(recs["rank"]) != 1: + warnings.warn("rank should start with 1") nrel = len(truth) if nrel == 0: return None - good = recs['item'].isin(truth.index) - ranks = recs['rank'][good] + good = recs["item"].isin(truth.index) + ranks = recs["rank"][good] disc = patience ** (ranks - 1) rbp = np.sum(disc) if normalize: @@ -423,36 +424,36 @@ def rbp(recs, truth, k=None, patience=0.5, normalize=False): @bulk_impl(rbp) def _bulk_rbp(recs, truth, k=None, patience=0.5, normalize=False): if k is not None: - recs = recs[recs['rank'] <= k] + recs = recs[recs["rank"] <= k] - good = recs.join(truth, on=['LKTruthID', 'item'], how='inner') - good['rbp_disc'] = patience ** (good['rank'] - 1) - scores = good.groupby('LKRecID')['rbp_disc'].sum() + good = recs.join(truth, on=["LKTruthID", "item"], how="inner") + good["rbp_disc"] = patience ** (good["rank"] - 1) + scores = good.groupby("LKRecID")["rbp_disc"].sum() if normalize: - tns = truth.reset_index().groupby('LKTruthID')['item'].count() + tns = truth.reset_index().groupby("LKTruthID")["item"].count() if k is not None: tns[tns > k] = k max_nrel = np.max(tns) # compute 0...k-1 (the powers of k-1 for 1..k) kseq = np.arange(max_nrel) # compute the discounts at each k-1 - nd = patience ** kseq + nd = patience**kseq # convert to a series of the sums, up through each k max_rbps = pd.Series(np.cumsum(nd), index=kseq + 1) # get a rec/truth mapping - map = recs[['LKRecID', 'LKTruthID']].drop_duplicates() - map.set_index('LKRecID', inplace=True) + map = recs[["LKRecID", "LKTruthID"]].drop_duplicates() + map.set_index("LKRecID", inplace=True) map = map.reindex(scores.index) # map to nrel, and then to the max RBPs - map = map.join(tns.to_frame('nrel'), on='LKTruthID', how='left') - map = map.join(max_rbps.to_frame('rbp_max'), on='nrel', how='left') + map = map.join(tns.to_frame("nrel"), on="LKTruthID", how="left") + map = map.join(max_rbps.to_frame("rbp_max"), on="nrel", how="left") # divide each score by max RBP - scores /= map['rbp_max'] + scores /= map["rbp_max"] else: - scores *= (1 - patience) + scores *= 1 - patience - scores = scores.reindex(recs['LKRecID'].unique(), fill_value=0) + scores = scores.reindex(recs["LKRecID"].unique(), fill_value=0) return scores diff --git a/lenskit/sharing/__init__.py b/lenskit/sharing/__init__.py index 7ee142c43..eb17ac946 100644 --- a/lenskit/sharing/__init__.py +++ b/lenskit/sharing/__init__.py @@ -15,7 +15,7 @@ def _save_mode(): - return getattr(_store_state, 'mode', 'save') + return getattr(_store_state, "mode", "save") @contextmanager @@ -25,7 +25,7 @@ def sharing_mode(): sharing, not model persistence. """ old = _save_mode() - _store_state.mode = 'share' + _store_state.mode = "share" try: yield finally: @@ -38,7 +38,7 @@ def in_share_context(): :func:`sharing_mode` context, which means model pickling will be used for cross-process sharing. """ - return _save_mode() == 'share' + return _save_mode() == "share" class PersistedModel(ABC): @@ -80,9 +80,9 @@ def transfer(self): ``self`` (for convenience) """ if not self.is_owner: - warnings.warning('non-owning objects should not be transferred', stacklevel=1) + warnings.warning("non-owning objects should not be transferred", stacklevel=1) else: - self.is_owner = 'transfer' + self.is_owner = "transfer" return self @@ -113,15 +113,15 @@ def persist(model, *, method=None): PersistedModel: The persisted object. """ if method is not None: - if method == 'binpickle': + if method == "binpickle": method = persist_binpickle - elif method == 'shm': + elif method == "shm": method = persist_shm - elif not hasattr(method, '__call__'): - raise ValueError('invalid method %s: must be one of binpickle, shm, or a funciton') + elif not hasattr(method, "__call__"): + raise ValueError("invalid method %s: must be one of binpickle, shm, or a funciton") if method is None: - if SHM_AVAILABLE and 'LK_TEMP_DIR' not in os.environ: + if SHM_AVAILABLE and "LK_TEMP_DIR" not in os.environ: method = persist_shm else: method = persist_binpickle @@ -129,5 +129,5 @@ def persist(model, *, method=None): return method(model) -from .binpickle import persist_binpickle, BPKPersisted # noqa: E402,F401 +from .binpickle import persist_binpickle, BPKPersisted # noqa: E402,F401 from .shm import persist_shm, SHMPersisted, SHM_AVAILABLE # noqa: E402,F401 diff --git a/lenskit/sharing/binpickle.py b/lenskit/sharing/binpickle.py index be9240936..c3d850761 100644 --- a/lenskit/sharing/binpickle.py +++ b/lenskit/sharing/binpickle.py @@ -27,11 +27,11 @@ def persist_binpickle(model, dir=None, file=None): path = pathlib.Path(file) else: if dir is None: - dir = os.environ.get('LK_TEMP_DIR', None) - fd, path = tempfile.mkstemp(suffix='.bpk', prefix='lkpy-', dir=dir) + dir = os.environ.get("LK_TEMP_DIR", None) + fd, path = tempfile.mkstemp(suffix=".bpk", prefix="lkpy-", dir=dir) os.close(fd) path = pathlib.Path(path) - _log.debug('persisting %s to %s', model, path) + _log.debug("persisting %s to %s", model, path) with binpickle.BinPickler.mappable(path) as bp, sharing_mode(): bp.dump(model) return BPKPersisted(path) @@ -46,7 +46,7 @@ def __init__(self, path): def get(self): if self._bpk_file is None: - _log.debug('loading %s', self.path) + _log.debug("loading %s", self.path) self._bpk_file = binpickle.BinPickleFile(self.path, direct=True) self._model = self._bpk_file.load() return self._model @@ -55,35 +55,35 @@ def close(self, unlink=True): if self._bpk_file is not None: self._model = None try: - _log.debug('closing BPK file') + _log.debug("closing BPK file") try: self._bpk_file.close() except BufferError: - _log.debug('could not close %s, collecting garbage and retrying', self.path) + _log.debug("could not close %s, collecting garbage and retrying", self.path) gc.collect() self._bpk_file.close() except (BufferError, IOError) as e: - _log.warn('error closing %s: %s', self.path, e) + _log.warn("error closing %s: %s", self.path, e) self._bpk_file = None if self.is_owner and unlink: assert self._model is None if unlink: - _log.debug('deleting %s', self.path) + _log.debug("deleting %s", self.path) try: self.path.unlink() except IOError as e: - _log.warn('could not remove %s: %s', self.path, e) + _log.warn("could not remove %s: %s", self.path, e) self.is_owner = False def __getstate__(self): d = dict(self.__dict__) - d['_bpk_file'] = None - d['_model'] = None - if self.is_owner == 'transfer': - d['is_owner'] = True + d["_bpk_file"] = None + d["_model"] = None + if self.is_owner == "transfer": + d["is_owner"] = True else: - d['is_owner'] = False + d["is_owner"] = False return d def __del___(self): diff --git a/lenskit/sharing/shm.py b/lenskit/sharing/shm.py index 22729cd1e..1409049b2 100644 --- a/lenskit/sharing/shm.py +++ b/lenskit/sharing/shm.py @@ -4,7 +4,8 @@ from . import sharing_mode, PersistedModel import multiprocessing.shared_memory as shm -SHM_AVAILABLE = sys.platform != 'win32' + +SHM_AVAILABLE = sys.platform != "win32" _log = logging.getLogger(__name__) @@ -27,12 +28,17 @@ def persist_shm(model, dir=None): data = pickle.dumps(model, protocol=5, buffer_callback=buffers.append) total_size = sum(memoryview(b).nbytes for b in buffers) - _log.info('serialized %s to %d pickle bytes with %d buffers of %d bytes', - model, len(data), len(buffers), total_size) + _log.info( + "serialized %s to %d pickle bytes with %d buffers of %d bytes", + model, + len(data), + len(buffers), + total_size, + ) if buffers: # blit the buffers to the SHM block - _log.debug('preparing to share %d buffers', len(buffers)) + _log.debug("preparing to share %d buffers", len(buffers)) memory = shm.SharedMemory(create=True, size=total_size) cur_offset = 0 blocks = [] @@ -40,7 +46,7 @@ def persist_shm(model, dir=None): ba = buf.raw() blen = ba.nbytes bend = cur_offset + blen - _log.debug('saving %d bytes in buffer %d/%d', blen, i+1, len(buffers)) + _log.debug("saving %d bytes in buffer %d/%d", blen, i + 1, len(buffers)) memory.buf[cur_offset:bend] = ba blocks.append((cur_offset, bend)) cur_offset = bend @@ -65,7 +71,7 @@ def __init__(self, data, memory, blocks): def get(self): if self._model is None: - _log.debug('loading model from shared memory') + _log.debug("loading model from shared memory") shm = self._open() buffers = [] for bs, be in self.blocks: @@ -78,11 +84,11 @@ def get(self): def close(self, unlink=True): self._model = None - _log.debug('releasing SHM buffers') + _log.debug("releasing SHM buffers") self.buffers = None if self.memory is not None: self.memory.close() - if unlink and self.is_owner and self.is_owner != 'transfer': + if unlink and self.is_owner and self.is_owner != "transfer": self.memory.unlink() self.is_owner = False self.memory = None @@ -94,16 +100,16 @@ def _open(self): def __getstate__(self): return { - 'pickle_data': self.pickle_data, - 'blocks': self.blocks, - 'shm_name': self.shm_name, - 'is_owner': True if self.is_owner == 'transfer' else False + "pickle_data": self.pickle_data, + "blocks": self.blocks, + "shm_name": self.shm_name, + "is_owner": True if self.is_owner == "transfer" else False, } def __setstate__(self, state): self.__dict__.update(state) if self.is_owner: - _log.debug('opening shared buffers after ownership transfer') + _log.debug("opening shared buffers after ownership transfer") self._open() def __del__(self): diff --git a/lenskit/topn.py b/lenskit/topn.py index 9bb19c3b4..0ca22072b 100644 --- a/lenskit/topn.py +++ b/lenskit/topn.py @@ -16,7 +16,7 @@ def _length(df, *args, **kwargs): @bulk_impl(_length) def _bulk_length(df, *args): - return df.groupby('LKRecID')['item'].count() + return df.groupby("LKRecID")["item"].count() class RecListAnalysis: @@ -42,11 +42,11 @@ class RecListAnalysis: The columns to group by, or ``None`` to use the default. """ - DEFAULT_SKIP_COLS = ['item', 'rank', 'score', 'rating'] + DEFAULT_SKIP_COLS = ["item", "rank", "score", "rating"] def __init__(self, group_cols=None, n_jobs=None): self.group_cols = group_cols - self.metrics = [(_length, 'nrecs', {})] + self.metrics = [(_length, "nrecs", {})] self.n_jobs = n_jobs def add_metric(self, metric, *, name=None, **kwargs): @@ -86,7 +86,7 @@ def compute(self, recs, truth, *, include_missing=False): Returns: pandas.DataFrame: The results of the analysis. """ - _log.info('analyzing %d recommendations (%d truth rows)', len(recs), len(truth)) + _log.info("analyzing %d recommendations (%d truth rows)", len(recs), len(truth)) rec_key, truth_key = _df_keys(recs.columns, truth.columns, self.group_cols) @@ -95,19 +95,19 @@ def compute(self, recs, truth, *, include_missing=False): timer = Stopwatch() - _log.info('collecting metric results') + _log.info("collecting metric results") bulk_res = [] ind_metrics = [] for mf, mn, margs in self.metrics: - if hasattr(mf, 'bulk_score') and 'rank' in r_data.columns: - _log.debug('bulk-scoring %s', mn) + if hasattr(mf, "bulk_score") and "rank" in r_data.columns: + _log.debug("bulk-scoring %s", mn) mbs = mf.bulk_score(r_data, t_data, **margs).to_frame(name=mn) - assert mbs.index.name == 'LKRecID' + assert mbs.index.name == "LKRecID" bulk_res.append(mbs) else: ind_metrics.append((mf, mn, margs)) if bulk_res: - bulk_res = ft.reduce(lambda df1, df2: df1.join(df2, how='outer'), bulk_res) + bulk_res = ft.reduce(lambda df1, df2: df1.join(df2, how="outer"), bulk_res) else: bulk_res = None @@ -118,13 +118,13 @@ def worker(rdf): return res if ind_metrics: - _log.debug('applying individual metrics') - groups = r_data.groupby(['LKRecID', 'LKTruthID'], sort=False) - if hasattr(groups, 'progress_apply'): + _log.debug("applying individual metrics") + groups = r_data.groupby(["LKRecID", "LKTruthID"], sort=False) + if hasattr(groups, "progress_apply"): ind_res = groups.progress_apply(worker) else: ind_res = groups.apply(worker) - ind_res = ind_res.reset_index('LKTruthID', drop=True) + ind_res = ind_res.reset_index("LKTruthID", drop=True) if bulk_res is not None: res = bulk_res.join(ind_res) @@ -133,60 +133,60 @@ def worker(rdf): else: res = bulk_res - _log.debug('transforming results') - res = r_ident.join(res, on='LKRecID').drop(columns=['LKRecID', 'LKTruthID']) + _log.debug("transforming results") + res = r_ident.join(res, on="LKRecID").drop(columns=["LKRecID", "LKTruthID"]) - _log.info('measured %d lists in %s', len(res), timer) + _log.info("measured %d lists in %s", len(res), timer) if include_missing: - _log.info('filling in missing user info (%d initial rows)', len(res)) + _log.info("filling in missing user info (%d initial rows)", len(res)) ug_cols = [c for c in rec_key if c not in truth_key] - tcount = truth.groupby(truth_key)['item'].count().to_frame('ntruth') - _log.debug('truth counts:\n%s', tcount) + tcount = truth.groupby(truth_key)["item"].count().to_frame("ntruth") + _log.debug("truth counts:\n%s", tcount) if ug_cols: - _log.debug('regrouping by %s to fill', ug_cols) - _log.debug('pre-group series:\n%s', res) + _log.debug("regrouping by %s to fill", ug_cols) + _log.debug("pre-group series:\n%s", res) rdict = {} for key, df in res.groupby(ug_cols): - df2 = df.drop(columns=ug_cols).join(tcount, how='outer', on=truth_key) + df2 = df.drop(columns=ug_cols).join(tcount, how="outer", on=truth_key) rdict[key] = df2 res = pd.concat(rdict, names=ug_cols) - _log.debug('joined result:\n%s', res) + _log.debug("joined result:\n%s", res) res = res.reset_index(ug_cols) res.reset_index(inplace=True, drop=True) - _log.debug('final joined result:\n%s', res) + _log.debug("final joined result:\n%s", res) else: - _log.debug('no ungroup cols, directly merging to fill') - res = res.join(tcount, how='outer', on=truth_key) - _log.debug('final columns: %s', res.columns) - _log.debug('index levels: %s', res.index.names) - _log.debug('expanded to %d rows', len(res)) - res['ntruth'] = res['ntruth'].fillna(0) - res['nrecs'] = res['nrecs'].fillna(0) + _log.debug("no ungroup cols, directly merging to fill") + res = res.join(tcount, how="outer", on=truth_key) + _log.debug("final columns: %s", res.columns) + _log.debug("index levels: %s", res.index.names) + _log.debug("expanded to %d rows", len(res)) + res["ntruth"] = res["ntruth"].fillna(0) + res["nrecs"] = res["nrecs"].fillna(0) return res.set_index(rec_key) def _number_truth(self, truth, truth_key): - _log.info('numbering truth lists') + _log.info("numbering truth lists") truth_df = truth[truth_key].drop_duplicates() - truth_df['LKTruthID'] = np.arange(len(truth_df)) + truth_df["LKTruthID"] = np.arange(len(truth_df)) truth = pd.merge(truth_df, truth, on=truth_key).drop(columns=truth_key) - truth.set_index(['LKTruthID', 'item'], inplace=True) + truth.set_index(["LKTruthID", "item"], inplace=True) if not truth.index.is_unique: - _log.warn('truth index not unique: may have duplicate items\n%s', truth) + _log.warn("truth index not unique: may have duplicate items\n%s", truth) return truth_df, truth def _number_recs(self, recs, truth_key, rec_key, t_ident): - _log.info('numbering rec lists') + _log.info("numbering rec lists") rec_df = recs[rec_key].drop_duplicates() - rec_df['LKRecID'] = np.arange(len(rec_df)) - rec_df = pd.merge(rec_df, t_ident, on=truth_key, how='left') + rec_df["LKRecID"] = np.arange(len(rec_df)) + rec_df = pd.merge(rec_df, t_ident, on=truth_key, how="left") recs = pd.merge(rec_df, recs, on=rec_key).drop(columns=rec_key) return rec_df, recs @@ -199,6 +199,6 @@ def _df_keys(r_cols, t_cols, g_cols=None, skip_cols=RecListAnalysis.DEFAULT_SKIP truth_key = [c for c in g_cols if c in t_cols] rec_key = [c for c in g_cols if c not in t_cols] + truth_key - _log.info('using rec key columns %s', rec_key) - _log.info('using truth key columns %s', truth_key) + _log.info("using rec key columns %s", rec_key) + _log.info("using truth key columns %s", truth_key) return rec_key, truth_key diff --git a/lenskit/util/__init__.py b/lenskit/util/__init__.py index 8740287f6..2267e6835 100644 --- a/lenskit/util/__init__.py +++ b/lenskit/util/__init__.py @@ -20,12 +20,16 @@ _log = logging.getLogger(__name__) __all__ = [ - 'log_to_stderr', 'log_to_notebook', - 'Stopwatch', - 'read_df_detect', - 'rng', 'init_rng', 'derivable_rng', - 'proc_count', - 'clone', 'clean_str' + "log_to_stderr", + "log_to_notebook", + "Stopwatch", + "read_df_detect", + "rng", + "init_rng", + "derivable_rng", + "proc_count", + "clone", + "clean_str", ] @@ -46,8 +50,8 @@ def clone(algo): >>> copy.damping == orig.damping True """ - _log.debug('cloning %s', algo) - if isinstance(algo, Algorithm) or hasattr(algo, 'get_params'): + _log.debug("cloning %s", algo) + if isinstance(algo, Algorithm) or hasattr(algo, "get_params"): params = algo.get_params(deep=False) sps = dict([(k, clone(v)) for (k, v) in params.items()]) @@ -59,7 +63,7 @@ def clone(algo): class LastMemo: - def __init__(self, func, check_type='identity'): + def __init__(self, func, check_type="identity"): self.function = func self.check = check_type self.memory = None @@ -73,13 +77,13 @@ def __call__(self, arg): return self.result def _arg_is_last(self, arg): - if self.check == 'identity': + if self.check == "identity": return arg is self.memory - elif self.check == 'equality': + elif self.check == "equality": return arg == self.memory -def last_memo(func=None, check_type='identity'): +def last_memo(func=None, check_type="identity"): if func is None: return lambda f: LastMemo(f, check_type) else: @@ -90,7 +94,7 @@ def cached(prop): """ Decorator for property getters to cache the property value. """ - cache = '_cached_' + prop.__name__ + cache = "_cached_" + prop.__name__ def getter(self): val = getattr(self, cache, None) @@ -114,7 +118,7 @@ def max_memory(): res = resource.getrusage(resource.RUSAGE_SELF) return "%.1f MiB" % (res.ru_maxrss / 1024,) else: - return 'unknown' + return "unknown" def cur_memory(): @@ -123,7 +127,7 @@ def cur_memory(): res = resource.getrusage(resource.RUSAGE_SELF) return "%.1f MiB" % (res.ru_idrss,) else: - return 'unknown' + return "unknown" def clean_str(s): @@ -135,4 +139,5 @@ def check_env(): Check the runtime environment for potential performance or stability problems. """ from .debug import check_env + return check_env() diff --git a/lenskit/util/accum.py b/lenskit/util/accum.py index ed725c838..060da81ae 100644 --- a/lenskit/util/accum.py +++ b/lenskit/util/accum.py @@ -11,8 +11,8 @@ def _pair_downheap(pos: int, sp, limit, ks, vs): finished = False while not finished: min = pos - left = 2*pos + 1 - right = 2*pos + 2 + left = 2 * pos + 1 + right = 2 * pos + 2 if left < limit and vs[sp + left] < vs[sp + min]: min = left if right < limit and vs[sp + right] < vs[sp + min]: @@ -87,7 +87,7 @@ def kvp_minheap_sort(sp, ep, keys, vals): vals: the value array """ - for i in range(ep-1, sp, -1): + for i in range(ep - 1, sp, -1): swap(keys, i, sp) swap(vals, i, sp) - _pair_downheap(0, sp, i-sp, keys, vals) + _pair_downheap(0, sp, i - sp, keys, vals) diff --git a/lenskit/util/debug.py b/lenskit/util/debug.py index cb7d5c82d..a34a83d2f 100644 --- a/lenskit/util/debug.py +++ b/lenskit/util/debug.py @@ -52,7 +52,7 @@ def blas_info(): pools = threadpoolctl.threadpool_info() blas = None for pool in pools: - if pool['user_api'] != 'blas': + if pool["user_api"] != "blas": continue if blas is not None: @@ -60,20 +60,26 @@ def blas_info(): _log.info("later layer is: %s", pool) continue - blas = BlasInfo(pool['internal_api'], pool.get('threading_layer', None), pool.get('num_threads', None), pool['version']) + blas = BlasInfo( + pool["internal_api"], + pool.get("threading_layer", None), + pool.get("num_threads", None), + pool["version"], + ) return blas + def numba_info(): x = _par_test(100) - _log.debug('sum: %d', x) + _log.debug("sum: %d", x) try: layer = numba.threading_layer() except ValueError: - _log.info('Numba threading not initialized') + _log.info("Numba threading not initialized") return None - _log.info('numba threading layer: %s', layer) + _log.info("numba threading layer: %s", layer) nth = numba.get_num_threads() return NumbaInfo(layer, nth) @@ -91,42 +97,41 @@ def check_env(): blas = blas_info() numba = numba_info() except Exception as e: - _log.error('error inspecting runtime environment: %s', e) + _log.error("error inspecting runtime environment: %s", e) _already_checked = True return if numba is None: - _log.warning('Numba JIT seems to be disabled - this will hurt performance') + _log.warning("Numba JIT seems to be disabled - this will hurt performance") _already_checked = True return if blas is None: - _log.warning('threadpoolctl could not find your BLAS') + _log.warning("threadpoolctl could not find your BLAS") _already_checked = True return - _log.info('Using BLAS %s', blas.impl) + _log.info("Using BLAS %s", blas.impl) - if numba.threading != 'tbb': - _log.info('Numba is using threading layer %s - consider TBB', numba.threading) + if numba.threading != "tbb": + _log.info("Numba is using threading layer %s - consider TBB", numba.threading) - if numba.threading == 'tbb' and blas.threading == 'tbb': - _log.info('Numba and BLAS both using TBB - good') + if numba.threading == "tbb" and blas.threading == "tbb": + _log.info("Numba and BLAS both using TBB - good") - if numba.threading == 'tbb' and blas.impl == 'mkl' and blas.threading != 'tbb': - _log.warning('Numba using TBB but MKL is using %s', blas.threading) - _log.info('Set MKL_THREADING_LAYER=tbb for improved performance') + if numba.threading == "tbb" and blas.impl == "mkl" and blas.threading != "tbb": + _log.warning("Numba using TBB but MKL is using %s", blas.threading) + _log.info("Set MKL_THREADING_LAYER=tbb for improved performance") problems += 1 if blas.threads and blas.threads > 1 and numba.threads > 1: # TODO make this be fine in OpenMP configurations - _log.warning('BLAS using multiple threads - can cause oversubscription') - _log.info('See https://mde.one/lkpy-blas for information on tuning BLAS for LensKit') + _log.warning("BLAS using multiple threads - can cause oversubscription") + _log.info("See https://mde.one/lkpy-blas for information on tuning BLAS for LensKit") problems += 1 if problems: - _log.warning('found %d potential runtime problems - see https://boi.st/lkpy-perf', - problems) + _log.warning("found %d potential runtime problems - see https://boi.st/lkpy-perf", problems) _already_checked = True return problems @@ -144,19 +149,20 @@ def print_numba_info(): def main(): from docopt import docopt + opts = docopt(__doc__) - level = logging.DEBUG if opts['--verbose'] else logging.INFO - logging.basicConfig(level=level, stream=sys.stderr, format='%(levelname)s %(name)s %(message)s') - logging.getLogger('numba').setLevel(logging.INFO) + level = logging.DEBUG if opts["--verbose"] else logging.INFO + logging.basicConfig(level=level, stream=sys.stderr, format="%(levelname)s %(name)s %(message)s") + logging.getLogger("numba").setLevel(logging.INFO) - if opts['--blas-info']: + if opts["--blas-info"]: print_blas_info() - if opts['--numba-info']: + if opts["--numba-info"]: print_numba_info() - if opts['--check-env']: + if opts["--check-env"]: check_env() -if __name__ == '__main__': - _log = logging.getLogger('lenskit.util.debug') +if __name__ == "__main__": + _log = logging.getLogger("lenskit.util.debug") main() diff --git a/lenskit/util/log.py b/lenskit/util/log.py index 91c939518..05b05672a 100644 --- a/lenskit/util/log.py +++ b/lenskit/util/log.py @@ -16,6 +16,7 @@ class InjectHandler: "Handler that re-injects a message into parent process logging" + level = logging.DEBUG def handle(self, record): @@ -36,16 +37,16 @@ def log_to_stderr(level=logging.INFO): """ global _lts_initialized if _lts_initialized: - _log.info('log already initialized') + _log.info("log already initialized") h = logging.StreamHandler(sys.stderr) - f = logging.Formatter('[%(levelname)7s] %(name)s %(message)s') + f = logging.Formatter("[%(levelname)7s] %(name)s %(message)s") h.setFormatter(f) root = logging.getLogger() root.addHandler(h) root.setLevel(level) - _log.info('stderr logging configured') + _log.info("stderr logging configured") _lts_initialized = True @@ -55,10 +56,10 @@ def log_to_notebook(level=logging.INFO): """ global _ltn_initialized if _ltn_initialized: - _log.info('log already initialized') + _log.info("log already initialized") h = logging.StreamHandler(sys.stderr) - f = logging.Formatter('[%(levelname)7s] %(name)s %(message)s') + f = logging.Formatter("[%(levelname)7s] %(name)s %(message)s") h.setFormatter(f) h.setLevel(logging.WARNING) @@ -71,7 +72,7 @@ def log_to_notebook(level=logging.INFO): root.addHandler(oh) root.setLevel(level) - _log.info('notebook logging configured') + _log.info("notebook logging configured") _ltn_initialized = True @@ -81,6 +82,7 @@ def log_queue(): """ global _log_queue, _log_listener from lenskit.util.parallel import LKContext + ctx = LKContext.INSTANCE if _log_queue is None: _log_queue = ctx.Queue() diff --git a/lenskit/util/parallel.py b/lenskit/util/parallel.py index 25cc7fcd4..37fcff7e2 100644 --- a/lenskit/util/parallel.py +++ b/lenskit/util/parallel.py @@ -31,7 +31,7 @@ def is_worker(): def is_mp_worker(): "Query whether the current process is a multiprocessing worker." - return os.environ.get('_LK_IN_MP', 'no') == 'yes' + return os.environ.get("_LK_IN_MP", "no") == "yes" def _p5_recv(self): @@ -48,6 +48,7 @@ class FastQ(SimpleQueue): """ SimpleQueue subclass that uses Pickle5 instead of default pickling. """ + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__patch() @@ -111,7 +112,7 @@ def _initialize_mp_worker(mkey, func, threads, log_queue, seed): # deferred function unpickling to minimize imports before initialization __work_func = pickle.loads(func) - _log.debug('worker %d ready (process %s)', os.getpid(), mp.current_process()) + _log.debug("worker %d ready (process %s)", os.getpid(), mp.current_process()) def _mp_invoke_worker(*args): @@ -121,13 +122,13 @@ def _mp_invoke_worker(*args): def _sp_worker(log_queue, seed, res_queue, func, args, kwargs): _initialize_worker(log_queue, seed) - _log.debug('running %s in worker', func) + _log.debug("running %s in worker", func) try: res = func(*args, **kwargs) - _log.debug('completed successfully') + _log.debug("completed successfully") res_queue.put((True, res)) except Exception as e: - _log.error('failed, transmitting error %r', e) + _log.error("failed, transmitting error %r", e) res_queue.put((False, e)) @@ -159,9 +160,9 @@ def proc_count(core_div=2, max_default=None, level=0): int: The number of jobs desired. """ - nprocs = os.environ.get('LK_NUM_PROCS', None) + nprocs = os.environ.get("LK_NUM_PROCS", None) if nprocs is not None: - nprocs = [int(s) for s in nprocs.split(',')] + nprocs = [int(s) for s in nprocs.split(",")] elif core_div is not None: nprocs = max(mp.cpu_count() // core_div, 1) if max_default is not None: @@ -184,22 +185,22 @@ def run_sp(func, *args, **kwargs): rq = ctx.SimpleQueue() seed = derive_seed() worker_args = (log_queue(), seed, rq, func, args, kwargs) - _log.debug('spawning subprocess to run %s', func) + _log.debug("spawning subprocess to run %s", func) proc = ctx.Process(target=_sp_worker, args=worker_args) proc.start() - _log.debug('waiting for process %s to return', proc) + _log.debug("waiting for process %s to return", proc) success, payload = rq.get() - _log.debug('received success=%s', success) - _log.debug('waiting for process %s to exit', proc) + _log.debug("received success=%s", success) + _log.debug("waiting for process %s to exit", proc) proc.join() if proc.exitcode: - _log.error('subprocess failed with code %d', proc.exitcode) - raise RuntimeError('subprocess failed with code ' + str(proc.exitcode)) + _log.error("subprocess failed with code %d", proc.exitcode) + raise RuntimeError("subprocess failed with code " + str(proc.exitcode)) if success: return payload else: - _log.error('subprocess raised exception: %s', payload) - raise ChildProcessError('error in child process', payload) + _log.error("subprocess raised exception: %s", payload) + raise ChildProcessError("error in child process", payload) def invoker(model, func, n_jobs=None, *, persist_method=None): @@ -265,7 +266,7 @@ def __exit__(self, *args): class InProcessOpInvoker(ModelOpInvoker): def __init__(self, model, func): - _log.info('setting up in-process worker') + _log.info("setting up in-process worker") if isinstance(model, PersistedModel): self.model = model.get() else: @@ -285,28 +286,29 @@ class ProcessPoolOpInvoker(ModelOpInvoker): def __init__(self, model, func, n_jobs, persist_method): if isinstance(model, PersistedModel): - _log.debug('model already persisted') + _log.debug("model already persisted") key = model else: - _log.debug('persisting model with method %s', persist_method) + _log.debug("persisting model with method %s", persist_method) key = persist(model, method=persist_method) self._close_key = key - _log.debug('persisting function') + _log.debug("persisting function") func = pickle.dumps(func) ctx = LKContext.INSTANCE - _log.info('setting up ProcessPoolExecutor w/ %d workers', n_jobs) - os.environ['_LK_IN_MP'] = 'yes' + _log.info("setting up ProcessPoolExecutor w/ %d workers", n_jobs) + os.environ["_LK_IN_MP"] = "yes" kid_tc = proc_count(level=1) - self.executor = ProcessPoolExecutor(n_jobs, ctx, _initialize_mp_worker, - (key, func, kid_tc, log_queue(), get_root_seed())) + self.executor = ProcessPoolExecutor( + n_jobs, ctx, _initialize_mp_worker, (key, func, kid_tc, log_queue(), get_root_seed()) + ) def map(self, *iterables): return self.executor.map(_mp_invoke_worker, *iterables) def shutdown(self): self.executor.shutdown() - os.environ.pop('_LK_IN_MP', 'yes') + os.environ.pop("_LK_IN_MP", "yes") if self._close_key is not None: self._close_key.close() del self._close_key diff --git a/lenskit/util/random.py b/lenskit/util/random.py index 482696d69..a8e6af820 100644 --- a/lenskit/util/random.py +++ b/lenskit/util/random.py @@ -21,7 +21,7 @@ def get_root_seed(): Returns: numpy.random.SeedSequence: The LensKit root seed. """ - warnings.warn('get_root_seed is deprecated, use seedbank.root_seed', DeprecationWarning) + warnings.warn("get_root_seed is deprecated, use seedbank.root_seed", DeprecationWarning) return seedbank.root_seed() @@ -54,7 +54,7 @@ def init_rng(seed, *keys, propagate=True): Returns: The random seed. """ - warnings.warn('init_rng is deprecated, use seedbank.initialize', DeprecationWarning) + warnings.warn("init_rng is deprecated, use seedbank.initialize", DeprecationWarning) seedbank.initialize(seed, *keys) @@ -84,7 +84,7 @@ def rng(spec=None, *, legacy=False): Returns: numpy.random.Generator: A random number generator. """ - warnings.warn('rng is deprecated, use seedbank.numpy_rng', DeprecationWarning) + warnings.warn("rng is deprecated, use seedbank.numpy_rng", DeprecationWarning) if legacy: return seedbank.numpy_random_state(spec) @@ -94,6 +94,7 @@ def rng(spec=None, *, legacy=False): class FixedRNG: "RNG provider that always provides the same RNG" + def __init__(self, rng): self.rng = rng @@ -101,11 +102,12 @@ def __call__(self, *keys): return self.rng def __str__(self): - return 'Fixed({})'.format(self.rng) + return "Fixed({})".format(self.rng) class DerivingRNG: "RNG provider that derives new RNGs from the key" + def __init__(self, seed, legacy): self.seed = seed self.legacy = legacy @@ -119,7 +121,7 @@ def __call__(self, *keys): return np.random.default_rng(seed) def __str__(self): - return 'Derive({})'.format(self.seed) + return "Derive({})".format(self.seed) def derivable_rng(spec, *, legacy=False): @@ -144,12 +146,12 @@ def derivable_rng(spec, *, legacy=False): the ``legacy`` parameter). """ - if spec == 'user': + if spec == "user": return DerivingRNG(derive_seed(), legacy) elif isinstance(spec, tuple): seed, key = spec - if key != 'user': - raise ValueError('unrecognized key %s', key) + if key != "user": + raise ValueError("unrecognized key %s", key) return DerivingRNG(seed, legacy) else: return FixedRNG(rng(spec, legacy=legacy)) diff --git a/lenskit/util/test.py b/lenskit/util/test.py index a6fb2d277..e008c0dae 100644 --- a/lenskit/util/test.py +++ b/lenskit/util/test.py @@ -14,20 +14,20 @@ from lenskit.algorithms.ranking import PlackettLuce from lenskit.batch import recommend -ml_test = MovieLens('data/ml-latest-small') -ml100k = ML100K('data/ml-100k') +ml_test = MovieLens("data/ml-latest-small") +ml100k = ML100K("data/ml-100k") -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def demo_recs(): """ A demo set of train, test, and recommendation data. """ train, test = simple_test_pair(ml_test.ratings, f_rates=0.5) - users = test['user'].unique() + users = test["user"].unique() algo = PopScore() - algo = PlackettLuce(algo, rng_spec='user') + algo = PlackettLuce(algo, rng_spec="user") algo.fit(train) recs = recommend(algo, users, 500) @@ -55,5 +55,4 @@ def set_env_var(var, val): del os.environ[var] -wantjit = pytest.mark.skipif('NUMBA_DISABLE_JIT' in os.environ, - reason='JIT required') +wantjit = pytest.mark.skipif("NUMBA_DISABLE_JIT" in os.environ, reason="JIT required") diff --git a/lenskit/util/timing.py b/lenskit/util/timing.py index 1eb2188e2..760dbd610 100644 --- a/lenskit/util/timing.py +++ b/lenskit/util/timing.py @@ -5,10 +5,11 @@ import time -class Stopwatch(): +class Stopwatch: """ Timer class for recording elapsed wall time in operations. """ + start_time = None stop_time = None