From a565901819693e1049bf5b05b7a697394009e5ef Mon Sep 17 00:00:00 2001 From: lil-aditya Date: Mon, 13 Apr 2026 06:22:06 +0530 Subject: [PATCH] Fix case-run path traversal --- API/Routes/DataFile/DataFileRoute.py | 39 ++++++++---- tests/test_datafile_guards.py | 95 ++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 14 deletions(-) create mode 100644 tests/test_datafile_guards.py diff --git a/API/Routes/DataFile/DataFileRoute.py b/API/Routes/DataFile/DataFileRoute.py index 56b04efdc..feeac5207 100644 --- a/API/Routes/DataFile/DataFileRoute.py +++ b/API/Routes/DataFile/DataFileRoute.py @@ -9,6 +9,11 @@ datafile_api = Blueprint('DataFileRoute', __name__) + +def _safe_child_path(base_dir, *parts): + relative_path = os.path.join(*[part for part in parts if part not in (None, "")]) + return Path(Config.validate_path(base_dir, relative_path)) + @datafile_api.route("/generateDataFile", methods=['POST']) def generateDataFile(): try: @@ -77,8 +82,8 @@ def deleteCaseRun(): if not casename: return jsonify({'message': 'No model selected.', 'status_code': 'error'}), 400 - Config.validate_path(Config.DATA_STORAGE, os.path.join(casename, 'res', caserunname or '')) - casePath = Path(Config.DATA_STORAGE, casename, 'res', caserunname) + case_results_dir = _safe_child_path(Config.DATA_STORAGE, casename, 'res') + casePath = _safe_child_path(case_results_dir, caserunname) if not resultsOnly: shutil.rmtree(casePath) else: @@ -92,6 +97,8 @@ def deleteCaseRun(): caserun = DataFile(casename) response = caserun.deleteCaseRun(caserunname, resultsOnly) return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except FileNotFoundError: return jsonify('No existing cases!'), 404 except OSError: @@ -215,9 +222,10 @@ def downloadDataFile(): caserunname = request.args.get('caserunname') if not caserunname: return jsonify({'message': 'Missing required parameter: caserunname.', 'status_code': 'error'}), 400 - Config.validate_path(Config.DATA_STORAGE, os.path.join(case or '', 'res', caserunname or '')) - dataFile = Path(Config.DATA_STORAGE,case, 'res',caserunname, 'data.txt') - return send_file(dataFile.resolve(), as_attachment=True, max_age=0) + case_results_dir = _safe_child_path(Config.DATA_STORAGE, case, 'res') + caserun_dir = _safe_child_path(case_results_dir, caserunname) + dataFile = _safe_child_path(caserun_dir, 'data.txt') + return send_file(dataFile, as_attachment=True, max_age=0) except PermissionError: return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 @@ -233,9 +241,9 @@ def downloadFile(): file = request.args.get('file') if not file: return jsonify({'message': 'Missing required parameter: file.', 'status_code': 'error'}), 400 - Config.validate_path(Config.DATA_STORAGE, os.path.join(case or '', 'res', 'csv', file or '')) - dataFile = Path(Config.DATA_STORAGE,case,'res','csv',file) - return send_file(dataFile.resolve(), as_attachment=True, max_age=0) + csv_dir = _safe_child_path(Config.DATA_STORAGE, case, 'res', 'csv') + dataFile = _safe_child_path(csv_dir, file) + return send_file(dataFile, as_attachment=True, max_age=0) except PermissionError: return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 @@ -254,9 +262,11 @@ def downloadCSVFile(): return jsonify({'message': 'Missing required parameter: file.', 'status_code': 'error'}), 400 if not caserunname: return jsonify({'message': 'Missing required parameter: caserunname.', 'status_code': 'error'}), 400 - Config.validate_path(Config.DATA_STORAGE, os.path.join(case or '', 'res', caserunname or '', 'csv', file or '')) - dataFile = Path(Config.DATA_STORAGE,case,'res',caserunname,'csv',file) - return send_file(dataFile.resolve(), as_attachment=True, max_age=0) + case_results_dir = _safe_child_path(Config.DATA_STORAGE, case, 'res') + caserun_dir = _safe_child_path(case_results_dir, caserunname) + csv_dir = _safe_child_path(caserun_dir, 'csv') + dataFile = _safe_child_path(csv_dir, file) + return send_file(dataFile, as_attachment=True, max_age=0) except PermissionError: return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 @@ -272,9 +282,10 @@ def downloadResultsFile(): caserunname = request.args.get('caserunname') if not caserunname: return jsonify({'message': 'Missing required parameter: caserunname.', 'status_code': 'error'}), 400 - Config.validate_path(Config.DATA_STORAGE, os.path.join(case or '', 'res', caserunname or '')) - dataFile = Path(Config.DATA_STORAGE,case, 'res', caserunname,'results.txt') - return send_file(dataFile.resolve(), as_attachment=True, max_age=0) + case_results_dir = _safe_child_path(Config.DATA_STORAGE, case, 'res') + caserun_dir = _safe_child_path(case_results_dir, caserunname) + dataFile = _safe_child_path(caserun_dir, 'results.txt') + return send_file(dataFile, as_attachment=True, max_age=0) except PermissionError: return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 diff --git a/tests/test_datafile_guards.py b/tests/test_datafile_guards.py new file mode 100644 index 000000000..0aa58ef27 --- /dev/null +++ b/tests/test_datafile_guards.py @@ -0,0 +1,95 @@ +from pathlib import Path +import shutil +import uuid + +from Classes.Base import Config +from Classes.Base.FileClass import File + + +def _create_case(case_name, case_runs=None): + case_dir = Path(Config.DATA_STORAGE, case_name) + (case_dir / "view").mkdir(parents=True) + (case_dir / "res").mkdir(parents=True) + + File.writeFile({"osy-casename": case_name}, case_dir / "genData.json") + File.writeFile({"osy-cases": case_runs or []}, case_dir / "view" / "resData.json") + + for run in case_runs or []: + (case_dir / "res" / run["Case"]).mkdir(parents=True) + + return case_dir + + +def test_delete_case_run_deletes_requested_case_run(client): + case_name = f"delete_run_case_{uuid.uuid4().hex}" + case_dir = _create_case(case_name, case_runs=[{"Case": "run1"}]) + + try: + response = client.post( + "/deleteCaseRun", + json={"casename": case_name, "caserunname": "run1", "resultsOnly": False}, + ) + + assert response.status_code == 200 + assert response.get_json() == { + "message": "You have deleted a case run!", + "status_code": "success", + } + assert not (case_dir / "res" / "run1").exists() + assert File.readFile(case_dir / "view" / "resData.json") == {"osy-cases": []} + finally: + shutil.rmtree(case_dir, ignore_errors=True) + + +def test_delete_case_run_blocks_path_traversal_to_sibling_case(client): + source_case = f"source_case_{uuid.uuid4().hex}" + victim_case = f"victim_case_{uuid.uuid4().hex}" + source_dir = _create_case(source_case) + victim_dir = _create_case(victim_case) + + try: + response = client.post( + "/deleteCaseRun", + json={ + "casename": source_case, + "caserunname": f"../../{victim_case}", + "resultsOnly": False, + }, + ) + + assert response.status_code == 400 + assert response.get_json() == { + "message": "Invalid path.", + "status_code": "error", + } + assert victim_dir.exists() + finally: + shutil.rmtree(source_dir, ignore_errors=True) + shutil.rmtree(victim_dir, ignore_errors=True) + + +def test_download_file_blocks_path_traversal_to_sibling_case(client): + source_case = f"download_source_{uuid.uuid4().hex}" + victim_case = f"download_victim_{uuid.uuid4().hex}" + source_dir = _create_case(source_case) + victim_dir = _create_case(victim_case) + (source_dir / "res" / "csv").mkdir(parents=True, exist_ok=True) + + try: + with client.session_transaction() as session_data: + session_data["osycase"] = source_case + + response = client.get( + "/downloadFile", + query_string={"file": f"../../../{victim_case}/genData.json"}, + ) + + assert response.status_code == 400 + assert response.get_json() == { + "message": "Invalid path.", + "status_code": "error", + } + assert victim_dir.exists() + finally: + shutil.rmtree(source_dir, ignore_errors=True) + shutil.rmtree(victim_dir, ignore_errors=True)