-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMedia.py
More file actions
277 lines (231 loc) · 8.95 KB
/
Media.py
File metadata and controls
277 lines (231 loc) · 8.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
from os import system, remove
from os.path import join
import sys, getopt
from enum import Enum
import re
import subprocess
from pytubefix import YouTube, Stream
FFMPEG_PATH = join('ffmpeg', 'bin')
def get_number_of_items(listItems: list, numElements: int) -> list:
if len(listItems) <= numElements:
return listItems
return [listItems[item] for item in range(numElements)]
class FileType(Enum):
VIDEO = "VIDEO"
AUDIO = "AUDIO"
class Media:
"""
A class for managing media from YouTube, including validation, information retrieval,
and downloading of video and audio streams.
"""
YOUTUBE_REGEX = (
r"^(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/"
r"(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})"
)
def __init__(self, url: str, file_name: str = None, file_type: FileType = FileType.VIDEO):
if not self.is_url(url):
raise ValueError(f"Invalid Youtube url provided: {url}")
self.url = url
self.file_name = file_name
self.file_type = file_type
self.video_name = None
self.audio_name = None
self.yt = YouTube(url)
@staticmethod
def is_url(url: str) -> bool:
"""Validates if the given url matches the YouTube URL pattern."""
if not re.match(Media.YOUTUBE_REGEX, url):
print("Invalid format: Not a recognizable Youtube URL")
return False
return True
def get_media_info(self) -> dict:
"""
Retrieves and returns basic media information.
Returns:
dict: A dictionary containing media thumbnail URL, title, channel name, and duration.
"""
return {
"thumbnail":self.yt.thumbnail_url,
"title":self.yt.title,
"channel":self.yt.author,
"duration":self.yt.length
}
def get_highest_resolution(self, streams: list[Stream]) -> Stream:
"""
Finds the highest resolution video stream from the provided list, prioritizing specific resolutions.
Returns:
Stream: The stream object of the highest resolution video, or None if unavailable.
"""
resolution_priority = ['1080p', '720p', '480p', '360p', '240p', '144p']
filtered_streams = [
stream for stream in streams
if stream.type.lower() == "video"
and stream.codecs[0].split('.')[0].lower() == "avc1"
and stream.is_progressive is False
]
sorted_streams = sorted(
filtered_streams, key=lambda stream: stream.fps, reverse=True
)
return next((stream for stream in sorted_streams if stream.resolution in resolution_priority), None)
def get_highest_bitrate(self, streams: list[Stream]) -> Stream:
"""
Finds the highest bitrate audio stream from the provided list, prioritizing specific bitrates.
Returns:
Stream: The stream object of the highest bitrate audio, or None if unavailable.
"""
audio_bitrates_priority = ['128kbps', '48kbps']
filtered_streams = [
stream for stream in streams
if stream.type.lower() == "audio"
and stream.codecs[0].split('.')[0].lower() == "mp4a"
]
sorted_streams = sorted(filtered_streams, key=lambda stream: stream.abr)
return next((stream for stream in sorted_streams if stream.abr in audio_bitrates_priority), None)
def download_video(self) -> None:
"""
Downloads the highest resolution video stream and saves it as a temporary file.
Sets:
video_name (str): The name of the temporary video file downloaded.
"""
stream = self.get_highest_resolution(self.yt.streams)
if stream is None:
print("Video stream is empty!")
return
self.video_name = stream.download(filename=f"TEMPvideo.mp4")
def download_audio(self) -> None:
"""
Downloads the highest bitrate audio stream and saves it as a temporary file.
Sets:
audio_name (str): The name of the temporary audio file downloaded.
"""
stream = self.get_highest_bitrate(self.yt.streams)
if stream is None:
print("Audio stream is empty!")
return
self.audio_name = stream.download(filename=f"TEMPaudio.mp3")
class Downloader:
"""
A class to manage media downloads and conversions,
supporting combined media downloads (video and audio)
and audio-only conversions using FFMPEG.
"""
def __init__(self, media: Media, file_dir: str = ''):
self.media = media
self.file_dir = file_dir
def __combine_media(self):
"""
Downloads video and audio streams from the media and combines them into a single file.
The output file is saved to the specified file directory.
Raises:
Exception: If an error occurs during the merging process.
"""
try:
self.media.download_video()
self.media.download_audio()
system(f'{join(FFMPEG_PATH, 'ffmpeg.exe')} -i {self.media.video_name} -i {self.media.audio_name} -c:v copy -c:a aac "{join(self.file_dir, self.media.file_name)}"')
except Exception as e:
print("merging error: ",e)
def __audio_converter(self):
"""
Downloads the audio stream from the media and converts it to MP3 format using FFMPEG.
The output file is saved to the specified file directory.
Raises:
Exception: If an error occurs during the audio conversion process.
"""
try:
self.media.download_audio()
system(f'{join(FFMPEG_PATH, 'ffmpeg.exe')} -i {self.media.audio_name} -acodec libmp3lame "{join(self.file_dir, self.media.file_name)}"')
except Exception as e:
print("audio error: ",e)
def download(self) -> None:
"""
Initiates the download process based on the media file type.
- Combines video and audio for video files.
- Converts audio to MP3 for audio files.
- Cleans up temporary files after processing.
Raises:
ValueError: If the media file type is unsupported.
"""
match(self.media.file_type):
case FileType.VIDEO:
self.__combine_media()
case FileType.AUDIO:
self.__audio_converter()
case _:
raise ValueError(f"FileType '{self.media.file_type}' does not exist!")
self.__clean_up()
def __clean_up(self):
"""
Removes temporary video and audio files created during the download and conversion processes,
setting the respective file names in the media instance to None.
"""
if self.media.video_name:
remove(self.media.video_name)
self.media.video_name = None
if self.media.audio_name:
remove(self.media.audio_name)
self.media.audio_name = None
# Still not used
class DownloadQueue:
def __init__(self):
self.queue = []
def get_queue(self):
return self.queue
def add_to_queue(self, media: Media):
self.queue.append(media)
def process_queue(self):
while self.queue:
media = self.queue.pop(0)
downloader = Downloader(media)
downloader.download()
def cli(argv: list):
try:
options, args = getopt.getopt(argv, "van:p:l:", ["video", "audio", "name=", "path=", "url="])
except Exception as e:
print("Argument error:", e)
url = file_name = file_type = file_dir = ""
for opt, arg in options:
if opt in ['-v', "--video"]:
file_type = FileType.VIDEO
elif opt in ['-a', "--audio"]:
file_type = FileType.AUDIO
elif opt in ['-n', "--name"]:
file_name = arg
elif opt in ['-p', "--path"]:
file_dir = arg
elif opt in ['-l', "--url"]:
url = arg
if file_type == FileType.VIDEO:
file_name += '.mp4'
elif file_type == FileType.AUDIO:
file_name += '.mp3'
try:
media = Media(url, file_name, file_type)
Downloader(media, file_dir).download()
except Exception as e:
print("Download error:", e)
def tui():
print("""__ _____ _ _ ___
\\ V /_ _|| \\_/ || o \\
\\ / | /o\\ \\_/ || _/
|_| |_\\_/_| |_||_|
""")
url = input("Enter URL: ")
file_type = input("Type: ")
file_dir = input("Folder Path: ")
file_name = input("File Name: ")
print()
file_type = FileType.VIDEO
if file_type == FileType.VIDEO:
file_name += '.mp4'
elif file_type == FileType.AUDIO:
file_name += '.mp3'
try:
media = Media(url, file_name, file_type)
Downloader(media, file_dir).download()
except Exception as e:
print("Download error:", e)
if __name__ == "__main__":
if sys.argv[1:]:
cli(sys.argv[1:])
tui()