From 1ad1957cb4e06ac58bbe9f7b86bfa2e0701e1eb1 Mon Sep 17 00:00:00 2001 From: gtx20060124-bot Date: Wed, 27 May 2026 16:08:15 +0800 Subject: [PATCH] fix: resolve 500 Server Error on generate-proxy endpoint (#7) --- requirements.txt | 3 +- setup.py | 29 +-- udio_wrapper/__init__.py | 397 ++++++++++++++++++++++++++------------- 3 files changed, 290 insertions(+), 139 deletions(-) diff --git a/requirements.txt b/requirements.txt index 077c95d..cf195bf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -requests==2.31.0 \ No newline at end of file +requests>=2.31.0 +urllib3>=1.26.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 1b460e4..e548acb 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,18 @@ -from setuptools import setup, find_packages - -with open("requirements.txt", "r") as f: - requirements = f.read().splitlines() - -setup( - name='udio_wrapper', - version='0.0.1', - description='Generates songs using the Udio API using textual prompts.', - author='Flowese', - packages=find_packages(), - install_requires=requirements, +from setuptools import setup, find_packages + +with open("requirements.txt", "r") as f: + requirements = [l.strip() for l in f if l.strip() and not l.startswith("#")] + +setup( + name="udio_wrapper", + version="0.0.4", + description="Generates songs using the Udio API using textual prompts.", + author="Flowese", + packages=find_packages(), + install_requires=requirements, + extras_require={ + "cloudscraper": ["cloudscraper>=1.2.0"], + "curl_cffi": ["curl_cffi>=0.7.0"], + }, + python_requires=">=3.8", ) \ No newline at end of file diff --git a/udio_wrapper/__init__.py b/udio_wrapper/__init__.py index d4ed8fe..59c9504 100644 --- a/udio_wrapper/__init__.py +++ b/udio_wrapper/__init__.py @@ -1,169 +1,312 @@ """ Udio Wrapper -Author: Flowese -Version: 0.0.3 -Date: 2024-04-15 +Version: 0.0.4 Description: Generates songs using the Udio API using textual prompts. """ -import requests +import logging import os import time +try: + from urllib3.util.retry import Retry +except ImportError: + Retry = None + +try: + import requests + from requests.adapters import HTTPAdapter + from requests.exceptions import RequestException +except ImportError: + raise ImportError("requests >= 2.26.0 required") + +logger = logging.getLogger(__name__) + +HAS_CLOUDSCRAPER = False +try: + import cloudscraper + HAS_CLOUDSCRAPER = True +except ImportError: + pass + +HAS_CURL_CFFI = False +try: + from curl_cffi import requests as curl_requests + HAS_CURL_CFFI = True +except ImportError: + pass + + class UdioWrapper: API_BASE_URL = "https://www.udio.com/api" - def __init__(self, auth_token): - self.auth_token = auth_token + def __init__( + self, + auth_token, + timeout=60, + max_retries=3, + max_poll_time=300, + use_cloudscraper=False, + use_curl_cffi=False, + proxies=None, + ): + self.auth_token = auth_token.strip() self.all_track_ids = [] + self.timeout = timeout + self.max_poll_time = max_poll_time + self.proxies = proxies or self._resolve_proxies() + self.session = self._build_session( + use_cloudscraper, use_curl_cffi, max_retries + ) + + @staticmethod + def _resolve_proxies(): + proxies = {} + for scheme in ("http", "https"): + val = os.environ.get(f"{scheme}_proxy") or os.environ.get( + f"{scheme.upper()}_PROXY" + ) + if val: + proxies[scheme] = val + return proxies or None + + def _build_session(self, use_cloudscraper, use_curl_cffi, max_retries): + session = None + + if use_curl_cffi and HAS_CURL_CFFI: + logger.info("using curl_cffi session (TLS fingerprint mimic)") + session = curl_requests.Session() + if self.proxies: + session.proxies.update(self.proxies) + return session + + if use_cloudscraper and HAS_CLOUDSCRAPER: + logger.info("using cloudscraper session (Cloudflare JS bypass)") + session = cloudscraper.create_scraper() + else: + if use_cloudscraper and not HAS_CLOUDSCRAPER: + logger.warning( + "cloudscraper not installed, fallback to requests. " + "Install: pip install cloudscraper" + ) + session = requests.Session() + + if Retry is not None: + retry = Retry( + total=max_retries, + read=max_retries, + connect=max_retries, + backoff_factor=1.0, + allowed_methods={"GET", "POST"}, + status_forcelist={429, 500, 502, 503, 504}, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount("https://", adapter) + session.mount("http://", adapter) + + if self.proxies: + session.proxies.update(self.proxies) + return session def make_request(self, url, method, data=None, headers=None): + kwargs = {"headers": headers, "timeout": self.timeout} try: - if method == 'POST': - response = requests.post(url, headers=headers, json=data) + if method == "POST": + kwargs["json"] = data + response = self.session.post(url, **kwargs) else: - response = requests.get(url, headers=headers) + response = self.session.get(url, **kwargs) response.raise_for_status() return response - except requests.exceptions.RequestException as e: - print(f"Error making {method} request to {url}: {e}") + except RequestException as e: + status = getattr(e.response, "status_code", None) + snippet = "" + if e.response is not None: + try: + snippet = e.response.text[:200] + except Exception: + pass + logger.error( + "Udio API %s %s -> %s | %s", + method, url, status or "N/A", snippet or "", + ) + return None + + @staticmethod + def _parse_json(response): + if response is None: + return None + try: + return response.json() + except Exception as e: + logger.error("JSON parse error: %s", e) return None def get_headers(self, get_request=False): headers = { - "Accept": "application/json, text/plain, */*" if get_request else "application/json", + "Accept": ( + "application/json, text/plain, */*" + if get_request + else "application/json" + ), "Content-Type": "application/json", - "Cookie": f"; sb-api-auth-token={self.auth_token}", + "Cookie": f"sb-api-auth-token={self.auth_token}", "Origin": "https://www.udio.com", "Referer": "https://www.udio.com/my-creations", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/126.0.0.0 Safari/537.36" + ), "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Dest": "empty" + "Sec-Fetch-Dest": "empty", } if not get_request: headers.update({ - "sec-ch-ua": '"Google Chrome";v="123", "Not:A-Brand";v="8", "Chromium";v="123"', + "sec-ch-ua": ( + '"Google Chrome";v="126", ' + '"Not.A/Brand";v="24", ' + '"Chromium";v="126"' + ), "sec-ch-ua-mobile": "?0", - "sec-ch-ua-platform": '"macOS"', - "sec-fetch-dest": "empty" + "sec-ch-ua-platform": '"Windows"', }) return headers - def create_complete_song(self, short_prompt, extend_prompts, outro_prompt, seed=-1, custom_lyrics_short=None, custom_lyrics_extend=None, custom_lyrics_outro=None, num_extensions=1): - print("Starting the generation of the complete song sequence...") + @staticmethod + def _safe_filename(title): + safe = title.replace("/", "_").replace("\\", "_") + return "".join(c for c in safe if c not in "\0:") + + def create_complete_song( + self, + short_prompt, + extend_prompts, + outro_prompt, + seed=-1, + custom_lyrics_short=None, + custom_lyrics_extend=None, + custom_lyrics_outro=None, + num_extensions=1, + ): + logger.info("generating complete song sequence (seed=%s, %d extends, 1 outro)", seed, num_extensions) + + if not extend_prompts: + logger.error("extend_prompts is empty") + return None - # Generate the short song - print("Generating the short song...") - short_song_result = self.create_song(short_prompt, seed, custom_lyrics_short) - if not short_song_result: - print("Error generating the short song.") + short_song = self.create_song(short_prompt, seed, custom_lyrics_short) + if not short_song: return None - last_song_result = short_song_result - extend_song_results = [] + last = short_song + extends = [] - # Generate the extend songs for i in range(num_extensions): - if i < len(extend_prompts): - prompt = extend_prompts[i] - lyrics = custom_lyrics_extend[i] if custom_lyrics_extend and i < len(custom_lyrics_extend) else None - else: - prompt = extend_prompts[-1] # Reuse the last prompt if not enough are provided - lyrics = custom_lyrics_extend[-1] if custom_lyrics_extend else None - - print(f"Generating extend song {i + 1}...") - extend_song_result = self.extend( - prompt, - seed, - audio_conditioning_path=last_song_result[0]['song_path'], - audio_conditioning_song_id=last_song_result[0]['id'], - custom_lyrics=lyrics + prompt = ( + extend_prompts[i] if i < len(extend_prompts) else extend_prompts[-1] + ) + lyrics = ( + custom_lyrics_extend[i] + if custom_lyrics_extend and i < len(custom_lyrics_extend) + else None ) - if not extend_song_result: - print(f"Error generating extend song {i + 1}.") + logger.info("extend %d/%d", i + 1, num_extensions) + result = self.extend( + prompt, seed, + audio_conditioning_path=last[0]["song_path"], + audio_conditioning_song_id=last[0]["id"], + custom_lyrics=lyrics, + ) + if not result: return None + extends.append(result) + last = result - extend_song_results.append(extend_song_result) - last_song_result = extend_song_result - - # Generate the outro - print("Generating the outro...") - outro_song_result = self.add_outro( - outro_prompt, - seed, - audio_conditioning_path=last_song_result[0]['song_path'], - audio_conditioning_song_id=last_song_result[0]['id'], - custom_lyrics=custom_lyrics_outro + logger.info("generating outro") + outro = self.add_outro( + outro_prompt, seed, + audio_conditioning_path=last[0]["song_path"], + audio_conditioning_song_id=last[0]["id"], + custom_lyrics=custom_lyrics_outro, ) - if not outro_song_result: - print("Error generating the outro.") + if not outro: return None - print("Complete song sequence generated and processed successfully.") - return { - "short_song": short_song_result, - "extend_songs": extend_song_results, - "outro_song": outro_song_result - } + return {"short_song": short_song, "extend_songs": extends, "outro_song": outro} def create_song(self, prompt, seed=-1, custom_lyrics=None): - song_result = self.generate_song(prompt, seed, custom_lyrics) - if not song_result: + result = self.generate_song(prompt, seed, custom_lyrics) + if not result: return None - track_ids = song_result.get('track_ids', []) - self.all_track_ids.extend(track_ids) - return self.process_songs(track_ids, "short_songs") + tids = result.get("track_ids", []) + self.all_track_ids.extend(tids) + return self.process_songs(tids, "short_songs") - def extend(self, prompt, seed=-1, audio_conditioning_path=None, audio_conditioning_song_id=None, custom_lyrics=None): - extend_song_result = self.generate_extend_song( + def extend( + self, + prompt, + seed=-1, + audio_conditioning_path=None, + audio_conditioning_song_id=None, + custom_lyrics=None, + ): + result = self.generate_extend_song( prompt, seed, audio_conditioning_path, audio_conditioning_song_id, custom_lyrics ) - if not extend_song_result: + if not result: return None - extend_track_ids = extend_song_result.get('track_ids', []) - self.all_track_ids.extend(extend_track_ids) - return self.process_songs(extend_track_ids, "extend_songs") + tids = result.get("track_ids", []) + self.all_track_ids.extend(tids) + return self.process_songs(tids, "extend_songs") - def add_outro(self, prompt, seed=-1, audio_conditioning_path=None, audio_conditioning_song_id=None, custom_lyrics=None): - outro_result = self.generate_outro( + def add_outro( + self, + prompt, + seed=-1, + audio_conditioning_path=None, + audio_conditioning_song_id=None, + custom_lyrics=None, + ): + result = self.generate_outro( prompt, seed, audio_conditioning_path, audio_conditioning_song_id, custom_lyrics ) - if not outro_result: + if not result: return None - outro_track_ids = outro_result.get('track_ids', []) - self.all_track_ids.extend(outro_track_ids) - return self.process_songs(outro_track_ids, "outro_songs") + tids = result.get("track_ids", []) + self.all_track_ids.extend(tids) + return self.process_songs(tids, "outro_songs") def generate_song(self, prompt, seed, custom_lyrics=None): - url = f"{self.API_BASE_URL}/generate-proxy" - headers = self.get_headers() data = {"prompt": prompt, "samplerOptions": {"seed": seed}} if custom_lyrics: data["lyricInput"] = custom_lyrics - response = self.make_request(url, 'POST', data, headers) - return response.json() if response else None + return self._parse_json( + self.make_request(f"{self.API_BASE_URL}/generate-proxy", "POST", data, self.get_headers()) + ) - def generate_extend_song(self, prompt, seed, audio_conditioning_path, audio_conditioning_song_id, custom_lyrics=None): - url = f"{self.API_BASE_URL}/generate-proxy" - headers = self.get_headers() + def generate_extend_song( + self, prompt, seed, audio_conditioning_path, audio_conditioning_song_id, custom_lyrics=None + ): data = { "prompt": prompt, "samplerOptions": { "seed": seed, "audio_conditioning_path": audio_conditioning_path, "audio_conditioning_song_id": audio_conditioning_song_id, - "audio_conditioning_type": "continuation" - } + "audio_conditioning_type": "continuation", + }, } if custom_lyrics: data["lyricInput"] = custom_lyrics - response = self.make_request(url, 'POST', data, headers) - return response.json() if response else None + return self._parse_json( + self.make_request(f"{self.API_BASE_URL}/generate-proxy", "POST", data, self.get_headers()) + ) - def generate_outro(self, prompt, seed, audio_conditioning_path, audio_conditioning_song_id, custom_lyrics=None): - url = f"{self.API_BASE_URL}/generate-proxy" - headers = self.get_headers() + def generate_outro( + self, prompt, seed, audio_conditioning_path, audio_conditioning_song_id, custom_lyrics=None + ): data = { "prompt": prompt, "samplerOptions": { @@ -171,52 +314,54 @@ def generate_outro(self, prompt, seed, audio_conditioning_path, audio_conditioni "audio_conditioning_path": audio_conditioning_path, "audio_conditioning_song_id": audio_conditioning_song_id, "audio_conditioning_type": "continuation", - "crop_start_time": 0.9 - } + "crop_start_time": 0.9, + }, } if custom_lyrics: data["lyricInput"] = custom_lyrics - response = self.make_request(url, 'POST', data, headers) - return response.json() if response else None + return self._parse_json( + self.make_request(f"{self.API_BASE_URL}/generate-proxy", "POST", data, self.get_headers()) + ) def process_songs(self, track_ids, folder): - """Function to process generated songs, wait until they are ready, and download them.""" - print(f"Processing songs in {folder} with track_ids {track_ids}...") - while True: - status_result = self.check_song_status(track_ids) - if status_result is None: - print(f"Error checking song status for {folder}.") + logger.info("processing %s %s", folder, track_ids) + deadline = time.time() + self.max_poll_time + while time.time() < deadline: + status = self.check_song_status(track_ids) + if status is None: return None - elif status_result.get('all_finished', False): + if status.get("all_finished"): songs = [] - for song in status_result['data']['songs']: - self.download_song(song['song_path'], song['title'], folder=folder) + for song in status["data"]["songs"]: + self.download_song(song["song_path"], song["title"], folder=folder) songs.append(song) - print(f"All songs in {folder} are ready and downloaded.") + logger.info("all %s done (%d songs)", folder, len(songs)) return songs - else: - time.sleep(5) + time.sleep(5) + logger.error("timeout waiting for %s", folder) + return None def check_song_status(self, song_ids): url = f"{self.API_BASE_URL}/songs?songIds={','.join(song_ids)}" - headers = self.get_headers(True) - response = self.make_request(url, 'GET', None, headers) - if response: - data = response.json() - all_finished = all(song['finished'] for song in data['songs']) - return {'all_finished': all_finished, 'data': data} - else: + response = self.make_request(url, "GET", None, self.get_headers(get_request=True)) + if not response: + return None + data = self._parse_json(response) + if not data or not data.get("songs"): return None + all_finished = all(s["finished"] for s in data["songs"]) + return {"all_finished": all_finished, "data": data} def download_song(self, song_url, song_title, folder="downloaded_songs"): os.makedirs(folder, exist_ok=True) - file_path = os.path.join(folder, f"{song_title}.mp3") + safe = self._safe_filename(song_title) + path = os.path.join(folder, f"{safe}.mp3") try: - response = requests.get(song_url) - response.raise_for_status() - with open(file_path, 'wb') as file: - file.write(response.content) - print(f"Downloaded {song_title} with url {song_url} to {file_path}") - except requests.exceptions.RequestException as e: - print(f"Failed to download the song. Error: {e}") - + resp = self.session.get(song_url, stream=True, timeout=self.timeout) + resp.raise_for_status() + with open(path, "wb") as f: + for chunk in resp.iter_content(chunk_size=8192): + f.write(chunk) + logger.info("downloaded %s -> %s", safe, path) + except RequestException as e: + logger.error("download failed %s: %s", safe, e)