Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ COPY healthcheck.sh /code/healthcheck.sh
RUN chmod +x /code/healthcheck.sh

RUN chmod +x /code/start.sh
RUN sed -i 's/\r$//' /code/start.sh /code/healthcheck.sh /usr/bin/pasarguard-cli /usr/bin/pasarguard-tui

ENTRYPOINT ["/code/start.sh"]
1 change: 1 addition & 0 deletions app/db/crud/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ async def get_cores_simple(
sort_list.extend(s.value)
else:
sort_list.append(s.value)
# Deterministic tie-breaker for stable pagination (MySQL especially).
sort_list.append(CoreConfig.id.asc())
stmt = stmt.order_by(*sort_list)
else:
Expand Down
308 changes: 308 additions & 0 deletions app/db/crud/hwid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
from __future__ import annotations

import hashlib
import hmac
import threading
from contextlib import nullcontext
from dataclasses import dataclass
from datetime import datetime, timezone

from sqlalchemy import delete, desc, func, select, text, update
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.models import HWIDUserDevice, User
from config import HWID_HASH_SALT

_HWID_MAX_LEN = 256
_DEVICE_OS_MAX_LEN = 64
_OS_VERSION_MAX_LEN = 64
_DEVICE_MODEL_MAX_LEN = 128
_USER_AGENT_MAX_LEN = 512
_REQUEST_IP_MAX_LEN = 64
_sqlite_user_hwid_locks: dict[int, threading.Lock] = {}
_sqlite_user_hwid_locks_guard = threading.Lock()


def _clamp(value: str | None, max_len: int) -> str | None:
if not value:
return None
return value.strip()[:max_len] or None


def normalize_hwid(value: str) -> str:
return value.strip()


def hash_hwid(hwid: str) -> str:
# Expects normalized HWID input.
digest = hmac.new(HWID_HASH_SALT.encode("utf-8"), hwid.encode("utf-8"), hashlib.sha256).hexdigest()
return digest


def _get_sqlite_user_lock(user_id: int) -> threading.Lock:
with _sqlite_user_hwid_locks_guard:
lock = _sqlite_user_hwid_locks.get(user_id)
if lock is None:
lock = threading.Lock()
_sqlite_user_hwid_locks[user_id] = lock
return lock


@dataclass
class HWIDDecision:
allowed: bool
max_devices_reached: bool = False
missing_hwid: bool = False


async def _acquire_user_hwid_lock(db: AsyncSession, user_id: int) -> None:
dialect = db.get_bind().dialect.name
if dialect == "sqlite":
# SQLite uses database-level write locks; avoid extra dummy writes that
# can trigger "database is locked" under concurrent test cleanup paths.
return

if dialect == "postgresql":
# Transaction-scoped lock that is auto-released on commit/rollback.
await db.execute(text("SELECT pg_advisory_xact_lock(:lock_key)"), {"lock_key": int(user_id)})
return

# Use transaction-scoped row lock for MySQL/MariaDB and other dialects.
await db.execute(select(User.id).where(User.id == user_id).with_for_update())


async def _release_user_hwid_lock(db: AsyncSession, user_id: int) -> None:
return


async def enforce_hwid_device_limit(
db: AsyncSession,
*,
user: User,
hwid: str | None,
device_os: str | None,
os_version: str | None,
device_model: str | None,
user_agent: str | None,
request_ip: str | None,
limit: int,
) -> HWIDDecision:
if not hwid:
return HWIDDecision(allowed=False, missing_hwid=True)

normalized_hwid = normalize_hwid(hwid)
if not normalized_hwid or len(normalized_hwid) > _HWID_MAX_LEN:
return HWIDDecision(allowed=False, missing_hwid=True)

hwid_hash = hash_hwid(normalized_hwid)
now = datetime.now(timezone.utc)
had_transaction = db.in_transaction()
tx_ctx = nullcontext() if had_transaction else db.begin()
sqlite_lock_ctx = (
_get_sqlite_user_lock(user.id) if db.get_bind().dialect.name == "sqlite" else nullcontext()
)
decision = HWIDDecision(allowed=True)
try:
async with tx_ctx:
with sqlite_lock_ctx:
await _acquire_user_hwid_lock(db, user.id)
existing = (
await db.execute(
select(HWIDUserDevice).where(HWIDUserDevice.user_id == user.id, HWIDUserDevice.hwid_hash == hwid_hash)
)
).scalar_one_or_none()

if existing:
existing.last_seen_at = now
existing.device_os = _clamp(device_os, _DEVICE_OS_MAX_LEN)
existing.os_version = _clamp(os_version, _OS_VERSION_MAX_LEN)
existing.device_model = _clamp(device_model, _DEVICE_MODEL_MAX_LEN)
existing.user_agent = _clamp(user_agent, _USER_AGENT_MAX_LEN)
existing.request_ip = _clamp(request_ip, _REQUEST_IP_MAX_LEN)
existing.updated_at = now
decision = HWIDDecision(allowed=True)
else:
lock_count_stmt = select(HWIDUserDevice.id).where(HWIDUserDevice.user_id == user.id)
if db.get_bind().dialect.name != "sqlite":
# Use a locking read so concurrent transactions observe current rows,
# not a stale repeatable-read snapshot.
lock_count_stmt = lock_count_stmt.with_for_update()
existing_count = len((await db.execute(lock_count_stmt)).scalars().all())
if existing_count >= limit:
decision = HWIDDecision(allowed=False, max_devices_reached=True)
else:
db.add(
HWIDUserDevice(
user_id=user.id,
hwid_hash=hwid_hash,
device_os=_clamp(device_os, _DEVICE_OS_MAX_LEN),
os_version=_clamp(os_version, _OS_VERSION_MAX_LEN),
device_model=_clamp(device_model, _DEVICE_MODEL_MAX_LEN),
user_agent=_clamp(user_agent, _USER_AGENT_MAX_LEN),
request_ip=_clamp(request_ip, _REQUEST_IP_MAX_LEN),
updated_at=now,
)
)
decision = HWIDDecision(allowed=True)
finally:
await _release_user_hwid_lock(db, user.id)
return decision


async def list_hwid_devices(
db: AsyncSession, *, offset: int = 0, limit: int = 50, user_id: int | None = None
) -> tuple[list[dict], int]:
stmt = select(HWIDUserDevice, User.username).join(User, User.id == HWIDUserDevice.user_id, isouter=True)
if user_id is not None:
stmt = stmt.where(HWIDUserDevice.user_id == user_id)
count_stmt = select(func.count(HWIDUserDevice.id))
if user_id is not None:
count_stmt = count_stmt.where(HWIDUserDevice.user_id == user_id)
count = int((await db.execute(count_stmt)).scalar() or 0)
stmt = stmt.order_by(desc(HWIDUserDevice.last_seen_at)).offset(offset).limit(limit)
rows = (await db.execute(stmt)).all()
items: list[dict] = []
for device, username in rows:
items.append(
{
"id": device.id,
"user_id": device.user_id,
"username": username,
"hwid_hash": device.hwid_hash,
"device_os": device.device_os,
"os_version": device.os_version,
"device_model": device.device_model,
"user_agent": device.user_agent,
"request_ip": device.request_ip,
"first_seen_at": device.first_seen_at,
"last_seen_at": device.last_seen_at,
"created_at": device.created_at,
"updated_at": device.updated_at,
}
)
return items, count


async def add_hwid_device(
db: AsyncSession,
*,
user_id: int,
hwid: str,
device_os: str | None = None,
os_version: str | None = None,
device_model: str | None = None,
user_agent: str | None = None,
request_ip: str | None = None,
) -> tuple[dict, bool]:
normalized_hwid = normalize_hwid(hwid)
if not normalized_hwid or len(normalized_hwid) > _HWID_MAX_LEN:
raise ValueError("Invalid HWID")

hwid_hash = hash_hwid(normalized_hwid)
now = datetime.now(timezone.utc)
had_transaction = db.in_transaction()
tx_ctx = nullcontext() if had_transaction else db.begin()
sqlite_lock_ctx = _get_sqlite_user_lock(user_id) if db.get_bind().dialect.name == "sqlite" else nullcontext()
try:
async with tx_ctx:
with sqlite_lock_ctx:
await _acquire_user_hwid_lock(db, user_id)
existing = (
await db.execute(
select(HWIDUserDevice).where(HWIDUserDevice.user_id == user_id, HWIDUserDevice.hwid_hash == hwid_hash)
)
).scalar_one_or_none()

if existing:
existing.last_seen_at = now
existing.device_os = _clamp(device_os, _DEVICE_OS_MAX_LEN)
existing.os_version = _clamp(os_version, _OS_VERSION_MAX_LEN)
existing.device_model = _clamp(device_model, _DEVICE_MODEL_MAX_LEN)
existing.user_agent = _clamp(user_agent, _USER_AGENT_MAX_LEN)
existing.request_ip = _clamp(request_ip, _REQUEST_IP_MAX_LEN)
existing.updated_at = now
device = existing
created = False
else:
device = HWIDUserDevice(
user_id=user_id,
hwid_hash=hwid_hash,
device_os=_clamp(device_os, _DEVICE_OS_MAX_LEN),
os_version=_clamp(os_version, _OS_VERSION_MAX_LEN),
device_model=_clamp(device_model, _DEVICE_MODEL_MAX_LEN),
user_agent=_clamp(user_agent, _USER_AGENT_MAX_LEN),
request_ip=_clamp(request_ip, _REQUEST_IP_MAX_LEN),
updated_at=now,
)
db.add(device)
await db.flush()
created = True

username = (
await db.execute(select(User.username).where(User.id == user_id))
).scalar_one_or_none()
finally:
await _release_user_hwid_lock(db, user_id)
await db.commit()
return (
{
"id": device.id,
"user_id": device.user_id,
"username": username,
"hwid_hash": device.hwid_hash,
"device_os": device.device_os,
"os_version": device.os_version,
"device_model": device.device_model,
"user_agent": device.user_agent,
"request_ip": device.request_ip,
"first_seen_at": device.first_seen_at,
"last_seen_at": device.last_seen_at,
"created_at": device.created_at,
"updated_at": device.updated_at,
},
created,
)


async def delete_hwid_device(db: AsyncSession, *, user_id: int, hwid_hash: str) -> int:
tx_ctx = nullcontext() if db.in_transaction() else db.begin()
async with tx_ctx:
result = await db.execute(
delete(HWIDUserDevice).where(HWIDUserDevice.user_id == user_id, HWIDUserDevice.hwid_hash == hwid_hash)
)
await db.commit()
return int(result.rowcount or 0)


async def delete_all_hwid_devices(db: AsyncSession, *, user_id: int) -> int:
tx_ctx = nullcontext() if db.in_transaction() else db.begin()
async with tx_ctx:
result = await db.execute(delete(HWIDUserDevice).where(HWIDUserDevice.user_id == user_id))
await db.commit()
return int(result.rowcount or 0)


async def get_hwid_stats(db: AsyncSession) -> dict[str, int]:
total_devices = int((await db.execute(select(func.count(HWIDUserDevice.id)))).scalar() or 0)
users_with_devices = int(
(
await db.execute(
select(func.count(func.distinct(HWIDUserDevice.user_id)))
)
).scalar()
or 0
)
return {"total_devices": total_devices, "users_with_devices": users_with_devices}


async def get_hwid_top_users(db: AsyncSession, *, limit: int = 20) -> list[dict]:
stmt = (
select(HWIDUserDevice.user_id, User.username, func.count(HWIDUserDevice.id).label("devices_count"))
.join(User, User.id == HWIDUserDevice.user_id, isouter=True)
.group_by(HWIDUserDevice.user_id, User.username)
.order_by(desc("devices_count"))
.limit(max(1, limit))
)
rows = (await db.execute(stmt)).all()
return [{"user_id": user_id, "username": username or "", "devices_count": int(devices_count or 0)} for user_id, username, devices_count in rows]

8 changes: 8 additions & 0 deletions app/db/crud/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Admin,
DataLimitResetStrategy,
Group,
HWIDUserDevice,
NextPlan,
NodeUserUsage,
NotificationReminder,
Expand Down Expand Up @@ -821,6 +822,7 @@ async def _delete_user_dependencies(db: AsyncSession, user_ids: list[int]):
if not user_ids:
return

await db.execute(delete(HWIDUserDevice).where(HWIDUserDevice.user_id.in_(user_ids)))
await db.execute(users_groups_association.delete().where(users_groups_association.c.user_id.in_(user_ids)))


Expand Down Expand Up @@ -933,6 +935,12 @@ async def modify_user(
if modify.on_hold_expire_duration is not None:
db_user.on_hold_expire_duration = modify.on_hold_expire_duration

if modify.hwid_device_limit is not None:
db_user.hwid_device_limit = modify.hwid_device_limit

if modify.hwid_limit_disabled is not None:
db_user.hwid_limit_disabled = modify.hwid_limit_disabled

if modify.next_plan is not None:
db_user.next_plan = NextPlan(
user_id=db_user.id,
Expand Down
25 changes: 24 additions & 1 deletion app/db/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@
# ... etc.


def _compare_type(_context, _inspected_column, metadata_column, _inspected_type, _metadata_type):
"""Skip INTEGER vs BIGINT drift for HWID PK/FK.

Migration b2c3d4e5f6a7 mirrors ``users.id`` SQL type per dialect (INTEGER on SQLite,
BIGINT on PostgreSQL/MySQL). Models use ``BigInteger`` so runtime values fit all DBs;
SQLite still reflects INTEGER, which would otherwise fail ``alembic check``.
"""
if metadata_column.table.name == "hwid_user_devices" and metadata_column.name in (
"id",
"user_id",
):
return False
return None


def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.

Expand All @@ -50,12 +65,20 @@ def run_migrations_offline() -> None:
literal_binds=True,
render_as_batch=True,
dialect_opts={"paramstyle": "named"},
compare_type=_compare_type,
)

with context.begin_transaction():
context.run_migrations()


def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata, render_as_batch=True)
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=True,
compare_type=_compare_type,
)

with context.begin_transaction():
context.run_migrations()
Expand Down
Loading
Loading