-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnew.py
More file actions
244 lines (195 loc) · 8.81 KB
/
new.py
File metadata and controls
244 lines (195 loc) · 8.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import cv2
import numpy as np
from tensorflow import keras
import asyncio
import websockets
import json
from collections import deque
import os
import google.generativeai as genai
genai.configure(api_key=os.environ.get('GOOGLE_API_KEY')) # type: ignore
model = genai.GenerativeModel("gemini-2.5-flash-lite") # type: ignore
class SignLanguagePredictor:
def __init__(self, model_path='sign_language_model.h5',
label_encoder_path='label_encoder_classes.npy'):
"""Initialize the predictor with trained model"""
print("Loading trained model...")
self.model = keras.models.load_model(model_path)
self.label_classes = np.load(label_encoder_path, allow_pickle=True)
print(f"Model loaded! Can recognize: {', '.join(self.label_classes)}")
# Smoothing predictions over multiple frames
self.prediction_buffer = deque(maxlen=5)
def mediapipe_to_features(self, hand_landmarks):
"""
Convert MediaPipe hand landmarks to model input format
Args:
hand_landmarks: List of landmarks with x, y coordinates
Returns:
numpy array of shape (42,) with [x0, y0, x1, y1, ..., x20, y20]
"""
if hand_landmarks is None or len(hand_landmarks) == 0:
return None
features = []
for landmark in hand_landmarks:
features.append(landmark['x'])
features.append(landmark['y'])
return np.array(features)
def normalize_landmarks(self, features):
"""
Normalize landmarks the same way as during training
Must match the _normalize_landmarks method in training
"""
if features is None or len(features) != 42:
return None
# Reshape to separate x and y coordinates
x_coords = features[::2] # Even indices are x
y_coords = features[1::2] # Odd indices are y
# Get wrist position (landmark 0)
wrist_x, wrist_y = x_coords[0], y_coords[0]
# Translate to wrist origin
x_coords = x_coords - wrist_x
y_coords = y_coords - wrist_y
# Scale by hand size (distance from wrist to middle finger tip - landmark 12)
hand_size = np.sqrt(x_coords[12]**2 + y_coords[12]**2)
if hand_size > 0:
x_coords = x_coords / hand_size
y_coords = y_coords / hand_size
# Interleave back into single array
normalized = np.zeros(42)
normalized[::2] = x_coords
normalized[1::2] = y_coords
return normalized
def predict(self, hand_landmarks, use_smoothing=True):
"""
Predict sign language letter from landmarks
Args:
hand_landmarks: List of landmarks with x, y coordinates
use_smoothing: Whether to smooth predictions over multiple frames
Returns:
(predicted_letter, confidence) or (None, 0) if invalid input
"""
if hand_landmarks is None:
return None, 0.0
# Convert to features
features = self.mediapipe_to_features(hand_landmarks)
if features is None:
return None, 0.0
# Normalize features
features_normalized = self.normalize_landmarks(features)
if features_normalized is None:
return None, 0.0
# Reshape for model input (batch size of 1)
features_input = features_normalized.reshape(1, -1)
# Get prediction
pred_probs = self.model.predict(features_input, verbose=0)[0]
if use_smoothing:
# Add to buffer for smoothing
self.prediction_buffer.append(pred_probs)
# Average predictions over buffer
avg_probs = np.mean(self.prediction_buffer, axis=0)
pred_class = np.argmax(avg_probs)
confidence = avg_probs[pred_class]
else:
pred_class = np.argmax(pred_probs)
confidence = pred_probs[pred_class]
predicted_letter = self.label_classes[pred_class]
return predicted_letter, float(confidence)
# Initialize predictor
predictor = SignLanguagePredictor(
model_path='sign_language_model.h5',
label_encoder_path='label_encoder_classes.npy'
)
async def handle_landmarks(websocket):
"""Handle incoming MediaPipe landmarks from frontend"""
print(f"Client connected from {websocket.remote_address}")
try:
token_buffer = deque(maxlen=5)
last_letter_added_time = asyncio.get_event_loop().time() # Start cooldown from connection time
async for message in websocket:
data = json.loads(message)
if 'reset' in data:
token_buffer.clear()
await websocket.send(json.dumps({'status': 'buffer_cleared'}))
continue
if 'generate_word' in data:
# Force send current buffer to Gemini
tokens_str = ''.join(token_buffer)
prompt = f"""
You are decoding partial character tokens into the most likely English word.
Rules:
- Output ONLY the final predicted word
- No punctuation, no explanation
- Lowercase only
- If multiple words are possible, choose the most common one
Tokens:
"{tokens_str}"
""".strip()
print("Sending to Gemini with prompt:", prompt)
try:
gemini_response = model.generate_content(prompt)
predicted_word = gemini_response.text.strip().lower()
print("Gemini response:", predicted_word)
await websocket.send(json.dumps({'word_prediction': predicted_word}))
except Exception as e:
print(f"Gemini API error: {e}")
await websocket.send(json.dumps({'word_prediction': tokens_str}))
token_buffer.clear()
continue
landmarks = data.get('landmarks')
if landmarks is None or len(landmarks) != 21:
await websocket.send(json.dumps({'error': 'Invalid landmarks'}))
continue
# Predict
predicted_letter, confidence = predictor.predict(landmarks, use_smoothing=True)
# Add to buffer if confident and enough time has passed since last letter
current_time = asyncio.get_event_loop().time()
debounce_time = 8.0
time_since_last = current_time - last_letter_added_time
# Only add letter if confident and timer expired
if predicted_letter and confidence > 0.7:
if time_since_last >= debounce_time:
token_buffer.append(predicted_letter)
last_letter_added_time = current_time
time_since_last = 0.0
response = {
'predicted_letter': predicted_letter,
'confidence': confidence,
'tokens': list(token_buffer),
'time_since_last': time_since_last,
'debounce_time': debounce_time
}
# Send to Gemini when buffer is full
if len(token_buffer) == 5:
tokens_str = ''.join(token_buffer)
prompt = f"""
You are decoding partial character tokens into the most likely English word.
Rules:
- Output ONLY the final predicted word
- No punctuation, no explanation
- Lowercase only
- If multiple words are possible, choose the most common one
Tokens:
"{tokens_str}"
""".strip()
print("Sending to Gemini with prompt:", prompt)
try:
gemini_response = model.generate_content(prompt)
predicted_word = gemini_response.text.strip().lower()
print("Gemini response:", predicted_word)
response['word_prediction'] = predicted_word
except Exception as e:
print(f"Gemini API error: {e}")
response['word_prediction'] = tokens_str
token_buffer.clear()
await websocket.send(json.dumps(response))
except websockets.exceptions.ConnectionClosed:
print(f"Client {websocket.remote_address} disconnected")
except Exception as e:
print(f"Error: {e}")
async def main():
"""Start WebSocket server"""
print("Starting WebSocket server on ws://localhost:8765")
async with websockets.serve(handle_landmarks, "localhost", 8765):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())