-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspoManager.py
More file actions
178 lines (144 loc) · 8.33 KB
/
Copy pathspoManager.py
File metadata and controls
178 lines (144 loc) · 8.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import os
import spotipy
from datetime import datetime as dt
from spotipy.oauth2 import SpotifyOAuth
from utilities import Utilities as utils
from dotenv import load_dotenv
ARTIST_PATH = os.path.join(".", "data", "{}")
ARTIST_URL_PREFIX = "https://open.spotify.com/artist/"
RESPONESE_OFFSET = 20
class SpotifyManager:
def __init__(self, debug=False, country = "ES"):
load_dotenv()
client_id = os.getenv('clientID')
client_secret = os.getenv('clientSecret')
if not client_id or not client_secret:
raise EnvironmentError("Environment variables 'clientID' or 'clientSecret' not found.")
try:
self.sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
client_id=client_id,
client_secret=client_secret,
redirect_uri="http://localhost:8888/callback",
scope="user-library-read,user-follow-read,playlist-modify-public,playlist-modify-private"))
except Exception as e:
raise RuntimeError("Error authenticating with Spotify: " + str(e))
self.debug = debug
self.country = country
print("Spotify working")
def _get_artist_id_from_url(self, artist_id):
if artist_id.startswith(ARTIST_URL_PREFIX):
return artist_id[len(ARTIST_URL_PREFIX):len(ARTIST_URL_PREFIX) + 22]
return artist_id
def _save_response(self, path, data):
utils.saveResponse(data, path)
def search_artist(self, query, limit=1):
"""
Search for an artist by name.
Parameters:
- query: The search string for the artist name.
- limit: The number of results to return. Defaults to 1.
Returns the first artist ID from the results or None if no results.
"""
results = self.sp.search(q=query, limit=limit, type='artist')
items = results.get('artists', {}).get('items', [])
return items[0]['id'] if items else None
def getArtistCollabs(self, artist_id, force=False):
"""
Retrieves the artist collaborations for a given artist ID or name.
Args:
artist_id (str): The ID or name of the artist.
force (bool, optional): If True, forces the retrieval of artist collaborations even if the data already exists.
Defaults to False.
Returns:
tuple: A tuple containing the following information:
- total_artists (dict): A dictionary mapping artist IDs to the number of collaborations.
- registered_songs (dict): A dictionary mapping track IDs to track information.
- last_collab_artist (dict): A dictionary mapping artist IDs to the date of their last collaboration.
- artist_response (dict): Information about the artist.
- artists_info (dict): Information about the collaborating artists.
"""
# First, we'll attempt to determine if the input is a name or ID.
# If the input does not start with the Spotify URL prefix and does not seem to have the format of a Spotify ID,
# then we'll assume it's a name and try to search for it.
if not (artist_id.startswith(ARTIST_URL_PREFIX) or len(artist_id) == 22):
searched_artist_id = self.search_artist(artist_id)
if not searched_artist_id:
raise ValueError(f"No artist found for the query: {artist_id}")
artist_id = searched_artist_id
seen_preview_urls = set()
artist_id = self._get_artist_id_from_url(artist_id)
artist_folder = ARTIST_PATH.format(artist_id)
if not force and os.path.exists(artist_folder):
if self.debug:
print("Already existed")
return tuple(utils.loadJson(os.path.join(artist_folder, f"{name}.json")) for name in
["totalArtists", "registeredSongs", "lastCollab", "artistData", "artistInfo"])
os.makedirs(artist_folder, exist_ok=True)
total_artists, registered_songs, last_collab_artist = {}, {}, {}
ids_to_fetch = []
artist_response = self.sp.artist(artist_id)
artist_name = artist_response['name']
# Count of albums retrieved for DEBUG purposes
total_retrieved = 0
first_iteration = True
response = None
while first_iteration or response['next']:
if first_iteration:
first_iteration = False
response = self.sp.artist_albums(artist_id, limit=50, country=self.country, album_type="album,single,appears_on") # ,
else:
response = self.sp.next(response)
if self.debug:
total_retrieved += len(response['items'])
print(f"Obtained {response['offset']} / {response['total']} -> items: {len(response['items'])}, total: {total_retrieved}")
for album in response['items']:
# Skip compilation albums
if album['album_type'] == 'compilation':
continue
release_date_format = '%Y' if album["release_date_precision"] == "year" else '%Y-%m-%d'
release_date = dt.strptime(album['release_date'], release_date_format)
album_tracks = self.sp.album_tracks(album["uri"])
for track in album_tracks['items']:
if track['preview_url'] in seen_preview_urls:
continue
seen_preview_urls.add(track['preview_url'])
artist_ids = [a["id"] for a in track['artists']]
if artist_id in artist_ids:
track_data = registered_songs.setdefault(track["id"], {
"name": track["name"],
"url": album["external_urls"]["spotify"],
"artists": artist_ids,
"collaborations": [],
"thumbnail": album["images"][1]["url"],
"preview": track["preview_url"]
})
if artist_ids not in track_data["collaborations"]:
track_data["collaborations"].append(artist_ids)
for a_id in artist_ids:
total_artists[a_id] = total_artists.get(a_id, 0) + 1
if artist_id != a_id:
last_collab_artist[a_id] = max(last_collab_artist.get(a_id, release_date), release_date)
if a_id not in ids_to_fetch:
ids_to_fetch.append(a_id)
all_artist_details = []
for i in range(0, len(ids_to_fetch), 50):
ids_chunk = ids_to_fetch[i:i+50]
chunk_details = self.sp.artists(ids_chunk)
all_artist_details.extend(chunk_details['artists'])
artists_details = {'artists': all_artist_details}
artists_info = {artist['id']: {"name": artist['name'],
"url": artist['images'][0]['url'] if artist['images'] else None,
"genres": artist['genres'] if 'genres' in artist else []
} for artist in artists_details['artists']}
artists_info[artist_id] = {
'name': artist_name,
'url': artist_response['images'][0]['url'] if artist_response['images'] else None,
'genres': artist_response['genres']
}
last_collab_artist = {key: elem.strftime('%Y-%m-%d') for key, elem in last_collab_artist.items()}
self._save_response(os.path.join(artist_folder, "totalArtists.json"), total_artists)
self._save_response(os.path.join(artist_folder, "registeredSongs.json"), registered_songs)
self._save_response(os.path.join(artist_folder, "lastCollab.json"), last_collab_artist)
self._save_response(os.path.join(artist_folder, "artistData.json"), artist_response)
self._save_response(os.path.join(artist_folder, "artistInfo.json"), artists_info)
return total_artists, registered_songs, last_collab_artist, artist_response, artists_info