Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 122 additions & 15 deletions logics/recommend.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *Recommender) IsColdStart() bool {

func (r *Recommender) Recommend(ctx context.Context, limit int) (result []cache.Score, err error) {
if !strings.EqualFold(r.config.Ranker.Type, "none") {
scores, err := r.cacheClient.SearchScores(ctx, cache.Recommend, r.userId, r.categories, 0, r.config.CacheSize)
scores, err := r.searchAvailableScores(ctx, cache.Recommend, r.userId, r.categories)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -146,6 +146,112 @@ func (r *Recommender) RecommendSequential(ctx context.Context, result []cache.Sc
return result, util.MD5(digests...), nil
}

func (r *Recommender) expireBefore() time.Time {
if r.config.DataSource.ItemTTL == 0 {
return time.Time{}
}
return time.Now().AddDate(0, 0, -int(r.config.DataSource.ItemTTL))
}

func availableItem(item data.Item, expireBefore time.Time) bool {
if item.IsHidden {
return false
}
return expireBefore.IsZero() || item.Timestamp.IsZero() || !item.Timestamp.Before(expireBefore)
}

func (r *Recommender) filterAvailableItems(items []data.Item) []data.Item {
expireBefore := r.expireBefore()
return lo.Filter(items, func(item data.Item, _ int) bool {
return availableItem(item, expireBefore)
})
}

func (r *Recommender) batchGetItemsMap(ctx context.Context, ids []string) (map[string]data.Item, error) {
if len(ids) == 0 {
return map[string]data.Item{}, nil
}
items, err := r.dataClient.BatchGetItems(ctx, ids)
if err != nil {
return nil, errors.Trace(err)
}
itemsMap := make(map[string]data.Item, len(items))
for _, item := range items {
itemsMap[item.ItemId] = item
}
return itemsMap, nil
}

func (r *Recommender) filterAvailableScores(ctx context.Context, scores []cache.Score) ([]cache.Score, error) {
itemsMap, err := r.batchGetItemsMap(ctx, lo.Map(scores, func(score cache.Score, _ int) string {
return score.Id
}))
if err != nil {
return nil, errors.Trace(err)
}
expireBefore := r.expireBefore()
return lo.Filter(scores, func(score cache.Score, _ int) bool {
item, ok := itemsMap[score.Id]
return !ok || availableItem(item, expireBefore)
}), nil
}

func (r *Recommender) searchAvailableScores(ctx context.Context, collection, subset string, categories []string) ([]cache.Score, error) {
if r.config.CacheSize <= 0 {
scores, err := r.cacheClient.SearchScores(ctx, collection, subset, categories, 0, -1)
if err != nil {
return nil, errors.Trace(err)
}
return r.filterAvailableScores(ctx, scores)
}
results := make([]cache.Score, 0, r.config.CacheSize)
for begin := 0; ; begin += r.config.CacheSize {
scores, err := r.cacheClient.SearchScores(ctx, collection, subset, categories, begin, begin+r.config.CacheSize)
if err != nil {
return nil, errors.Trace(err)
}
pageSize := len(scores)
if pageSize == 0 {
return results, nil
}
scores, err = r.filterAvailableScores(ctx, scores)
if err != nil {
return nil, errors.Trace(err)
}
results = append(results, scores...)
if len(results) >= r.config.CacheSize {
return results[:r.config.CacheSize], nil
}
if pageSize < r.config.CacheSize {
return results, nil
}
}
}

func (r *Recommender) getLatestAvailableItems(ctx context.Context) ([]data.Item, error) {
if r.config.CacheSize <= 0 {
items, err := r.dataClient.GetLatestItems(ctx, 0, r.categories)
if err != nil {
return nil, errors.Trace(err)
}
return r.filterAvailableItems(items), nil
}
for n := r.config.CacheSize; ; n += r.config.CacheSize {
items, err := r.dataClient.GetLatestItems(ctx, n, r.categories)
if err != nil {
return nil, errors.Trace(err)
}
m := len(items)
items = r.filterAvailableItems(items)
if len(items) >= r.config.CacheSize {
return items[:r.config.CacheSize], nil
}
if m < n {
return items, nil
}
}
}

func (r *Recommender) parse(fullname string) (RecommenderFunc, error) {
if fullname == CollaborativeRecommender {
return r.recommendCollaborative, nil
Expand All @@ -169,7 +275,7 @@ func (r *Recommender) parse(fullname string) (RecommenderFunc, error) {
}

func (r *Recommender) recommendLatest(ctx context.Context) ([]cache.Score, string, error) {
items, err := r.dataClient.GetLatestItems(ctx, r.config.CacheSize, r.categories)
items, err := r.getLatestAvailableItems(ctx)
if err != nil {
return nil, "", errors.Trace(err)
}
Expand All @@ -194,8 +300,7 @@ func (r *Recommender) recommendNonPersonalized(name string) RecommenderFunc {
} else {
categories = r.categories
}
// fetch items from cache
items, err := r.cacheClient.SearchScores(ctx, cache.NonPersonalized, name, categories, 0, r.config.CacheSize)
items, err := r.searchAvailableScores(ctx, cache.NonPersonalized, name, categories)
if err != nil {
return nil, "", errors.Trace(err)
}
Expand All @@ -205,15 +310,14 @@ func (r *Recommender) recommendNonPersonalized(name string) RecommenderFunc {
return nil, "", errors.Trace(err)
}
// remove excluded items
return lo.Filter(items, func(item cache.Score, index int) bool {
return lo.Filter(items, func(item cache.Score, _ int) bool {
return !r.excludeSet.Contains(item.Id)
}), digest, nil
}
}

func (r *Recommender) recommendCollaborative(ctx context.Context) ([]cache.Score, string, error) {
// fetch items from cache
items, err := r.cacheClient.SearchScores(ctx, cache.CollaborativeFiltering, r.userId, r.categories, 0, r.config.CacheSize)
items, err := r.searchAvailableScores(ctx, cache.CollaborativeFiltering, r.userId, r.categories)
if err != nil {
return nil, "", errors.Trace(err)
}
Expand All @@ -223,7 +327,7 @@ func (r *Recommender) recommendCollaborative(ctx context.Context) ([]cache.Score
return nil, "", errors.Trace(err)
}
// remove excluded items
return lo.Filter(items, func(item cache.Score, index int) bool {
return lo.Filter(items, func(item cache.Score, _ int) bool {
return !r.excludeSet.Contains(item.Id)
}), digest, nil
}
Expand All @@ -246,7 +350,7 @@ func (r *Recommender) recommendItemToItem(name string) RecommenderFunc {
categories := make(map[string][]string)
digests := mapset.NewSet[string]()
for _, feedback := range userFeedback {
similarItems, err := r.cacheClient.SearchScores(ctx, cache.ItemToItem, cache.Key(name, feedback.ItemId), r.categories, 0, r.config.CacheSize)
similarItems, err := r.searchAvailableScores(ctx, cache.ItemToItem, cache.Key(name, feedback.ItemId), r.categories)
if err != nil {
return nil, "", errors.Trace(err)
}
Expand Down Expand Up @@ -316,16 +420,15 @@ func (r *Recommender) recommendUserToUser(name string) RecommenderFunc {
ids := lo.Map(elems, func(elem heap.Elem[string, float64], _ int) string {
return elem.Value
})
items, err := r.dataClient.BatchGetItems(ctx, ids)
itemsMap, err := r.batchGetItemsMap(ctx, ids)
if err != nil {
return nil, "", errors.Trace(err)
}
itemsMap := make(map[string]data.Item)
for _, item := range items {
itemsMap[item.ItemId] = item
}
expireBefore := r.expireBefore()
for _, elem := range elems {
if item, ok := itemsMap[elem.Value]; ok && lo.Every(item.Categories, r.categories) {
if item, ok := itemsMap[elem.Value]; ok &&
lo.Every(item.Categories, r.categories) &&
availableItem(item, expireBefore) {
results = append(results, cache.Score{
Id: item.ItemId,
Score: elem.Weight,
Expand Down Expand Up @@ -369,6 +472,10 @@ func (r *Recommender) recommendExternal(name string) RecommenderFunc {
})
}
}
scores, err = r.filterAvailableScores(ctx, scores)
if err != nil {
return nil, "", errors.Trace(err)
}
return scores, externalConfig.Hash(), nil
}
}
Loading
Loading