Source code for beta_rec.datasets.data_split

import math
import os
import time

import numpy as np
import pandas as pd
import sklearn
from tabulate import tabulate
from tqdm import tqdm

from ..utils.alias_table import AliasTable
from ..utils.common_util import get_dataframe_from_npz, save_dataframe_as_npz
from ..utils.constants import (
    DEFAULT_FLAG_COL,
    DEFAULT_ITEM_COL,
    DEFAULT_ORDER_COL,
    DEFAULT_RATING_COL,
    DEFAULT_TIMESTAMP_COL,
    DEFAULT_USER_COL,
)


[docs]def filter_by_count(df, group_col, filter_col, num): """Filter out the group_col column values that have a less than num count of filter_col. Args: df (DataFrame): interaction DataFrame to be processed. group_col (string): column name to be filtered. filter_col (string): column with the filter condition. num (int): minimum count condition that should be filter out. Returns: DataFrame: The filtered interactions. """ ordercount = ( df.groupby([group_col])[filter_col].nunique().rename("count").reset_index() ) filter_df = df[ df[group_col].isin(ordercount[ordercount["count"] >= num][group_col]) ] return filter_df
[docs]def check_data_available(data): """Check if a dataset is available after filtering. Check whether a given dataset is available for later use. Args: data (DataFrame): interaction DataFrame to be processed. Raises: RuntimeError: An error occurred it there is no interaction. """ if len(data.index) < 1: raise RuntimeError( "This dataset contains no interaction after filtering. Please check the default filter setup of this split!" )
[docs]def filter_user_item(df, min_u_c=5, min_i_c=5): """Filter data by the minimum purchase number of items and users. Args: df (DataFrame): interaction DataFrame to be processed. min_u_c (int): filter the items that were purchased by less than min_u_c users. (default: :obj:`5`) min_i_c (int): filter the users that have purchased by less than min_i_c items. (default: :obj:`5`) Returns: DataFrame: The filtered interactions """ print(f"filter_user_item under condition min_u_c={min_u_c}, min_i_c={min_i_c}") print("-" * 80) print("Dataset statistics before filter") print( tabulate( df.agg(["count", "nunique"]), headers=df.columns, tablefmt="psql", disable_numparse=True, ) ) n_interact = len(df.index) while True: # Filter out users that have less than min_i_c interactions (items) if min_i_c > 0: df = filter_by_count(df, DEFAULT_USER_COL, DEFAULT_ITEM_COL, min_i_c) # Filter out items that have less than min_u_c users if min_u_c > 0: df = filter_by_count(df, DEFAULT_ITEM_COL, DEFAULT_USER_COL, min_u_c) new_n_interact = len(df.index) if n_interact != new_n_interact: n_interact = new_n_interact else: break # no change check_data_available(df) print("Dataset statistics after filter") print( tabulate( df.agg(["count", "nunique"]), headers=df.columns, tablefmt="psql", disable_numparse=True, ) ) print("-" * 80) return df
[docs]def filter_user_item_order(df, min_u_c=5, min_i_c=5, min_o_c=5): """Filter data by the minimum purchase number of items and users. Args: df (DataFrame): interaction DataFrame to be processed. min_u_c: filter the items that were purchased by less than min_u_c users. (default: :obj:`5`) min_i_c: filter the users that have purchased by less than min_i_c items. (default: :obj:`5`) min_o_c: filter the users that have purchased by less than min_o_c orders. (default: :obj:`5`) Returns: The filtered DataFrame. """ print( f"filter_user_item_order under condition min_u_c={min_u_c}, min_i_c={min_i_c}, min_o_c={min_o_c}" ) print("-" * 80) print("Dataset statistics before filter") print( tabulate( df.agg(["count", "nunique"]), headers=df.columns, tablefmt="psql", disable_numparse=True, ) ) n_interact = len(df.index) while True: # Filter out users by that have less than min_o_c purchased orders if min_o_c > 0: df = filter_by_count(df, DEFAULT_USER_COL, DEFAULT_ORDER_COL, min_o_c) # Filter out users that have less than min_i_c interactions (items) if min_i_c > 0: df = filter_by_count(df, DEFAULT_USER_COL, DEFAULT_ITEM_COL, min_i_c) # Filter out items that have less than min_u_c users if min_u_c > 0: df = filter_by_count(df, DEFAULT_ITEM_COL, DEFAULT_USER_COL, min_u_c) new_n_interact = len(df.index) if n_interact != new_n_interact: n_interact = new_n_interact else: break # no change check_data_available(df) print("Dataset statistics after filter") print( tabulate( df.agg(["count", "nunique"]), headers=df.columns, tablefmt="psql", disable_numparse=True, ) ) print("-" * 80) return df
[docs]def feed_neg_sample(data, negative_num, item_sampler): """Sample negative items for a interaction DataFrame. Args: data (DataFrame): interaction DataFrame to be processed. negative_num (int): number of negative items. if negative_num<0, will keep all the negative items for each user. item_sampler (AliasTable): a AliasTable sampler that contains the items. Returns: DataFrame: interaction DataFrame with a new 'flag' column labeling with "train", "test" or "valid". """ unique_item_set = set(data[DEFAULT_ITEM_COL].unique()) unique_rating_num = data[DEFAULT_RATING_COL].nunique() interact_status = ( data.groupby([DEFAULT_USER_COL])[DEFAULT_ITEM_COL].apply(set).reset_index() ) total_interact = pd.DataFrame( {DEFAULT_USER_COL: [], DEFAULT_ITEM_COL: [], DEFAULT_RATING_COL: []}, dtype=np.long, ) for index, user_items in interact_status.iterrows(): u = user_items[DEFAULT_USER_COL] pos_items = set(user_items[DEFAULT_ITEM_COL]) # item set for user u pos_items_li = list(pos_items) # the positive items should be unique n_pos_items = len(pos_items_li) # number of positive item for user u if negative_num < 0: # keep all the negative items neg_items_li = list(unique_item_set - pos_items) n_neg_items = len(neg_items_li) else: # only keep negative_num negative items n_neg_items = negative_num neg_items = set(item_sampler.sample(negative_num + n_pos_items, 1, True)) neg_items_li = list(neg_items - pos_items)[:negative_num] # filter the positive items and truncate the first negative_num df_items = np.append(pos_items_li, neg_items_li) df_users = np.array([u] * (n_pos_items + n_neg_items), dtype=type(u)) pos_rating = [] if unique_rating_num != 1: # get the rating scores. for item in pos_items_li: pos_rating.append( data.loc[ (data[DEFAULT_USER_COL] == u) & (data[DEFAULT_ITEM_COL] == item), DEFAULT_RATING_COL, ].to_numpy()[0] ) else: pos_rating = np.full(n_pos_items, 1) neg_rating = np.zeros(n_neg_items, dtype=np.long) df_ratings = np.append(pos_rating, neg_rating) df = pd.DataFrame( { DEFAULT_USER_COL: df_users, DEFAULT_ITEM_COL: df_items, DEFAULT_RATING_COL: df_ratings, } ) total_interact = total_interact.append(df) # shuffle interactions to avoid all the negative samples being together total_interact = sklearn.utils.shuffle(total_interact) return total_interact
[docs]def load_split_data(path, n_test=10): """Load split DataFrame from a specified path. Args: path (string): split data path. n_test: number of testing and validation datasets. If n_test==0, will load the original (no negative items) valid and test datasets. Returns: (DataFrame, list(DataFrame), list(DataFrame)): DataFrame of training interaction, DataFrame list of validation interaction, DataFrame list of testing interaction, """ train_file = os.path.join(path, "train.npz") train_data = get_dataframe_from_npz(train_file) print("-" * 80) print("Loaded training set statistics") print( tabulate( train_data.agg(["count", "nunique"]), headers=train_data.columns, tablefmt="psql", disable_numparse=True, ) ) if not n_test: valid_df = get_dataframe_from_npz(os.path.join(path, "valid.npz")) test_df = get_dataframe_from_npz(os.path.join(path, "test.npz")) print("Loaded validation set statistics") print( tabulate( valid_df.agg(["count", "nunique"]), headers=valid_df.columns, tablefmt="psql", disable_numparse=True, ) ) print("Loaded testing set statistics") print( tabulate( test_df.agg(["count", "nunique"]), headers=test_df.columns, tablefmt="psql", disable_numparse=True, ) ) print("-" * 80) return train_data, valid_df, test_df valid_data_li = [] test_data_li = [] for i in range(n_test): valid_df = get_dataframe_from_npz(os.path.join(path, f"valid_{i}.npz")) valid_data_li.append(valid_df) if i == 0: print(f"valid_data_{i} statistics") print( tabulate( valid_df.agg(["count", "nunique"]), headers=valid_df.columns, tablefmt="psql", disable_numparse=True, ) ) test_df = get_dataframe_from_npz(os.path.join(path, f"test_{i}.npz")) test_data_li.append(test_df) if i == 0: print(f"test_data_{i} statistics") print( tabulate( test_df.agg(["count", "nunique"]), headers=test_df.columns, tablefmt="psql", disable_numparse=True, ) ) print("-" * 80) return train_data, valid_data_li, test_data_li
[docs]def save_split_data( data, base_dir, data_split="leave_one_basket", parameterized_dir=None, suffix="train.npz", ): """Save DataFrame to compressed npz. Args: data (DataFrame): interaction DataFrame to be saved. parameterized_dir (string): data_split parameter string. suffix (string): suffix of the data to be saved. base_dir (string): directory to save. data_split (string): sub folder name for saving the data. """ data_file = os.path.join(base_dir, data_split) if not os.path.exists(data_file): os.makedirs(data_file) data_file = os.path.join(data_file, parameterized_dir) if not os.path.exists(data_file): os.makedirs(data_file) data_file = os.path.join(data_file, suffix) save_dataframe_as_npz(data, data_file) print("Data is dumped in :", data_file)
[docs]def random_split(data, test_rate=0.1, by_user=False): """random_basket_split. Args: data (DataFrame): interaction DataFrame to be split. test_rate (float): percentage of the test data. Note that percentage of the validation data will be the same as testing. by_user (bool): Default False. - Ture: user-based split, - False: global split, Returns: DataFrame: DataFrame that have already by labeled by a col with "train", "test" or "valid". """ print("random_split") data[DEFAULT_FLAG_COL] = "train" if by_user: users = data[DEFAULT_USER_COL].unique() for u in tqdm(users): interactions = data[data[DEFAULT_USER_COL] == u].index.values # numpy array interactions = sklearn.utils.shuffle(interactions) total_size = len(interactions) validate_size = math.ceil(total_size * test_rate) test_size = math.ceil(total_size * test_rate) train_size = total_size - test_size data.loc[ interactions[train_size:], DEFAULT_FLAG_COL, ] = "test" # the last test_rate of the total orders to be the test set data.loc[ interactions[train_size - validate_size : train_size], DEFAULT_FLAG_COL, ] = "validate" else: interactions = data.index.values # numpy array interactions = sklearn.utils.shuffle(interactions) total_size = len(interactions) validate_size = math.ceil(total_size * test_rate) test_size = math.ceil(total_size * test_rate) train_size = total_size - test_size data.loc[ interactions[train_size:], DEFAULT_FLAG_COL, ] = "test" # the last test_rate of the total orders to be the test set data.loc[ interactions[train_size - validate_size : train_size], DEFAULT_FLAG_COL, ] = "validate" return data
[docs]def random_basket_split(data, test_rate=0.1, by_user=False): """random_basket_split. Args: data (DataFrame): interaction DataFrame to be split. test_rate (float): percentage of the test data. Note that percentage of the validation data will be the same as testing. by_user (bool): Default False. - True: user-based split, - False: global split, Returns: DataFrame: DataFrame that have already by labeled by a col with "train", "test" or "valid". """ print("random_basket_split") data[DEFAULT_FLAG_COL] = "train" if by_user: users = data[DEFAULT_USER_COL].unique() for u in tqdm(users): orders = data[data[DEFAULT_USER_COL] == u][DEFAULT_ORDER_COL].unique() orders = sklearn.utils.shuffle(orders) total_size = len(orders) validate_size = math.ceil(total_size * test_rate) test_size = math.ceil(total_size * test_rate) train_size = total_size - test_size data.loc[ data[DEFAULT_ORDER_COL].isin(orders[train_size:]), DEFAULT_FLAG_COL, ] = "test" # the last test_rate of the total orders to be the test set data.loc[ data[DEFAULT_ORDER_COL].isin( orders[train_size - validate_size : train_size] ), DEFAULT_FLAG_COL, ] = "validate" else: orders = data[DEFAULT_ORDER_COL].unique() orders = sklearn.utils.shuffle(orders) total_size = len(orders) validate_size = math.ceil(total_size * test_rate) test_size = math.ceil(total_size * test_rate) train_size = total_size - test_size data.loc[ data[DEFAULT_ORDER_COL].isin(orders[train_size:]), DEFAULT_FLAG_COL, ] = "test" # the last test_rate of the total orders to be the test set data.loc[ data[DEFAULT_ORDER_COL].isin( orders[train_size - validate_size : train_size] ), DEFAULT_FLAG_COL, ] = "validate" return data
[docs]def leave_one_out(data, random=False): """leave_one_out split. Args: data (DataFrame): interaction DataFrame to be split. random (bool): Whether randomly leave one item/basket as testing. only for leave_one_out and leave_one_basket. Returns: DataFrame: DataFrame that have already by labeled by a col with "train", "test" or "valid". """ start_time = time.time() print("leave_one_out") data[DEFAULT_FLAG_COL] = "train" if random: data = sklearn.utils.shuffle(data) else: data.sort_values(by=[DEFAULT_TIMESTAMP_COL], ascending=False, inplace=True) data.loc[ data.groupby([DEFAULT_USER_COL]).head(2).index, DEFAULT_FLAG_COL ] = "validate" data.loc[data.groupby([DEFAULT_USER_COL]).head(1).index, DEFAULT_FLAG_COL] = "test" end_time = time.time() print(f"leave_one_out time cost: {end_time - start_time}") return data
[docs]def leave_one_basket(data, random=False): """leave_one_basket split. Args: data (DataFrame): interaction DataFrame to be split. random (bool): Whether randomly leave one item/basket as testing. only for leave_one_out and leave_one_basket. Returns: DataFrame: DataFrame that have already by labeled by a col with "train", "test" or "valid". """ print("leave_one_basket") data[DEFAULT_FLAG_COL] = "train" if random: data = sklearn.utils.shuffle(data) else: data.sort_values(by=[DEFAULT_TIMESTAMP_COL], inplace=True) users = data[DEFAULT_USER_COL].unique() for u in tqdm(users): user_orders = data[data[DEFAULT_USER_COL] == u][DEFAULT_ORDER_COL].unique() data.loc[data[DEFAULT_ORDER_COL] == user_orders[-1], DEFAULT_FLAG_COL] = "test" data.loc[ data[DEFAULT_ORDER_COL] == user_orders[-2], DEFAULT_FLAG_COL ] = "validate" return data
[docs]def temporal_split(data, test_rate=0.1, by_user=False): """temporal_split. Args: data (DataFrame): interaction DataFrame to be split. test_rate (float): percentage of the test data. Note that percentage of the validation data will be the same as testing. by_user (bool): bool. Default False. - True: user-based split, - False: global split, Returns: DataFrame: DataFrame that have already by labeled by a col with "train", "test" or "valid". """ print("temporal_split") data[DEFAULT_FLAG_COL] = "train" data.sort_values(by=[DEFAULT_TIMESTAMP_COL], inplace=True) if by_user: users = data[DEFAULT_USER_COL].unique() for u in tqdm(users): interactions = data[data[DEFAULT_USER_COL] == u].index.values total_size = len(interactions) validate_size = math.ceil(total_size * test_rate) test_size = math.ceil(total_size * test_rate) train_size = total_size - test_size data.loc[ interactions[train_size:], DEFAULT_FLAG_COL, ] = "test" # the last test_rate of the total orders to be the test set data.loc[ interactions[train_size - validate_size : train_size], DEFAULT_FLAG_COL, ] = "validate" else: interactions = data.index.values total_size = len(interactions) validate_size = math.ceil(total_size * test_rate) test_size = math.ceil(total_size * test_rate) train_size = total_size - test_size data.loc[ interactions[train_size:], DEFAULT_FLAG_COL, ] = "test" # the last test_rate of the total orders to be the test set data.loc[ interactions[train_size - validate_size : train_size], DEFAULT_FLAG_COL, ] = "validate" return data
[docs]def temporal_basket_split(data, test_rate=0.1, by_user=False): """temporal_basket_split. Args: data (DataFrame): interaction DataFrame to be split. It must have a col DEFAULT_ORDER_COL. test_rate (float): percentage of the test data. Note that percentage of the validation data will be the same as testing. by_user (bool): Default False. - True: user-based split, - False: global split, Returns: DataFrame: DataFrame that have already by labeled by a col with "train", "test" or "valid". """ print("temporal_split_basket") data[DEFAULT_FLAG_COL] = "train" data.sort_values(by=[DEFAULT_TIMESTAMP_COL], inplace=True) if by_user: users = data[DEFAULT_USER_COL].unique() for u in tqdm(users): orders = data[data[DEFAULT_USER_COL] == u][DEFAULT_ORDER_COL].unique() total_size = len(orders) validate_size = math.ceil(total_size * test_rate) test_size = math.ceil(total_size * test_rate) train_size = total_size - test_size data.loc[ data[DEFAULT_ORDER_COL].isin(orders[train_size:]), DEFAULT_FLAG_COL, ] = "test" # the last test_rate of the total orders to be the test set data.loc[ data[DEFAULT_ORDER_COL].isin( orders[train_size - validate_size : train_size] ), DEFAULT_FLAG_COL, ] = "validate" else: orders = data[DEFAULT_ORDER_COL].unique() total_size = len(orders) validate_size = math.ceil(total_size * test_rate) test_size = math.ceil(total_size * test_rate) train_size = total_size - test_size data.loc[ data[DEFAULT_ORDER_COL].isin(orders[train_size:]), DEFAULT_FLAG_COL, ] = "test" # the last test_rate of the total orders to be the test set data.loc[ data[DEFAULT_ORDER_COL].isin( orders[train_size - validate_size : train_size] ), DEFAULT_FLAG_COL, ] = "validate" return data
[docs]def split_data( data, split_type, test_rate, random=False, n_negative=100, save_dir=None, by_user=False, n_test=10, ): """Split data by split_type and other parameters. Args: data (DataFrame): interaction DataFrame to be split split_type (string): options can be: - random - random_basket - leave_one_out - leave_one_basket - temporal - temporal_basket random (bool): Whether random leave one item/basket as testing. only for leave_one_out and leave_one_basket. test_rate (float): percentage of the test data. Note that percentage of the validation data will be the same as testing. n_negative (int): Number of negative samples for testing and validation data. save_dir (string or Path): Default None. If specified, the split data will be saved to the dir. by_user (bool): Default False. - True: user-based split, - False: global split, n_test (int): Default 10. The number of testing and validation copies. Returns: DataFrame: The split data. Note that the returned data will not have negative samples. """ print(f"Splitting data by {split_type} ...") if n_negative < 0 and n_test > 1: # n_negative < 0, validate and testing sets of splits will contain all the negative items. # There will be only one validate and one testing sets. n_test = 1 if split_type == "random": data = random_split(data, test_rate, by_user) elif split_type == "random_basket": data = random_basket_split(data, test_rate, by_user) elif split_type == "leave_one_out": data = leave_one_out(data, random) elif split_type == "leave_one_basket": data = leave_one_basket(data, random) elif split_type == "temporal": data = temporal_split(data, test_rate, by_user) elif split_type == "temporal_basket": data = temporal_basket_split(data, test_rate, by_user) else: print("[ERROR] wrong split_type.") return None tp_train = data[data[DEFAULT_FLAG_COL] == "train"] tp_validate = data[data[DEFAULT_FLAG_COL] == "validate"] tp_test = data[data[DEFAULT_FLAG_COL] == "test"] if save_dir is None: return data parameterized_path = generate_parameterized_path( test_rate=test_rate, random=random, n_negative=n_negative, by_user=by_user ) save_split_data(tp_train, save_dir, split_type, parameterized_path, "train.npz") # keep the original validation and test sets. save_split_data(tp_validate, save_dir, split_type, parameterized_path, "valid.npz") save_split_data(tp_test, save_dir, split_type, parameterized_path, "test.npz") item_sampler = AliasTable(data[DEFAULT_ITEM_COL].value_counts().to_dict()) n_items = tp_train[DEFAULT_ITEM_COL].nunique() valid_neg_max = ( tp_validate.groupby([DEFAULT_USER_COL])[DEFAULT_ITEM_COL].count().max() ) test_neg_max = tp_test.groupby([DEFAULT_USER_COL])[DEFAULT_ITEM_COL].count().max() if n_items - valid_neg_max < n_negative or n_items - test_neg_max < n_negative: raise RuntimeError( "This dataset do not have sufficient negative items for sampling! \n" + f"valid_neg_max: {n_items - valid_neg_max}, " + f"test_neg_max: {n_items - test_neg_max}," + f"n_negative: {n_negative}\nPlease directly use valid.npz and test.npz." ) for i in range(n_test): tp_validate_new = feed_neg_sample(tp_validate, n_negative, item_sampler) tp_test_new = feed_neg_sample(tp_test, n_negative, item_sampler) save_split_data( tp_validate_new, save_dir, split_type, parameterized_path, "valid_" + str(i) + ".npz", ) save_split_data( tp_test_new, save_dir, split_type, parameterized_path, "test_" + str(i) + ".npz", ) return data
[docs]def generate_random_data(n_interaction, user_id, item_id): """Generate random data for testing. Generate random data for unit test. """ oder_id = 10 users = np.random.randint(user_id, size=n_interaction) orders = np.random.randint(oder_id, size=n_interaction) * 100 + users timestamps = orders items = np.random.randint(item_id, size=n_interaction) ratings = np.array([1] * n_interaction) data = { DEFAULT_USER_COL: users, DEFAULT_ORDER_COL: orders, DEFAULT_TIMESTAMP_COL: timestamps, DEFAULT_ITEM_COL: items, DEFAULT_RATING_COL: ratings, } data = pd.DataFrame(data) return data
[docs]def generate_parameterized_path( test_rate=0, random=False, n_negative=100, by_user=False ): """Generate parameterized path. Encode parameters into path to differentiate different split parameters. Args: by_user (bool): split by user. test_rate (float): percentage of the test data. Note that percentage of the validation data will be the same as testing. random (bool): Whether random leave one item/basket as testing. only for leave_one_out and leave_one_basket. n_negative (int): Number of negative samples for testing and validation data. Returns: string: A string that encodes parameters. """ path_str = "" if by_user: path_str = "user_based" + path_str else: path_str = "full" + path_str test_rate *= 100 test_rate = round(test_rate) path_str += "_test_rate_" + str(test_rate) if test_rate != 0 else "" path_str += "_random" if random is True else "" path_str += "_n_neg_" + str(n_negative) return path_str