diff --git a/examples/streamlit/movielens/movielens_dashboard.py b/examples/streamlit/movielens/movielens_dashboard.py index 12498fc..74dd420 100644 --- a/examples/streamlit/movielens/movielens_dashboard.py +++ b/examples/streamlit/movielens/movielens_dashboard.py @@ -1,13 +1,13 @@ import datetime import os +import re import time +import urllib.parse from typing import Dict, Optional, Tuple -import streamlit as st import pandas as pd import requests -import re -import urllib.parse +import streamlit as st from rtrec.models import SLIM from rtrec.recommender import Recommender @@ -28,10 +28,10 @@ def load_movielens_100k() -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: ratings_df = pd.read_csv(RATINGS_FILE, sep='\t', names=col_rat) col_items = [ - 'movie_id', 'movie_title', 'release_date', 'video_release_date', - 'IMDb_URL', 'unknown', 'Action', 'Adventure', 'Animation', - 'Childrens', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', - 'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci_Fi', + 'movie_id', 'movie_title', 'release_date', 'video_release_date', + 'IMDb_URL', 'unknown', 'Action', 'Adventure', 'Animation', + 'Childrens', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', + 'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci_Fi', 'Thriller', 'War', 'Western' ] items_df = pd.read_csv(ITEMS_FILE, sep='|', names=col_items, encoding='latin-1') diff --git a/rtrec/experiments/datasets.py b/rtrec/experiments/datasets.py index b3bb195..7cd3f14 100644 --- a/rtrec/experiments/datasets.py +++ b/rtrec/experiments/datasets.py @@ -1,13 +1,14 @@ +import os import tarfile -import pandas as pd -import zipfile import urllib.request -import os - +import zipfile from datetime import datetime +import pandas as pd + from .kaggle_datasets import load_retailrocket + def load_movielens(dataset_scale: str, sort_by_tstamp: bool = False, load_user_attributes: bool = False, load_item_attributes: bool = False) -> pd.DataFrame: """ Downloads and loads the specified MovieLens dataset version into a DataFrame with columns @@ -276,7 +277,7 @@ def load_amazon_review_v2(category_name: str = "Music", sort_by_tstamp (bool): Whether to sort the DataFrame by timestamp (default is False). Returns: - pd.DataFrame: A DataFrame containing user, item, rating, timestamp, image_url, and + pd.DataFrame: A DataFrame containing user, item, rating, timestamp, image_url, and (if category is "all") category. """ # Base URL and directory setup diff --git a/rtrec/experiments/experiments.py b/rtrec/experiments/experiments.py index 0705d8f..d9b7789 100644 --- a/rtrec/experiments/experiments.py +++ b/rtrec/experiments/experiments.py @@ -1,10 +1,17 @@ -from typing import Dict import argparse +from typing import Dict -from .datasets import load_dataset -from .split import leave_one_last_item, random_split, temporal_split, temporal_user_split +from ..models import SLIM, HybridSlimFM, LightFM +from ..models.base import BaseModel from ..recommender import Recommender -from ..models import SLIM, LightFM, HybridSlimFM +from .datasets import load_dataset +from .split import ( + leave_one_last_item, + random_split, + temporal_split, + temporal_user_split, +) + def run_experiment( dataset_name: str,model_name: str, split_method: str = "temporal", epochs: int = 10, batch_size: int = 1000 @@ -40,6 +47,7 @@ def run_experiment( raise ValueError(f"Unsupported split method: {split_method}") # Initialize and train the recommender + model: BaseModel if model_name == "slim": model = SLIM(epochs=epochs, random_seed=43) elif model_name == "lightfm": diff --git a/rtrec/experiments/kaggle_datasets.py b/rtrec/experiments/kaggle_datasets.py index 0855dc1..f53db5c 100644 --- a/rtrec/experiments/kaggle_datasets.py +++ b/rtrec/experiments/kaggle_datasets.py @@ -1,13 +1,14 @@ -from collections import defaultdict +import os +import zipfile from typing import Dict + import pandas as pd -import os import requests -import zipfile from tqdm import tqdm from .utils import map_hour_to_period + def load_retailrocket(standardize_schema: bool=True) -> pd.DataFrame: """ Load the Retail Rocket dataset from a CSV file. diff --git a/rtrec/experiments/split.py b/rtrec/experiments/split.py index 6f008bc..304d573 100644 --- a/rtrec/experiments/split.py +++ b/rtrec/experiments/split.py @@ -1,5 +1,7 @@ +from typing import Optional, Tuple + import pandas as pd -from typing import Tuple, Optional + def leave_one_last_item( df: pd.DataFrame, @@ -46,7 +48,7 @@ def temporal_split( Parameters: df (pd.DataFrame): A pandas DataFrame with columns ['user', 'item', 'tstamp', 'rating']. - test_frac (float): The proportion of data to include in the test set, between 0 and 1. + test_frac (float): The proportion of data to include in the test set, between 0 and 1. Used if timestamp is not provided. Default is 0.2. timestamp (Optional[float]): The POSIX timestamp threshold for the split. Interactions before this timestamp will be in the training set, and interactions on or after diff --git a/rtrec/experiments/utils.py b/rtrec/experiments/utils.py index 26f15dc..8d5e1c9 100644 --- a/rtrec/experiments/utils.py +++ b/rtrec/experiments/utils.py @@ -1,6 +1,8 @@ -import pandas as pd from typing import List +import pandas as pd + + def n_core_filter(df: pd.DataFrame, columns: List[str], min_count: int = 10) -> pd.DataFrame: """ Filters the DataFrame to only include rows where any specified column's values @@ -9,11 +11,11 @@ def n_core_filter(df: pd.DataFrame, columns: List[str], min_count: int = 10) -> Args: df (pd.DataFrame): The input DataFrame. columns (List[str]): The list of columns to apply the n-core filter on. - min_count (int): Minimum occurrences required for each value in any specified column + min_count (int): Minimum occurrences required for each value in any specified column to be retained. Defaults to 10. Returns: - pd.DataFrame: A filtered DataFrame containing only rows where values in any specified + pd.DataFrame: A filtered DataFrame containing only rows where values in any specified column appear at least `min_count` times. """ # Initialize a mask to keep track of rows that meet the criteria for any column diff --git a/rtrec/models/__init__.py b/rtrec/models/__init__.py index 2258be7..657986d 100644 --- a/rtrec/models/__init__.py +++ b/rtrec/models/__init__.py @@ -1,5 +1,5 @@ -from .slim import SLIM -from .lightfm import LightFM from .hybrid import HybridSlimFM +from .lightfm import LightFM +from .slim import SLIM __all__ = ["SLIM", "LightFM", "HybridSlimFM"] diff --git a/rtrec/models/base.py b/rtrec/models/base.py index d6bed95..2bf2100 100644 --- a/rtrec/models/base.py +++ b/rtrec/models/base.py @@ -4,8 +4,6 @@ from io import BytesIO from typing import Any, Iterable, List, Optional, Self, Tuple, Union -from numpy import ndarray - from rtrec.utils.features import FeatureStore from rtrec.utils.identifiers import Identifier from rtrec.utils.interactions import UserItemInteractions @@ -217,11 +215,11 @@ def recommend_batch(self, users: List[Any], candidate_items: Optional[List[Any]] if len(cold_user_ids) == 0: # If there are no cold-start users, proceed with batch recommendation - results = self._recommend_hot_batch(hot_user_ids, candidate_item_ids=candidate_item_ids, users_tags=users_tags, top_k=top_k, filter_interacted=filter_interacted) - return [[self.item_ids.get(item_id) for item_id in internal_ids] for internal_ids in results] + batch_results = self._recommend_hot_batch(hot_user_ids, candidate_item_ids=candidate_item_ids, users_tags=users_tags, top_k=top_k, filter_interacted=filter_interacted) + return [[self.item_ids.get(item_id) for item_id in internal_ids] for internal_ids in batch_results] # Initialize results list - results = [None] * len(users) + results: list[list[int]] = [[] for _ in range(len(users))] # Handle cold-start users if cold_indices: @@ -334,7 +332,7 @@ def similar_items(self, query_item: Any, query_item_tags: Optional[List[str]] = return [self.item_ids.get(item_id) for item_id, _ in similar_item_ids] @abstractmethod - def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str]] = None, top_k: int = 10) -> List[Tuple[int, float]] | Tuple[ndarray, ndarray]: + def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str]] = None, top_k: int = 10) -> List[Tuple[int, float]]: """ Find similar items for a list of query items. :param query_item_id: item id to find similar items for diff --git a/rtrec/models/hybrid.py b/rtrec/models/hybrid.py index e250352..3216199 100644 --- a/rtrec/models/hybrid.py +++ b/rtrec/models/hybrid.py @@ -45,7 +45,7 @@ def comb_sum(fm_ids: np.ndarray, fm_scores: np.ndarray, Returns: Dict[int, float]: Dictionary containing the aggregated scores for each item. """ - summed_scores = defaultdict(float) + summed_scores: Dict[int, float] = defaultdict(float) # Process FM scores for item_id, score in zip(fm_ids, fm_scores): @@ -73,8 +73,8 @@ def comb_mnz(fm_ids: np.ndarray, fm_scores: np.ndarray, Returns: Dict[int, float]: Dictionary containing the aggregated scores for each item. """ - summed_scores = defaultdict(float) - item_counts = defaultdict(int) + summed_scores: Dict[int, float] = defaultdict(float) + item_counts: Dict[int, int] = defaultdict(int) # Process FM scores for item_id, score in zip(fm_ids, fm_scores): @@ -198,6 +198,9 @@ def bulk_fit(self, parallel: bool=False, progress_bar: bool=True) -> Self: self.model.fit_partial(ui_coo, user_features, item_features, sample_weight=sample_weights, epochs=self.epochs, num_threads=self.n_threads, verbose=progress_bar) # Fit SLIM model ui_csc = ui_coo.tocsc(copy=False) + # Ensure we have a csc_matrix, not csc_array + if not isinstance(ui_csc, sparse.csc_matrix): + ui_csc = sparse.csc_matrix(ui_csc) self.slim_model.fit(ui_csc, parallel=parallel, progress_bar=progress_bar) return self @@ -207,7 +210,7 @@ def _recommend(self, user_id: int, candidate_item_ids: Optional[List[int]] = Non interaction_matrix = self.interactions.to_csr(select_users=[user_id]) dense_output = not self.item_ids.pass_through result = self.slim_model.recommend(user_id, interaction_matrix, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output) - return result + return result # type: ignore users_tags = [user_tags] if user_tags is not None else None user_features = self._create_user_features(user_ids=[user_id], users_tags=users_tags, slice=True) @@ -250,7 +253,7 @@ def _recommend(self, user_id: int, candidate_item_ids: Optional[List[int]] = Non dense_output = not self.item_ids.pass_through slim_ids, slim_scores = self.slim_model.recommend(user_id, ui_csr, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output, ret_scores=True) - return self._ensemble_by_scores(user_id, fm_ids, fm_scores, slim_ids, slim_scores, top_k) + return self._ensemble_by_scores(user_id, fm_ids, fm_scores, slim_ids, slim_scores, top_k) # type: ignore @override def handle_unknown_user(self, user: Any) -> Optional[int]: @@ -325,6 +328,8 @@ def _cold_user_features(self, users_tags: List[List[str]]) -> csr_matrix: # Create zero matrix for identity since cold users have no history users = sparse.csr_matrix((num_rows, num_hot_users), dtype="float32") + if self.model.user_embeddings is None: + raise ValueError("Model user_embeddings is None") num_user_features = self.model.user_embeddings.shape[0] - num_hot_users assert num_user_features > 0, f"num_user_features should be greater than 0, but got {num_user_features}" @@ -361,10 +366,10 @@ def _recommend_hot_batch(self, interaction_matrix = self.interactions.to_csr(select_users=user_ids) dense_output = not self.item_ids.pass_through result = [ - self.slim_model.recommend(user_id, interaction_matrix, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output) + self.slim_model.recommend(user_id, interaction_matrix, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output, ret_scores=False) for user_id in user_ids ] - return result + return result # type: ignore user_features = self._create_user_features(user_ids, users_tags=users_tags, slice=True) item_features = self._create_item_features(item_ids=candidate_item_ids, slice=False) @@ -408,7 +413,7 @@ def _recommend_hot_batch(self, slim_ids, slim_scores = self.slim_model.recommend(user_id, ui_csr, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output, ret_scores=True) # Combine scores from both models and get top-k item ids - top_items = self._ensemble_by_scores(user_id, fm_ids, fm_scores, slim_ids, slim_scores, top_k) + top_items = self._ensemble_by_scores(user_id, fm_ids, fm_scores, slim_ids, slim_scores, top_k) # type: ignore results.append(top_items) return results @@ -429,9 +434,9 @@ def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str] assert target_vector is not None, "target_vector should not be None" target_norm = calc_norm(target_vector) - fm_ids, fm_scores = implicit.topk(items=target_vector, query=query_vector, k=top_k, item_norms=target_norm, filter_items=np.array([query_item_id], dtype="int32"), num_threads=self.n_threads) - fm_ids: np.ndarray = fm_ids.ravel() - fm_scores: np.ndarray = fm_scores.ravel() + fm_ids_raw, fm_scores_raw = implicit.topk(items=target_vector, query=query_vector, k=top_k, item_norms=target_norm, filter_items=np.array([query_item_id], dtype="int32"), num_threads=self.n_threads) + fm_ids: np.ndarray = fm_ids_raw.ravel() + fm_scores: np.ndarray = fm_scores_raw.ravel() # implicit assigns negative infinity to the scores to be fitered out # see https://github.com/benfred/implicit/blob/v0.7.2/implicit/cpu/topk.pyx#L54 @@ -456,11 +461,14 @@ def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str] # Get SLIM similar items slim_ids, slim_scores = self.slim_model.similar_items(query_item_id, top_k=top_k, ret_ndarrays=True) + assert isinstance(slim_ids, np.ndarray), "slim_ids should be a numpy array" + assert isinstance(slim_scores, np.ndarray), "slim_scores should be a numpy array" # Combine scores from both models fm_scores = minmax_normalize(fm_scores) slim_scores = minmax_normalize(slim_scores) - comb_scores = comb_sum(fm_ids, fm_scores, slim_ids, slim_scores) + slim_ids_list = slim_ids.tolist() if hasattr(slim_ids, 'tolist') else list(slim_ids) + comb_scores = comb_sum(fm_ids, fm_scores, slim_ids_list, slim_scores) # type: ignore # Get top-k item ids sorted_items = sorted(comb_scores.items(), key=lambda x: x[1], reverse=True)[:top_k] return sorted_items diff --git a/rtrec/models/internal/lightfm_wrapper.py b/rtrec/models/internal/lightfm_wrapper.py index 3bc9f22..462ca7e 100644 --- a/rtrec/models/internal/lightfm_wrapper.py +++ b/rtrec/models/internal/lightfm_wrapper.py @@ -1,7 +1,9 @@ +from typing import Optional, override + from lightfm import LightFM +from scipy.sparse import coo_matrix, csr_matrix from sklearn.base import clone -from typing import Optional, override -from scipy.sparse import csr_matrix, coo_matrix + class LightFMWrapper(LightFM): """ diff --git a/rtrec/models/internal/slim_elastic.py b/rtrec/models/internal/slim_elastic.py index ea49e15..9ef93ed 100644 --- a/rtrec/models/internal/slim_elastic.py +++ b/rtrec/models/internal/slim_elastic.py @@ -1,20 +1,22 @@ import logging import os +import warnings +from functools import partial +from multiprocessing import Pool, shared_memory from typing import Any, Dict, List, Optional, Self, Tuple + import numpy as np +import scipy.sparse as sp from numpy import ndarray from numpy.typing import ArrayLike -import scipy.sparse as sp -from sklearn.linear_model import SGDRegressor, ElasticNet -import warnings from sklearn.exceptions import ConvergenceWarning -from tqdm import tqdm +from sklearn.linear_model import ElasticNet, SGDRegressor from sklearn.utils.extmath import safe_sparse_dot -from multiprocessing import Pool, shared_memory -from functools import partial +from tqdm import tqdm from rtrec.utils.multiprocessing import create_shared_array + class CSRMatrixWrapper: """ CSRMatrixWrapper is a wrapper class for a CSR matrix that provides efficient access to columns. @@ -36,7 +38,7 @@ def matrix(self) -> sp.csr_matrix: @property def shape(self) -> Tuple[int, int]: return self.csr_matrix.shape # type: ignore - + def get_col(self, j: int) -> sp.csc_matrix: """ Return the j-th column of the matrix. @@ -55,7 +57,7 @@ def get_col(self, j: int) -> sp.csc_matrix: col = self.col_view[:, j].copy() col.data = self.csr_matrix.data[col.data] return col - + def set_col(self, j: int, values: ArrayLike) -> None: """ Set the j-th column of the matrix to the given values. @@ -83,7 +85,7 @@ def matrix(self) -> sp.csc_matrix: @property def shape(self) -> Tuple[int, int]: return self.csc_matrix.shape # type: ignore - + def get_col(self, j: int, copy: bool = True) -> sp.spmatrix: """ Return the j-th column of the matrix. @@ -102,7 +104,7 @@ def get_col(self, j: int, copy: bool = True) -> sp.spmatrix: if copy: return col.copy() return col - + def set_col(self, j: int, values: ArrayLike) -> None: """ Set the j-th column of the matrix to the given values. @@ -128,14 +130,14 @@ def __init__(self, model: ElasticNet | SGDRegressor, n_neighbors: int = 30): def fit(self, X: sp.spmatrix, y: np.ndarray): # Compute dot products between items and the target - feature_scores = X.T.dot(y).flatten() + feature_scores = X.T.dot(y).flatten() # type: ignore # Select the top-k similar items to the target item by sorting the dot products selected_features = np.argsort(feature_scores)[-1:-1-self.n_neighbors:-1] # Only fit the model with the selected features # TODO: Implement a more efficient way to select the features for csr_matrix - self.model.fit(X[:, selected_features], y) - + self.model.fit(X[:, selected_features], y) # type: ignore + # Store the coefficients of the fitted model coeff = self.model.coef_ # of shape (n_neighbors,) @@ -153,7 +155,7 @@ class SLIMElastic: def __init__(self, config: dict={}): """ Initialize the SLIMElastic model. - + Args: config (dict): Configuration parameters for the model @@ -225,6 +227,7 @@ def fit(self, interaction_matrix: sp.csc_matrix | sp.csr_matrix, parallel: bool= parallel (bool): Whether to use parallel processing for fitting. progress_bar (bool): Whether to show a progress bar during training. """ + X: CSCMatrixWrapper | CSRMatrixWrapper if isinstance(interaction_matrix, sp.csc_matrix): if parallel: return self.fit_in_parallel(interaction_matrix, progress_bar=progress_bar) @@ -263,7 +266,7 @@ def fit(self, interaction_matrix: sp.csc_matrix | sp.csr_matrix, parallel: bool= item_similarity[i, j] = value # Reattach the item column after training - X.set_col(j, y.data) + X.set_col(j, y.data) # type: ignore # Convert item_similarity to CSC format for efficient access self.item_similarity = item_similarity.tocsc(copy=False) @@ -443,7 +446,7 @@ def _fit_items( return results @staticmethod - def _get_model(config: dict[str, Any]) -> ElasticNet | SGDRegressor | FeatureSelectionWrapper: + def _get_model(config: dict[str, Any]) -> ElasticNet | SGDRegressor | FeatureSelectionWrapper: optim_name = config.get("optim", "cd") if optim_name == "cd": model = ElasticNet( @@ -490,7 +493,7 @@ def partial_fit(self, interaction_matrix: sp.csr_matrix, user_ids: List[int], pa user_ids (list): List of user indices that were updated. parallel (bool): Whether to use parallel processing for fitting. progress_bar (bool): Whether to show a progress bar during training. - """ + """ user_items = set() for user_id in user_ids: user_items.update(interaction_matrix[user_id, :].indices.tolist()) # type: ignore @@ -506,6 +509,7 @@ def partial_fit_items(self, interaction_matrix: sp.csc_matrix | sp.csr_matrix, u parallel (bool): Whether to use parallel processing for fitting. progress_bar (bool): Whether to show a progress bar during training. """ + X: CSCMatrixWrapper | CSRMatrixWrapper if isinstance(interaction_matrix, sp.csc_matrix): if parallel: return self.fit_in_parallel(interaction_matrix, item_ids=np.array(updated_items), progress_bar=progress_bar) @@ -534,10 +538,10 @@ def partial_fit_items(self, interaction_matrix: sp.csc_matrix | sp.csr_matrix, u y = X.get_col(j) # Set the target item column to 0 - X.set_col(j, np.zeros_like(y.data)) + X.set_col(j, np.zeros_like(y.data)) # type: ignore # Fit the model for the updated item - model.fit(X.matrix, y.toarray().ravel()) + model.fit(X.matrix, y.toarray().ravel()) # type: ignore # Update the item similarity matrix with new coefficients (weights for each user-item interaction) # self.item_similarity[:, j] = model.coef_ @@ -545,7 +549,7 @@ def partial_fit_items(self, interaction_matrix: sp.csc_matrix | sp.csr_matrix, u item_similarity[i, j] = value # Reattach the item column after training - X.set_col(j, y.data) + X.set_col(j, y.data) # type: ignore # Convert item_similarity to CSC format for efficient access self.item_similarity = item_similarity.tocsc(copy=False) diff --git a/rtrec/models/lightfm.py b/rtrec/models/lightfm.py index a9ae715..76bc65e 100644 --- a/rtrec/models/lightfm.py +++ b/rtrec/models/lightfm.py @@ -261,9 +261,9 @@ def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str] raise ValueError("target_vector is None, cannot compute norm.") target_norm = calc_norm(target_vector) - ids, scores = implicit.topk(items=target_vector, query=query_vector, k=top_k, item_norms=target_norm, filter_items=np.array([query_item_id], dtype="int32"), num_threads=self.n_threads) - ids: np.ndarray = ids.ravel() - scores: np.ndarray = scores.ravel() + ids_raw, scores_raw = implicit.topk(items=target_vector, query=query_vector, k=top_k, item_norms=target_norm, filter_items=np.array([query_item_id], dtype="int32"), num_threads=self.n_threads) + ids: np.ndarray = ids_raw.ravel() + scores: np.ndarray = scores_raw.ravel() # implicit assigns negative infinity to the scores to be fitered out # see https://github.com/benfred/implicit/blob/v0.7.2/implicit/cpu/topk.pyx#L54 diff --git a/rtrec/models/slim.py b/rtrec/models/slim.py index 4013adc..56b5ad7 100644 --- a/rtrec/models/slim.py +++ b/rtrec/models/slim.py @@ -1,8 +1,6 @@ import logging from typing import Any, Iterable, List, Optional, Self, Tuple, override -from numpy import ndarray - from ..models.internal.slim_elastic import SLIMElastic from .base import BaseModel @@ -65,9 +63,9 @@ def _recommend(self, user_id: int, candidate_item_ids: Optional[List[int]] = Non """ interaction_matrix = self.interactions.to_csr(select_users=[user_id]) dense_output = not self.item_ids.pass_through - return self.model.recommend(user_id, interaction_matrix, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output) + return self.model.recommend(user_id, interaction_matrix, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output, ret_scores=False) # type: ignore - def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str]] = None, top_k: int = 10) -> List[Tuple[int, float]] | Tuple[ndarray, ndarray]: + def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str]] = None, top_k: int = 10) -> List[Tuple[int, float]]: """ Find similar items for a list of query items. :param query_item_ids: List of query item indices @@ -76,7 +74,7 @@ def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str] :param filter_query_items: Whether to filter out items in the query_items list :return: List of top-K similar items for each query item with similarity scores """ - return self.model.similar_items(query_item_id, top_k=top_k) + return self.model.similar_items(query_item_id, top_k=top_k, ret_ndarrays=False) # type: ignore def _serialize(self) -> dict: """ diff --git a/rtrec/recommender.py b/rtrec/recommender.py index daf1598..1f824c4 100644 --- a/rtrec/recommender.py +++ b/rtrec/recommender.py @@ -1,13 +1,14 @@ import math -import pandas as pd import time +from typing import Any, Dict, Iterable, Iterator, List, Optional, Self, Tuple +import pandas as pd from tqdm import tqdm -from typing import Dict, Iterator, Optional, Tuple, Iterable, Any, List, Self from .models.base import BaseModel from .utils.metrics import compute_scores + class Recommender: def __init__(self, model: BaseModel, use_generator: bool = True): @@ -63,7 +64,7 @@ def fit( # sort interactions by timestamp ascending order interaction_df.sort_values("tstamp", ascending=True, inplace=True) total = math.ceil(len(interaction_df) / batch_size) - for batch in tqdm(generate_batches(interaction_df, batch_size, as_generator=self.use_generator), total=total, desc="Add interactions"): + for batch in tqdm(Recommender.generate_batches(interaction_df, batch_size, as_generator=self.use_generator), total=total, desc="Add interactions"): self.model.add_interactions(batch, update_interaction=update_interaction, record_interactions=True) # Fit the model self.model._fit_recorded(parallel=parallel, progress_bar=True) @@ -108,7 +109,7 @@ def bulk_fit( # sort interactions by timestamp ascending order inplace interaction_df.sort_values("tstamp", ascending=True, inplace=True) total = math.ceil(len(interaction_df) / batch_size) - for batch in tqdm(generate_batches(interaction_df, batch_size, as_generator=self.use_generator), total=total, desc="Add interactions"): + for batch in tqdm(Recommender.generate_batches(interaction_df, batch_size, as_generator=self.use_generator), total=total, desc="Add interactions"): self.model.add_interactions(batch, update_interaction=update_interaction) # Fit the model in bulk self.model.bulk_fit(parallel=parallel, progress_bar=True) @@ -190,25 +191,25 @@ def generate_evaluation_pairs() -> Iterable[Tuple[List[Any], List[Any]]]: # Compute and return the evaluation metrics using the generator return compute_scores(generate_evaluation_pairs(), recommend_size) -@staticmethod -def generate_batches(df: pd.DataFrame, batch_size: int = 1_000, as_generator: bool = False) -> Iterator[Iterable[Tuple[int, int, int, float]]]: - """ - Converts a DataFrame to an iterable of mini-batches. - - Parameters: - df (pd.DataFrame): The DataFrame to convert to mini-batches. - batch_size (int): The number of rows per mini-batch. - as_generator (bool): Whether to return a generator or a list of mini-batches. - - Returns: - Iterator[Iterable[Tuple[int, int, int, float]]]: An iterator of mini-batches. - """ - num_rows = len(df) - if as_generator: - for start in range(0, num_rows, batch_size): - batch = df.iloc[start:start + batch_size] - yield batch.itertuples(index=False, name=None) - else: - for start in range(0, num_rows, batch_size): - batch = df.iloc[start:start + batch_size] - yield list(batch.itertuples(index=False, name=None)) + @staticmethod + def generate_batches(df: pd.DataFrame, batch_size: int = 1_000, as_generator: bool = False) -> Iterator[Iterable[Tuple[int, int, int, float]]]: + """ + Converts a DataFrame to an iterable of mini-batches. + + Parameters: + df (pd.DataFrame): The DataFrame to convert to mini-batches. + batch_size (int): The number of rows per mini-batch. + as_generator (bool): Whether to return a generator or a list of mini-batches. + + Returns: + Iterator[Iterable[Tuple[int, int, int, float]]]: An iterator of mini-batches. + """ + num_rows = len(df) + if as_generator: + for start in range(0, num_rows, batch_size): + batch = df.iloc[start:start + batch_size] + yield batch.itertuples(index=False, name=None) + else: + for start in range(0, num_rows, batch_size): + batch = df.iloc[start:start + batch_size] + yield list(batch.itertuples(index=False, name=None)) diff --git a/rtrec/serving/app.py b/rtrec/serving/app.py index e43deab..64ea423 100644 --- a/rtrec/serving/app.py +++ b/rtrec/serving/app.py @@ -1,9 +1,10 @@ +import logging +import os +from typing import Any, List + from fastapi import FastAPI, Header, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel -from typing import List, Any -import logging -import os from rtrec.models import SLIM diff --git a/rtrec/tools/kinesis_consumer.py b/rtrec/tools/kinesis_consumer.py index de09acc..0387229 100644 --- a/rtrec/tools/kinesis_consumer.py +++ b/rtrec/tools/kinesis_consumer.py @@ -1,9 +1,11 @@ -import os import asyncio +import os +import signal +from typing import Dict, List + import boto3 from botocore.exceptions import ClientError -import signal -from typing import List, Dict, Set + from rtrec.models import SLIM kinesis_client = boto3.client('kinesis') diff --git a/rtrec/utils/collections.py b/rtrec/utils/collections.py index 5fb6348..abebe0f 100644 --- a/rtrec/utils/collections.py +++ b/rtrec/utils/collections.py @@ -8,8 +8,8 @@ class IndexedSet(Generic[T]): allowing fast addition of unique keys and retrieval of their indices. """ def __init__(self, iterable: Optional[Iterable[T]] = None) -> None: - self._key_to_index = {} # Maps keys to their indices - self._index_to_key = [] # Maintains the order of insertion + self._key_to_index: dict[T, int] = {} # Maps keys to their indices + self._index_to_key: list[T] = [] # Maintains the order of insertion if iterable is not None: for item in iterable: self.add(item) diff --git a/rtrec/utils/diskcache.py b/rtrec/utils/diskcache.py index 238d569..0fa2a10 100644 --- a/rtrec/utils/diskcache.py +++ b/rtrec/utils/diskcache.py @@ -2,6 +2,7 @@ from collections import OrderedDict from typing import Any + class PersistentCache: def __init__(self, filename: str, cache_size: int = 10_000, pickle_protocol: int = 5): """ @@ -14,7 +15,7 @@ def __init__(self, filename: str, cache_size: int = 10_000, pickle_protocol: int self.filename = filename self.cache_size = cache_size self.pickle_protocol = pickle_protocol - self.lru_cache = OrderedDict() # In-memory cache + self.lru_cache: OrderedDict[str, Any] = OrderedDict() # In-memory cache # flag='n' always create a new, empty database, open for reading and writing. self.store = shelve.open(self.filename, flag='n', protocol=pickle_protocol, writeback=False) self.size = 0 # Total size, counting both LRU cache and disk storage diff --git a/rtrec/utils/features.py b/rtrec/utils/features.py index 263d740..6003d89 100644 --- a/rtrec/utils/features.py +++ b/rtrec/utils/features.py @@ -1,8 +1,11 @@ from typing import List, Optional -from scipy.sparse import csr_matrix + import numpy as np +from scipy.sparse import csr_matrix + from .collections import IndexedSet + class FeatureStore: def __init__(self): diff --git a/rtrec/utils/identifiers.py b/rtrec/utils/identifiers.py index 9da5357..9b4eaa1 100644 --- a/rtrec/utils/identifiers.py +++ b/rtrec/utils/identifiers.py @@ -2,6 +2,7 @@ import numpy as np + class IdentifierError(Exception): """Custom exception for Identifier class key errors.""" def __init__(self, id_name: str, obj_id: int): @@ -19,10 +20,7 @@ def __init__(self, name: str="ID", force_identify: bool=False, **kwargs: Any) -> self.force_identify = force_identify # Flag to force identification self.obj_to_id: dict[Any, int] = {} # Store object-to-ID mapping self.id_to_obj: list[Any] = [] # Store ID-to-object mapping - if force_identify: - self.pass_through = False - else: - self.pass_through : Optional[bool] = None # If True, return the object as-is if it is an integer + self.pass_through: Optional[bool] = False if force_identify else None def identify(self, obj: Any) -> int: # If the object is an integer, return it as-is diff --git a/rtrec/utils/interactions.py b/rtrec/utils/interactions.py index 91b732f..5b7e074 100644 --- a/rtrec/utils/interactions.py +++ b/rtrec/utils/interactions.py @@ -1,13 +1,16 @@ -from collections import defaultdict -from typing import List, Optional, Any -import time, math import logging +import math +import time +from collections import defaultdict from datetime import datetime, timezone +from typing import Any, List, Optional import numpy as np -from scipy.sparse import csr_matrix, csc_matrix, coo_matrix +from scipy.sparse import coo_matrix, csc_matrix, csr_matrix + from .lru import LRUFreqSet + class UserItemInteractions: def __init__(self, min_value: int = -5, max_value: int = 10, decay_in_days: Optional[int] = None, **kwargs: Any) -> None: """ @@ -21,7 +24,7 @@ def __init__(self, min_value: int = -5, max_value: int = 10, decay_in_days: Opti """ # Store interactions as a dictionary of dictionaries in shape {user_id: {item_id: (value, timestamp)}} self.interactions: defaultdict[int, dict[int, tuple[float, float]]] = defaultdict(dict) - self.all_item_ids = set() + self.all_item_ids: set[int] = set() n_recent_hot = kwargs.get("n_recent_hot", 100_000) self.hot_items = LRUFreqSet(capacity=n_recent_hot) assert max_value > min_value, f"max_value should be greater than min_value {max_value} > {min_value}" diff --git a/rtrec/utils/lang.py b/rtrec/utils/lang.py index e5fc71e..4f24422 100644 --- a/rtrec/utils/lang.py +++ b/rtrec/utils/lang.py @@ -1,4 +1,5 @@ -from typing import Callable, Dict, Any +from typing import Any, Callable, Dict + def extract_func_args(func: Callable, kwargs: Dict[str, Any]) -> Dict[str, Any]: """ diff --git a/rtrec/utils/lru.py b/rtrec/utils/lru.py index 27181e4..c68f856 100644 --- a/rtrec/utils/lru.py +++ b/rtrec/utils/lru.py @@ -2,6 +2,7 @@ from collections.abc import MutableSet from typing import Any, Iterator, List, Optional, override + class LRUFreqSet(MutableSet): def __init__(self, capacity: int): """ @@ -90,7 +91,7 @@ def __repr__(self) -> str: def get_freq_items(self, n: Optional[int] = None, exclude_items: List[Any] = []) -> Iterator[Any]: """ - Retrieve the top `n` most frequently used keys in the set, excluding specified items. + Retrieve the top `n` most frequently used keys in the set, excluding specified items. If `n` is None, return all keys sorted by frequency. Parameters: diff --git a/rtrec/utils/math.py b/rtrec/utils/math.py index a02f014..5867b7b 100644 --- a/rtrec/utils/math.py +++ b/rtrec/utils/math.py @@ -1,5 +1,6 @@ import numpy as np + def sigmoid(x): """ sigmoid function diff --git a/rtrec/utils/metrics.py b/rtrec/utils/metrics.py index 99928d3..d109a7b 100644 --- a/rtrec/utils/metrics.py +++ b/rtrec/utils/metrics.py @@ -1,6 +1,7 @@ -from typing import List, Any, Dict, Iterable, Tuple -from math import log2 from collections import defaultdict +from math import log2 +from typing import Any, Dict, Iterable, List, Tuple + def ndcg(ranked_list: List[Any], ground_truth: List[Any], recommend_size: int) -> float: """ @@ -163,7 +164,8 @@ def mrr(ranked_lists: Iterable[List[Any]], ground_truths: Iterable[List[Any]], r MRR score as a float. """ rr_sum = sum(reciprocal_rank(r, g, recommend_size) for r, g in zip(ranked_lists, ground_truths)) - return rr_sum / len(ranked_lists) if ranked_lists else 0.0 + ranked_lists_list = list(ranked_lists) if not isinstance(ranked_lists, list) else ranked_lists + return rr_sum / len(ranked_lists_list) if ranked_lists_list else 0.0 def auc(ranked_list: List[Any], ground_truth: List[Any], recommend_size: int) -> float: """ @@ -259,7 +261,8 @@ def map_score(ranked_lists: Iterable[List[Any]], ground_truths: Iterable[List[An MAP score as a float. """ ap_sum = sum(average_precision(r, g, recommend_size) for r, g in zip(ranked_lists, ground_truths)) - return ap_sum / len(ranked_lists) if ranked_lists else 0.0 + ranked_lists_list = list(ranked_lists) if not isinstance(ranked_lists, list) else ranked_lists + return ap_sum / len(ranked_lists_list) if ranked_lists_list else 0.0 def compute_scores( evaluation_pairs: Iterable[Tuple[List[Any], List[Any]]], recommend_size: int @@ -274,8 +277,8 @@ def compute_scores( Returns: Dictionary with averaged scores across all queries. """ - precision_sum = recall_sum = f1_sum = ndcg_sum = rr_sum = ap_sum = auc_sum = 0.0 - hit_sum = tp_sum = 0 + precision_sum = recall_sum = f1_sum = ndcg_sum = rr_sum = ap_sum = auc_sum = hit_sum = 0.0 + tp_sum = 0 num_queries = 0 # Total number of queries processed for ranked_list, ground_truth in evaluation_pairs: diff --git a/rtrec/utils/multiprocessing.py b/rtrec/utils/multiprocessing.py index a3b9b94..3c680f0 100644 --- a/rtrec/utils/multiprocessing.py +++ b/rtrec/utils/multiprocessing.py @@ -1,6 +1,8 @@ from multiprocessing import shared_memory + import numpy as np + def create_shared_array(array: np.ndarray) -> shared_memory.SharedMemory: """ Create a shared memory segment for a NumPy array and copy the array's data into it. diff --git a/rtrec/utils/pandas.py b/rtrec/utils/pandas.py index 00b75df..236db48 100644 --- a/rtrec/utils/pandas.py +++ b/rtrec/utils/pandas.py @@ -1,6 +1,7 @@ import numpy as np import pandas as pd + def convert_dtypes(df): def parse_numeric(x, downcast): try: diff --git a/rtrec/utils/scoring.py b/rtrec/utils/scoring.py index b3d63fc..9d6abd7 100644 --- a/rtrec/utils/scoring.py +++ b/rtrec/utils/scoring.py @@ -3,6 +3,7 @@ import numpy as np + def minmax_normalize(scores: np.ndarray) -> np.ndarray: """ Normalize scores using min-max scaling along a specific axis. @@ -41,7 +42,7 @@ def weighted_borda(rankings: List[List[int]], weights: List[float]) -> Dict[int, raise ValueError("Number of rankings must match number of weights") # Initialize scores dictionary - borda_scores = defaultdict(float) + borda_scores: Dict[int, float] = defaultdict(float) # Process each ranking list for ranking, weight in zip(rankings, weights): diff --git a/tests/models/test_lightfm.py b/tests/models/test_lightfm.py index 6bd7af8..b51d6b9 100644 --- a/tests/models/test_lightfm.py +++ b/tests/models/test_lightfm.py @@ -1,8 +1,10 @@ +import random +import time + import pytest from rtrec.models.lightfm import LightFM -import time -import random + @pytest.fixture def model(): @@ -243,7 +245,7 @@ def test_recommend_and_similar_items(model): interactions = [ ('user_1', 'item_1', current_unixtime + rng.uniform(0, jitter_range), 5.0), ('user_2', 'item_2', current_unixtime + 1 + rng.uniform(0, jitter_range), 3.0), - ('user_1', 'item_3', current_unixtime + 2 + rng.uniform(0, jitter_range), 4.0), + ('user_1', 'item_3', current_unixtime + 2 + rng.uniform(0, jitter_range), 4.0), ('user_5', 'item_10', current_unixtime + 9 + rng.uniform(0, jitter_range), 4.0), ('user_5', 'item_1', current_unixtime + 9 + rng.uniform(0, jitter_range), 4.0), ('user_3', 'item_5', current_unixtime + 3 + rng.uniform(0, jitter_range), 5.0), diff --git a/tests/models/test_slim.py b/tests/models/test_slim.py index edecdce..2e0e81b 100644 --- a/tests/models/test_slim.py +++ b/tests/models/test_slim.py @@ -4,6 +4,7 @@ from rtrec.models.slim import SLIM + @pytest.fixture def model(): model = SLIM() diff --git a/tests/serving/test_app.py b/tests/serving/test_app.py index ef0113a..9e5dc74 100644 --- a/tests/serving/test_app.py +++ b/tests/serving/test_app.py @@ -1,6 +1,6 @@ -from fastapi import FastAPI import pytest from fastapi.testclient import TestClient + from rtrec.serving.app import create_app # Mock secret token for tests diff --git a/tests/utils/test_collections.py b/tests/utils/test_collections.py index 74c12d9..e39023f 100644 --- a/tests/utils/test_collections.py +++ b/tests/utils/test_collections.py @@ -2,6 +2,7 @@ from rtrec.utils.collections import IndexedSet + def test_add(): indexed_set = IndexedSet() assert indexed_set.add("apple") == 0 # First entry should have index 0 diff --git a/tests/utils/test_diskcache.py b/tests/utils/test_diskcache.py index b0f6c6d..fe5e5b5 100644 --- a/tests/utils/test_diskcache.py +++ b/tests/utils/test_diskcache.py @@ -1,9 +1,11 @@ import os -import pytest from tempfile import NamedTemporaryFile +import pytest + from rtrec.utils.diskcache import PersistentCache + # Test class for PersistentCache @pytest.fixture def cache(): diff --git a/tests/utils/test_features.py b/tests/utils/test_features.py index ccf04b1..1d95992 100644 --- a/tests/utils/test_features.py +++ b/tests/utils/test_features.py @@ -1,8 +1,10 @@ import numpy as np import pytest from scipy.sparse import csr_matrix + from rtrec.utils.features import FeatureStore + # Test adding user features def test_put_user_feature(): features = FeatureStore() @@ -108,7 +110,7 @@ def test_build_user_features_matrix_with_user_id(): # Test with two valid user IDs user_matrix = features.build_user_features_matrix([0, 1]) - expected_matrix = csr_matrix(np.array([[1, 1, 0], [0, 1, 1]]), shape=(2, 3)) # 2 users, 3 features + expected_matrix = csr_matrix(np.array([[1, 1, 0], [0, 1, 1]]), shape=(2, 3)) # 2 users, 3 features assert (user_matrix != expected_matrix).nnz == 0 def test_build_item_features_matrix_with_item_id(): diff --git a/tests/utils/test_interactions.py b/tests/utils/test_interactions.py index a13ef8e..4c1175c 100644 --- a/tests/utils/test_interactions.py +++ b/tests/utils/test_interactions.py @@ -1,9 +1,11 @@ -import pytest -from collections import Counter import time from time import sleep + +import pytest +from scipy.sparse import coo_matrix, csc_matrix + from rtrec.utils.interactions import UserItemInteractions -from scipy.sparse import csc_matrix, coo_matrix + @pytest.fixture def interactions(): diff --git a/tests/utils/test_lang.py b/tests/utils/test_lang.py index 752299b..ec79d3f 100644 --- a/tests/utils/test_lang.py +++ b/tests/utils/test_lang.py @@ -1,8 +1,10 @@ -from typing import Dict, Any +from typing import Any, Dict import pytest + from rtrec.utils.lang import extract_func_args + def test_extract_func_args(): def sample_function(a, b, c): return a + b + c diff --git a/tests/utils/test_lru.py b/tests/utils/test_lru.py index 4168796..e6d83ce 100644 --- a/tests/utils/test_lru.py +++ b/tests/utils/test_lru.py @@ -1,7 +1,10 @@ from typing import List + import pytest + from rtrec.utils.lru import LRUFreqSet + @pytest.fixture def lru_set(): """Fixture to create a new LRUFreqSet instance for each test.""" diff --git a/tests/utils/test_metrics.py b/tests/utils/test_metrics.py index d4ca574..21a91f7 100644 --- a/tests/utils/test_metrics.py +++ b/tests/utils/test_metrics.py @@ -1,17 +1,20 @@ -import pytest from math import log2 + +import pytest + from rtrec.utils.metrics import ( - precision, - recall, + auc, + average_precision, f1_score, - ndcg, hit, + ndcg, + precision, + recall, reciprocal_rank, - average_precision, - auc, true_positives, ) + @pytest.mark.parametrize("ranked_list, ground_truth, k, expected", [ ([1, 3, 2, 6], [1, 2, 4], 4, 0.7039), ([1, 3, 2, 6], [1, 2, 4], 2, 0.6131),