Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions database_api/is_machai_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Check whether a user_id belongs to a MachAI iframe user."""
import logging
import os
import uuid

from database_api.planexe_db_singleton import db
from database_api.model_user_account import UserAccount

logger = logging.getLogger(__name__)


def is_machai_user(user_id: str) -> bool:
"""Return True if *user_id* belongs to a MachAI iframe user.

Registered users (home.planexe.org sign-ups, docker admin) use UUID
identifiers and exist in the UserAccount table. MachAI iframe users
use opaque, non-UUID strings and are *not* in the table.

Must be called inside a Flask app context.
"""
# Registered users and admins use UUIDs as their user_id.
try:
user_uuid = uuid.UUID(str(user_id))
user = db.session.get(UserAccount, user_uuid)
if user is not None:
logger.debug("is_machai_user: user_id %r found in database — not a MachAI user.", user_id)
return False
except (ValueError, AttributeError):
pass

# Fallback admin username (non-UUID string like "admin").
admin_username = os.environ.get("PLANEXE_FRONTEND_MULTIUSER_ADMIN_USERNAME", "")
if admin_username and user_id == admin_username:
logger.debug("is_machai_user: user_id %r matches admin username — not a MachAI user.", user_id)
return False

# Unknown user — likely a MachAI iframe user.
logger.debug("is_machai_user: user_id %r is unknown — treating as MachAI user.", user_id)
return True
42 changes: 21 additions & 21 deletions frontend_multi_user/src/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,43 +62,43 @@ def _sanitize_legacy_run_zip_for_download(run_zip_snapshot: bytes) -> Optional[i
@downloads_bp.route("/plan/download/report")
@login_required
def plan_download_report():
run_id = request.args.get("id", "")
task = db.session.get(PlanItem, run_id)
if task is None:
return jsonify({"error": "Task not found"}), 400
if not current_user.is_admin and str(task.user_id) != str(current_user.id):
plan_id = request.args.get("id", "")
plan = db.session.get(PlanItem, plan_id)
if plan is None:
return jsonify({"error": "Plan not found"}), 400
if not current_user.is_admin and str(plan.user_id) != str(current_user.id):
return jsonify({"error": "Forbidden"}), 403
if not task.generated_report_html:
if not plan.generated_report_html:
return jsonify({"error": "Report not available"}), 404
buffer = io.BytesIO(task.generated_report_html.encode("utf-8"))
buffer = io.BytesIO(plan.generated_report_html.encode("utf-8"))
buffer.seek(0)
download_name = f"{task.id}-report.html"
download_name = f"{plan.id}-report.html"
return send_file(buffer, mimetype="text/html", as_attachment=True, download_name=download_name)


@downloads_bp.route("/plan/download/zip")
@login_required
def plan_download_zip():
run_id = request.args.get("id", "")
task = db.session.get(PlanItem, run_id)
if task is None:
return jsonify({"error": "Task not found"}), 400
if not current_user.is_admin and str(task.user_id) != str(current_user.id):
plan_id = request.args.get("id", "")
plan = db.session.get(PlanItem, plan_id)
if plan is None:
return jsonify({"error": "Plan not found"}), 400
if not current_user.is_admin and str(plan.user_id) != str(current_user.id):
return jsonify({"error": "Forbidden"}), 403
if not task.run_zip_snapshot:
return jsonify({"error": "Run zip not available"}), 404
if not plan.run_zip_snapshot:
return jsonify({"error": "Plan zip not available"}), 404

layout_version = safe_int(getattr(task, "run_artifact_layout_version", None)) or 0
layout_version = safe_int(getattr(plan, "run_artifact_layout_version", None)) or 0
if layout_version >= 2:
buffer = io.BytesIO(task.run_zip_snapshot)
buffer = io.BytesIO(plan.run_zip_snapshot)
buffer.seek(0)
else:
buffer = _sanitize_legacy_run_zip_for_download(task.run_zip_snapshot)
buffer = _sanitize_legacy_run_zip_for_download(plan.run_zip_snapshot)
if buffer is None:
logger.error("Invalid legacy run zip snapshot for run_id=%s", run_id)
return jsonify({"error": "Run zip is invalid"}), 500
logger.error("Invalid legacy run zip snapshot for plan_id=%s", plan_id)
return jsonify({"error": "Plan zip is invalid"}), 500

download_name = f"{task.id}.zip"
download_name = f"{plan.id}.zip"
return send_file(buffer, mimetype="application/zip", as_attachment=True, download_name=download_name)


Expand Down
Loading