import os
import random
import numpy as np
import pandas as pd
import scipy.sparse as sp
import torch
from scipy.sparse import csr_matrix
from tabulate import tabulate
from torch.utils.data import DataLoader
from ..data.data_loaders import PairwiseNegativeDataset, RatingDataset
from ..utils.alias_table import AliasTable
from ..utils.common_util import ensureDir, normalized_adj_single
from ..utils.constants import DEFAULT_ITEM_COL, DEFAULT_RATING_COL, DEFAULT_USER_COL
[docs]class BaseData(object):
r"""A plain DataBase object modeling general recommendation data. Re_index all the users and items from raw dataset.
Args:
split_dataset (train,valid,test): the split dataset, a tuple consisting of training (DataFrame),
validate/list of validate (DataFrame), testing/list of testing (DataFrame).
intersect (bool, optional): remove users and items of test/valid sets that do not exist in the train set. If the
model is able to predict for new users and new items, this can be :obj:`False`. (default: :obj:`True`).
binarize (bool, optional): binarize the rating column of train set 0 or 1, i.e. implicit feedback.
(default: :obj:`True`).
bin_thld (int, optional): the threshold of binarization (default: :obj:`0`) normalize (bool, optional): normalize
the rating column of train.
set into [0, 1], i.e. explicit feedback. (default: :obj:`False`).
"""
def __init__(
self,
split_dataset,
intersect=True,
binarize=True,
bin_thld=0.0,
normalize=False,
):
"""Initialize BaseData Class."""
self.train, self.valid, self.test = split_dataset
self.user_pool = list(self.train[DEFAULT_USER_COL].unique())
self.item_pool = list(self.train[DEFAULT_ITEM_COL].unique())
self.n_users = len(self.user_pool)
self.n_items = len(self.item_pool)
self.user_id_pool = [i for i in range(self.n_users)]
self.item_id_pool = [i for i in range(self.n_items)]
if intersect:
self._intersect()
if binarize:
self._binarize(bin_thld)
if normalize:
self._normalize()
self._re_index()
self.item_sampler = AliasTable(
self.train[DEFAULT_ITEM_COL].value_counts().to_dict()
)
self.user_sampler = AliasTable(
self.train[DEFAULT_USER_COL].value_counts().to_dict()
)
def _binarize(self, bin_thld):
"""Binarize ratings into 0 or 1, i.e. implicit feedback."""
for data in [self.train, self.valid, self.test]:
if isinstance(data, list):
for sub_data in data:
sub_data.loc[
sub_data[DEFAULT_RATING_COL] > bin_thld, DEFAULT_RATING_COL
] = 1.0
else:
data.loc[data[DEFAULT_RATING_COL] > bin_thld, DEFAULT_RATING_COL] = 1.0
def _normalize(self):
"""Normalize ratings into [0, 1] from [0, max_rating], explicit feedback."""
max_rating = self.train[DEFAULT_RATING_COL].max()
assert max_rating > 0, "All rating may be less than 0 (or not be a number)."
for data in [self.train, self.valid, self.test]:
if isinstance(data, list):
for sub_data in data:
sub_data.loc[
:, DEFAULT_RATING_COL
] = sub_data.DEFAULT_RATING_COL.apply(
lambda x: x * 1.0 / max_rating
)
else:
data.loc[:, DEFAULT_RATING_COL] = data.DEFAULT_RATING_COL.apply(
lambda x: x * 1.0 / max_rating
)
def _re_index(self):
"""Reindex for list of dataset.
For example, validate and test can be a list for evaluation.
"""
# Reindex user and item index
self.user2id = dict(zip(np.array(self.user_pool), np.arange(self.n_users)))
self.id2user = {self.user2id[k]: k for k in self.user2id}
self.item2id = dict(zip(np.array(self.item_pool), np.arange(self.n_items)))
self.id2item = {self.item2id[k]: k for k in self.item2id}
for data in [self.train, self.valid, self.test]:
if isinstance(data, list):
for sub_data in data:
# Map user_idx and item_idx
sub_data.loc[:, DEFAULT_USER_COL] = sub_data[
DEFAULT_USER_COL
].apply(lambda x: self.user2id[x])
sub_data.loc[:, DEFAULT_ITEM_COL] = sub_data[
DEFAULT_ITEM_COL
].apply(lambda x: self.item2id[x])
else:
# Map user_idx and item_idx
data.loc[:, DEFAULT_USER_COL] = data[DEFAULT_USER_COL].apply(
lambda x: self.user2id[x]
)
data.loc[:, DEFAULT_ITEM_COL] = data[DEFAULT_ITEM_COL].apply(
lambda x: self.item2id[x]
)
def _intersect(self):
"""Intersect validation and test datasets with train datasets and then reindex userID and itemID."""
for data in [self.valid, self.test]:
if isinstance(data, list):
for sub_data in data:
sub_data.drop(
sub_data[
~sub_data[DEFAULT_USER_COL].isin(self.user_pool)
].index,
inplace=True,
)
sub_data.drop(
sub_data[
~sub_data[DEFAULT_ITEM_COL].isin(self.item_pool)
].index,
inplace=True,
)
else:
data.drop(
data[~data[DEFAULT_USER_COL].isin(self.user_pool)].index,
inplace=True,
)
data.drop(
data[~data[DEFAULT_ITEM_COL].isin(self.item_pool)].index,
inplace=True,
)
print("After intersection, testing set [0] statistics")
print(
tabulate(
self.test[0].agg(["count", "nunique"])
if isinstance(self.test, list)
else self.test.agg(["count", "nunique"]),
headers=self.test[0].columns
if isinstance(self.test, list)
else self.test.columns,
tablefmt="psql",
disable_numparse=True,
)
)
print("After intersection, validation set [0] statistics")
print(
tabulate(
self.valid[0].agg(["count", "nunique"])
if isinstance(self.test, list)
else self.test.agg(["count", "nunique"]),
headers=self.test[0].columns
if isinstance(self.test, list)
else self.test.columns,
tablefmt="psql",
disable_numparse=True,
)
)
[docs] def instance_bce_loader(self, batch_size, device, num_negative):
"""Instance a train DataLoader that have rating."""
users, items, ratings = [], [], []
interact_status = (
self.train.groupby(DEFAULT_USER_COL)[DEFAULT_ITEM_COL]
.apply(set)
.reset_index()
.rename(columns={DEFAULT_ITEM_COL: "positive_items"})
)
interact_status["negative_items"] = interact_status["positive_items"].apply(
lambda x: set(self.item_id_pool) - x
)
train_ratings = pd.merge(
self.train,
interact_status[[DEFAULT_USER_COL, "negative_items"]],
on=DEFAULT_USER_COL,
)
train_ratings["negative_samples"] = train_ratings["negative_items"].apply(
lambda x: random.sample(list(x), num_negative)
)
for _, row in train_ratings.iterrows():
users.append(int(row[DEFAULT_USER_COL]))
items.append(int(row[DEFAULT_ITEM_COL]))
ratings.append(float(row[DEFAULT_RATING_COL]))
for i in range(num_negative):
users.append(int(row[DEFAULT_USER_COL]))
items.append(int(row["negative_samples"][i]))
ratings.append(float(0)) # negative samples get 0 rating
dataset = RatingDataset(
user_tensor=torch.LongTensor(users).to(device),
item_tensor=torch.LongTensor(items).to(device),
target_tensor=torch.FloatTensor(ratings).to(device),
)
print(f"Making RatingDataset of length {len(dataset)}")
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
[docs] def instance_bpr_loader(self, batch_size, device):
"""Instance a pairwise Data_loader for training.
Sample ONE negative items for each user-item pare, and shuffle them with positive items.
A batch of data in this DataLoader is suitable for a binary cross-entropy loss.
# todo implement the item popularity-biased sampling
"""
users, pos_items, neg_items = [], [], []
interact_status = (
self.train.groupby(DEFAULT_USER_COL)[DEFAULT_ITEM_COL]
.apply(set)
.reset_index()
.rename(columns={DEFAULT_ITEM_COL: "positive_items"})
)
interact_status["negative_items"] = interact_status["positive_items"].apply(
lambda x: set(self.item_id_pool) - x
)
train_ratings = pd.merge(
self.train,
interact_status[[DEFAULT_USER_COL, "negative_items"]],
on=DEFAULT_USER_COL,
)
train_ratings["negative_sample"] = train_ratings["negative_items"].apply(
lambda x: random.sample(list(x), 1)[0]
)
for _, row in train_ratings.iterrows():
users.append(row[DEFAULT_USER_COL])
pos_items.append(row[DEFAULT_ITEM_COL])
neg_items.append(row["negative_sample"])
dataset = PairwiseNegativeDataset(
user_tensor=torch.LongTensor(users).to(device),
pos_item_tensor=torch.LongTensor(pos_items).to(device),
neg_item_tensor=torch.LongTensor(neg_items).to(device),
)
print(f"Making PairwiseNegativeDataset of length {len(dataset)}")
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
[docs] def instance_mul_neg_loader(self, batch_size, device, num_negative):
"""Instance a pairwise Data_loader for training.
Sample multiples negative items for each user-item pare, and shuffle them with positive items.
A batch of data in this DataLoader is suitable for a binary cross-entropy loss.
"""
users, pos_items, neg_items = [], [], []
interact_status = (
self.train.groupby(DEFAULT_USER_COL)[DEFAULT_ITEM_COL]
.apply(set)
.reset_index()
.rename(columns={DEFAULT_ITEM_COL: "positive_items"})
)
interact_status["negative_items"] = interact_status["positive_items"].apply(
lambda x: set(self.item_id_pool) - x
)
train_ratings = pd.merge(
self.train,
interact_status[[DEFAULT_USER_COL, "negative_items"]],
on=DEFAULT_USER_COL,
)
train_ratings["negative_sample"] = train_ratings["negative_items"].apply(
lambda x: random.sample(list(x), num_negative)
)
for _, row in train_ratings.iterrows():
users.append(row[DEFAULT_USER_COL])
pos_items.append(row[DEFAULT_ITEM_COL])
neg_items.append(row["negative_sample"])
dataset = PairwiseNegativeDataset(
user_tensor=torch.LongTensor(users).to(device),
pos_item_tensor=torch.LongTensor(pos_items).to(device),
neg_item_tensor=torch.LongTensor(neg_items).to(device),
)
print(f"Making PairwiseNegativeDataset of length {len(dataset)}")
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
[docs] def get_adj_mat(self, config):
"""Get the adjacent matrix, if not previously stored then call the function to create.
This method is for NGCF model.
Returns:
Different types of adjacment matrix.
"""
process_file_name = (
"ngcf_"
+ config["dataset"]["dataset"]
+ "_"
+ config["dataset"]["data_split"]
+ (
("_" + str(config["dataset"]["percent"] * 100))
if "percent" in config
else ""
)
)
process_path = os.path.join(
config["system"]["process_dir"],
config["dataset"]["dataset"] + "/",
)
process_file_name = os.path.join(process_path, process_file_name)
ensureDir(process_file_name)
print(process_file_name)
try:
adj_mat = sp.load_npz(os.path.join(process_file_name, "s_adj_mat.npz"))
norm_adj_mat = sp.load_npz(
os.path.join(process_file_name, "s_norm_adj_mat.npz")
)
mean_adj_mat = sp.load_npz(
os.path.join(process_file_name, "s_mean_adj_mat.npz")
)
print("already load adj matrix", adj_mat.shape)
except Exception:
adj_mat, norm_adj_mat, mean_adj_mat = self.create_adj_mat()
sp.save_npz(os.path.join(process_file_name, "s_adj_mat.npz"), adj_mat)
sp.save_npz(
os.path.join(process_file_name, "s_norm_adj_mat.npz"), norm_adj_mat
)
sp.save_npz(
os.path.join(process_file_name, "s_mean_adj_mat.npz"), mean_adj_mat
)
return adj_mat, norm_adj_mat, mean_adj_mat
[docs] def create_adj_mat(self):
"""Create adjacent matirx from the user-item interaction matrix."""
adj_mat = sp.dok_matrix(
(self.n_users + self.n_items, self.n_users + self.n_items), dtype=np.float32
)
adj_mat = adj_mat.tolil()
R = sp.dok_matrix((self.n_users, self.n_items), dtype=np.float32)
user_np = np.array(self.train[DEFAULT_USER_COL])
item_np = np.array(self.train[DEFAULT_ITEM_COL])
for u in range(self.n_users):
index = list(np.where(user_np == u)[0])
i = item_np[index]
for item in i:
R[u, item] = 1
R = R.tolil()
adj_mat[: self.n_users, self.n_users :] = R
adj_mat[self.n_users :, : self.n_users] = R.T
adj_mat = adj_mat.todok()
print("already create adjacency matrix", adj_mat.shape)
norm_adj_mat = normalized_adj_single(adj_mat + sp.eye(adj_mat.shape[0]))
mean_adj_mat = normalized_adj_single(adj_mat)
print("already normalize adjacency matrix")
return adj_mat.tocsr(), norm_adj_mat.tocsr(), mean_adj_mat.tocsr()
[docs] def get_constraint_mat(self, config):
"""Get the adjacent matrix, if not previously stored then call the function to create.
This method is for NGCF model.
Returns:
Different types of adjacment matrix.
"""
process_file_name = (
"ultragcn_"
+ config["dataset"]["dataset"]
+ "_"
+ config["dataset"]["data_split"]
+ (
("_" + str(config["dataset"]["percent"] * 100))
if "percent" in config
else ""
)
)
process_path = os.path.join(
config["system"]["process_dir"],
config["dataset"]["dataset"] + "/",
)
process_file_name = os.path.join(process_path, process_file_name)
ensureDir(process_file_name)
print(process_file_name)
try:
train_mat = sp.load_npz(os.path.join(process_file_name, "train_mat.npz"))
constraint_mat = np.load(
os.path.join(process_file_name, "constraint_mat.npz")
)
beta_uD = constraint_mat["beta_uD"]
beta_iD = constraint_mat["beta_iD"]
print("already load train mat and constraint mat")
except Exception:
train_mat, beta_uD, beta_iD = self.create_constraint_mat()
sp.save_npz(os.path.join(process_file_name, "train_mat.npz"), train_mat)
np.savez_compressed(
os.path.join(process_file_name, "constraint_mat.npz"),
beta_uD=beta_uD,
beta_iD=beta_iD,
)
constraint_mat = {"beta_uD": beta_uD, "beta_iD": beta_iD}
return train_mat, constraint_mat
[docs] def create_constraint_mat(self):
"""Create adjacent matirx from the user-item interaction matrix."""
train_mat = sp.dok_matrix((self.n_users, self.n_items), dtype=np.float32)
user_np = np.array(self.train[DEFAULT_USER_COL])
item_np = np.array(self.train[DEFAULT_ITEM_COL])
for u in range(self.n_users):
index = list(np.where(user_np == u)[0])
i = item_np[index]
for item in i:
train_mat[u, item] = 1.0
items_D = np.sum(train_mat, axis=0).reshape(-1)
users_D = np.sum(train_mat, axis=1).reshape(-1)
beta_uD = (np.sqrt(users_D + 1) / users_D).reshape(-1, 1)
beta_iD = (1 / np.sqrt(items_D + 1)).reshape(1, -1)
# constraint_mat = {"beta_uD": beta_uD.reshape(-1),
# "beta_iD": beta_iD.reshape(-1)}
return train_mat.tocsr(), beta_uD.reshape(-1), beta_iD.reshape(-1)
[docs] def create_sgl_mat(self, config):
"""Create adjacent matirx from the user-item interaction matrix."""
n_nodes = self.n_users + self.n_items
is_subgraph = config["model"]["is_subgraph"]
aug_type = config["model"]["aug_type"]
ssl_ratio = config["model"]["ssl_ratio"]
user_np = np.array(self.train[DEFAULT_USER_COL])
item_np = np.array(self.train[DEFAULT_ITEM_COL])
if is_subgraph and aug_type in [0, 1, 2] and ssl_ratio > 0:
# data augmentation type --- 0: Node Dropout; 1: Edge Dropout; 2: Random Walk
if aug_type == 0:
drop_user_idx = self.randint_choice(
self.n_users, size=self.n_users * ssl_ratio, replace=False
)
drop_item_idx = self.randint_choice(
self.n_items, size=self.n_items * ssl_ratio, replace=False
)
indicator_user = np.ones(self.n_users, dtype=np.float32)
indicator_item = np.ones(self.n_items, dtype=np.float32)
indicator_user[drop_user_idx] = 0.0
indicator_item[drop_item_idx] = 0.0
diag_indicator_user = sp.diags(indicator_user)
diag_indicator_item = sp.diags(indicator_item)
R = sp.csr_matrix(
(np.ones_like(user_np, dtype=np.float32), (user_np, item_np)),
shape=(self.n_users, self.n_items),
)
R_prime = diag_indicator_user.dot(R).dot(diag_indicator_item)
(user_np_keep, item_np_keep) = R_prime.nonzero()
ratings_keep = R_prime.data
tmp_adj = sp.csr_matrix(
(ratings_keep, (user_np_keep, item_np_keep + self.n_users)),
shape=(n_nodes, n_nodes),
)
if aug_type in [1, 2]:
keep_idx = self.randint_choice(
len(user_np),
size=int(len(user_np) * (1 - ssl_ratio)),
replace=False,
)
user_keep_np = np.array(user_np)[keep_idx]
item_keep_np = np.array(item_np)[keep_idx]
ratings = np.ones_like(user_keep_np, dtype=np.float32)
tmp_adj = sp.csr_matrix(
(ratings, (user_keep_np, item_keep_np + self.n_users)),
shape=(n_nodes, n_nodes),
)
else:
ratings = np.ones_like(user_np, dtype=np.float32)
tmp_adj = sp.csr_matrix(
(ratings, (user_np, item_np + self.n_users)), shape=(n_nodes, n_nodes)
)
adj_mat = tmp_adj + tmp_adj.T
# pre adjcency matrix
rowsum = np.array(adj_mat.sum(1))
d_inv = np.power(rowsum, -0.5).flatten()
d_inv[np.isinf(d_inv)] = 0.0
d_mat_inv = sp.diags(d_inv)
norm_adj_tmp = d_mat_inv.dot(adj_mat)
adj_matrix = norm_adj_tmp.dot(d_mat_inv)
# print('use the pre adjcency matrix')
return adj_matrix
[docs] def randint_choice(self, high, size=None, replace=True, p=None, exclusion=None):
"""Return random integers from `0` (inclusive) to `high` (exclusive)."""
a = np.arange(high)
if exclusion is not None:
if p is None:
p = np.ones_like(a)
else:
p = np.array(p, copy=True)
p = p.flatten()
p[exclusion] = 0
if p is not None:
p = p / np.sum(p)
sample = np.random.choice(a, size=size, replace=replace, p=p)
return sample
[docs] def instance_vae_loader(self, device):
"""Instance a train DataLoader that have rating."""
users = list(self.train[DEFAULT_USER_COL])
items = list(self.train[DEFAULT_ITEM_COL])
ratings = list(self.train[DEFAULT_RATING_COL])
dataset = RatingDataset(
user_tensor=torch.LongTensor(users).to(device),
item_tensor=torch.LongTensor(items).to(device),
target_tensor=torch.FloatTensor(ratings).to(device),
)
uids = self.user_id_pool
user_indices = np.fromiter(uids, dtype=np.int)
matrix = csr_matrix(
(ratings, (users, items)),
shape=(self.n_users, self.n_items),
)
print(f"Making RatingDataset of length {len(dataset)}")
return user_indices, matrix