From ffdb3f6e444249756879718fa30b6b77be2cd520 Mon Sep 17 00:00:00 2001 From: lil-aditya Date: Thu, 16 Apr 2026 06:13:00 +0530 Subject: [PATCH] fix: replace list.remove() during iteration with comprehension filtering Both deleteScenarioCaseRuns and deleteCaseRun used the anti-pattern of calling list.remove(item) inside a for-loop over the same list. When an element is removed, the iterator skips the next element because the internal index advances past the shifted items. Impact: - deleteScenarioCaseRuns: if a scenario ID appeared in consecutive positions within a case run's Scenarios list, the second occurrence was silently retained, leaving orphaned scenario data in resData.json that could not be deleted through the UI. - deleteCaseRun: same pattern, though less likely to trigger in practice since case names are typically unique. Fix: replace the mutation-during-iteration pattern with list-comprehension filtering, which builds a new list and is both safe and idiomatic Python. Includes 6 new unit tests covering: - Single scenario removal - Consecutive duplicate removal (the regression test) - Cross-case-run removal - No-match noop - resultsOnly flag behavior --- API/Classes/Case/DataFileClass.py | 21 ++- tests/test_list_mutation_fix.py | 224 ++++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+), 13 deletions(-) create mode 100644 tests/test_list_mutation_fix.py diff --git a/API/Classes/Case/DataFileClass.py b/API/Classes/Case/DataFileClass.py index b3fc93190..6081bfd8e 100644 --- a/API/Classes/Case/DataFileClass.py +++ b/API/Classes/Case/DataFileClass.py @@ -824,10 +824,10 @@ def deleteScenarioCaseRuns(self, scenarioId): cases = self.resData['osy-cases'] for cs in cases: - for sc in cs['Scenarios']: - if sc['ScenarioId'] == scenarioId: - cs['Scenarios'].remove(sc) - + cs['Scenarios'] = [ + sc for sc in cs['Scenarios'] + if sc['ScenarioId'] != scenarioId + ] File.writeFile(self.resData, self.resDataPath) response = { @@ -835,9 +835,7 @@ def deleteScenarioCaseRuns(self, scenarioId): "status_code": "success" } - return response - # urllib.request.urlretrieve(self.dataFile, dataFile) except(IOError, IndexError): raise IndexError except OSError: @@ -916,14 +914,11 @@ def deleteCaseResultsJSON(self, caserunname): def deleteCaseRun(self, caserunname, resultsOnly): try: - #caseRunPath = Path(Config.DATA_STORAGE,self.case,'res', caserunname) - #self.resData = Path(Config.DATA_STORAGE,self.case,'view', 'resData.json') - - if not resultsOnly: - for obj in self.resData['osy-cases']: - if obj['Case'] == caserunname: - self.resData['osy-cases'].remove(obj) + self.resData['osy-cases'] = [ + obj for obj in self.resData['osy-cases'] + if obj['Case'] != caserunname + ] File.writeFile(self.resData, self.resDataPath) diff --git a/tests/test_list_mutation_fix.py b/tests/test_list_mutation_fix.py new file mode 100644 index 000000000..a8369b463 --- /dev/null +++ b/tests/test_list_mutation_fix.py @@ -0,0 +1,224 @@ +""" +Tests for the list-mutation-during-iteration fixes in DataFileClass. + +Validates that deleteScenarioCaseRuns and deleteCaseRun correctly +remove *all* matching entries, not just the first one — the old code +called list.remove() inside a for-loop, which skips elements when +the list shrinks mid-iteration. + +These tests are self-contained and do not require Flask or the app. +They test the pure-Python logic by constructing the DataFile object +with mocked filesystem paths. +""" + +import json +import sys +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +# Ensure the API directory is on sys.path so we can import the classes +API_DIR = str(Path(__file__).resolve().parent.parent / "API") +if API_DIR not in sys.path: + sys.path.insert(0, API_DIR) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_case_tree(tmp: Path, case: str, res_data: dict, gen_data: dict): + """Create the minimal directory/file tree that DataFile.__init__ expects.""" + case_dir = tmp / case + view_dir = case_dir / "view" + res_dir = case_dir / "res" + view_dir.mkdir(parents=True, exist_ok=True) + res_dir.mkdir(parents=True, exist_ok=True) + + # genData.json + (case_dir / "genData.json").write_text(json.dumps(gen_data), encoding="utf-8") + + # resData.json (lives inside view/) + (view_dir / "resData.json").write_text(json.dumps(res_data), encoding="utf-8") + + # Parameters.json & Variables.json at DataStorage root + if not (tmp / "Parameters.json").exists(): + (tmp / "Parameters.json").write_text(json.dumps({}), encoding="utf-8") + if not (tmp / "Variables.json").exists(): + (tmp / "Variables.json").write_text(json.dumps({}), encoding="utf-8") + + +# Minimal genData that the Osemosys constructor won't choke on +MINIMAL_GEN_DATA = { + "osy-years": ["2020", "2025"], + "osy-tech": [{"TechId": "T1", "Tech": "Tech1"}], + "osy-comm": [{"CommId": "C1", "Comm": "Comm1"}], + "osy-emis": [{"EmisId": "E1", "Emis": "Emis1"}], + "osy-stg": [], + "osy-ts": [{"TsId": "TS1", "Ts": "Ts1"}], + "osy-se": [{"SeId": "SE1", "Se": "Se1"}], + "osy-dt": [{"DtId": "DT1", "Dt": "Dt1"}], + "osy-dtb": [{"DtbId": "DTB1", "Dtb": "Dtb1"}], + "osy-constraints": [], + "osy-scenarios": [], + "osy-mo": "1", +} + + +def _build_datafile(tmp_path, case, res_data): + """Build a DataFile instance pointed at a tmp_path case directory.""" + _make_case_tree(tmp_path, case, res_data, MINIMAL_GEN_DATA) + + with patch("Classes.Base.Config.DATA_STORAGE", str(tmp_path)): + from Classes.Case.DataFileClass import DataFile + df = DataFile(case) + return df + + +# --------------------------------------------------------------------------- +# deleteScenarioCaseRuns +# --------------------------------------------------------------------------- + + +class TestDeleteScenarioCaseRuns: + """Verify that deleteScenarioCaseRuns removes *all* matching scenarios.""" + + def test_removes_single_scenario(self, tmp_path): + """Basic case: one matching scenario across one case run.""" + res_data = { + "osy-cases": [ + { + "Case": "run1", + "Scenarios": [ + {"ScenarioId": "SC_A", "Scenario": "A", "Active": True}, + {"ScenarioId": "SC_B", "Scenario": "B", "Active": True}, + ], + } + ] + } + + df = _build_datafile(tmp_path, "model", res_data) + resp = df.deleteScenarioCaseRuns("SC_A") + + assert resp["status_code"] == "success" + remaining_ids = [ + sc["ScenarioId"] for sc in df.resData["osy-cases"][0]["Scenarios"] + ] + assert "SC_A" not in remaining_ids + assert "SC_B" in remaining_ids + + def test_removes_consecutive_duplicates(self, tmp_path): + """ + Regression: the old code skipped a scenario when two consecutive + entries matched because list.remove() shifted elements mid-iteration. + """ + res_data = { + "osy-cases": [ + { + "Case": "run1", + "Scenarios": [ + {"ScenarioId": "SC_X", "Scenario": "X1", "Active": True}, + {"ScenarioId": "SC_X", "Scenario": "X2", "Active": False}, + {"ScenarioId": "SC_Y", "Scenario": "Y", "Active": True}, + ], + } + ] + } + + df = _build_datafile(tmp_path, "model", res_data) + df.deleteScenarioCaseRuns("SC_X") + + remaining_ids = [ + sc["ScenarioId"] for sc in df.resData["osy-cases"][0]["Scenarios"] + ] + # Both SC_X entries must be gone + assert remaining_ids == ["SC_Y"] + + def test_removes_across_multiple_case_runs(self, tmp_path): + """Scenario should be removed from every case run, not just the first.""" + res_data = { + "osy-cases": [ + { + "Case": "run1", + "Scenarios": [ + {"ScenarioId": "SC_A", "Scenario": "A", "Active": True}, + ], + }, + { + "Case": "run2", + "Scenarios": [ + {"ScenarioId": "SC_A", "Scenario": "A", "Active": True}, + {"ScenarioId": "SC_B", "Scenario": "B", "Active": True}, + ], + }, + ] + } + + df = _build_datafile(tmp_path, "model", res_data) + df.deleteScenarioCaseRuns("SC_A") + + for case in df.resData["osy-cases"]: + ids = [sc["ScenarioId"] for sc in case["Scenarios"]] + assert "SC_A" not in ids + + def test_no_match_is_noop(self, tmp_path): + """Deleting a non-existent scenario should succeed without changing data.""" + res_data = { + "osy-cases": [ + { + "Case": "run1", + "Scenarios": [ + {"ScenarioId": "SC_A", "Scenario": "A", "Active": True}, + ], + } + ] + } + + df = _build_datafile(tmp_path, "model", res_data) + df.deleteScenarioCaseRuns("SC_NONEXISTENT") + + remaining_ids = [ + sc["ScenarioId"] for sc in df.resData["osy-cases"][0]["Scenarios"] + ] + assert remaining_ids == ["SC_A"] + + +# --------------------------------------------------------------------------- +# deleteCaseRun +# --------------------------------------------------------------------------- + + +class TestDeleteCaseRun: + """Verify that deleteCaseRun removes the matching case run entry.""" + + def test_results_only_does_not_touch_osy_cases(self, tmp_path): + res_data = { + "osy-cases": [ + {"Case": "run1", "Scenarios": []}, + {"Case": "run2", "Scenarios": []}, + ] + } + + df = _build_datafile(tmp_path, "model", res_data) + resp = df.deleteCaseRun("run1", resultsOnly=True) + + # resultsOnly=True means the osy-cases list should NOT be modified + case_names = [c["Case"] for c in df.resData["osy-cases"]] + assert "run1" in case_names + + def test_removes_case_from_resdata_when_not_results_only(self, tmp_path): + res_data = { + "osy-cases": [ + {"Case": "run1", "Scenarios": []}, + {"Case": "run2", "Scenarios": []}, + ] + } + + df = _build_datafile(tmp_path, "model", res_data) + df.deleteCaseRun("run1", resultsOnly=False) + + case_names = [c["Case"] for c in df.resData["osy-cases"]] + assert "run1" not in case_names + assert "run2" in case_names