You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
classURLMapping:
def__init__(self, short_code: str, long_url: str, creator_user: str=None, created_at=None):
self.short_code=short_codeself.long_url=long_urlself.creator_user=creator_userself.created_at=created_atorcurrent_timestamp()
classClickEvent:
""" Represents a click event handler (No-SQL). This class is responsible for managing and processing click events triggered by user interactions. It provides methods to handle the event logic and execute associated actions. """def__init__(self, short_code: str, timestamp: str, user_ip: str, device_type: str, country: str):
self.short_code=short_codeself.timestamp=timestampself.user_ip=user_ipself.device_type=device_typeself.country=countryclassAggregatedMetrics:
""" (No-SQL) Documentation for AggregatedMetrics: AggregatedMetrics is a component responsible for collecting, aggregating, and providing statistical data related to the URL shortener system. It tracks metrics such as the number of URLs shortened, the number of redirects, and other performance indicators. """def__init__(self, short_code: str, date: str, hour: int):
self.short_code=short_codeself.date=dateself.hour=hourself.clicks=0self.unique_visitors=set()
self.device_stats=defaultdict(int)
self.country_stats=defaultdict(int)
2. Services
classURLShortenerService:
def__init__(self, db, cache):
self.db=db# DB interface (SQL/NoSQL)self.cache=cache# Redis or in-memorydefcreate_short_url(self, long_url: str, custom_alias: str=None, user_id: str=None) ->str:
ifcustom_alias:
ifself.db.exists(custom_alias):
raiseValueError("Alias already exists")
short_code=custom_aliaselse:
short_code=self._generate_unique_code()
mapping=URLMapping(short_code, long_url, user_id)
self.db.insert(mapping)
self.cache.set(short_code, long_url)
returnshort_codedef_generate_unique_code(self) ->str:
whileTrue:
code=base62_encode(random.randint(1, 1e12))[:8] # Limit to 8 charsifnotself.db.exists(code):
returncodeclassRedirectionService:
def__init__(self, db, cache, analytics_logger):
self.db=dbself.cache=cacheself.analytics_logger=analytics_loggerdefredirect(self, short_code: str, request) ->str:
long_url=self.cache.get(short_code)
ifnotlong_url:
record=self.db.get(short_code)
ifnotrecord:
raiseException("Short code not found")
long_url=record.long_urlself.cache.set(short_code, long_url)
# Fire-and-forget logging (Async)self.analytics_logger.log_event(short_code, request)
returnlong_urlclassAnalyticsLogger:
def__init__(self, queue):
self.queue=queue# Kafka, RabbitMQ, or just append to DBdeflog_event(self, short_code: str, request):
ip=request.ip_addressua=request.user_agentevent=ClickEvent(short_code, ip, ua)
self.queue.send(event) # Or save to NoSQL DB directlyclassMetricsAggregator:
def__init__(self, db):
self.db=dbdefaggregate_clicks(self, date: str):
""" The `aggregate_clicks` function is designed to process a list of click events and compute the total number of clicks for each unique item. """pass