Newer
Older
# standard library imports
from collections import defaultdict
# third parties imports
import pandas as pd
import numpy as np
import random as rd
from sklearn.metrics import mean_squared_error
from pprint import pprint as pp
from loaders import load_items, load_ratings
from constants import Constant as C
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.linear_model import Lasso, Ridge, ElasticNet
from sklearn.neighbors import KNeighborsRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import AdaBoostRegressor
from sklearn.feature_extraction.text import TfidfVectorizer
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from sklearn.linear_model import LinearRegression, Lasso, Ridge, ElasticNet
from sklearn.svm import SVR
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor, AdaBoostRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.neighbors import KNeighborsRegressor
from xgboost import XGBRegressor
# All the dataframes
df_items = load_items()
df_ratings = load_ratings()
# Example 1 : create title_length features
df_features = df_items[C.LABEL_COL].apply(lambda x: len(x)).to_frame('n_character_title')
df_features = df_tag[C.TAG]
df_genome_score = pd.read_csv("data/hackathon/content/genome-scores.csv")
df_genome_tag = pd.read_csv("data/hackathon/content/genome-tags.csv")
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def get_top_n(predictions, n):
"""Return the top-N recommendation for each user from a set of predictions.
Source: inspired by https://github.com/NicolasHug/Surprise/blob/master/examples/top_n_recommendations.py
and modified by cvandekerckh for random tie breaking
Args:
predictions(list of Prediction objects): The list of predictions, as
returned by the test method of an algorithm.
n(int): The number of recommendation to output for each user. Default
is 10.
Returns:
A dict where keys are user (raw) ids and values are lists of tuples:
[(raw item id, rating estimation), ...] of size n.
"""
rd.seed(0)
# First map the predictions to each user.
top_n = defaultdict(list)
for uid, iid, true_r, est, _ in predictions:
top_n[uid].append((iid, est))
# Then sort the predictions for each user and retrieve the k highest ones.
for uid, user_ratings in top_n.items():
rd.shuffle(user_ratings)
user_ratings.sort(key=lambda x: x[1], reverse=True)
top_n[uid] = user_ratings[:n]
return top_n
# First algorithm
class ModelBaseline1(AlgoBase):
def __init__(self):
AlgoBase.__init__(self)
def estimate(self, u, i):
return 2
# Second algorithm
class ModelBaseline2(AlgoBase):
def __init__(self):
AlgoBase.__init__(self)
def fit(self, trainset):
AlgoBase.fit(self, trainset)
rd.seed(0)
def estimate(self, u, i):
return rd.uniform(self.trainset.rating_scale[0], self.trainset.rating_scale[1])
# Third algorithm
class ModelBaseline3(AlgoBase):
def __init__(self):
AlgoBase.__init__(self)
def fit(self, trainset):
AlgoBase.fit(self, trainset)
self.the_mean = np.mean([r for (_, _, r) in self.trainset.all_ratings()])
return self
def estimate(self, u, i):
return self.the_mean
# Fourth Model
class ModelBaseline4(SVD):
def __init__(self):
SVD.__init__(self, n_factors=100)
def __init__(self, features_method, regressor_method,is_hackathon=False):
AlgoBase.__init__(self)
self.regressor_method = regressor_method
self.content_features = self.create_content_features(features_method)
"""Content Analyzer"""
df_items = load_items()
df_genome_score = pd.read_csv("data/hackathon/content/genome-scores.csv")
df_genome_tag = pd.read_csv("data/hackathon/content/genome-tags.csv")
for method in features_methods:
if method == "title_length":
df_title_length = df_items[C.LABEL_COL].apply(lambda x: len(x)).to_frame('title_length')
df_features = pd.concat([df_features, df_title_length], axis=1)
elif method == "movie_year":
df_movie_year = df_items['title'].str.extract(r'\((\d{4})\)', expand=False).to_frame('movie_year')
df_features = pd.concat([df_features, df_movie_year.astype(float).fillna(0)], axis=1)
elif method == "genre":
tfidf_vectorizer = TfidfVectorizer(tokenizer=lambda x: x.split('|'), token_pattern=None)
tfidf_matrix = tfidf_vectorizer.fit_transform(df_items['genres'])
df_tfidf_genres = pd.DataFrame(tfidf_matrix.toarray(), index=df_items.index, columns=tfidf_vectorizer.get_feature_names_out())
df_features = pd.concat([df_features, df_tfidf_genres], axis=1)
elif method == "avg_rating":
df_avg_rating = df_ratings.groupby('movieId')['rating'].mean().to_frame('avg_rating')
df_features = df_features.join(df_avg_rating, on='movieId')
else:
raise NotImplementedError(f'Feature method {method} not yet implemented')
# Handle missing values in df_features
df_features.fillna(0, inplace=True)
def fit(self, trainset):
"""Profile Learner"""
AlgoBase.fit(self, trainset)
# Preallocate user profiles
self.user_profile = {u: None for u in trainset.all_users()}
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
self.user_profile_explain = {}
epsilon = 1e-10 # Small value to prevent division by zero
for u in trainset.all_users():
raw_user_id = trainset.to_raw_uid(u)
self.user_profile_explain[raw_user_id] = {}
user_ratings = np.array([rating for (_, rating) in trainset.ur[u]])
item_ids = [iid for (iid, _) in trainset.ur[u]]
raw_item_ids = [trainset.to_raw_iid(iid) for iid in item_ids]
# filtered_item_ids = [item_id for item_id in raw_item_ids if item_id in df_features.index and item_id in df_items.index]
# feature_values = self.content_features.loc[filtered_item_ids].values
feature_values = self.content_features.loc[raw_item_ids].values
norms = np.linalg.norm(feature_values, axis=0) + epsilon
weighted_features = feature_values / norms
feature_importance = user_ratings @ weighted_features
print(feature_values)
print("---------------\n")
pp(weighted_features.T)
print(type(weighted_features))
print(f"\n--------------- Nb lignes = {len(weighted_features)} --> {len(weighted_features)} x {len(user_ratings)}\n--------------- Nb colonnes = 17\n")
print(user_ratings.reshape(1,-1))
print(user_ratings[1])
print(type(user_ratings))
print("#########################################\n")
print(feature_importance)
print("•••••••••••••••••")
feature_importance /= np.sum(user_ratings)
self.user_profile_explain[raw_user_id] = dict(zip(self.content_features.columns, feature_importance))
self.user_profile_explain = {}
# Loop over all internal user IDs in the trainset
for u in trainset.all_users():
# Convert internal user ID to raw user ID
raw_user_id = trainset.to_raw_uid(u)
# Initialize feature importance dictionary for the raw user ID
self.user_profile_explain[raw_user_id] = {}
# Extract user ratings for the current user
user_ratings = np.array([rating for _, rating in trainset.ur[u]])
# Compute feature importance based on content features and user ratings
feature_values = self.content_features.values.astype(int)
weighted_features = feature_values / np.linalg.norm(feature_values)
feature_importance = weighted_features / np.sum(user_ratings)
# Map feature importance scores to feature names and store in user_profile_explain
self.user_profile_explain[raw_user_id] = dict(zip(self.content_features.columns, feature_importance))
if self.regressor_method == 'random_score':
for u in self.user_profile:
self.user_profile[u] = [rating for (_, rating) in trainset.ur[u]]
else:
regressor_models = {
'linear_regression': LinearRegression(fit_intercept=False),
'svr_regression': SVR(kernel='rbf', C=10, epsilon=0.2),
'gradient_boosting': GradientBoostingRegressor(n_estimators=100, learning_rate=0.1, max_depth=3),
'random_forest': RandomForestRegressor(n_estimators=100),
'lasso_regression': Lasso(alpha=0.1),
'ridge_regression': Ridge(alpha=1.0),
'elastic_net': ElasticNet(alpha=1.0, l1_ratio=0.5),
'knn_regression': KNeighborsRegressor(n_neighbors=1),
'decision_tree': DecisionTreeRegressor(max_depth=5),
'adaboost': AdaBoostRegressor(n_estimators=50),
'xgboost': XGBRegressor(n_estimators=100, learning_rate=0.1, max_depth=3)
'xgboost': XGBRegressor(n_estimators=100, learning_rate=0.1, max_depth=3),
'lightgbm': LGBMRegressor(n_estimators=100, learning_rate=0.1, max_depth=3)
}
if self.regressor_method not in regressor_models:
raise NotImplementedError(f'Regressor method {self.regressor_method} not yet implemented')
user_ratings = [rating for (_, rating) in trainset.ur[u]]
item_ids = [iid for (iid, _) in trainset.ur[u]]
raw_item_ids = [trainset.to_raw_iid(iid) for iid in item_ids]
df_user = pd.DataFrame({'item_id': raw_item_ids, 'user_ratings': user_ratings})
df_user = df_user.merge(self.content_features, left_on="item_id", right_index=True, how='left')
X = df_user.drop(columns=['item_id', 'user_ratings'])
y = df_user['user_ratings']
regressor = regressor_models[self.regressor_method]
regressor.fit(X, y)
def estimate(self, u, i):
"""Scoring component used for item filtering"""
if not (self.trainset.knows_user(u) and self.trainset.knows_item(i)):
if self.regressor_method == 'random_score':
elif self.regressor_method == 'random_sample':
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
item_features = self.content_features.loc[raw_item_id, :].values.reshape(1, -1)
regressor = self.user_profile[u]
item_features_df = pd.DataFrame(item_features, columns=self.content_features.columns)
return regressor.predict(item_features_df)[0]
def explain(self, u):
if u in self.user_profile_explain:
return self.user_profile_explain[u]
else:
return None
def rmse(self, testset):
"""Compute RMSE on the testset"""
predictions = []
true_ratings = []
for (uid, iid, true_r) in testset:
try:
pred_r = self.estimate(self.trainset.to_inner_uid(uid), self.trainset.to_inner_iid(iid))
predictions.append(pred_r)
true_ratings.append(true_r)
except PredictionImpossible:
continue
mse = mean_squared_error(true_ratings, predictions)
rmse_value = np.sqrt(mse)
return rmse_value
# Example usage:
cb = ContentBased(["title_length", "movie_year","genre","avg_rating"], "ridge_regression")
surprise_data = load_ratings(surprise_format=True)
trainset = surprise_data.build_full_trainset()
testset = trainset.build_anti_testset()
# print(cb.fit(trainset))
# # Example explanations for users:
# #print(cb.explain(11))
# # Obtenir les meilleures recommandations pour chaque utilisateur
# top_n_recommendations = get_top_n(predictions, n=10)
# # Afficher les recommandations pour quelques utilisateurs spécifiques
# for user_id, user_recommendations in top_n_recommendations.items():
# print(f"Utilisateur {user_id}:")
# for item_id, rating in user_recommendations:
# print(f" - Item {item_id}, estimation de note : {rating}")
def test_contentbased_class(feature_method, regressor_method):
"""Test the ContentBased class.
Tries to make a prediction on the first (user,item ) tuple of the anti_test_set
"""
sp_ratings = load_ratings(surprise_format=True)
train_set = sp_ratings.build_full_trainset()
content_algo = ContentBased(feature_method, regressor_method)
content_algo.fit(train_set)
anti_test_set_first = train_set.build_anti_testset()[0]
prediction = content_algo.predict(anti_test_set_first[0], anti_test_set_first[1])
print(prediction)
>>>>>>> 5385c4bc3a5802e1caec979d0d3a6bc7af3e970f