Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,105 changes: 56 additions & 1,049 deletions amnex-live-data-server.py

Large diffs are not rendered by default.

Empty file added src/__init__.py
Empty file.
116 changes: 116 additions & 0 deletions src/cache_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
Cache utilities and Redis operations for the GPS tracking system.
Contains the SimpleCache class and vehicle location history management.
"""

import json
import logging
import time
from typing import Optional, List, Dict, Any

logger = logging.getLogger('amnex-data-server')


class SimpleCache:
"""Simple in-memory cache with Redis fallback"""

def __init__(self, redis_client):
self.cache = {}
self.redis_client = redis_client

def get(self, key: str) -> Optional[Any]:
"""Get value from cache, checking in-memory first, then Redis"""
res = self.cache.get(key)
if res:
value, expiry_timestamp = res
if expiry_timestamp is not None and expiry_timestamp < time.time():
del self.cache[key] # Expired
res = None
else:
return value

if res is None:
res_from_redis = self.redis_client.get(f"simpleCache:{key}")
if res_from_redis:
parsed_res = json.loads(res_from_redis)
# When loading from Redis, get the TTL from Redis and apply it to the in-memory cache
redis_ttl = self.redis_client.ttl(f"simpleCache:{key}")
in_memory_expiry_timestamp = None
if redis_ttl is not None and redis_ttl > -1: # -1 means no expire, -2 means key doesn't exist
in_memory_expiry_timestamp = time.time() + redis_ttl

self.cache[key] = (parsed_res, in_memory_expiry_timestamp)
return parsed_res
else:
return None
return res

def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Set value in both in-memory cache and Redis"""
expiry_timestamp = None
if ttl is not None:
expiry_timestamp = time.time() + ttl
self.cache[key] = (value, expiry_timestamp)

if ttl is None:
self.redis_client.set(f"simpleCache:{key}", json.dumps(value))
else:
self.redis_client.setex(f"simpleCache:{key}", ttl, json.dumps(value))


def store_vehicle_location_history(redis_client, device_id: str, lat: float, lon: float,
timestamp: int, max_points: int = 25) -> None:
"""Store vehicle location history in Redis with TTL"""
from .geometry_utils import calculate_distance

history = None
try:
history_key = f"vehicle_history:{device_id}"
point = {
"lat": lat,
"lon": lon,
"timestamp": int(timestamp if timestamp else time.time())
}

# Get existing history
history = redis_client.get(history_key)
if history:
points = json.loads(history) or []
else:
points = []

if len(points) > 0:
last_point = points[-1]
if calculate_distance(last_point['lat'], last_point['lon'], point['lat'], point['lon']) < 0.002:
return

# Add new point
points.append(point)

# Keep only last max_points
if len(points) > max_points:
points = points[-max_points:]

points.sort(key=lambda x: x['timestamp'])
# Store updated history with 1 hour TTL
redis_client.setex(history_key, 3600, json.dumps(points))

except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error storing vehicle history for {device_id}: {e}\nHistory value: {history}\nTraceback: {error_details}")


def get_vehicle_location_history(redis_client, device_id: str) -> List[Dict[str, Any]]:
"""Get vehicle location history from Redis"""
try:
history_key = f"vehicle_history:{device_id}"
history = redis_client.get(history_key)
if history:
value = json.loads(history)
if value:
return value
return []
except Exception as e:
logger.error(f"Error getting vehicle history for {device_id}: {e}")
return []
235 changes: 235 additions & 0 deletions src/fleet_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
"""
Fleet management and device mapping functionality.
Contains fleet information retrieval and device-vehicle mapping logic.
"""

import json
import logging
import time
import threading
from typing import List, Optional

from .cache_utils import store_vehicle_location_history
from .models import FleetInfo
from .route_matching import get_route_ids_from_waybills

logger = logging.getLogger('amnex-data-server')


def get_fleet_info(redis_client, device_vehicle_map: dict, waybills_session_local,
device_id: str, current_lat: float = None, current_lon: float = None,
timestamp: int = None, provider: str = None, stop_tracker=None,
bus_location_max_age: int = 120, bus_cleanup_interval: int = 180) -> List[FleetInfo]:
"""Get both fleet number and route ID for a device"""
cache_key = f"fleetInfo:{device_id}"
cache_key_saved = cache_key + ":saved"

fleet_mapping_values = [] # response values

# Check cache first
fleet_info_str = redis_client.get(cache_key)
if fleet_info_str is not None:
fleet_infos_data = json.loads(fleet_info_str)
fleet_infos = [FleetInfo(**fleet_info) for fleet_info in fleet_infos_data]
for fleet_info in fleet_infos:
if current_lat is not None and current_lon is not None:
store_vehicle_location_history(redis_client, fleet_info.vehicle_no, current_lat, current_lon, timestamp)
return fleet_infos

try:
vehicle_no = device_vehicle_map.get(device_id)
if not vehicle_no:
return []

# Get route for fleet
route_ids = get_route_ids_from_waybills(waybills_session_local, vehicle_no, current_lat, current_lon, timestamp, provider, stop_tracker)
for route_id in route_ids:
fleet_info = FleetInfo(
vehicle_no=vehicle_no,
device_id=device_id,
route_id=route_id
)
try:
fleet_info_saved = redis_client.get(cache_key_saved)
if fleet_info_saved is not None:
fleet_info_saved = json.loads(fleet_info_saved)
print("going to delete route info")
if ('route_id' in fleet_info_saved and
fleet_info_saved['route_id'] is not None and
route_id != fleet_info_saved['route_id']):
route_key = "route:" + fleet_info_saved['route_id']
clean_redis_key_for_route_info(redis_client, fleet_info_saved['route_id'], route_key, bus_location_max_age)
except Exception as e:
logger.error(f"Error cleaning redis key for route info: {e}")
fleet_mapping_values.append(fleet_info)

if len(route_ids) > 0:
# Convert FleetInfo objects to dicts for JSON serialization to Redis
fleet_mapping_dicts = [fleet_info.model_dump() for fleet_info in fleet_mapping_values]
redis_client.setex(cache_key_saved, bus_location_max_age + bus_cleanup_interval, json.dumps(fleet_mapping_dicts)) # hack for cleanup if route changes
redis_client.setex(cache_key, bus_cleanup_interval, json.dumps(fleet_mapping_dicts))
return fleet_mapping_values
except Exception as e:
print(f"Error querying fleet info for device {device_id}: {e}")
return fleet_mapping_values


def clean_redis_key_for_route_info(redis_client, prod_redis_client, route_id: str, redis_key: str,
bus_location_max_age: int) -> int:
"""Clean outdated vehicle data from a specific route key"""
import time

current_time = int(time.time())
prod_vehicle_data = prod_redis_client.hgetall(redis_key)
vehicle_data = redis_client.hgetall(redis_key)

# Merge prod_vehicle_data and vehicle_data so that all vehicles from both are considered.
# If a vehicle_id exists in both, prefer the one from prod_vehicle_data.
merged_vehicle_data = dict(vehicle_data) if vehicle_data else {}
if prod_vehicle_data:
merged_vehicle_data.update(prod_vehicle_data)
vehicle_data = merged_vehicle_data

if not vehicle_data:
return 0

vehicles_to_remove = []
removed_count = 0

# Check each vehicle's timestamp
for vehicle_id, data_json in merged_vehicle_data.items():
try:
data = json.loads(data_json)
# First check serverTime if available
if 'serverTime' in data:
timestamp = data.get('serverTime')
# Otherwise use timestamp
else:
timestamp = data.get('timestamp')

# If no valid timestamp, skip
if not timestamp:
continue

age = current_time - int(timestamp)
print("Error age", vehicle_id, route_id, age, current_time, int(timestamp), current_time - int(timestamp))

# If older than threshold, mark for removal
if age > bus_location_max_age:
vehicles_to_remove.append(vehicle_id)
logger.debug(f"Vehicle {vehicle_id} on route {route_id} outdated by {age}s, marking for removal")
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
logger.error(f"Error parsing data for vehicle {vehicle_id}: {e}")
# Mark invalid entries for removal
vehicles_to_remove.append(vehicle_id)

# Remove outdated vehicles
if vehicles_to_remove:
redis_client.hdel(redis_key, *vehicles_to_remove)
prod_redis_client.hdel(redis_key, *vehicles_to_remove)
removed_count = len(vehicles_to_remove)
logger.info(f"Removed {removed_count} outdated vehicles from route {route_id}")

return removed_count


def load_device_vehicle_mappings(session_local) -> dict:
"""Load device to vehicle mappings from database"""
from .models import DeviceVehicleMapping

device_vehicle_map = {}
try:
with session_local() as db:
mappings = db.query(DeviceVehicleMapping).all()
for mapping in mappings:
device_vehicle_map[mapping.device_id] = mapping.vehicle_no
logger.info(f"Loaded {len(device_vehicle_map)} device-vehicle mappings at startup.")
except Exception as e:
logger.error(f"Error loading device-vehicle mappings at startup: {e}")

return device_vehicle_map

def clean_outdated_vehicle_mappings(redis_client, prod_redis_client, CLEANUP_LOCK_TTL):
"""
Remove outdated vehicle mappings from Redis for all routes.
Uses Redis lock to ensure only one instance runs cleanup at a time.
"""
# Try to acquire lock
lock_key = "vehicle_mappings_cleanup_lock"
lock_acquired = redis_client.set(lock_key, "locked", nx=True, ex=CLEANUP_LOCK_TTL)

if not lock_acquired:
logger.debug("Vehicle mappings cleanup already running in another pod/process")
return

try:
logger.info("Starting vehicle mappings cleanup")
# Get all route keys
# Use a more robust approach to get all keys matching the pattern
route_keys = []
cursor = 0
prod_cursor = 0
max_iterations = 100
iteration_count = 0
start = True

while iteration_count < max_iterations:
if (start and cursor == 0) or (not start and cursor != 0):
cursor, keys = redis_client.scan(cursor, match="route:*", count=1000)
route_keys.extend(keys)
if (start and prod_cursor == 0) or (not start and prod_cursor != 0):
prod_cursor, prod_keys = prod_redis_client.scan(cursor, match="route:*", count=1000)
route_keys.extend(prod_keys)
start = False
iteration_count += 1
if cursor == 0 and prod_cursor == 0:
break
route_keys = list(set(route_keys))
logger.debug(f"Found {len(route_keys)} route keys for cleanup after {iteration_count} iterations")
if not route_keys:
logger.debug("No route data found for cleanup")
return

total_routes = len(route_keys)
total_vehicles_removed = 0

for redis_key in route_keys:
try:
# Extract route_id from key
route_id = redis_key.split(":", 1)[1] if ":" in redis_key else "unknown"
# Get all vehicles for this route
removed = clean_redis_key_for_route_info(route_id, redis_key)
if removed:
total_vehicles_removed += removed

except Exception as e:
logger.error(f"Error cleaning route {redis_key}: {e}")

logger.info(f"Completed vehicle mappings cleanup: processed {total_routes} routes, removed {total_vehicles_removed} vehicles")

except Exception as e:
logger.error(f"Error during vehicle mappings cleanup: {e}")
finally:
# Release the lock
try:
redis_client.delete(lock_key)
except:
pass

def start_vehicle_cleanup_thread(redis_client, prod_redis_client, CLEANUP_LOCK_TTL, BUS_CLEANUP_INTERVAL, BUS_LOCATION_MAX_AGE):
"""Start a background thread for vehicle mapping cleanup"""
def cleanup_worker():
logger.info(f"Vehicle mappings cleanup thread started (interval: {BUS_CLEANUP_INTERVAL}s, max age: {BUS_LOCATION_MAX_AGE}s)")

while True:
try:
clean_outdated_vehicle_mappings(redis_client, prod_redis_client, CLEANUP_LOCK_TTL)
except Exception as e:
logger.error(f"Error in vehicle cleanup worker: {e}")

time.sleep(BUS_CLEANUP_INTERVAL)

cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()

return cleanup_thread
Loading