Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions examples/streamlit/movielens/movielens_dashboard.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import datetime
import os
import re
import time
import urllib.parse
from typing import Dict, Optional, Tuple

import streamlit as st
import pandas as pd
import requests
import re
import urllib.parse
import streamlit as st

from rtrec.models import SLIM
from rtrec.recommender import Recommender
Expand All @@ -28,10 +28,10 @@ def load_movielens_100k() -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
ratings_df = pd.read_csv(RATINGS_FILE, sep='\t', names=col_rat)

col_items = [
'movie_id', 'movie_title', 'release_date', 'video_release_date',
'IMDb_URL', 'unknown', 'Action', 'Adventure', 'Animation',
'Childrens', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy',
'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci_Fi',
'movie_id', 'movie_title', 'release_date', 'video_release_date',
'IMDb_URL', 'unknown', 'Action', 'Adventure', 'Animation',
'Childrens', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy',
'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci_Fi',
'Thriller', 'War', 'Western'
]
items_df = pd.read_csv(ITEMS_FILE, sep='|', names=col_items, encoding='latin-1')
Expand Down
11 changes: 6 additions & 5 deletions rtrec/experiments/datasets.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import os
import tarfile
import pandas as pd
import zipfile
import urllib.request
import os

import zipfile
from datetime import datetime

import pandas as pd

from .kaggle_datasets import load_retailrocket


def load_movielens(dataset_scale: str, sort_by_tstamp: bool = False, load_user_attributes: bool = False, load_item_attributes: bool = False) -> pd.DataFrame:
"""
Downloads and loads the specified MovieLens dataset version into a DataFrame with columns
Expand Down Expand Up @@ -276,7 +277,7 @@ def load_amazon_review_v2(category_name: str = "Music",
sort_by_tstamp (bool): Whether to sort the DataFrame by timestamp (default is False).

Returns:
pd.DataFrame: A DataFrame containing user, item, rating, timestamp, image_url, and
pd.DataFrame: A DataFrame containing user, item, rating, timestamp, image_url, and
(if category is "all") category.
"""
# Base URL and directory setup
Expand Down
16 changes: 12 additions & 4 deletions rtrec/experiments/experiments.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from typing import Dict
import argparse
from typing import Dict

from .datasets import load_dataset
from .split import leave_one_last_item, random_split, temporal_split, temporal_user_split
from ..models import SLIM, HybridSlimFM, LightFM
from ..models.base import BaseModel
from ..recommender import Recommender
from ..models import SLIM, LightFM, HybridSlimFM
from .datasets import load_dataset
from .split import (
leave_one_last_item,
random_split,
temporal_split,
temporal_user_split,
)


def run_experiment(
dataset_name: str,model_name: str, split_method: str = "temporal", epochs: int = 10, batch_size: int = 1000
Expand Down Expand Up @@ -40,6 +47,7 @@ def run_experiment(
raise ValueError(f"Unsupported split method: {split_method}")

# Initialize and train the recommender
model: BaseModel
if model_name == "slim":
model = SLIM(epochs=epochs, random_seed=43)
elif model_name == "lightfm":
Expand Down
7 changes: 4 additions & 3 deletions rtrec/experiments/kaggle_datasets.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from collections import defaultdict
import os
import zipfile
from typing import Dict

import pandas as pd
import os
import requests
import zipfile
from tqdm import tqdm

from .utils import map_hour_to_period


def load_retailrocket(standardize_schema: bool=True) -> pd.DataFrame:
"""
Load the Retail Rocket dataset from a CSV file.
Expand Down
6 changes: 4 additions & 2 deletions rtrec/experiments/split.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Optional, Tuple

import pandas as pd
from typing import Tuple, Optional


def leave_one_last_item(
df: pd.DataFrame,
Expand Down Expand Up @@ -46,7 +48,7 @@ def temporal_split(

Parameters:
df (pd.DataFrame): A pandas DataFrame with columns ['user', 'item', 'tstamp', 'rating'].
test_frac (float): The proportion of data to include in the test set, between 0 and 1.
test_frac (float): The proportion of data to include in the test set, between 0 and 1.
Used if timestamp is not provided. Default is 0.2.
timestamp (Optional[float]): The POSIX timestamp threshold for the split. Interactions before this
timestamp will be in the training set, and interactions on or after
Expand Down
8 changes: 5 additions & 3 deletions rtrec/experiments/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import pandas as pd
from typing import List

import pandas as pd


def n_core_filter(df: pd.DataFrame, columns: List[str], min_count: int = 10) -> pd.DataFrame:
"""
Filters the DataFrame to only include rows where any specified column's values
Expand All @@ -9,11 +11,11 @@ def n_core_filter(df: pd.DataFrame, columns: List[str], min_count: int = 10) ->
Args:
df (pd.DataFrame): The input DataFrame.
columns (List[str]): The list of columns to apply the n-core filter on.
min_count (int): Minimum occurrences required for each value in any specified column
min_count (int): Minimum occurrences required for each value in any specified column
to be retained. Defaults to 10.

Returns:
pd.DataFrame: A filtered DataFrame containing only rows where values in any specified
pd.DataFrame: A filtered DataFrame containing only rows where values in any specified
column appear at least `min_count` times.
"""
# Initialize a mask to keep track of rows that meet the criteria for any column
Expand Down
4 changes: 2 additions & 2 deletions rtrec/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .slim import SLIM
from .lightfm import LightFM
from .hybrid import HybridSlimFM
from .lightfm import LightFM
from .slim import SLIM

__all__ = ["SLIM", "LightFM", "HybridSlimFM"]
10 changes: 4 additions & 6 deletions rtrec/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from io import BytesIO
from typing import Any, Iterable, List, Optional, Self, Tuple, Union

from numpy import ndarray

from rtrec.utils.features import FeatureStore
from rtrec.utils.identifiers import Identifier
from rtrec.utils.interactions import UserItemInteractions
Expand Down Expand Up @@ -217,11 +215,11 @@ def recommend_batch(self, users: List[Any], candidate_items: Optional[List[Any]]

if len(cold_user_ids) == 0:
# If there are no cold-start users, proceed with batch recommendation
results = self._recommend_hot_batch(hot_user_ids, candidate_item_ids=candidate_item_ids, users_tags=users_tags, top_k=top_k, filter_interacted=filter_interacted)
return [[self.item_ids.get(item_id) for item_id in internal_ids] for internal_ids in results]
batch_results = self._recommend_hot_batch(hot_user_ids, candidate_item_ids=candidate_item_ids, users_tags=users_tags, top_k=top_k, filter_interacted=filter_interacted)
return [[self.item_ids.get(item_id) for item_id in internal_ids] for internal_ids in batch_results]

# Initialize results list
results = [None] * len(users)
results: list[list[int]] = [[] for _ in range(len(users))]

# Handle cold-start users
if cold_indices:
Expand Down Expand Up @@ -334,7 +332,7 @@ def similar_items(self, query_item: Any, query_item_tags: Optional[List[str]] =
return [self.item_ids.get(item_id) for item_id, _ in similar_item_ids]

@abstractmethod
def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str]] = None, top_k: int = 10) -> List[Tuple[int, float]] | Tuple[ndarray, ndarray]:
def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str]] = None, top_k: int = 10) -> List[Tuple[int, float]]:
"""
Find similar items for a list of query items.
:param query_item_id: item id to find similar items for
Expand Down
32 changes: 20 additions & 12 deletions rtrec/models/hybrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def comb_sum(fm_ids: np.ndarray, fm_scores: np.ndarray,
Returns:
Dict[int, float]: Dictionary containing the aggregated scores for each item.
"""
summed_scores = defaultdict(float)
summed_scores: Dict[int, float] = defaultdict(float)

# Process FM scores
for item_id, score in zip(fm_ids, fm_scores):
Expand Down Expand Up @@ -73,8 +73,8 @@ def comb_mnz(fm_ids: np.ndarray, fm_scores: np.ndarray,
Returns:
Dict[int, float]: Dictionary containing the aggregated scores for each item.
"""
summed_scores = defaultdict(float)
item_counts = defaultdict(int)
summed_scores: Dict[int, float] = defaultdict(float)
item_counts: Dict[int, int] = defaultdict(int)

# Process FM scores
for item_id, score in zip(fm_ids, fm_scores):
Expand Down Expand Up @@ -198,6 +198,9 @@ def bulk_fit(self, parallel: bool=False, progress_bar: bool=True) -> Self:
self.model.fit_partial(ui_coo, user_features, item_features, sample_weight=sample_weights, epochs=self.epochs, num_threads=self.n_threads, verbose=progress_bar)
# Fit SLIM model
ui_csc = ui_coo.tocsc(copy=False)
# Ensure we have a csc_matrix, not csc_array
if not isinstance(ui_csc, sparse.csc_matrix):
ui_csc = sparse.csc_matrix(ui_csc)
self.slim_model.fit(ui_csc, parallel=parallel, progress_bar=progress_bar)
return self

Expand All @@ -207,7 +210,7 @@ def _recommend(self, user_id: int, candidate_item_ids: Optional[List[int]] = Non
interaction_matrix = self.interactions.to_csr(select_users=[user_id])
dense_output = not self.item_ids.pass_through
result = self.slim_model.recommend(user_id, interaction_matrix, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output)
return result
return result # type: ignore

users_tags = [user_tags] if user_tags is not None else None
user_features = self._create_user_features(user_ids=[user_id], users_tags=users_tags, slice=True)
Expand Down Expand Up @@ -250,7 +253,7 @@ def _recommend(self, user_id: int, candidate_item_ids: Optional[List[int]] = Non
dense_output = not self.item_ids.pass_through
slim_ids, slim_scores = self.slim_model.recommend(user_id, ui_csr, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output, ret_scores=True)

return self._ensemble_by_scores(user_id, fm_ids, fm_scores, slim_ids, slim_scores, top_k)
return self._ensemble_by_scores(user_id, fm_ids, fm_scores, slim_ids, slim_scores, top_k) # type: ignore

@override
def handle_unknown_user(self, user: Any) -> Optional[int]:
Expand Down Expand Up @@ -325,6 +328,8 @@ def _cold_user_features(self, users_tags: List[List[str]]) -> csr_matrix:
# Create zero matrix for identity since cold users have no history
users = sparse.csr_matrix((num_rows, num_hot_users), dtype="float32")

if self.model.user_embeddings is None:
raise ValueError("Model user_embeddings is None")
num_user_features = self.model.user_embeddings.shape[0] - num_hot_users
assert num_user_features > 0, f"num_user_features should be greater than 0, but got {num_user_features}"

Expand Down Expand Up @@ -361,10 +366,10 @@ def _recommend_hot_batch(self,
interaction_matrix = self.interactions.to_csr(select_users=user_ids)
dense_output = not self.item_ids.pass_through
result = [
self.slim_model.recommend(user_id, interaction_matrix, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output)
self.slim_model.recommend(user_id, interaction_matrix, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output, ret_scores=False)
for user_id in user_ids
]
return result
return result # type: ignore

user_features = self._create_user_features(user_ids, users_tags=users_tags, slice=True)
item_features = self._create_item_features(item_ids=candidate_item_ids, slice=False)
Expand Down Expand Up @@ -408,7 +413,7 @@ def _recommend_hot_batch(self,

slim_ids, slim_scores = self.slim_model.recommend(user_id, ui_csr, candidate_item_ids=candidate_item_ids, top_k=top_k, filter_interacted=filter_interacted, dense_output=dense_output, ret_scores=True)
# Combine scores from both models and get top-k item ids
top_items = self._ensemble_by_scores(user_id, fm_ids, fm_scores, slim_ids, slim_scores, top_k)
top_items = self._ensemble_by_scores(user_id, fm_ids, fm_scores, slim_ids, slim_scores, top_k) # type: ignore
results.append(top_items)
return results

Expand All @@ -429,9 +434,9 @@ def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str]
assert target_vector is not None, "target_vector should not be None"
target_norm = calc_norm(target_vector)

fm_ids, fm_scores = implicit.topk(items=target_vector, query=query_vector, k=top_k, item_norms=target_norm, filter_items=np.array([query_item_id], dtype="int32"), num_threads=self.n_threads)
fm_ids: np.ndarray = fm_ids.ravel()
fm_scores: np.ndarray = fm_scores.ravel()
fm_ids_raw, fm_scores_raw = implicit.topk(items=target_vector, query=query_vector, k=top_k, item_norms=target_norm, filter_items=np.array([query_item_id], dtype="int32"), num_threads=self.n_threads)
fm_ids: np.ndarray = fm_ids_raw.ravel()
fm_scores: np.ndarray = fm_scores_raw.ravel()

# implicit assigns negative infinity to the scores to be fitered out
# see https://github.com/benfred/implicit/blob/v0.7.2/implicit/cpu/topk.pyx#L54
Expand All @@ -456,11 +461,14 @@ def _similar_items(self, query_item_id: int, query_item_tags: Optional[List[str]

# Get SLIM similar items
slim_ids, slim_scores = self.slim_model.similar_items(query_item_id, top_k=top_k, ret_ndarrays=True)
assert isinstance(slim_ids, np.ndarray), "slim_ids should be a numpy array"
assert isinstance(slim_scores, np.ndarray), "slim_scores should be a numpy array"

# Combine scores from both models
fm_scores = minmax_normalize(fm_scores)
slim_scores = minmax_normalize(slim_scores)
comb_scores = comb_sum(fm_ids, fm_scores, slim_ids, slim_scores)
slim_ids_list = slim_ids.tolist() if hasattr(slim_ids, 'tolist') else list(slim_ids)
comb_scores = comb_sum(fm_ids, fm_scores, slim_ids_list, slim_scores) # type: ignore
# Get top-k item ids
sorted_items = sorted(comb_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
return sorted_items
Expand Down
6 changes: 4 additions & 2 deletions rtrec/models/internal/lightfm_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import Optional, override

from lightfm import LightFM
from scipy.sparse import coo_matrix, csr_matrix
from sklearn.base import clone
from typing import Optional, override
from scipy.sparse import csr_matrix, coo_matrix


class LightFMWrapper(LightFM):
"""
Expand Down
Loading
Loading