From 0c5f959c3df1e7824a35b88e73890c786a145ba3 Mon Sep 17 00:00:00 2001 From: Vysakh Menon Date: Fri, 20 Mar 2026 16:37:24 -0700 Subject: [PATCH 1/3] 32879 prevent duplicate processing of same filing --- .../business_filer/common/services/naics.py | 22 ++-- .../src/business_filer/resources/worker.py | 2 +- .../src/business_filer/services/filer.py | 6 +- .../test_filer/test_process_filing_core.py | 111 ++++++++++++++++++ 4 files changed, 127 insertions(+), 14 deletions(-) create mode 100644 queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py diff --git a/queue_services/business-filer/src/business_filer/common/services/naics.py b/queue_services/business-filer/src/business_filer/common/services/naics.py index 427abbf598..62b7bde1bb 100644 --- a/queue_services/business-filer/src/business_filer/common/services/naics.py +++ b/queue_services/business-filer/src/business_filer/common/services/naics.py @@ -27,15 +27,13 @@ class NaicsService: @staticmethod def find_by_code(naics_code: str): """Return NAICS Structure matching code.""" - try: - naics_url = current_app.config.get("NAICS_API_URL") - token = AccountService.get_bearer_token() - response = requests.get(naics_url + "/" + naics_code, headers={ - "Content-Type": "application/json", - "Authorization": "Bearer " + token - }) - response.raise_for_status() - return response.json() - except Exception as err: - current_app.logger.error(err) - return None + naics_url = current_app.config.get("NAICS_API_URL") + token = AccountService.get_bearer_token() + if not token: + raise Exception("Failed to get token for naics call") + response = requests.get(naics_url + "/" + naics_code, headers={ + "Content-Type": "application/json", + "Authorization": "Bearer " + token + }) + response.raise_for_status() + return response.json() diff --git a/queue_services/business-filer/src/business_filer/resources/worker.py b/queue_services/business-filer/src/business_filer/resources/worker.py index 56291e2639..969a69eca4 100644 --- a/queue_services/business-filer/src/business_filer/resources/worker.py +++ b/queue_services/business-filer/src/business_filer/resources/worker.py @@ -95,7 +95,7 @@ def worker(): process_filing(filing_message) except Exception as err: # pylint: disable=broad-exception-caught current_app.logger.error(f"Error processing filing {filing_message}: {err}") - current_app.logger.debug(traceback.format_exc()) + current_app.logger.debug(f"{filing_message}: {traceback.format_exc()}") return {"error": f"Unable to process filing: {filing_message}"}, HTTPStatus.INTERNAL_SERVER_ERROR # Completed diff --git a/queue_services/business-filer/src/business_filer/services/filer.py b/queue_services/business-filer/src/business_filer/services/filer.py index 6a3a3d4b5d..f40ae0c5c4 100644 --- a/queue_services/business-filer/src/business_filer/services/filer.py +++ b/queue_services/business-filer/src/business_filer/services/filer.py @@ -99,7 +99,11 @@ def get_filing_types(legal_filings: dict): def process_filing(filing_message: FilingMessage): # noqa: PLR0915, PLR0912 """Render the filings contained in the submission.""" - if not (filing_submission := Filing.find_by_id(filing_message.filing_identifier)): + if not (filing_submission := db.session.query(Filing) + .with_for_update() + .filter_by(id=filing_message.filing_identifier) + .first() + ): current_app.logger.error(f"No filing found for: {filing_message}") raise DefaultError(error_text=f"filing not found for {filing_message.filing_identifier}") diff --git a/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py b/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py new file mode 100644 index 0000000000..4cceda2b9c --- /dev/null +++ b/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py @@ -0,0 +1,111 @@ +# Copyright © 2026 Province of British Columbia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Test Suites to ensure that the core process_filing wrapper is operating correctly.""" +import pytest +import copy +from registry_schemas.example_data import ANNUAL_REPORT, REGISTRATION, FILING_HEADER + +from business_filer.common.filing_message import FilingMessage +from business_filer.exceptions import DefaultError +from business_filer.services.filer import process_filing +from business_model.models import Filing +from tests.unit import create_business, create_filing + + +def test_process_filing_not_found(app, session): + """Assert that a DefaultError is raised when filing is not found.""" + filing_msg = FilingMessage(filing_identifier=999999) + with pytest.raises(DefaultError) as excinfo: + process_filing(filing_msg) + assert "filing not found for 999999" in excinfo.value.error_text + + +def test_process_filing_duplicate(app, session, mocker): + """Assert that process_filing handles duplicate messages and the DB query succeeds without mocking.""" + # mock out the event publishing so we don't try to send emails + mocker.patch('business_filer.services.publish_event.PublishEvent.publish_email_message', return_value=None) + mocker.patch('business_filer.services.publish_event.PublishEvent.publish_event', return_value=None) + + business = create_business(identifier="CP1234567") + filing = create_filing("payment123", ANNUAL_REPORT, business.id) + filing_msg = FilingMessage(filing_identifier=filing.id) + + # First attempt: Processes successfully and commits (triggering DB updates) + process_filing(filing_msg) + + # Verify status changed to completed in DB + completed_filing = Filing.find_by_id(filing.id) + assert completed_filing.status == Filing.Status.COMPLETED.value + + # Second attempt (Duplicate queue message): + # This verifies that db.session.query(Filing).with_for_update() executes successfully against Postgres + # and properly halts execution by returning None, None when status is COMPLETED. + res1, res2 = process_filing(filing_msg) + + assert res1 is None + assert res2 is None + + +def test_process_filing_locked(app, session, mocker): + """Assert that process_filing propagates OperationalError when the row is locked by another transaction.""" + import time + import concurrent + from sqlalchemy.exc import OperationalError + + filing_json = copy.deepcopy(FILING_HEADER) + filing_json['filing']['header']['name'] = 'registration' + filing_json['filing']['registration'] = copy.deepcopy(REGISTRATION) + filing_json['filing']['registration']['startDate'] = '2026-03-19' + + filing = create_filing('123', filing_json) + filing_msg = FilingMessage(filing_identifier=filing.id) + + def mock_get_next_corp_num(legal_type): + time.sleep(3) # simulate delay + return "FM1234567" + + mocker.patch( + 'business_filer.filing_processors.filing_components.business_info.get_next_corp_num', + side_effect=mock_get_next_corp_num + ) + + from business_filer.common.services import NaicsService + naics_response = { + 'code': REGISTRATION['business']['naics']['naicsCode'], + 'naicsKey': 'a4667c26-d639-42fa-8af3-7ec73e392569' + } + mocker.patch.object(NaicsService, 'find_by_code', return_value=naics_response) + + mocker.patch('business_filer.filing_processors.filing_components.business_profile.update_business_profile', return_value=None) + mocker.patch('business_filer.filing_processors.filing_components.business_profile.update_affiliation', return_value=None) + + + def run_process(): + with app.app_context(): + try: + process_filing(filing_msg) + return "SUCCESS" + except Exception as e: + import traceback + return traceback.format_exc() + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + f1 = executor.submit(run_process) + f2 = executor.submit(run_process) + + res1 = f1.result() + res2 = f2.result() + + assert res1 == "SUCCESS", f"Thread 1 failed with: {res1}" + assert res2 == "SUCCESS", f"Thread 2 failed with: {res2}" From 00fc910a59fbffe26d47e1080e36359b07c32d0c Mon Sep 17 00:00:00 2001 From: Vysakh Menon Date: Fri, 20 Mar 2026 16:49:03 -0700 Subject: [PATCH 2/3] no message --- .../src/business_filer/common/services/naics.py | 4 +++- .../tests/unit/test_filer/test_process_filing_core.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/queue_services/business-filer/src/business_filer/common/services/naics.py b/queue_services/business-filer/src/business_filer/common/services/naics.py index 62b7bde1bb..1053e1c327 100644 --- a/queue_services/business-filer/src/business_filer/common/services/naics.py +++ b/queue_services/business-filer/src/business_filer/common/services/naics.py @@ -18,6 +18,8 @@ import requests from flask import current_app +from business_filer.exceptions import QueueException + from .account_service import AccountService @@ -30,7 +32,7 @@ def find_by_code(naics_code: str): naics_url = current_app.config.get("NAICS_API_URL") token = AccountService.get_bearer_token() if not token: - raise Exception("Failed to get token for naics call") + raise QueueException("Failed to get token for naics call") response = requests.get(naics_url + "/" + naics_code, headers={ "Content-Type": "application/json", "Authorization": "Bearer " + token diff --git a/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py b/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py index 4cceda2b9c..f6b1963ffc 100644 --- a/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py +++ b/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py @@ -96,7 +96,7 @@ def run_process(): try: process_filing(filing_msg) return "SUCCESS" - except Exception as e: + except Exception: import traceback return traceback.format_exc() From 1365705daaee799684a4ae35a7a7044f231fd908 Mon Sep 17 00:00:00 2001 From: Vysakh Menon Date: Mon, 23 Mar 2026 15:00:23 -0700 Subject: [PATCH 3/3] no message --- .../business-filer/src/business_filer/services/filer.py | 2 +- .../tests/unit/test_filer/test_process_filing_core.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/queue_services/business-filer/src/business_filer/services/filer.py b/queue_services/business-filer/src/business_filer/services/filer.py index f40ae0c5c4..3a16572a08 100644 --- a/queue_services/business-filer/src/business_filer/services/filer.py +++ b/queue_services/business-filer/src/business_filer/services/filer.py @@ -100,7 +100,7 @@ def get_filing_types(legal_filings: dict): def process_filing(filing_message: FilingMessage): # noqa: PLR0915, PLR0912 """Render the filings contained in the submission.""" if not (filing_submission := db.session.query(Filing) - .with_for_update() + .with_for_update(nowait=True) .filter_by(id=filing_message.filing_identifier) .first() ): diff --git a/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py b/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py index f6b1963ffc..7b918a33e6 100644 --- a/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py +++ b/queue_services/business-filer/tests/unit/test_filer/test_process_filing_core.py @@ -107,5 +107,10 @@ def run_process(): res1 = f1.result() res2 = f2.result() - assert res1 == "SUCCESS", f"Thread 1 failed with: {res1}" - assert res2 == "SUCCESS", f"Thread 2 failed with: {res2}" + results = [res1, res2] + + success_count = sum(1 for r in results if r == "SUCCESS") + error_count = sum(1 for r in results if "OperationalError" in r) + + assert success_count == 1, f"Expected exactly one thread to succeed, got: {results}" + assert error_count == 1, f"Expected exactly one thread to hit lock timeout (OperationalError), got: {results}"