Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import requests
from flask import current_app

from business_filer.exceptions import QueueException

from .account_service import AccountService


Expand All @@ -27,15 +29,13 @@ class NaicsService:
@staticmethod
def find_by_code(naics_code: str):
"""Return NAICS Structure matching code."""
try:
naics_url = current_app.config.get("NAICS_API_URL")
token = AccountService.get_bearer_token()
response = requests.get(naics_url + "/" + naics_code, headers={
"Content-Type": "application/json",
"Authorization": "Bearer " + token
})
response.raise_for_status()
return response.json()
except Exception as err:
current_app.logger.error(err)
return None
naics_url = current_app.config.get("NAICS_API_URL")
token = AccountService.get_bearer_token()
if not token:
raise QueueException("Failed to get token for naics call")
response = requests.get(naics_url + "/" + naics_code, headers={
"Content-Type": "application/json",
"Authorization": "Bearer " + token
})
response.raise_for_status()
return response.json()
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def worker():
process_filing(filing_message)
except Exception as err: # pylint: disable=broad-exception-caught
current_app.logger.error(f"Error processing filing {filing_message}: {err}")
current_app.logger.debug(traceback.format_exc())
current_app.logger.debug(f"{filing_message}: {traceback.format_exc()}")
return {"error": f"Unable to process filing: {filing_message}"}, HTTPStatus.INTERNAL_SERVER_ERROR

# Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ def get_filing_types(legal_filings: dict):

def process_filing(filing_message: FilingMessage): # noqa: PLR0915, PLR0912
"""Render the filings contained in the submission."""
if not (filing_submission := Filing.find_by_id(filing_message.filing_identifier)):
if not (filing_submission := db.session.query(Filing)
.with_for_update(nowait=True)
.filter_by(id=filing_message.filing_identifier)
.first()
):
current_app.logger.error(f"No filing found for: {filing_message}")
raise DefaultError(error_text=f"filing not found for {filing_message.filing_identifier}")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright © 2026 Province of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Test Suites to ensure that the core process_filing wrapper is operating correctly."""
import pytest
import copy
from registry_schemas.example_data import ANNUAL_REPORT, REGISTRATION, FILING_HEADER

from business_filer.common.filing_message import FilingMessage
from business_filer.exceptions import DefaultError
from business_filer.services.filer import process_filing
from business_model.models import Filing
from tests.unit import create_business, create_filing


def test_process_filing_not_found(app, session):
"""Assert that a DefaultError is raised when filing is not found."""
filing_msg = FilingMessage(filing_identifier=999999)
with pytest.raises(DefaultError) as excinfo:
process_filing(filing_msg)
assert "filing not found for 999999" in excinfo.value.error_text


def test_process_filing_duplicate(app, session, mocker):
"""Assert that process_filing handles duplicate messages and the DB query succeeds without mocking."""
# mock out the event publishing so we don't try to send emails
mocker.patch('business_filer.services.publish_event.PublishEvent.publish_email_message', return_value=None)
mocker.patch('business_filer.services.publish_event.PublishEvent.publish_event', return_value=None)

business = create_business(identifier="CP1234567")
filing = create_filing("payment123", ANNUAL_REPORT, business.id)
filing_msg = FilingMessage(filing_identifier=filing.id)

# First attempt: Processes successfully and commits (triggering DB updates)
process_filing(filing_msg)

# Verify status changed to completed in DB
completed_filing = Filing.find_by_id(filing.id)
assert completed_filing.status == Filing.Status.COMPLETED.value

# Second attempt (Duplicate queue message):
# This verifies that db.session.query(Filing).with_for_update() executes successfully against Postgres
# and properly halts execution by returning None, None when status is COMPLETED.
res1, res2 = process_filing(filing_msg)

assert res1 is None
assert res2 is None


def test_process_filing_locked(app, session, mocker):
"""Assert that process_filing propagates OperationalError when the row is locked by another transaction."""
import time
import concurrent
from sqlalchemy.exc import OperationalError

filing_json = copy.deepcopy(FILING_HEADER)
filing_json['filing']['header']['name'] = 'registration'
filing_json['filing']['registration'] = copy.deepcopy(REGISTRATION)
filing_json['filing']['registration']['startDate'] = '2026-03-19'

filing = create_filing('123', filing_json)
filing_msg = FilingMessage(filing_identifier=filing.id)

def mock_get_next_corp_num(legal_type):
time.sleep(3) # simulate delay
return "FM1234567"

mocker.patch(
'business_filer.filing_processors.filing_components.business_info.get_next_corp_num',
side_effect=mock_get_next_corp_num
)

from business_filer.common.services import NaicsService
naics_response = {
'code': REGISTRATION['business']['naics']['naicsCode'],
'naicsKey': 'a4667c26-d639-42fa-8af3-7ec73e392569'
}
mocker.patch.object(NaicsService, 'find_by_code', return_value=naics_response)

mocker.patch('business_filer.filing_processors.filing_components.business_profile.update_business_profile', return_value=None)
mocker.patch('business_filer.filing_processors.filing_components.business_profile.update_affiliation', return_value=None)


def run_process():
with app.app_context():
try:
process_filing(filing_msg)
return "SUCCESS"
except Exception:
import traceback
return traceback.format_exc()

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
f1 = executor.submit(run_process)
f2 = executor.submit(run_process)

res1 = f1.result()
res2 = f2.result()

results = [res1, res2]

success_count = sum(1 for r in results if r == "SUCCESS")
error_count = sum(1 for r in results if "OperationalError" in r)

assert success_count == 1, f"Expected exactly one thread to succeed, got: {results}"
assert error_count == 1, f"Expected exactly one thread to hit lock timeout (OperationalError), got: {results}"