Skip to content

Commit edf2448

Browse files
committed
Improve test robustness
- Used shared test helpers more consistently - Centralized port assignment to avoid conflicts - Improve retry behaviour in get_connection
1 parent 4fd500a commit edf2448

15 files changed

Lines changed: 459 additions & 427 deletions

conftest.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""Repo-wide pytest configuration.
2+
3+
This file applies to all tests in the repository (including package-level
4+
`*/tests` folders). It is intentionally minimal and focuses on integration-test
5+
hygiene.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import os
11+
12+
import pytest
13+
14+
15+
# Make shared fixtures/helpers from tests.integration_utils available everywhere.
16+
pytest_plugins = ["tests.integration_utils"]
17+
18+
19+
def _truthy_env(name: str) -> bool:
20+
return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"}
21+
22+
23+
@pytest.fixture(scope="session", autouse=True)
24+
def _cleanup_test_db_containers_session():
25+
"""Clean up ggmpilot test DB containers before and after the session.
26+
27+
CI failures due to "port is already allocated" or inability to start new
28+
containers are often caused by leftover `*-docker-db-*` containers/volumes
29+
from previous runs. This cleanup is best-effort and only runs when:
30+
31+
- Docker is reachable, and
32+
- either `RUN_SLOW_TESTS` is enabled or we are in CI.
33+
34+
To opt out locally, set `GGMPILOT_KEEP_TEST_CONTAINERS=1`.
35+
"""
36+
37+
from tests.integration_utils import (
38+
cleanup_all_test_db_containers,
39+
docker_running,
40+
slow_tests_enabled,
41+
)
42+
43+
# In pytest-xdist, this session fixture would run in every worker process.
44+
# Only run the global cleanup in the master/controller process to avoid
45+
# workers deleting containers used by other workers.
46+
if os.getenv("PYTEST_XDIST_WORKER"):
47+
yield
48+
return
49+
50+
if _truthy_env("GGMPILOT_KEEP_TEST_CONTAINERS"):
51+
yield
52+
return
53+
54+
should_cleanup = docker_running() and (slow_tests_enabled() or _truthy_env("CI"))
55+
if should_cleanup:
56+
cleanup_all_test_db_containers()
57+
58+
yield
59+
60+
if should_cleanup:
61+
cleanup_all_test_db_containers()

dev_sql_server/get_connection.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,13 @@
2929
# Driver‑specific helpers
3030
# ────────────────────────────────────────────────────────────────────────────────
3131
def _connect_postgres(cfg: Dict[str, Any]):
32-
return psycopg2.connect(**cfg)
32+
# psycopg2 can block for a while on connect() if no connect_timeout is set.
33+
# In CI (and especially under parallel starts) we want fast retries.
34+
cfg2 = dict(cfg)
35+
cfg2.setdefault(
36+
"connect_timeout", int(os.getenv("GGMPILOT_DB_CONNECT_TIMEOUT", "5"))
37+
)
38+
return psycopg2.connect(**cfg2)
3339

3440

3541
def _connect_oracle(cfg: Dict[str, Any]):
@@ -44,11 +50,17 @@ def _connect_oracle(cfg: Dict[str, Any]):
4450

4551

4652
def _connect_mssql(cfg: Dict[str, Any]):
53+
# Keep individual connection attempts short so _wait_for_db_ready can retry.
54+
# ODBC driver 18 defaults to Encrypt=yes; TrustServerCertificate is required
55+
# for local/dev containers with self-signed certs.
56+
connect_timeout = int(os.getenv("GGMPILOT_DB_CONNECT_TIMEOUT", "5"))
4757
conn_str = (
4858
f"DRIVER={{{SQL_SERVER_DRIVER}}};"
4959
f"SERVER={cfg['host']},{cfg['port']};"
5060
f"DATABASE={cfg['dbname']};"
5161
f"UID={cfg['user']};PWD={cfg['password']};"
62+
f"Connection Timeout={connect_timeout};"
63+
"Encrypt=yes;"
5264
"TrustServerCertificate=yes;"
5365
)
5466
return pyodbc.connect(conn_str)
@@ -500,11 +512,19 @@ def get_connection(
500512
container_name = container_name or f"{db_type}-docker-db-{port_effective}"
501513
volume_name = volume_name or f"{container_name}_data"
502514

515+
user_supplied_max_wait = max_wait_seconds is not None
516+
503517
if max_wait_seconds is None:
518+
# Defaults are intentionally conservative for CI stability.
519+
# Callers can always override via max_wait_seconds.
504520
if db_type == "oracle":
505521
max_wait_seconds = 600
506522
elif db_type in ("mysql", "mariadb"):
507523
max_wait_seconds = 180
524+
elif db_type == "postgres":
525+
max_wait_seconds = 180
526+
elif db_type == "mssql":
527+
max_wait_seconds = 360
508528
else:
509529
max_wait_seconds = 120
510530

@@ -527,6 +547,24 @@ def get_connection(
527547
container_force_refresh,
528548
)
529549

550+
# If the container was newly created, DB initialization can take
551+
# substantially longer on cold CI runners (esp. MSSQL). Only adjust
552+
# when the caller didn't explicitly choose a timeout.
553+
if (not user_supplied_max_wait) and was_created:
554+
if db_type == "postgres":
555+
max_wait_seconds = max(max_wait_seconds, 240)
556+
elif db_type == "mssql":
557+
max_wait_seconds = max(max_wait_seconds, 480)
558+
559+
# CI runners can be slower (image pulls, constrained IO). Allow opt-in
560+
# override while keeping local runs reasonable.
561+
if not user_supplied_max_wait:
562+
if os.getenv("CI", "").strip().lower() in {"1", "true", "yes", "on"}:
563+
if db_type == "postgres":
564+
max_wait_seconds = max(max_wait_seconds, 240)
565+
elif db_type == "mssql":
566+
max_wait_seconds = max(max_wait_seconds, 480)
567+
530568
# Prepare configs for master and target DB
531569
# When running inside Docker, localhost refers to the container itself.
532570
# Prefer host.docker.internal; if not resolvable on Linux, fall back to Docker bridge gateway.
@@ -537,7 +575,9 @@ def get_connection(
537575
except Exception:
538576
host_addr = os.getenv("HOST_GATEWAY_IP", "172.17.0.1")
539577
else:
540-
host_addr = "localhost"
578+
# Prefer IPv4 loopback to avoid occasional localhost/IPv6 resolution
579+
# differences across CI runners.
580+
host_addr = "127.0.0.1"
541581

542582
# Choose appropriate admin DB/user per backend
543583
if db_type == "postgres":

odata_to_staging/tests/test_integration_odata_to_staging_postgres.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,18 @@
33
import runpy
44
import io
55
import contextlib
6-
import subprocess
76

87
import pytest
98
import requests
109
from sqlalchemy import text
1110

1211
from dev_sql_server.get_connection import get_connection
12+
from tests.integration_utils import docker_running, slow_tests_enabled, ports_dest
1313

1414

1515
NORTHWIND_V2 = "https://services.odata.org/V2/Northwind/Northwind.svc/"
1616

1717

18-
def _docker_running() -> bool:
19-
try:
20-
res = subprocess.run(
21-
["docker", "info"], capture_output=True, text=True, timeout=5
22-
)
23-
return res.returncode == 0
24-
except Exception:
25-
return False
26-
27-
28-
def _slow_tests_enabled() -> bool:
29-
return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"}
30-
31-
3218
def _northwind_available() -> bool:
3319
"""Best-effort sanity check that the Northwind service is reachable and returns JSON for an entity set.
3420
@@ -57,25 +43,27 @@ def _northwind_available() -> bool:
5743
@pytest.mark.slow
5844
@pytest.mark.postgres
5945
@pytest.mark.skipif(
60-
not _slow_tests_enabled(),
46+
not slow_tests_enabled(),
6147
reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.",
6248
)
6349
@pytest.mark.skipif(
64-
not _docker_running(),
50+
not docker_running(),
6551
reason="Docker is not available/running; required for this integration test.",
6652
)
6753
@pytest.mark.skipif(
6854
not _northwind_available(),
6955
reason="Northwind OData service not reachable; skipping to avoid flakey failures.",
7056
)
7157
def test_main_odata_to_staging_postgres(tmp_path):
58+
dst_port = ports_dest["postgres"]
59+
7260
# Start a fresh Postgres destination
7361
engine = get_connection(
7462
db_type="postgres",
7563
db_name="ggm_odata_to_staging",
7664
user="sa",
7765
password="S3cureP@ssw0rd!23243",
78-
port=5434,
66+
port=dst_port,
7967
force_refresh=True,
8068
print_tables=False,
8169
)
@@ -95,7 +83,7 @@ def test_main_odata_to_staging_postgres(tmp_path):
9583
DST_USERNAME=sa
9684
DST_PASSWORD=S3cureP@ssw0rd!23243
9785
DST_HOST=localhost
98-
DST_PORT=5434
86+
DST_PORT={dst_port}
9987
DST_DB=ggm_odata_to_staging
10088
DST_SCHEMA=staging
10189

sql_to_staging/tests/test_integration_direct_transfer_streaming.py

Lines changed: 15 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,74 +4,37 @@
44

55
import logging
66
import os
7-
import shutil
8-
import subprocess
97
import re
108

119
import pytest
12-
import docker
1310
from sqlalchemy import text
1411
from dotenv import load_dotenv
1512

1613
from dev_sql_server.get_connection import get_connection
1714
from sql_to_staging.functions.direct_transfer import direct_transfer
15+
from tests.integration_utils import (
16+
cleanup_db_container_by_port,
17+
docker_running,
18+
ports,
19+
ports_dest,
20+
slow_tests_enabled,
21+
)
1822

1923

2024
load_dotenv("tests/.env")
2125

2226

23-
def _docker_running() -> bool:
24-
if not shutil.which("docker"):
25-
return False
26-
try:
27-
res = subprocess.run(
28-
["docker", "info"], capture_output=True, text=True, timeout=5
29-
)
30-
return res.returncode == 0
31-
except Exception:
32-
return False
33-
34-
35-
def _slow_tests_enabled() -> bool:
36-
return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"}
37-
38-
3927
# Use explicit ports consistent with other integration tests
40-
SRC_PORT = 5433
41-
DST_PORT = 5434
42-
43-
44-
def _cleanup_db_containers(name: str, port: int):
45-
"""Stop and remove the container/volume our get_connection uses for a db/port pair."""
46-
client = docker.from_env()
47-
cname = f"{name}-docker-db-{port}"
48-
try:
49-
c = client.containers.get(cname)
50-
try:
51-
c.stop()
52-
except Exception:
53-
pass
54-
try:
55-
c.remove()
56-
except Exception:
57-
pass
58-
except Exception:
59-
pass
60-
# remove associated volume
61-
vname = f"{cname}_data"
62-
try:
63-
v = client.volumes.get(vname)
64-
v.remove(force=True)
65-
except Exception:
66-
pass
28+
SRC_PORT = ports["postgres"]
29+
DST_PORT = ports_dest["postgres"]
6730

6831

6932
@pytest.mark.skipif(
70-
not _slow_tests_enabled(),
33+
not slow_tests_enabled(),
7134
reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.",
7235
)
7336
@pytest.mark.skipif(
74-
not _docker_running(),
37+
not docker_running(),
7538
reason="Docker is not available/running; required for this integration test.",
7639
)
7740
@pytest.mark.parametrize(
@@ -93,8 +56,8 @@ def test_direct_transfer_streams_in_chunks_postgres(
9356
table = "stream_check"
9457

9558
# Ensure a clean slate before starting
96-
_cleanup_db_containers("postgres", SRC_PORT)
97-
_cleanup_db_containers("postgres", DST_PORT)
59+
cleanup_db_container_by_port("postgres", SRC_PORT)
60+
cleanup_db_container_by_port("postgres", DST_PORT)
9861

9962
try:
10063
# Start Postgres source and create table with many rows
@@ -164,5 +127,5 @@ def test_direct_transfer_streams_in_chunks_postgres(
164127
inserts.append((int(m.group(1)), int(m.group(2))))
165128
assert inserts == expected_batches
166129
finally:
167-
_cleanup_db_containers("postgres", SRC_PORT)
168-
_cleanup_db_containers("postgres", DST_PORT)
130+
cleanup_db_container_by_port("postgres", SRC_PORT)
131+
cleanup_db_container_by_port("postgres", DST_PORT)

sql_to_staging/tests/test_upload_parquet_mssql_datetime2.py

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,47 +13,26 @@
1313

1414
from dev_sql_server.get_connection import get_connection
1515
from utils.parquet.upload_parquet import upload_parquet
16-
17-
18-
def _docker_running() -> bool:
19-
import subprocess
20-
21-
try:
22-
res = subprocess.run(
23-
["docker", "info"], capture_output=True, text=True, timeout=5
24-
)
25-
return res.returncode == 0
26-
except Exception:
27-
return False
28-
29-
30-
def _slow_tests_enabled() -> bool:
31-
import os
32-
33-
return os.getenv("RUN_SLOW_TESTS", "0").lower() in {"1", "true", "yes", "on"}
34-
35-
36-
def _mssql_driver_available() -> bool:
37-
try:
38-
import pyodbc # noqa: F401
39-
40-
return any("ODBC Driver 18 for SQL Server" in d for d in pyodbc.drivers())
41-
except Exception:
42-
return False
16+
from tests.integration_utils import (
17+
docker_running,
18+
mssql_driver_available,
19+
port_for_worker,
20+
slow_tests_enabled,
21+
)
4322

4423

4524
@pytest.mark.slow
4625
@pytest.mark.mssql
4726
@pytest.mark.skipif(
48-
not _slow_tests_enabled(),
27+
not slow_tests_enabled(),
4928
reason="RUN_SLOW_TESTS not enabled; set to 1 to run slow integration tests.",
5029
)
5130
@pytest.mark.skipif(
52-
not _docker_running(),
31+
not docker_running(),
5332
reason="Docker is not available/running; required for this integration test.",
5433
)
5534
@pytest.mark.skipif(
56-
not _mssql_driver_available(),
35+
not mssql_driver_available(),
5736
reason="ODBC Driver 18 for SQL Server not installed; required for MSSQL test.",
5837
)
5938
def test_upload_parquet_mssql_uses_datetime2(tmp_path: Path):
@@ -63,7 +42,9 @@ def test_upload_parquet_mssql_uses_datetime2(tmp_path: Path):
6342
db_name="ggm_upload_parquet_dt2",
6443
user="sa",
6544
password="S3cureP@ssw0rd!23243",
66-
port=1436, # use a distinct port to avoid clashes with other tests
45+
port=port_for_worker(
46+
1436
47+
), # use a distinct port to avoid clashes with other tests
6748
force_refresh=True,
6849
sql_folder=None,
6950
sql_suffix_filter=True,

0 commit comments

Comments
 (0)