-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
153 lines (129 loc) · 4.66 KB
/
Copy pathutils.py
File metadata and controls
153 lines (129 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import os
import psutil
import subprocess
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
import io
import logging
from datetime import datetime
# Konfigurasi logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Variabel global untuk menyimpan proses ffmpeg
ffmpeg_process = None
def get_system_metrics():
"""
Mengambil metrik penggunaan sistem (CPU, Memory, Disk)
"""
try:
cpu_usage = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
'cpu': cpu_usage,
'memory': {
'total': memory.total,
'used': memory.used,
'percent': memory.percent
},
'disk': {
'total': disk.total,
'used': disk.used,
'percent': disk.percent
}
}
except Exception as e:
logger.error(f"Error mengambil metrik sistem: {str(e)}")
return None
def download_video_from_drive(file_id, destination_path):
"""
Download video dari Google Drive menggunakan API key
"""
try:
# Inisialisasi layanan Drive dengan API key
api_key = os.getenv('GOOGLE_DRIVE_API_KEY')
if not api_key:
raise ValueError("API key Google Drive tidak ditemukan")
service = build('drive', 'v3', developerKey=api_key)
# Membuat request untuk mengambil file
request = service.files().get_media(fileId=file_id)
# Download file
fh = io.FileIO(destination_path, 'wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
status, done = downloader.next_chunk()
if status:
logger.info(f"Download {int(status.progress() * 100)}%")
return True, "Video berhasil diunduh"
except Exception as e:
logger.error(f"Error mengunduh video: {str(e)}")
return False, str(e)
def start_stream(video_path, stream_url):
"""
Memulai streaming menggunakan ffmpeg tanpa encoding dan tanpa looping
"""
global ffmpeg_process
try:
if ffmpeg_process is not None:
logger.warning("Streaming sudah berjalan")
return False, "Streaming sudah berjalan"
# Perintah ffmpeg untuk streaming tanpa encoding dan tanpa looping
command = [
'ffmpeg',
'-re', # Membaca input pada kecepatan native
'-i', video_path, # File input
'-c', 'copy', # Tanpa encoding ulang (copy codec)
'-movflags', 'faststart', # Optimasi untuk streaming
'-f', 'flv', # Format output untuk streaming
stream_url # URL streaming tujuan (Facebook/YouTube)
]
# Memulai proses ffmpeg
ffmpeg_process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logger.info(f"Streaming dimulai: {video_path} -> {stream_url}")
return True, "Streaming berhasil dimulai"
except Exception as e:
logger.error(f"Error memulai streaming: {str(e)}")
return False, str(e)
def stop_stream():
"""
Menghentikan proses streaming ffmpeg
"""
global ffmpeg_process
try:
if ffmpeg_process is None:
logger.warning("Tidak ada streaming yang berjalan")
return False, "Tidak ada streaming yang berjalan"
# Menghentikan proses ffmpeg
ffmpeg_process.terminate()
ffmpeg_process.wait(timeout=5)
ffmpeg_process = None
logger.info("Streaming dihentikan")
return True, "Streaming berhasil dihentikan"
except Exception as e:
logger.error(f"Error menghentikan streaming: {str(e)}")
if ffmpeg_process:
ffmpeg_process.kill() # Force kill jika gagal terminate
ffmpeg_process = None
return False, str(e)
def format_bytes(size):
"""
Format ukuran file ke format yang mudah dibaca
"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024:
return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
def is_video_file(filename):
"""
Cek apakah file adalah video berdasarkan ekstensi
"""
video_extensions = {'.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv'}
return os.path.splitext(filename)[1].lower() in video_extensions