From ee160ee6c140b5b8228a67e394aa5a866b56a24f Mon Sep 17 00:00:00 2001 From: brightyorcerf Date: Sat, 18 Apr 2026 16:10:47 +0530 Subject: [PATCH] implement comprehensive execution-gate hardening and 94 regression tests --- API/Routes/DataFile/DataFileRoute.py | 63 ++++++ tests/test_path_traversal.py | 320 +++++++++++++++++++++++++++ 2 files changed, 383 insertions(+) create mode 100644 tests/test_path_traversal.py diff --git a/API/Routes/DataFile/DataFileRoute.py b/API/Routes/DataFile/DataFileRoute.py index 56b04efdc..e5b613b2c 100644 --- a/API/Routes/DataFile/DataFileRoute.py +++ b/API/Routes/DataFile/DataFileRoute.py @@ -9,11 +9,33 @@ datafile_api = Blueprint('DataFileRoute', __name__) +# Allowed solver identifiers (case-insensitive comparison applied in route). +_ALLOWED_SOLVERS = frozenset({'glpk', 'cbc'}) + + +def _validate_case_inputs(casename, caserunname=None): + """Validate casename and optional caserunname against path traversal. + + The Osemosys constructor validates casename via Config.validate_path, but + caserunname is never checked there. This helper closes the gap by + validating both at the route boundary before any filesystem operations. + + Raises PermissionError on traversal attempts, consistent with existing + download routes. + """ + Config.validate_path(Config.DATA_STORAGE, casename) + if caserunname is not None: + Config.validate_path( + Config.DATA_STORAGE, + os.path.join(casename, 'res', caserunname) + ) + @datafile_api.route("/generateDataFile", methods=['POST']) def generateDataFile(): try: casename = request.json['casename'] caserunname = request.json['caserunname'] + _validate_case_inputs(casename, caserunname) if casename != None: txtFile = DataFile(casename) @@ -23,6 +45,8 @@ def generateDataFile(): "status_code": "success" } return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('No existing cases!'), 404 @@ -35,12 +59,15 @@ def createCaseRun(): casename = request.json['casename'] caserunname = request.json['caserunname'] data = request.json['data'] + _validate_case_inputs(casename, caserunname) if casename != None: caserun = DataFile(casename) response = caserun.createCaseRun(caserunname, data) return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('No existing cases!'), 404 @@ -54,12 +81,16 @@ def updateCaseRun(): caserunname = request.json['caserunname'] oldcaserunname = request.json['oldcaserunname'] data = request.json['data'] + _validate_case_inputs(casename, caserunname) + _validate_case_inputs(casename, oldcaserunname) if casename != None: caserun = DataFile(casename) response = caserun.updateCaseRun(caserunname, oldcaserunname, data) return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('No existing cases!'), 404 @@ -92,6 +123,8 @@ def deleteCaseRun(): caserun = DataFile(casename) response = caserun.deleteCaseRun(caserunname, resultsOnly) return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except FileNotFoundError: return jsonify('No existing cases!'), 404 except OSError: @@ -102,12 +135,15 @@ def deleteScenarioCaseRuns(): try: scenarioId = request.json['scenarioId'] casename = request.json['casename'] + _validate_case_inputs(casename) if casename != None: caserun = DataFile(casename) response = caserun.deleteScenarioCaseRuns(scenarioId) return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('No existing cases!'), 404 @@ -117,12 +153,15 @@ def saveView(): casename = request.json['casename'] param = request.json['param'] data = request.json['data'] + _validate_case_inputs(casename) if casename != None: caserun = DataFile(casename) response = caserun.saveView(data, param) return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('No existing cases!'), 404 @@ -132,12 +171,15 @@ def updateViews(): casename = request.json['casename'] param = request.json['param'] data = request.json['data'] + _validate_case_inputs(casename) if casename != None: caserun = DataFile(casename) response = caserun.updateViews(data, param) return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('No existing cases!'), 404 @@ -146,6 +188,7 @@ def readDataFile(): try: casename = request.json['casename'] caserunname = request.json['caserunname'] + _validate_case_inputs(casename, caserunname) if casename != None: txtFile = DataFile(casename) data = txtFile.readDataFile(caserunname) @@ -153,6 +196,8 @@ def readDataFile(): else: response = None return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('No existing cases!'), 404 @@ -186,6 +231,7 @@ def validateInputs(): try: casename = request.json['casename'] caserunname = request.json['caserunname'] + _validate_case_inputs(casename, caserunname) if casename != None: df = DataFile(casename) validation = df.validateInputs(caserunname) @@ -193,6 +239,8 @@ def validateInputs(): else: response = None return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('No existing cases!'), 404 @@ -290,6 +338,11 @@ def run(): casename = request.json['casename'] caserunname = request.json['caserunname'] solver = request.json['solver'] + _validate_case_inputs(casename, caserunname) + + if str(solver).lower() not in _ALLOWED_SOLVERS: + return jsonify({'message': 'Invalid solver.', 'status_code': 'error'}), 400 + logger.info("Starting optimization process for model %s caserun %s", casename, caserunname) txtFile = DataFile(casename) response = txtFile.run(solver, caserunname) @@ -299,6 +352,8 @@ def run(): # print(ex) # return ex, 404 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('No existing cases!'), 404 @@ -311,6 +366,9 @@ def batchRun(): start = time.time() modelname = request.json['modelname'] cases = request.json['cases'] + _validate_case_inputs(modelname) + for caserun in cases: + _validate_case_inputs(modelname, caserun) if modelname != None: txtFile = DataFile(modelname) @@ -322,6 +380,8 @@ def batchRun(): end = time.time() response['time'] = end-start return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('Error!'), 404 @@ -329,6 +389,7 @@ def batchRun(): def cleanUp(): try: modelname = request.json['modelname'] + _validate_case_inputs(modelname) if modelname != None: model = DataFile(modelname) @@ -336,5 +397,7 @@ def cleanUp(): response = model.cleanUp() return jsonify(response), 200 + except PermissionError: + return jsonify({'message': 'Invalid path.', 'status_code': 'error'}), 400 except(IOError): return jsonify('Error!'), 404 diff --git a/tests/test_path_traversal.py b/tests/test_path_traversal.py new file mode 100644 index 000000000..90d166574 --- /dev/null +++ b/tests/test_path_traversal.py @@ -0,0 +1,320 @@ +""" +Tests for path traversal protection in DataFileRoute.py. + +Every route that accepts casename or caserunname from the request body must +reject path traversal payloads that escape DATA_STORAGE with a 400 status. + +Config.validate_path resolves the full path and checks it stays under +DATA_STORAGE. Traversals that *stay inside* DATA_STORAGE are not flagged +(they just point at a different case — normal filesystem behaviour). Only +traversals that *escape* the storage root are rejected. + +The Osemosys constructor validates casename, but caserunname was historically +unchecked — these tests verify the new _validate_case_inputs guard. +""" + +import pytest + +# --------------------------------------------------------------------------- +# Traversal payloads that ESCAPE DATA_STORAGE (the real attacks) +# --------------------------------------------------------------------------- +# DATA_STORAGE = /WebAPP/DataStorage +# casename is joined directly: DataStorage/ +# caserunname is joined as: DataStorage//res/ +# +# To escape DataStorage from the casename position we need ../../ +# To escape DataStorage from the caserunname position we need ../../../../ +# (because the path is DataStorage//res/) + +ESCAPING_CASENAMES = [ + "../../etc/passwd", + "../../../etc/shadow", + "valid_case/../../../etc", +] + +# These escape DataStorage from the caserunname position +# resolved path: DataStorage/safe_case/res/../../../../tmp -> /tmp (outside DataStorage) +ESCAPING_CASERUNNAMES = [ + "../../../../tmp/evil", + "../../../../../etc/shadow", + "run/../../../../tmp/hack", +] + +# Null byte payloads — blocked by explicit \x00 check in validate_path +NULL_BYTE_CASENAMES = [ + "case\x00injected", +] + +NULL_BYTE_CASERUNNAMES = [ + "run\x00injected", +] + + +# --------------------------------------------------------------------------- +# POST routes that accept casename — traversal MUST be blocked (400) +# --------------------------------------------------------------------------- +class TestCasenameTraversal: + """Routes must reject casename payloads that escape DATA_STORAGE.""" + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_generateDataFile_rejects_bad_casename(self, client, payload): + resp = client.post("/generateDataFile", json={ + "casename": payload, + "caserunname": "safe_run" + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_createCaseRun_rejects_bad_casename(self, client, payload): + resp = client.post("/createCaseRun", json={ + "casename": payload, + "caserunname": "safe_run", + "data": {} + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_updateCaseRun_rejects_bad_casename(self, client, payload): + resp = client.post("/updateCaseRun", json={ + "casename": payload, + "caserunname": "safe_run", + "oldcaserunname": "old_run", + "data": {} + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_deleteCaseRun_rejects_bad_casename(self, client, payload): + resp = client.post("/deleteCaseRun", json={ + "casename": payload, + "caserunname": "safe_run", + "resultsOnly": False + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_deleteScenarioCaseRuns_rejects_bad_casename(self, client, payload): + resp = client.post("/deleteScenarioCaseRuns", json={ + "scenarioId": "sc1", + "casename": payload + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_saveView_rejects_bad_casename(self, client, payload): + resp = client.post("/saveView", json={ + "casename": payload, + "param": "test", + "data": {} + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_updateViews_rejects_bad_casename(self, client, payload): + resp = client.post("/updateViews", json={ + "casename": payload, + "param": "test", + "data": {} + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_readDataFile_rejects_bad_casename(self, client, payload): + resp = client.post("/readDataFile", json={ + "casename": payload, + "caserunname": "safe_run" + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_validateInputs_rejects_bad_casename(self, client, payload): + resp = client.post("/validateInputs", json={ + "casename": payload, + "caserunname": "safe_run" + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_run_rejects_bad_casename(self, client, payload): + resp = client.post("/run", json={ + "casename": payload, + "caserunname": "safe_run", + "solver": "cbc" + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_batchRun_rejects_bad_modelname(self, client, payload): + resp = client.post("/batchRun", json={ + "modelname": payload, + "cases": ["run1"] + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASENAMES + NULL_BYTE_CASENAMES) + def test_cleanUp_rejects_bad_modelname(self, client, payload): + resp = client.post("/cleanUp", json={ + "modelname": payload + }) + assert resp.status_code == 400 + + +# --------------------------------------------------------------------------- +# POST routes: traversal in caserunname — the main gap this PR fixes +# --------------------------------------------------------------------------- +class TestCaserunnameTraversal: + """Routes must reject caserunname payloads that escape DATA_STORAGE. + This was the primary vulnerability: casename was validated by the + Osemosys constructor, but caserunname was never checked.""" + + @pytest.mark.parametrize("payload", ESCAPING_CASERUNNAMES + NULL_BYTE_CASERUNNAMES) + def test_generateDataFile_rejects_bad_caserunname(self, client, payload): + resp = client.post("/generateDataFile", json={ + "casename": "safe_case", + "caserunname": payload + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASERUNNAMES + NULL_BYTE_CASERUNNAMES) + def test_createCaseRun_rejects_bad_caserunname(self, client, payload): + resp = client.post("/createCaseRun", json={ + "casename": "safe_case", + "caserunname": payload, + "data": {} + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASERUNNAMES + NULL_BYTE_CASERUNNAMES) + def test_updateCaseRun_rejects_bad_caserunname(self, client, payload): + resp = client.post("/updateCaseRun", json={ + "casename": "safe_case", + "caserunname": payload, + "oldcaserunname": "old_run", + "data": {} + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASERUNNAMES + NULL_BYTE_CASERUNNAMES) + def test_updateCaseRun_rejects_bad_oldcaserunname(self, client, payload): + resp = client.post("/updateCaseRun", json={ + "casename": "safe_case", + "caserunname": "safe_run", + "oldcaserunname": payload, + "data": {} + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASERUNNAMES + NULL_BYTE_CASERUNNAMES) + def test_deleteCaseRun_rejects_bad_caserunname(self, client, payload): + resp = client.post("/deleteCaseRun", json={ + "casename": "safe_case", + "caserunname": payload, + "resultsOnly": False + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASERUNNAMES + NULL_BYTE_CASERUNNAMES) + def test_readDataFile_rejects_bad_caserunname(self, client, payload): + resp = client.post("/readDataFile", json={ + "casename": "safe_case", + "caserunname": payload + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASERUNNAMES + NULL_BYTE_CASERUNNAMES) + def test_validateInputs_rejects_bad_caserunname(self, client, payload): + resp = client.post("/validateInputs", json={ + "casename": "safe_case", + "caserunname": payload + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASERUNNAMES + NULL_BYTE_CASERUNNAMES) + def test_run_rejects_bad_caserunname(self, client, payload): + resp = client.post("/run", json={ + "casename": "safe_case", + "caserunname": payload, + "solver": "cbc" + }) + assert resp.status_code == 400 + + @pytest.mark.parametrize("payload", ESCAPING_CASERUNNAMES + NULL_BYTE_CASERUNNAMES) + def test_batchRun_rejects_bad_caserunname_in_list(self, client, payload): + resp = client.post("/batchRun", json={ + "modelname": "safe_case", + "cases": [payload] + }) + assert resp.status_code == 400 + + +# --------------------------------------------------------------------------- +# Solver whitelist +# --------------------------------------------------------------------------- +class TestSolverWhitelist: + """The /run endpoint should reject solver values outside the allowed set.""" + + @pytest.mark.parametrize("solver", [ + "bash", + "rm -rf /", + "python", + "'; DROP TABLE--", + "", + "GLPK; curl evil.com", + ]) + def test_run_rejects_invalid_solver(self, client, solver): + resp = client.post("/run", json={ + "casename": "safe_case", + "caserunname": "safe_run", + "solver": solver + }) + assert resp.status_code == 400 + + def test_run_does_not_reject_valid_solver_cbc(self, client): + """cbc is valid so the request shouldn't fail at the solver stage. + It will fail later (404 — no such case on disk), but NOT with 400 + for the solver check.""" + resp = client.post("/run", json={ + "casename": "safe_case", + "caserunname": "safe_run", + "solver": "cbc" + }) + # Must not be 400 with 'Invalid solver' — the next failure is + # 404 (IOError from DataFile constructor reading nonexistent case). + assert resp.status_code != 200 # won't succeed — case doesn't exist + data = resp.get_json(force=True) + if resp.status_code == 400: + # If it's 400, make sure it's not the solver check + assert "solver" not in str(data).lower() + + def test_run_does_not_reject_valid_solver_glpk(self, client): + resp = client.post("/run", json={ + "casename": "safe_case", + "caserunname": "safe_run", + "solver": "glpk" + }) + assert resp.status_code != 200 + data = resp.get_json(force=True) + if resp.status_code == 400: + assert "solver" not in str(data).lower() + + +# --------------------------------------------------------------------------- +# Null byte injection (explicit \x00 check in Config.validate_path) +# --------------------------------------------------------------------------- +class TestNullByteInjection: + """Null bytes are a classic bypass vector — Config.validate_path blocks them.""" + + def test_casename_with_null_byte(self, client): + resp = client.post("/generateDataFile", json={ + "casename": "valid\x00../../etc/passwd", + "caserunname": "run1" + }) + assert resp.status_code == 400 + + def test_caserunname_with_null_byte(self, client): + resp = client.post("/run", json={ + "casename": "safe", + "caserunname": "run\x00../../etc/passwd", + "solver": "cbc" + }) + assert resp.status_code == 400