Source code for beta_rec.utils.evaluation

from functools import lru_cache, wraps

import numpy as np
import pandas as pd
from sklearn.metrics import (
    explained_variance_score,
    log_loss,
    mean_absolute_error,
    mean_squared_error,
    r2_score,
    roc_auc_score,
)

from ..utils.constants import (
    DEFAULT_ITEM_COL,
    DEFAULT_K,
    DEFAULT_PREDICTION_COL,
    DEFAULT_RATING_COL,
    DEFAULT_THRESHOLD,
    DEFAULT_USER_COL,
)


[docs]class PandasHash: """Wrapper class to allow pandas objects (DataFrames or Series) to be hashable.""" # reserve space just for a single pandas object __slots__ = "pandas_object" def __init__(self, pandas_object): """Initialize PandasHash class. Args: pandas_object (pd.DataFrame|pd.Series): pandas object. """ if not isinstance(pandas_object, (pd.DataFrame, pd.Series)): raise TypeError("Can only wrap pandas DataFrame or Series objects") self.pandas_object = pandas_object def __eq__(self, other): """Overwrite equality comparison. Args: other (pd.DataFrame|pd.Series): pandas object to compare. Returns: bool: whether other object is the same as this one. """ return hash(self) == hash(other) def __hash__(self): """Overwrite hash operator for use with pandas objects. Returns: int: hashed value of object. """ hashable = tuple(self.pandas_object.values.tobytes()) if isinstance(self.pandas_object, pd.DataFrame): hashable += tuple(self.pandas_object.columns) else: hashable += tuple(self.pandas_object.name) return hash(hashable)
[docs]def has_columns(df, columns): """Check if DataFrame has necessary columns. Args: df (pd.DataFrame): DataFrame. columns (list(str): columns to check for. Returns: bool: True if DataFrame has specified columns. """ result = True for column in columns: if column not in df.columns: print("Missing column: {} in DataFrame".format(column)) result = False return result
[docs]def has_same_base_dtype(df_1, df_2, columns=None): """Check if specified columns have the same base dtypes across both DataFrames. Args: df_1 (pd.DataFrame): first DataFrame. df_2 (pd.DataFrame): second DataFrame. columns (list(str)): columns to check, None checks all columns. Returns: bool: True if DataFrames columns have the same base dtypes. """ if columns is None: if any(set(df_1.columns).symmetric_difference(set(df_2.columns))): print( "Cannot test all columns because they are not all shared across DataFrames" ) return False columns = df_1.columns if not ( has_columns(df=df_1, columns=columns) and has_columns(df=df_2, columns=columns) ): return False result = True for column in columns: if df_1[column].dtype.type.__base__ != df_2[column].dtype.type.__base__: print("Columns {} do not have the same base datatype".format(column)) result = False return result
[docs]def check_column_dtypes(func): """Check columns of DataFrame inputs. This includes the checks on 1. whether the input columns exist in the input DataFrames. 2. whether the data types of col_user as well as col_item are matched in the two input DataFrames. Args: func (function): function that will be wrapped. """ @wraps(func) def check_column_dtypes_wrapper( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, *args, **kwargs ): """Check columns of DataFrame inputs. Args: rating_true (pd.DataFrame): True data. rating_pred (pd.DataFrame): Predicted data. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. """ if not has_columns(rating_true, [col_user, col_item, col_rating]): raise ValueError("Missing columns in true rating DataFrame") if not has_columns(rating_pred, [col_user, col_item, col_prediction]): raise ValueError("Missing columns in predicted rating DataFrame") if not has_same_base_dtype( rating_true, rating_pred, columns=[col_user, col_item] ): raise ValueError("Columns in provided DataFrames are not the same datatype") return func( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, *args, **kwargs ) return check_column_dtypes_wrapper
[docs]def lru_cache_df(maxsize, typed=False): """Least-recently-used cache decorator. Args: maxsize (int|None): max size of cache, if set to None cache is boundless. typed (bool): arguments of different types are cached separately. """ def to_pandas_hash(val): """Return PandaHash object if input is a DataFrame otherwise return input unchanged.""" return PandasHash(val) if isinstance(val, pd.DataFrame) else val def from_pandas_hash(val): """Extract DataFrame if input is PandaHash object otherwise return input unchanged.""" return val.pandas_object if isinstance(val, PandasHash) else val def decorating_function(user_function): @wraps(user_function) def wrapper(*args, **kwargs): # convert DataFrames in args and kwargs to PandaHash objects args = tuple([to_pandas_hash(a) for a in args]) kwargs = {k: to_pandas_hash(v) for k, v in kwargs.items()} return cached_wrapper(*args, **kwargs) @lru_cache(maxsize=maxsize, typed=typed) def cached_wrapper(*args, **kwargs): # get DataFrames from PandaHash objects in args and kwargs args = tuple([from_pandas_hash(a) for a in args]) kwargs = {k: from_pandas_hash(v) for k, v in kwargs.items()} return user_function(*args, **kwargs) # retain lru_cache attributes wrapper.cache_info = cached_wrapper.cache_info wrapper.cache_clear = cached_wrapper.cache_clear return wrapper return decorating_function
[docs]@check_column_dtypes @lru_cache_df(maxsize=1) def merge_rating_true_pred( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, ): """Join truth and prediction data frames on userID and itemID. Joint truth and prediction DataFrames on userID and itemID and return the true and predicted rated with the correct index. Args: rating_true (pd.DataFrame): True data. rating_pred (pd.DataFrame): Predicted data. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. Returns: np.array: Array with the true ratings. np.array: Array with the predicted ratings. """ # pd.merge will apply suffixes to columns which have the same name across both dataframes suffixes = ["_true", "_pred"] rating_true_pred = pd.merge( rating_true, rating_pred, on=[col_user, col_item], suffixes=suffixes ) if col_rating in rating_pred.columns: col_rating = col_rating + suffixes[0] if col_prediction in rating_true.columns: col_prediction = col_prediction + suffixes[1] return rating_true_pred[col_rating], rating_true_pred[col_prediction]
[docs]def rmse( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, ): """Calculate Root Mean Squared Error. Args: rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs. rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. Returns: float: Root mean squared error. """ y_true, y_pred = merge_rating_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, ) return np.sqrt(mean_squared_error(y_true, y_pred))
[docs]def mae( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, ): """Calculate Mean Absolute Error. Args: rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs. rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. Returns: float: Mean Absolute Error. """ y_true, y_pred = merge_rating_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, ) return mean_absolute_error(y_true, y_pred)
[docs]def rsquared( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, ): """Calculate R squared. Args: rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs. rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. Returns: float: R squared (min=0, max=1). """ y_true, y_pred = merge_rating_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, ) return r2_score(y_true, y_pred)
[docs]def exp_var( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, ): """Calculate explained variance. Args: rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs. rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. Returns: float: Explained variance (min=0, max=1). """ y_true, y_pred = merge_rating_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, ) return explained_variance_score(y_true, y_pred)
[docs]def auc( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, ): """Calculate the Area-Under-Curve metric. Calculate the Aread-Under-Curve metric for implicit feedback typed recommender, where rating is binary and prediction is float number ranging from 0 to 1. https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve Note: The evaluation does not require a leave-one-out scenario. This metric does not calculate group-based AUC which considers the AUC scores averaged across users. It is also not limited to k. Instead, it calculates the scores on the entire prediction results regardless the users. Args: rating_true (pd.DataFrame): True data. rating_pred (pd.DataFrame): Predicted data. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. Returns: float: auc_score (min=0, max=1). """ y_true, y_pred = merge_rating_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, ) return roc_auc_score(y_true, y_pred)
[docs]def logloss( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, ): """Calculate the logloss metric. Calculate the logloss metric for implicit feedback typed recommender, where rating is binary and prediction is float number ranging from 0 to 1. https://en.wikipedia.org/wiki/Loss_functions_for_classification#Cross_entropy_loss_(Log_Loss) Args: rating_true (pd.DataFrame): True data. rating_pred (pd.DataFrame): Predicted data. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. Returns: float: log_loss_score (min=-inf, max=inf). """ y_true, y_pred = merge_rating_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, ) return log_loss(y_true, y_pred)
[docs]@check_column_dtypes @lru_cache_df(maxsize=1) def merge_ranking_true_pred( rating_true, rating_pred, col_user, col_item, col_rating, col_prediction, relevancy_method, k=DEFAULT_K, threshold=DEFAULT_THRESHOLD, ): """Filter truth and prediction data frames on common users. Args: rating_true (pd.DataFrame): True DataFrame. rating_pred (pd.DataFrame): Predicted DataFrame. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold']. k (int): number of top k items per user (optional). threshold (float): threshold of top items per user (optional). Returns: pd.DataFrame, pd.DataFrame, int: DataFrame of recommendation hits DataFrmae of hit counts vs actual relevant items per user number of unique user ids. """ # make sure the true data have no negative items rating_true = rating_true[rating_true[col_rating] >= 1] # Make sure the prediction and true data frames have the same set of users common_users = set(rating_true[col_user]).intersection(set(rating_pred[col_user])) rating_true_common = rating_true[rating_true[col_user].isin(common_users)] rating_pred_common = rating_pred[rating_pred[col_user].isin(common_users)] n_users = len(common_users) # Return hit items in prediction data frame with ranking information. This is used for calculating NDCG and MAP. # Use first to generate unique ranking values for each item. This is to align with the implementation in # Spark evaluation metrics, where index of each recommended items (the indices are unique to items) is used # to calculate penalized precision of the ordered items. if relevancy_method == "top_k": top_k = k elif relevancy_method == "by_threshold": top_k = threshold else: raise NotImplementedError("Invalid relevancy_method") df_hit = get_top_k_items( dataframe=rating_pred_common, col_user=col_user, col_rating=col_prediction, k=top_k, ) df_hit["rank"] = df_hit.groupby(col_user)[col_prediction].rank( method="first", ascending=False ) df_hit = pd.merge(df_hit, rating_true_common, on=[col_user, col_item])[ [col_user, col_item, "rank"] ] # count the number of hits vs actual relevant items per user df_hit_count = pd.merge( df_hit.groupby(col_user, as_index=False)[col_user].agg({"hit": "count"}), rating_true_common.groupby(col_user, as_index=False)[col_user].agg( {"actual": "count"} ), on=col_user, ) return df_hit, df_hit_count, n_users
[docs]def precision_at_k( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, relevancy_method="top_k", k=DEFAULT_K, threshold=DEFAULT_THRESHOLD, ): """Precision at K. Note: We use the same formula to calculate precision@k as that in Spark. More details can be found at http://spark.apache.org/docs/2.1.1/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RankingMetrics.precisionAt In particular, the maximum achievable precision may be < 1, if the number of items for a user in rating_pred is less than k. Args: rating_true (pd.DataFrame): True DataFrame. rating_pred (pd.DataFrame): Predicted DataFrame. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold']. k (int): number of top k items per user. threshold (float): threshold of top items per user (optional). Returns: float: precision at k (min=0, max=1). """ df_hit, df_hit_count, n_users = merge_ranking_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, relevancy_method=relevancy_method, k=k, threshold=threshold, ) if df_hit.shape[0] == 0: return 0.0 return (df_hit_count["hit"] / k).sum() / n_users
[docs]def recall_at_k( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, relevancy_method="top_k", k=DEFAULT_K, threshold=DEFAULT_THRESHOLD, ): """Recall at K. Args: rating_true (pd.DataFrame): True DataFrame. rating_pred (pd.DataFrame): Predicted DataFrame. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold']. k (int): number of top k items per user. threshold (float): threshold of top items per user (optional). Returns: float: recall at k (min=0, max=1). The maximum value is 1 even when fewer than k items exist for a user in rating_true. """ df_hit, df_hit_count, n_users = merge_ranking_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, relevancy_method=relevancy_method, k=k, threshold=threshold, ) if df_hit.shape[0] == 0: return 0.0 return (df_hit_count["hit"] / df_hit_count["actual"]).sum() / n_users
[docs]def ndcg_at_k( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, relevancy_method="top_k", k=DEFAULT_K, threshold=DEFAULT_THRESHOLD, ): """Compute Normalized Discounted Cumulative Gain (nDCG). Info: https://en.wikipedia.org/wiki/Discounted_cumulative_gain Args: rating_true (pd.DataFrame): True DataFrame. rating_pred (pd.DataFrame): Predicted DataFrame. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold']. k (int): number of top k items per user. threshold (float): threshold of top items per user (optional). Returns: float: nDCG at k (min=0, max=1). """ df_hit, df_hit_count, n_users = merge_ranking_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, relevancy_method=relevancy_method, k=k, threshold=threshold, ) if df_hit.shape[0] == 0: return 0.0 # calculate discounted gain for hit items df_dcg = df_hit.copy() # relevance in this case is always 1 df_dcg["dcg"] = 1 / np.log1p(df_dcg["rank"]) # sum up discount gained to get discount cumulative gain df_dcg = df_dcg.groupby(col_user, as_index=False, sort=False).agg({"dcg": "sum"}) # calculate ideal discounted cumulative gain df_ndcg = pd.merge(df_dcg, df_hit_count, on=[col_user]) df_ndcg["idcg"] = df_ndcg["actual"].apply( lambda x: sum(1 / np.log1p(range(1, min(x, k) + 1))) ) # DCG over IDCG is the normalized DCG return (df_ndcg["dcg"] / df_ndcg["idcg"]).sum() / n_users
[docs]def map_at_k( rating_true, rating_pred, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, col_prediction=DEFAULT_PREDICTION_COL, relevancy_method="top_k", k=DEFAULT_K, threshold=DEFAULT_THRESHOLD, ): """Mean Average Precision at k. The implementation of MAP is referenced from Spark MLlib evaluation metrics. https://spark.apache.org/docs/2.3.0/mllib-evaluation-metrics.html#ranking-systems A good reference can be found at: http://web.stanford.edu/class/cs276/handouts/EvaluationNew-handout-6-per.pdf Note: 1. The evaluation function is named as 'MAP is at k' because the evaluation class takes top k items for the prediction items. The naming is different from Spark. 2. The MAP is meant to calculate Avg. Precision for the relevant items, so it is normalized by the number of relevant items in the ground truth data, instead of k. Args: rating_true (pd.DataFrame): True DataFrame. rating_pred (pd.DataFrame): Predicted DataFrame. col_user (str): column name for user. col_item (str): column name for item. col_rating (str): column name for rating. col_prediction (str): column name for prediction. relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold']. k (int): number of top k items per user. threshold (float): threshold of top items per user (optional). Returns: float: MAP at k (min=0, max=1). """ df_hit, df_hit_count, n_users = merge_ranking_true_pred( rating_true=rating_true, rating_pred=rating_pred, col_user=col_user, col_item=col_item, col_rating=col_rating, col_prediction=col_prediction, relevancy_method=relevancy_method, k=k, threshold=threshold, ) if df_hit.shape[0] == 0: return 0.0 # calculate reciprocal rank of items for each user and sum them up df_hit_sorted = df_hit.sort_values([col_user, "rank"]) df_hit_sorted["rr"] = (df_hit.groupby(col_user).cumcount() + 1) / df_hit["rank"] df_hit_sorted = df_hit_sorted.groupby(col_user).agg({"rr": "sum"}).reset_index() df_merge = pd.merge(df_hit_sorted, df_hit_count, on=col_user) return (df_merge["rr"] / df_merge["actual"]).sum() / n_users
[docs]def get_top_k_items( dataframe, col_user=DEFAULT_USER_COL, col_rating=DEFAULT_RATING_COL, k=DEFAULT_K ): """Get the input customer-item-rating tuple in the format of Pandas. DataFrame, output a Pandas DataFrame in the dense format of top k items for each user. Note: if it is implicit rating, just append a column of constants to be ratings. Args: dataframe (pandas.DataFrame): DataFrame of rating data (in the format customerID-itemID-rating). col_user (str): column name for user. col_rating (str): column name for rating. k (int): number of items for each user. Returns: pd.DataFrame: DataFrame of top k items for each user. """ # Sort dataframe by col_user and (top k) col_rating top_k_items = ( dataframe.groupby(col_user, as_index=False) .apply(lambda x: x.nlargest(k, col_rating)) .reset_index(drop=True) ) # Add ranks top_k_items["rank"] = top_k_items.groupby(col_user, sort=False).cumcount() + 1 return top_k_items