Date: 2025-10-15 Status: Implemented Priority: CRITICAL
This document outlines the data security architecture for Pulsefeed, a multi-user system where user data isolation is paramount. One missed user_id filter = data leak where User A can see/edit User B's private data.
Rule: ALL user-scoped data MUST filter by user_id by default.
Implementation:
UserScopedMixinadded to all user-owned models (Task, Checklist, RecurringSeries, CalendarSource)- Provides
.get_for_user(id)and.all_for_user(**filters)methods - Automatic
user_idfiltering without manual checks
# ✅ CORRECT - Uses mixin for automatic filtering
task = Task.get_for_user(task_id) # Auto-filters by current_user.id
tasks = Task.all_for_user(checklist_id=5) # Auto-injects user_id
# ❌ WRONG - Manual filtering (error-prone, deprecated)
task = Task.query.filter_by(id=task_id, user_id=current_user.id).first()Why this matters:
- 92+ manual
filter_by(user_id=...)checks = 92 opportunities to forget one - One missed check = security vulnerability
- Centralized approach makes auditing easier
Rule: All new objects with user_id should auto-set to current_user.id.
Implementation:
- SQLAlchemy
before_flushevent listener inutils/query_scoping.py - Automatically injects
user_idwhen creating new objects - Only affects objects where
user_idis currentlyNone
# ✅ CORRECT - user_id auto-injected
task = Task(title="Test task", checklist_id=5)
db.session.add(task)
# task.user_id automatically set to current_user.id before flush
# ❌ OLD WAY - Manual assignment (still works but unnecessary)
task = Task(title="Test task", user_id=current_user.id, checklist_id=5)Why this matters:
- Prevents forgetting to set
user_idon create - Eliminates bugs where objects have
user_id=NULL - Makes secure behavior the default
Rule: Shared calendars intentionally break user isolation (by design).
Implementation:
get_accessible_shared_calendar_ids()returns list of calendar IDs user can access- Includes calendars user owns OR is a member of
- Request-scoped caching prevents repeated queries
# ✅ CORRECT - Include shared calendar content
accessible_ids = get_accessible_shared_calendar_ids()
tasks = Task.query.filter(
db.or_(
Task.user_id == current_user.id,
Task.shared_calendar_id.in_(accessible_ids)
)
).all()
# ❌ WRONG - Only shows user's private data, missing shared content
tasks = Task.all_for_user() # Misses shared calendar tasks!Why this matters:
- Shared calendars are collaborative workspaces
- User must see content from calendars they're a member of
- Permissions still enforced (owner/editor/viewer roles)
Rule: Some content is intentionally public (is_public=True or is_system=True).
Models with public content:
SeriesTemplate- System templates (holidays) and user-shared templatesCategory- System categories (user_id=NULL) visible to all
Implementation:
- Explicit OR queries to include public content
- NOT handled by UserScopedMixin (public by design)
# ✅ CORRECT - Include both user's and public templates
templates = SeriesTemplate.query.filter(
db.or_(
SeriesTemplate.user_id == current_user.id,
SeriesTemplate.is_public == True,
SeriesTemplate.is_system == True
)
).all()
# ❌ WRONG - Only shows user's private templates
templates = SeriesTemplate.all_for_user() # Misses public templates!Rule: AJAX/API endpoints are high-risk - easy to manipulate requests.
Critical API endpoints:
/api/events- Returns JSON events/api/checklists- Returns HTML fragment with checklist data/toggle_task- Modifies task completion status/delete_entry/<id>- Deletes entries
Security requirements:
- ALWAYS validate
user_idon ALL API endpoints - Check ownership via
.get_for_user()orcheck_user_can_access() - For shared content, verify calendar permissions
- Return 403 Forbidden if access denied (NOT 404 - prevents info disclosure)
# ✅ CORRECT - API endpoint with proper validation
@app.route("/api/delete_task/<int:task_id>", methods=["POST"])
@login_required
def delete_task_api(task_id):
task = Task.get_for_user(task_id)
if not task:
return jsonify({"error": "Not found"}), 404
# Check if task is in shared calendar
if not check_user_can_access(task):
return jsonify({"error": "Forbidden"}), 403
db.session.delete(task)
db.session.commit()
return jsonify({"success": True})
# ❌ WRONG - No user validation (data leak!)
@app.route("/api/delete_task/<int:task_id>", methods=["POST"])
@login_required
def delete_task_api(task_id):
task = Task.query.get_or_404(task_id) # ❌ No user_id filter!
db.session.delete(task) # Can delete other users' tasks!
db.session.commit()
return jsonify({"success": True})Rule: Don't reveal existence of IDs user doesn't own.
Good practices:
- Return same 404 for "doesn't exist" and "exists but not yours"
- Don't leak info in error messages ("This task belongs to user@example.com")
- Use
.first()instead of.first_or_404()when checking ownership
# ✅ CORRECT - Same error for both cases
task = Task.get_for_user(task_id)
if not task:
flash("Task not found", "error") # Could be: doesn't exist OR not yours
return redirect(url_for("index"))
# ❌ WRONG - Reveals existence to unauthorized users
task = Task.query.get_or_404(task_id) # Returns 404 if doesn't exist
if task.user_id != current_user.id:
flash("You don't own this task", "error") # ❌ Leaks existence!
return redirect(url_for("index"))Rule: NEVER use string formatting or concatenation for SQL queries.
Implementation:
- Use SQLAlchemy ORM (parameterized queries by default)
- For raw SQL, use
text()with bound parameters
# ✅ CORRECT - Parameterized query
from sqlalchemy import text
result = db.session.execute(
text("SELECT * FROM tasks WHERE user_id = :user_id AND title LIKE :search"),
{"user_id": current_user.id, "search": f"%{search_term}%"}
)
# ❌ WRONG - SQL injection vulnerable
query = f"SELECT * FROM tasks WHERE title LIKE '%{search_term}%'"
result = db.session.execute(text(query)) # ❌ Can inject malicious SQL!Rule: All state-changing operations require CSRF tokens.
Implementation:
- Flask-WTF provides automatic CSRF protection
- All forms MUST include
{{ form.hidden_tag() }}for WTForms validation - Manual forms must include CSRF token via
csrf_token()
<!-- ✅ CORRECT - WTForm with hidden_tag (includes CSRF + hidden fields) -->
<form method="POST" action="{{ url_for('auth.login') }}">
{{ form.hidden_tag() }}
{{ form.username.label }} {{ form.username() }}
{{ form.password.label }} {{ form.password() }}
<button type="submit">Login</button>
</form>
<!-- ✅ CORRECT - Manual form with explicit CSRF token -->
<form method="POST" action="/delete_task/{{ task.id }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit">Delete</button>
</form>
<!-- ❌ WRONG - No CSRF protection (vulnerable) -->
<form method="POST" action="/delete_task/{{ task.id }}">
<button type="submit">Delete</button>
</form>Common CSRF Issues:
-
Missing
form.hidden_tag()in WTForms templates- Symptom: "The CSRF token is missing" error on submit
- Fix: Add
{{ form.hidden_tag() }}immediately after<form>tag - Affected: login.html, register forms, settings forms
-
Cache headers preventing CSRF token generation
- Symptom: CSRF error after logout → login (back button)
- Fix: Exclude auth pages from aggressive cache prevention
- See: app.py
add_security_headers()function
Rule: Authenticated pages must not be cached to prevent data exposure after logout.
Problem: When users log out and press the browser's back button, cached pages can show sensitive user data even though the session is terminated. This is a privacy/security issue.
Implementation (app.py):
@app.after_requesthandler adds no-cache headers for authenticated users- Headers:
Cache-Control: no-store, no-cache, must-revalidate,Pragma: no-cache,Expires: 0 - Static files excluded (CSS/JS should be cached for performance)
- Auth pages excluded (login/register need normal caching for CSRF tokens)
# app.py - after_request handler
@app.after_request
def add_security_headers(response):
"""Add security headers to prevent caching of authenticated pages."""
# Skip static files - they should be cached
if request.path.startswith('/static/'):
return response
# Skip auth pages (login, register) - CSRF tokens need to work
auth_pages = ['/login', '/register', '/logout']
if any(request.path.startswith(page) for page in auth_pages):
return response
# For authenticated users, prevent browser caching
if current_user.is_authenticated:
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return responseWhy this matters:
- Security: Prevents unauthorized viewing of sensitive data via browser history
- Privacy: Other users on shared computers can't see previous user's data
- UX: Users expect that after logout, their data is not accessible
Testing:
- Log in to the app
- Navigate to homepage, calendar, settings (authenticated pages)
- Log out
- Press browser's back button
- Expected: Browser re-requests page from server, redirects to login
- Without fix: Browser shows cached homepage with user's data
Exclusions:
- Static files (
/static/*): CSS, JS, images should be cached for performance - Auth pages (
/login,/register,/logout): Must allow normal caching for CSRF token functionality - Guest mode: No caching restrictions (not authenticated, no sensitive data)
- User Isolation: All user-scoped queries use
.get_for_user()or.all_for_user() - Auto-Injection: New objects don't manually set
user_id(let event listener handle it) - Shared Content: Queries include
get_accessible_shared_calendar_ids()where appropriate - Public Content: Explicit OR clauses for
is_public=Truetemplates/categories - API Security: All API endpoints validate ownership before modifying/returning data
- 403 vs 404: Use 403 for "exists but not yours", 404 only for "doesn't exist at all"
- No SQL Injection: All queries use ORM or
text()with bound parameters - CSRF Tokens: All POST/PUT/DELETE requests include CSRF protection
- Browser Cache: Authenticated pages set no-cache headers (verified in app.py)
-
Search for dangerous patterns:
# Find queries that might be missing user_id filters grep -r "\.query\.get\(" app.py grep -r "\.query\.all\(\)" app.py grep -r "\.query\.filter\(" app.py | grep -v "user_id"
-
Test data isolation:
- Create two user accounts
- Try to access other user's task IDs via URL manipulation
- Verify 404/403 returned appropriately
-
Check API endpoints:
- Test all
/api/*endpoints with invalid/other user IDs - Verify proper error responses
- Test all
Problem: No automatic user_id filtering
# ❌ WRONG
task = Task.query.get_or_404(task_id)
# ✅ CORRECT
task = Task.get_for_user(task_id)
if not task:
abort(404)Problem: User can't see tasks in calendars they're a member of
# ❌ WRONG - Only private tasks
tasks = Task.all_for_user()
# ✅ CORRECT - Include shared calendar tasks
accessible_ids = get_accessible_shared_calendar_ids()
tasks = Task.query.filter(
db.or_(
Task.user_id == current_user.id,
Task.shared_calendar_id.in_(accessible_ids)
)
).all()Problem: Different errors leak information
# ❌ WRONG - Leaks existence
task = Task.query.get(task_id)
if not task:
flash("Task not found", "error")
elif task.user_id != current_user.id:
flash("You don't have permission", "error") # ❌ Leaks existence!
# ✅ CORRECT - Same error for both
task = Task.get_for_user(task_id)
if not task:
flash("Task not found", "error") # Could be either!Problem: AJAX endpoints easier to exploit than form submissions
# ❌ WRONG - No validation
@app.route("/api/toggle_task/<int:task_id>", methods=["POST"])
@login_required
def toggle_task_api(task_id):
task = Task.query.get_or_404(task_id) # ❌ No user check!
task.completed = not task.completed
db.session.commit()
return jsonify({"success": True})
# ✅ CORRECT - Proper validation
@app.route("/api/toggle_task/<int:task_id>", methods=["POST"])
@login_required
def toggle_task_api(task_id):
task = Task.get_for_user(task_id)
if not task:
return jsonify({"error": "Not found"}), 404
if not check_user_can_access(task):
return jsonify({"error": "Forbidden"}), 403
task.completed = not task.completed
db.session.commit()
return jsonify({"success": True})-
Audit Logging:
- Track all data access (who viewed what, when)
- Log failed authorization attempts
- Alert on suspicious patterns
-
Rate Limiting:
- Prevent brute-force ID enumeration
- Limit API request frequency per user
-
Encrypted Fields:
- Encrypt OAuth tokens at rest
- Consider encryption for sensitive user data
-
Two-Factor Authentication:
- Add 2FA option for user accounts
- Require for admin accounts
-
Security Headers:
- Add Content-Security-Policy
- Implement Strict-Transport-Security
- Add X-Frame-Options
- OWASP Top 10
- Flask Security Best Practices
- SQLAlchemy Security Considerations
- Architecture Refactoring Plan:
docs/systems/architecture-refactoring-plan.md - Query Scoping Implementation:
utils/query_scoping.py