-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmigrate_indexes.py
More file actions
141 lines (117 loc) · 4.81 KB
/
migrate_indexes.py
File metadata and controls
141 lines (117 loc) · 4.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
"""
Database index migration for Commerce System Demo.
Creates performance indexes not managed by SQLAlchemy's create_all (GIN/trigram)
and ensures all model-declared B-tree indexes exist in the running database.
Safe to run multiple times — all statements use IF NOT EXISTS.
Usage (from project root):
python scripts/migrate_indexes.py [--database-url URL]
DATABASE_URL from the environment / .env is used when --database-url is omitted.
"""
from __future__ import annotations
import argparse
import asyncio
import sys
import asyncpg
from sqlalchemy.ext.asyncio import create_async_engine
from app.db.base import Base
# ---------------------------------------------------------------------------
# Index definitions
# Each entry: (human description, DDL statement)
# Statements run outside any transaction so CONCURRENTLY is always safe.
# ---------------------------------------------------------------------------
_MIGRATIONS: list[tuple[str, str]] = [
# pg_trgm must exist before the GIN index can be created.
(
"extension pg_trgm",
"CREATE EXTENSION IF NOT EXISTS pg_trgm",
),
# B-tree indexes declared in the SQLAlchemy models.
# create_all creates these for new databases; this backfills existing ones.
(
"ix_product_price (B-tree, price range queries / COUNT skip)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_product_price"
" ON product(price)",
),
(
"ix_product_category_id (B-tree, category filter join)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_product_category_id"
" ON product(category_id)",
),
(
"ix_category_parent_id (B-tree, recursive CTE join)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_category_parent_id"
" ON category(parent_id)",
),
# New GIN trigram index — enables index-accelerated ILIKE '%text%' search.
(
"ix_product_title_trgm (GIN trigram, ILIKE title search)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_product_title_trgm"
" ON product USING GIN (title gin_trgm_ops)",
),
]
def _asyncpg_url(database_url: str) -> str:
"""Remove the SQLAlchemy dialect prefix so asyncpg can use the DSN directly."""
return database_url.replace("postgresql+asyncpg://", "postgresql://", 1)
async def _run(database_url: str) -> None:
"""Execute index and extension migrations against the target database."""
url = _asyncpg_url(database_url)
# Mask password in printed output.
safe_url = url.split("@", 1)[-1] if "@" in url else url
print(f"Connecting to {safe_url} …")
# Ensure base tables exist on first boot. The app service also runs
# create_all at startup, but it depends on this migration service.
import app.models # noqa: F401
engine = create_async_engine(database_url, future=True)
try:
async with engine.begin() as schema_conn:
await schema_conn.run_sync(Base.metadata.create_all)
finally:
await engine.dispose()
# asyncpg operates in autocommit mode by default (no BEGIN/COMMIT wrapper),
# which is required for CREATE INDEX CONCURRENTLY.
conn = await asyncpg.connect(url)
try:
errors: list[str] = []
for description, sql in _MIGRATIONS:
print(f" {description} … ", end="", flush=True)
try:
await conn.execute(sql)
print("ok")
except asyncpg.exceptions.UniqueViolationError:
print("already exists")
except Exception as exc:
print(f"FAILED\n {exc}")
errors.append(description)
print()
if errors:
print(f"Completed with {len(errors)} error(s):", file=sys.stderr)
for name in errors:
print(f" - {name}", file=sys.stderr)
sys.exit(1)
else:
print("All migrations applied successfully.")
finally:
await conn.close()
def main() -> None:
"""Parse inputs, resolve DB URL, and run async migration execution."""
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
"--database-url",
metavar="URL",
help="postgresql+asyncpg:// or postgresql:// DSN (overrides DATABASE_URL / .env)",
)
args = parser.parse_args()
if args.database_url:
database_url = args.database_url
else:
try:
from app.core.config import get_settings # noqa: PLC0415
database_url = get_settings().database_url
except Exception as exc:
print(f"Could not load app settings: {exc}", file=sys.stderr)
print("Pass --database-url or set DATABASE_URL.", file=sys.stderr)
sys.exit(1)
asyncio.run(_run(database_url))
if __name__ == "__main__":
main()