Source code for beta_rec.core.train_engine

import json
import os
import random
import string
import sys
from datetime import datetime

import GPUtil
import ray
import torch
from ray import tune
from tabulate import tabulate
from tqdm import tqdm

from ..core.config import find_config
from ..core.eval_engine import EvalEngine
from ..data.base_data import BaseData
from ..datasets.data_load import load_split_dataset
from ..utils import logger
from ..utils.common_util import ensureDir, print_dict_as_table, set_seed, update_args


[docs]class TrainEngine(object): """Training engine for all the models.""" def __init__(self, args): """Init TrainEngine Class.""" self.data = None self.train_loader = None self.monitor = None self.engine = None self.args = args self.config = self.prepare_env() self.gpu_id, self.config["model"]["device_str"] = self.get_device() self.eval_engine = EvalEngine(self.config)
[docs] def get_device(self): """Get one gpu id that have the most available memory. Returns: (int, str): The gpu id (None if no available gpu) and the the device string (pytorch style). """ if "device" in self.config["system"]: if self.config["system"]["device"] == "cpu": return (None, "cpu") elif ( "cuda" in self.config["system"]["device"] ): # receive an string with "cuda:#" return ( int(self.config["system"]["device"].replace("cuda", "")), self.config["system"]["device"], ) elif len(self.config["system"]["device"]) == 1: # receive an gpu id return ( int(self.config["system"]["device"]), "cuda:" + self.config["system"]["device"], ) device_str = "cpu" gpu_id_list = GPUtil.getAvailable( order="memory", limit=3 ) # get the fist gpu with the lowest load if len(gpu_id_list) < 1: gpu_id = None device_str = "cpu" else: gpu_id = gpu_id_list[0] # need to set 0 if ray only specify 1 gpu if "CUDA_VISIBLE_DEVICES" in os.environ: if len(os.environ["CUDA_VISIBLE_DEVICES"].split()) == 1: # gpu_id = int(os.environ["CUDA_VISIBLE_DEVICES"]) gpu_id = 0 print("Find only one gpu with id: ", gpu_id) device_str = "cuda:" + str(gpu_id) # print(os.system("nvidia-smi")) else: print("Get a gpu with the most available memory :", gpu_id) device_str = "cuda:" + str(gpu_id) return gpu_id, device_str
[docs] def prepare_env(self): """Prepare running environment. * Load parameters from json files. * Initialize system folders, model name and the paths to be saved. * Initialize resource monitor. * Initialize random seed. * Initialize logging. """ # Load config file from json config_file = find_config(self.args.config_file) with open(config_file) as config_params: print(f"loading config file {config_file}") config = json.load(config_params) # Update configs based on the received args from the command line . update_args(config, self.args) # obtain abspath for the project config["system"]["root_dir"] = os.path.abspath(config["system"]["root_dir"]) # construct unique model run id, which consist of model name, config id and a timestamp timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") random_str = "".join([random.choice(string.ascii_lowercase) for n in range(6)]) config["system"]["model_run_id"] = ( config["model"]["model"] + "_" + config["model"]["config_id"] + "_" + timestamp_str + "_" + random_str ) # Initialize random seeds set_seed(config["system"]["seed"] if "seed" in config["system"] else 2020) # Initialize working folders self.initialize_folders(config) config["system"]["process_dir"] = os.path.join( config["system"]["root_dir"], config["system"]["process_dir"] ) # Initialize log file config["system"]["log_file"] = os.path.join( config["system"]["root_dir"], config["system"]["log_dir"], config["system"]["model_run_id"], ) logger.init_std_logger(config["system"]["log_file"]) print("Python version:", sys.version) print("Pytorch version:", torch.__version__) # File paths to be saved config["model"]["run_dir"] = os.path.join( config["system"]["root_dir"], config["system"]["run_dir"], config["system"]["model_run_id"], ) config["system"]["run_dir"] = config["model"]["run_dir"] print( "The intermediate running statuses will be reported in folder:", config["system"]["run_dir"], ) config["system"]["tune_dir"] = os.path.join( config["system"]["root_dir"], config["system"]["tune_dir"] ) def get_user_temp_dir(): tempdir = os.path.join(config["system"]["root_dir"], "tmp") print(f"ray temp dir {tempdir}") return tempdir ray.utils.get_user_temp_dir = get_user_temp_dir # Model checkpoints paths to be saved config["system"]["model_save_dir"] = os.path.join( config["system"]["root_dir"], config["system"]["checkpoint_dir"], config["system"]["model_run_id"], ) ensureDir(config["system"]["model_save_dir"]) print("Model checkpoint will save in file:", config["system"]["model_save_dir"]) config["system"]["result_file"] = os.path.join( config["system"]["root_dir"], config["system"]["result_dir"], config["system"]["result_file"], ) print("Performance result will save in file:", config["system"]["result_file"]) print_dict_as_table(config["system"], "System configs") return config
[docs] def initialize_folders(self, config): """Initialize the whole directory structure of the project.""" dirs = [ "log_dir", "result_dir", "process_dir", "checkpoint_dir", "run_dir", "tune_dir", "dataset_dir", ] base_dir = config["system"]["root_dir"] for directory in dirs: path = os.path.join(base_dir, config["system"][directory]) if not os.path.exists(path): os.makedirs(path)
[docs] def load_dataset(self): """Load dataset.""" self.data = BaseData(load_split_dataset(self.config)) self.config["model"]["n_users"] = self.data.n_users self.config["model"]["n_items"] = self.data.n_items
[docs] def check_early_stop(self, engine, model_dir, epoch): """Check if early stop criterion is triggered. Save model if previous epoch have already obtained better result. Args: epoch (int): epoch num Returns: bool: True: if early stop criterion is triggered, False: else """ max_n_update = self.config["model"]["max_n_update"] if epoch > 0 and self.eval_engine.n_no_update == 0: # save model if previous epoch have already obtained better result engine.save_checkpoint(model_dir=model_dir) if self.eval_engine.n_no_update >= max_n_update: # stop training if early stop criterion is triggered print( "Early stop criterion triggered, no performance update for" f" {max_n_update} times" ) return True return False
def _train(self, engine, train_loader, save_dir, valid_df=None, test_df=None): self.eval_engine.flush() epoch_bar = tqdm(range(self.config["model"]["max_epoch"]), file=sys.stdout) for epoch in epoch_bar: print("Epoch {} starts !".format(epoch)) print("-" * 80) if self.check_early_stop(engine, save_dir, epoch): break engine.train_an_epoch(train_loader, epoch_id=epoch) """evaluate model on validation and test sets""" if (valid_df is None) & (test_df is None): self.eval_engine.train_eval( self.data.valid[0], self.data.test[0], engine.model, epoch ) else: self.eval_engine.train_eval(valid_df, test_df, engine.model, epoch) def _seq_train( self, engine, train_loader, save_dir, train_seq, valid_df=None, test_df=None ): self.eval_engine.flush() epoch_bar = tqdm(range(self.config["model"]["max_epoch"]), file=sys.stdout) for epoch in epoch_bar: print("Epoch {} starts !".format(epoch)) print("-" * 80) if self.check_early_stop(engine, save_dir, epoch): break engine.train_an_epoch(train_loader, epoch_id=epoch) """evaluate model on validation and test sets""" if (valid_df is None) & (test_df is None): self.eval_engine.seq_train_eval( train_seq, self.data.valid[0], self.data.test[0], engine.model, self.config["model"]["maxlen"], epoch, ) else: self.eval_engine.seq_train_eval( train_seq, valid_df, test_df, engine.model, self.config["model"]["maxlen"], epoch, ) def _seq_train_time( self, engine, train_loader, save_dir, train_seq, valid_df=None, test_df=None ): self.eval_engine.flush() epoch_bar = tqdm(range(self.config["model"]["max_epoch"]), file=sys.stdout) for epoch in epoch_bar: print("Epoch {} starts !".format(epoch)) print("-" * 80) if self.check_early_stop(engine, save_dir, epoch): break engine.train_an_epoch(train_loader, epoch_id=epoch) """evaluate model on validation and test sets""" if (valid_df is None) & (test_df is None): self.eval_engine.seq_train_eval_time( train_seq, self.data.valid[0], self.data.test[0], engine.model, self.config["model"]["maxlen"], self.config["model"]["time_span"], epoch, ) else: self.eval_engine.seq_train_eval_time( train_seq, valid_df, test_df, engine.model, self.config["model"]["maxlen"], self.config["model"]["time_span"], epoch, )
[docs] def tune(self, runable): """Tune parameters using ray.tune.""" config = vars(self.args) if "tune" in config: config["tune"] = False if "root_dir" in config and config["root_dir"]: config["root_dir"] = os.path.abspath(config["root_dir"]) else: config["root_dir"] = os.path.abspath("..") config["config_file"] = os.path.abspath(config["config_file"]) print(config) tunable = self.config["tunable"] for para in tunable: if para["type"] == "choice": config[para["name"]] = tune.grid_search(para["values"]) if para["type"] == "range": values = [] for val in range(para["bounds"][0], para["bounds"][1] + 1): values.append(val) config[para["name"]] = tune.grid_search(values) analysis = tune.run( runable, config=config, local_dir=self.config["system"]["tune_dir"], # temp_dir=self.config["system"]["tune_dir"] + "/temp", ) df = analysis.dataframe() tune_result_dir = os.path.join( self.config["system"]["tune_dir"], f"{self.config['system']['model_run_id']}_tune_result.csv", ) print(f"Tuning results are saved in {tune_result_dir}") df.to_csv(tune_result_dir) print(tabulate(df, headers=df.columns, tablefmt="psql")) return df
[docs] def test(self): """Evaluate the performance for the testing sets based on the best performing model.""" model_save_dir = os.path.join( self.config["system"]["model_save_dir"], self.config["model"]["save_name"] ) model = self.engine.resume_checkpoint(model_save_dir) self.eval_engine.test_eval(self.data.test, model)