Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions API/Routes/DataFile/DataFileRoute.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

datafile_api = Blueprint('DataFileRoute', __name__)


def _safe_child_path(base_dir, *parts):
relative_path = os.path.join(*[part for part in parts if part not in (None, "")])

Copilot AI Apr 13, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_safe_child_path can raise a TypeError when all provided parts are None/"" because os.path.join() is called with an empty argument list. This can be triggered by an empty string input (e.g., caserunname == ""), resulting in a 500 instead of a controlled 400. Consider explicitly handling the “no effective parts” case (e.g., raise PermissionError / return a 400) or ensure callers validate non-empty segments before calling this helper.

Suggested change
relative_path = os.path.join(*[part for part in parts if part not in (None, "")])
effective_parts = [part for part in parts if part not in (None, "")]
if not effective_parts:
raise PermissionError("At least one non-empty path segment is required")
relative_path = os.path.join(*effective_parts)

Copilot uses AI. Check for mistakes.
return Path(Config.validate_path(base_dir, relative_path))

@datafile_api.route("/generateDataFile", methods=['POST'])
def generateDataFile():
try:
Expand Down Expand Up @@ -77,8 +82,8 @@ def deleteCaseRun():
if not casename:
return jsonify({'message': 'No model selected.', 'status_code': 'error'}), 400

Copilot AI Apr 13, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteCaseRun does not validate that caserunname is non-empty. Because validate_json_fields only checks key presence, a request with caserunname: "" will reach _safe_child_path(case_results_dir, caserunname) and currently produce a server error (TypeError from os.path.join) instead of returning a 400. Add an explicit check for an empty caserunname (consistent with the download routes) or make _safe_child_path robust to empty segments.

Suggested change
return jsonify({'message': 'No model selected.', 'status_code': 'error'}), 400
return jsonify({'message': 'No model selected.', 'status_code': 'error'}), 400
if not caserunname:
return jsonify({'message': 'No case run selected.', 'status_code': 'error'}), 400

Copilot uses AI. Check for mistakes.

Config.validate_path(Config.DATA_STORAGE, os.path.join(casename, 'res', caserunname or ''))
casePath = Path(Config.DATA_STORAGE, casename, 'res', caserunname)
case_results_dir = _safe_child_path(Config.DATA_STORAGE, casename, 'res')
casePath = _safe_child_path(case_results_dir, caserunname)
if not resultsOnly:
shutil.rmtree(casePath)
else:
Expand All @@ -92,6 +97,8 @@ def deleteCaseRun():
caserun = DataFile(casename)
response = caserun.deleteCaseRun(caserunname, resultsOnly)
return jsonify(response), 200
except PermissionError:
return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400
Comment on lines 99 to +101

Copilot AI Apr 13, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching PermissionError and returning “Invalid path.” will also convert real filesystem permission failures (e.g., inability to delete due to OS ACLs/locks) into a 400 that looks like a traversal attempt. If you want to preserve debuggability, consider only mapping PermissionError originating from Config.validate_path/_safe_child_path to 400 and letting deletion-time permission errors surface as 5xx/appropriate error codes.

Copilot uses AI. Check for mistakes.
except FileNotFoundError:
return jsonify('No existing cases!'), 404
except OSError:
Expand Down Expand Up @@ -215,9 +222,10 @@ def downloadDataFile():
caserunname = request.args.get('caserunname')
if not caserunname:
return jsonify({'message': 'Missing required parameter: caserunname.', 'status_code': 'error'}), 400
Config.validate_path(Config.DATA_STORAGE, os.path.join(case or '', 'res', caserunname or ''))
dataFile = Path(Config.DATA_STORAGE,case, 'res',caserunname, 'data.txt')
return send_file(dataFile.resolve(), as_attachment=True, max_age=0)
case_results_dir = _safe_child_path(Config.DATA_STORAGE, case, 'res')
caserun_dir = _safe_child_path(case_results_dir, caserunname)
dataFile = _safe_child_path(caserun_dir, 'data.txt')
return send_file(dataFile, as_attachment=True, max_age=0)

except PermissionError:
return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400
Expand All @@ -233,9 +241,9 @@ def downloadFile():
file = request.args.get('file')
if not file:
return jsonify({'message': 'Missing required parameter: file.', 'status_code': 'error'}), 400
Config.validate_path(Config.DATA_STORAGE, os.path.join(case or '', 'res', 'csv', file or ''))
dataFile = Path(Config.DATA_STORAGE,case,'res','csv',file)
return send_file(dataFile.resolve(), as_attachment=True, max_age=0)
csv_dir = _safe_child_path(Config.DATA_STORAGE, case, 'res', 'csv')
dataFile = _safe_child_path(csv_dir, file)
return send_file(dataFile, as_attachment=True, max_age=0)

except PermissionError:
return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400
Expand All @@ -254,9 +262,11 @@ def downloadCSVFile():
return jsonify({'message': 'Missing required parameter: file.', 'status_code': 'error'}), 400
if not caserunname:
return jsonify({'message': 'Missing required parameter: caserunname.', 'status_code': 'error'}), 400
Config.validate_path(Config.DATA_STORAGE, os.path.join(case or '', 'res', caserunname or '', 'csv', file or ''))
dataFile = Path(Config.DATA_STORAGE,case,'res',caserunname,'csv',file)
return send_file(dataFile.resolve(), as_attachment=True, max_age=0)
case_results_dir = _safe_child_path(Config.DATA_STORAGE, case, 'res')
caserun_dir = _safe_child_path(case_results_dir, caserunname)
csv_dir = _safe_child_path(caserun_dir, 'csv')
dataFile = _safe_child_path(csv_dir, file)
return send_file(dataFile, as_attachment=True, max_age=0)

except PermissionError:
return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400
Expand All @@ -272,9 +282,10 @@ def downloadResultsFile():
caserunname = request.args.get('caserunname')
if not caserunname:
return jsonify({'message': 'Missing required parameter: caserunname.', 'status_code': 'error'}), 400
Config.validate_path(Config.DATA_STORAGE, os.path.join(case or '', 'res', caserunname or ''))
dataFile = Path(Config.DATA_STORAGE,case, 'res', caserunname,'results.txt')
return send_file(dataFile.resolve(), as_attachment=True, max_age=0)
case_results_dir = _safe_child_path(Config.DATA_STORAGE, case, 'res')
caserun_dir = _safe_child_path(case_results_dir, caserunname)
dataFile = _safe_child_path(caserun_dir, 'results.txt')
return send_file(dataFile, as_attachment=True, max_age=0)

except PermissionError:
return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400
Expand Down
95 changes: 95 additions & 0 deletions tests/test_datafile_guards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from pathlib import Path
import shutil
import uuid

from Classes.Base import Config
from Classes.Base.FileClass import File


def _create_case(case_name, case_runs=None):
case_dir = Path(Config.DATA_STORAGE, case_name)
(case_dir / "view").mkdir(parents=True)
(case_dir / "res").mkdir(parents=True)

File.writeFile({"osy-casename": case_name}, case_dir / "genData.json")
File.writeFile({"osy-cases": case_runs or []}, case_dir / "view" / "resData.json")

for run in case_runs or []:
(case_dir / "res" / run["Case"]).mkdir(parents=True)

return case_dir


def test_delete_case_run_deletes_requested_case_run(client):
case_name = f"delete_run_case_{uuid.uuid4().hex}"
case_dir = _create_case(case_name, case_runs=[{"Case": "run1"}])

try:
response = client.post(
"/deleteCaseRun",
json={"casename": case_name, "caserunname": "run1", "resultsOnly": False},
)

assert response.status_code == 200
assert response.get_json() == {
"message": "You have deleted a case run!",
"status_code": "success",
}
assert not (case_dir / "res" / "run1").exists()
assert File.readFile(case_dir / "view" / "resData.json") == {"osy-cases": []}
finally:
shutil.rmtree(case_dir, ignore_errors=True)


def test_delete_case_run_blocks_path_traversal_to_sibling_case(client):
source_case = f"source_case_{uuid.uuid4().hex}"
victim_case = f"victim_case_{uuid.uuid4().hex}"
source_dir = _create_case(source_case)
victim_dir = _create_case(victim_case)

try:
response = client.post(
"/deleteCaseRun",
json={
"casename": source_case,
"caserunname": f"../../{victim_case}",
"resultsOnly": False,
},
)

assert response.status_code == 400
assert response.get_json() == {
"message": "Invalid path.",
"status_code": "error",
}
assert victim_dir.exists()
finally:
shutil.rmtree(source_dir, ignore_errors=True)
shutil.rmtree(victim_dir, ignore_errors=True)


def test_download_file_blocks_path_traversal_to_sibling_case(client):
source_case = f"download_source_{uuid.uuid4().hex}"
victim_case = f"download_victim_{uuid.uuid4().hex}"
source_dir = _create_case(source_case)
victim_dir = _create_case(victim_case)
(source_dir / "res" / "csv").mkdir(parents=True, exist_ok=True)

try:
with client.session_transaction() as session_data:
session_data["osycase"] = source_case

response = client.get(
"/downloadFile",
query_string={"file": f"../../../{victim_case}/genData.json"},
)

assert response.status_code == 400
assert response.get_json() == {
"message": "Invalid path.",
"status_code": "error",
}
assert victim_dir.exists()
finally:
shutil.rmtree(source_dir, ignore_errors=True)
shutil.rmtree(victim_dir, ignore_errors=True)
Loading