import os
import torch
import torch.nn as nn
from beta_rec.models.gmf import GMF
from beta_rec.models.mlp import MLP
from beta_rec.models.torch_engine import ModelEngine
from beta_rec.utils.common_util import timeit
[docs]class NeuMF(torch.nn.Module):
"""NeuMF Class."""
def __init__(self, config):
"""Initialize NeuMF Class."""
super(NeuMF, self).__init__()
self.config = config
self.n_users = config["n_users"]
self.n_items = config["n_items"]
self.emb_dim = config["emb_dim"]
self.n_layers = config["mlp_config"]["n_layers"]
self.dropout = config["dropout"]
self.latent_dim_mlp = self.emb_dim * (2 ** (self.n_layers)) // 2
self.latent_dim_gmf = self.emb_dim
self.embedding_user_mlp = torch.nn.Embedding(
num_embeddings=self.n_users, embedding_dim=self.latent_dim_mlp
)
self.embedding_item_mlp = torch.nn.Embedding(
num_embeddings=self.n_items, embedding_dim=self.latent_dim_mlp
)
self.embedding_user_mf = torch.nn.Embedding(
num_embeddings=self.n_users, embedding_dim=self.latent_dim_gmf
)
self.embedding_item_mf = torch.nn.Embedding(
num_embeddings=self.n_items, embedding_dim=self.latent_dim_gmf
)
MLP_modules = []
for i in range(self.n_layers):
input_size = self.emb_dim * (2 ** (self.n_layers - i))
MLP_modules.append(nn.Dropout(p=self.dropout))
MLP_modules.append(nn.Linear(input_size, input_size // 2))
MLP_modules.append(nn.ReLU())
self.fc_layers = nn.Sequential(*MLP_modules)
self.affine_output = torch.nn.Linear(
in_features=self.emb_dim * 2, out_features=1
)
self.logistic = torch.nn.Sigmoid()
[docs] def forward(self, user_indices, item_indices):
"""Train the model."""
user_embedding_mlp = self.embedding_user_mlp(user_indices)
item_embedding_mlp = self.embedding_item_mlp(item_indices)
user_embedding_mf = self.embedding_user_mf(user_indices)
item_embedding_mf = self.embedding_item_mf(item_indices)
mlp_vector = torch.cat(
[user_embedding_mlp, item_embedding_mlp], dim=-1
) # the concat latent vector
mf_vector = torch.mul(user_embedding_mf, item_embedding_mf)
for idx, _ in enumerate(range(len(self.fc_layers))):
mlp_vector = self.fc_layers[idx](mlp_vector)
mlp_vector = torch.nn.ReLU()(mlp_vector)
vector = torch.cat([mlp_vector, mf_vector], dim=-1)
logits = self.affine_output(vector)
rating = self.logistic(logits)
return rating
[docs] def predict(self, user_indices, item_indices):
"""Predict the result with the model."""
user_indices = torch.LongTensor(user_indices).to(self.device)
item_indices = torch.LongTensor(item_indices).to(self.device)
with torch.no_grad():
return self.forward(user_indices, item_indices)
[docs] def init_weight(self):
"""Initialize weight in the model."""
pass
[docs]class NeuMFEngine(ModelEngine):
"""Engine for training & evaluating GMF model."""
def __init__(self, config):
"""Initialize NeuMFEngine Class."""
self.config = config
self.model = NeuMF(config["model"])
self.loss = torch.nn.BCELoss()
super(NeuMFEngine, self).__init__(config)
print(self.model)
if self.config["model"]["model"] == "ncf_pre":
self.load_pretrain_weights()
else:
self.init_weights()
[docs] def train_single_batch(self, users, items, ratings):
"""Train the model in a single batch.
Args:
batch_data (list): batch users, positive items and negative items.
Return:
loss (float): batch loss.
"""
assert hasattr(self, "model"), "Please specify the exact model !"
users, items, ratings = (
users.to(self.device),
items.to(self.device),
ratings.to(self.device),
)
self.optimizer.zero_grad()
ratings_pred = self.model(users, items)
loss = self.loss(ratings_pred.view(-1), ratings)
loss.backward()
self.optimizer.step()
loss = loss.item()
return loss
[docs] @timeit
def train_an_epoch(self, train_loader, epoch_id):
"""Train the model in one epoch.
Args:
epoch_id (int): the number of epoch.
train_loader (function): user, pos_items and neg_items generator.
"""
assert hasattr(self, "model"), "Please specify the exact model !"
self.model.train()
total_loss = 0
for batch_id, batch in enumerate(train_loader):
# assert isinstance(batch[0], torch.LongTensor)
user, item, rating = batch[0], batch[1], batch[2]
rating = rating.float()
loss = self.train_single_batch(user, item, rating)
total_loss += loss
print("[Training Epoch {}], Loss {}".format(epoch_id, loss))
self.writer.add_scalar("model/loss", total_loss, epoch_id)
[docs] def init_weights(self):
"""Initialize weights in the model."""
nn.init.normal_(self.model.embedding_user_mf.weight, std=0.01)
nn.init.normal_(self.model.embedding_item_mf.weight, std=0.01)
nn.init.normal_(self.model.embedding_user_mlp.weight, std=0.01)
nn.init.normal_(self.model.embedding_user_mlp.weight, std=0.01)
for m1 in self.model.fc_layers:
if isinstance(m1, nn.Linear):
nn.init.xavier_uniform_(m1.weight)
nn.init.kaiming_uniform_(
self.model.affine_output.weight, a=1, nonlinearity="sigmoid"
)
[docs] def load_pretrain_weights(self):
"""Load weights from trained MLP model & GMF model."""
# load GMF model
gmf_model = GMF(self.config["model"])
gmf_save_dir = os.path.join(
self.config["system"]["model_save_dir"],
self.config["model"]["gmf_config"]["save_name"],
)
self.resume_checkpoint(
gmf_save_dir,
gmf_model,
)
self.model.embedding_user_mf.weight.data = gmf_model.embedding_user.weight.data
self.model.embedding_item_mf.weight.data = gmf_model.embedding_item.weight.data
# load MLP model
mlp_model = MLP(self.config)
mlp_save_dir = os.path.join(
self.config["system"]["model_save_dir"],
self.config["model"]["mlp_config"]["save_name"],
)
self.resume_checkpoint(
mlp_save_dir,
mlp_model,
)
self.model.embedding_user_mlp.weight.data = mlp_model.embedding_user.weight.data
self.model.embedding_item_mlp.weight.data = mlp_model.embedding_item.weight.data
for (m1, m2) in zip(self.model.fc_layers, mlp_model.fc_layers):
if isinstance(m1, nn.Linear) and isinstance(m2, nn.Linear):
m1.weight.data.copy_(m2.weight)
m1.bias.data.copy_(m2.bias)
self.model.affine_output.weight.data = 0.5 * torch.cat(
[mlp_model.affine_output.weight.data, gmf_model.affine_output.weight.data],
dim=-1,
)
self.model.affine_output.bias.data = 0.5 * (
mlp_model.affine_output.bias.data + gmf_model.affine_output.bias.data
)