From 9a26bd36bdacbe7cda74eb2d7d349f858d974bb7 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Wed, 18 Mar 2026 10:16:45 -0700 Subject: [PATCH 01/12] 32822 - Extract Refresh Flow --- .../flows/refresh_extract_subset_flow.py | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 data-tool/flows/refresh_extract_subset_flow.py diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py new file mode 100644 index 0000000000..c8a1945320 --- /dev/null +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -0,0 +1,114 @@ +import argparse +from pathlib import Path +import subprocess +import sys +from prefect import flow, task +from prefect.cache_policies import NO_CACHE +from prefect.states import Failed + +_REPO_ROOT = Path(__file__).resolve().parents[2] +_SCRIPT_PATH = _REPO_ROOT / 'data-tool' / 'scripts' / 'generate_cprd_subset_extract.py' + +def _script_exists() -> None: + if not _SCRIPT_PATH.exists(): + raise FileNotFoundError( + f'CPRD subset extract script not found: {_SCRIPT_PATH}' + ) + +@task(name='Run-CPRD-Subset-Generator', cache_policy=NO_CACHE) +def run_cprd_subset_extract_generator( + corp_file: str, + mode: str = 'load', + chunk_size: int = 500, + threads: int = 4, + pg_fastload: bool = False, + pg_disabled_method: str = 'replica_role', + out: str | None = None +) -> subprocess.CompletedProcess: + """ + Generate Commands + """ + _script_exists() + corp_path = Path(corp_file).expanduser().resolve() + if not corp_path.exists(): + raise FileNotFoundError(f'Corp file not found: {corp_path}') + + argv = [ + sys.executable, + str(_SCRIPT_PATH), + '--corp-file', + str(corp_path), + '--mode', + mode, + '--chunk-size', + str(chunk_size), + '--threads', + str(threads), + '--pg-disable-method', + pg_disabled_method, + ] + if pg_fastload: + argv.append('--pg-fastload') + if out is not None: + argv.extend(['--out', str(Path(out).expanduser().resolve())]) + + result = subprocess.run( + argv, + cwd=str(_REPO_ROOT), + capture_output=False, + text=True + ) + return result + + +@flow(name='Extract-Subset-Flow', log_prints=True, persist_result=False) +def extract_pull_flow( + corp_file : str, + mode: str = 'load', + chunk_size: int = 900, + threads: int = 4, + pg_fastload: bool = False, + pg_disable_method: str = 'replica_role', + out: str | None=None, +): + """ + Generate files + """ + try: + print(f'Running CPRD subset extract generator {corp_file}') + result = run_cprd_subset_extract_generator( + corp_file=corp_file, + mode=mode, + chunk_size=chunk_size, + threads=threads, + pg_fastload=pg_fastload, + pg_disabled_method=pg_disable_method, + out=out, + ) + if result.returncode != 0: + print(f'generator exited with code {result.returncode}') + return Failed(message=f'CPRD subset extract generator exited with code {result.returncode}.') + print(f'generator completed successfully') + except Exception as e: + raise e + + + if __name__ == '__main__': + p = argparse.ArgumentParser(description='Run Extract-Pull flow....') + p.add_argument('corp_file', help='Path to newline-delimited corp identifiers') + p.add_argument('--mode', default='load', choices=('refresh', 'load')) + p.add_argument('--chunk-size', type=int, default=900, help='Max items per IN list.') + p.add_argument('--threads', type=int, default=4, help='DBSchemaCLI transfer threads') + p.add_argument('--pg-fastload', action='store_true', help='Enable Postgres fast-load') + p.add_argument('--pg-disable-method', default='reploca_role', choices=('table_triggers', 'replica_role')) + p.add_argument('--out', default=None, help='Output path for generated master script.') + args = p.parse_args() + sys.exit(extract_pull_flow( + corp_file=args.corp_file, + mode=args.mode, + chunk_size=args.chunk_size, + threads=args.threads, + pg_fastload=args.pg_fastload, + pg_disable_method=args.pg_disable_method, + out=args.out, + )) \ No newline at end of file From 182143087307fc395ff5c923721f57cb995f204c Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Thu, 19 Mar 2026 13:34:05 -0700 Subject: [PATCH 02/12] updated --- data-tool/Makefile | 3 + .../flows/refresh_extract_subset_flow.py | 103 ++++++++---------- 2 files changed, 51 insertions(+), 55 deletions(-) diff --git a/data-tool/Makefile b/data-tool/Makefile index e809d7835d..9d09acb74d 100644 --- a/data-tool/Makefile +++ b/data-tool/Makefile @@ -113,6 +113,9 @@ run-colin-freeze: ## Run colin freeze flow . $(VENV_DIR)/bin/activate && \ python flows/colin_freeze_flow.py +run-extract-pull: + . $(VENV_DIR)/bin/activate && \ + python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" ################################################################################# # Self Documenting Commands # diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index c8a1945320..453aa9b048 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -9,26 +9,23 @@ _REPO_ROOT = Path(__file__).resolve().parents[2] _SCRIPT_PATH = _REPO_ROOT / 'data-tool' / 'scripts' / 'generate_cprd_subset_extract.py' -def _script_exists() -> None: - if not _SCRIPT_PATH.exists(): - raise FileNotFoundError( - f'CPRD subset extract script not found: {_SCRIPT_PATH}' - ) - @task(name='Run-CPRD-Subset-Generator', cache_policy=NO_CACHE) def run_cprd_subset_extract_generator( - corp_file: str, - mode: str = 'load', - chunk_size: int = 500, - threads: int = 4, - pg_fastload: bool = False, - pg_disabled_method: str = 'replica_role', - out: str | None = None + corp_file: str, + mode: str = 'load', + chunk_size: int = 500, + threads: int = 4, + pg_fastload: bool = False, + pg_disabled_method: str = 'replica_role', + out: str | None = None ) -> subprocess.CompletedProcess: """ Generate Commands """ - _script_exists() + if not _SCRIPT_PATH.exists(): + raise FileNotFoundError( + f'CPRD subset extract script not found: {_SCRIPT_PATH}' + ) corp_path = Path(corp_file).expanduser().resolve() if not corp_path.exists(): raise FileNotFoundError(f'Corp file not found: {corp_path}') @@ -52,18 +49,16 @@ def run_cprd_subset_extract_generator( if out is not None: argv.extend(['--out', str(Path(out).expanduser().resolve())]) - result = subprocess.run( + return subprocess.run( argv, cwd=str(_REPO_ROOT), capture_output=False, - text=True + text=True, ) - return result - @flow(name='Extract-Subset-Flow', log_prints=True, persist_result=False) def extract_pull_flow( - corp_file : str, + corp_file: str, mode: str = 'load', chunk_size: int = 900, threads: int = 4, @@ -74,41 +69,39 @@ def extract_pull_flow( """ Generate files """ - try: - print(f'Running CPRD subset extract generator {corp_file}') - result = run_cprd_subset_extract_generator( - corp_file=corp_file, - mode=mode, - chunk_size=chunk_size, - threads=threads, - pg_fastload=pg_fastload, - pg_disabled_method=pg_disable_method, - out=out, - ) - if result.returncode != 0: - print(f'generator exited with code {result.returncode}') - return Failed(message=f'CPRD subset extract generator exited with code {result.returncode}.') - print(f'generator completed successfully') - except Exception as e: - raise e + print(f'Running CPRD subset extract generator {corp_file}') + result = run_cprd_subset_extract_generator( + corp_file=corp_file, + mode=mode, + chunk_size=chunk_size, + threads=threads, + pg_fastload=pg_fastload, + pg_disabled_method=pg_disable_method, + out=out, + ) + if result.returncode != 0: + print(f'generator exited with code {result.returncode}') + return Failed(message=f'CPRD subset extract generator exited with code {result.returncode}.') + print(f'generator completed successfully') + - if __name__ == '__main__': - p = argparse.ArgumentParser(description='Run Extract-Pull flow....') - p.add_argument('corp_file', help='Path to newline-delimited corp identifiers') - p.add_argument('--mode', default='load', choices=('refresh', 'load')) - p.add_argument('--chunk-size', type=int, default=900, help='Max items per IN list.') - p.add_argument('--threads', type=int, default=4, help='DBSchemaCLI transfer threads') - p.add_argument('--pg-fastload', action='store_true', help='Enable Postgres fast-load') - p.add_argument('--pg-disable-method', default='reploca_role', choices=('table_triggers', 'replica_role')) - p.add_argument('--out', default=None, help='Output path for generated master script.') - args = p.parse_args() - sys.exit(extract_pull_flow( - corp_file=args.corp_file, - mode=args.mode, - chunk_size=args.chunk_size, - threads=args.threads, - pg_fastload=args.pg_fastload, - pg_disable_method=args.pg_disable_method, - out=args.out, - )) \ No newline at end of file +if __name__ == '__main__': + p = argparse.ArgumentParser(description='Run Extract-Pull flow....') + p.add_argument('corp_file', help='Path to newline-delimited corp identifiers') + p.add_argument('--mode', default='load', choices=('refresh', 'load')) + p.add_argument('--chunk-size', type=int, default=900, help='Max items per IN list.') + p.add_argument('--threads', type=int, default=4, help='DBSchemaCLI transfer threads') + p.add_argument('--pg-fastload', action='store_true', help='Enable Postgres fast-load') + p.add_argument('--pg-disable-method', default='reploca_role', choices=('table_triggers', 'replica_role')) + p.add_argument('--out', default=None, help='Output path for generated master script.') + args = p.parse_args() + extract_pull_flow( + corp_file=args.corp_file, + mode=args.mode, + chunk_size=args.chunk_size, + threads=args.threads, + pg_fastload=args.pg_fastload, + pg_disable_method=args.pg_disable_method, + out=args.out, + ) \ No newline at end of file From d531fb9980105750de59fde100077adb4cfd50c6 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Thu, 19 Mar 2026 13:41:14 -0700 Subject: [PATCH 03/12] updated --- data-tool/flows/refresh_extract_subset_flow.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 453aa9b048..69daaf6c5f 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -23,9 +23,9 @@ def run_cprd_subset_extract_generator( Generate Commands """ if not _SCRIPT_PATH.exists(): - raise FileNotFoundError( - f'CPRD subset extract script not found: {_SCRIPT_PATH}' - ) + raise FileNotFoundError( + f'CPRD subset extract script not found: {_SCRIPT_PATH}' + ) corp_path = Path(corp_file).expanduser().resolve() if not corp_path.exists(): raise FileNotFoundError(f'Corp file not found: {corp_path}') @@ -65,7 +65,7 @@ def extract_pull_flow( pg_fastload: bool = False, pg_disable_method: str = 'replica_role', out: str | None=None, -): +) -> None: """ Generate files """ @@ -80,8 +80,7 @@ def extract_pull_flow( out=out, ) if result.returncode != 0: - print(f'generator exited with code {result.returncode}') - return Failed(message=f'CPRD subset extract generator exited with code {result.returncode}.') + raise RuntimeError(f'Generator exited with code {result.returncode}') print(f'generator completed successfully') @@ -93,7 +92,7 @@ def extract_pull_flow( p.add_argument('--chunk-size', type=int, default=900, help='Max items per IN list.') p.add_argument('--threads', type=int, default=4, help='DBSchemaCLI transfer threads') p.add_argument('--pg-fastload', action='store_true', help='Enable Postgres fast-load') - p.add_argument('--pg-disable-method', default='reploca_role', choices=('table_triggers', 'replica_role')) + p.add_argument('--pg-disable-method', default='replica_role', choices=('table_triggers', 'replica_role')) p.add_argument('--out', default=None, help='Output path for generated master script.') args = p.parse_args() extract_pull_flow( From 22af33aff3441a268fa6a12bb0db80d7b5f106a6 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Thu, 19 Mar 2026 13:54:45 -0700 Subject: [PATCH 04/12] updated for refresh command --- data-tool/Makefile | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/data-tool/Makefile b/data-tool/Makefile index 9d09acb74d..81555399ba 100644 --- a/data-tool/Makefile +++ b/data-tool/Makefile @@ -113,10 +113,14 @@ run-colin-freeze: ## Run colin freeze flow . $(VENV_DIR)/bin/activate && \ python flows/colin_freeze_flow.py -run-extract-pull: +run-extract-load: . $(VENV_DIR)/bin/activate && \ python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" +run-extract-refresh: + . $(VENV_DIR)/bin/activate && \ + python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" --mode refresh + ################################################################################# # Self Documenting Commands # ################################################################################# From 311c865905c7315a2a68109b09a06d02099ad87b Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Thu, 19 Mar 2026 14:49:16 -0700 Subject: [PATCH 05/12] dbschemacli subprocess on refresh --- data-tool/Makefile | 2 +- .../flows/refresh_extract_subset_flow.py | 34 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/data-tool/Makefile b/data-tool/Makefile index 81555399ba..b305718d37 100644 --- a/data-tool/Makefile +++ b/data-tool/Makefile @@ -119,7 +119,7 @@ run-extract-load: run-extract-refresh: . $(VENV_DIR)/bin/activate && \ - python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" --mode refresh + python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" --mode refresh --run-dbschemacli --dbschemacli-cmd "$${DBSCHEMACLI_CMD:-dbschemacli}" ################################################################################# # Self Documenting Commands # diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 69daaf6c5f..100a633fd4 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -8,6 +8,12 @@ _REPO_ROOT = Path(__file__).resolve().parents[2] _SCRIPT_PATH = _REPO_ROOT / 'data-tool' / 'scripts' / 'generate_cprd_subset_extract.py' +_GENERATED_DIR = _REPO_ROOT / 'data-tool' / 'scripts' / 'generated' + +def _resolve_master_script_path(mode: str, out: str | None) -> Path: + if out: + return Path(out).expanduser().resolve + return (_GENERATED_DIR / f'subset_{mode}.sql').resolve() @task(name='Run-CPRD-Subset-Generator', cache_policy=NO_CACHE) def run_cprd_subset_extract_generator( @@ -56,6 +62,19 @@ def run_cprd_subset_extract_generator( text=True, ) +@task(name='DBSchemaCLI', cache_policy=NO_CACHE) +def run_dbschemacli_task(master_script: str, dbschemacli_cmd: str = 'dbschemacli') -> subprocess.CompletedProcess: + master_script_path = Path(master_script) + if not master_script_path.exists(): + raise FileNotFoundError(f'Generated script not found: {master_script_path}') + print(f'Running: {dbschemacli_cmd} {master_script_path}') + return subprocess.run( + [dbschemacli_cmd, str(master_script_path)], + cwd=str(_REPO_ROOT), + capture_output=False, + text=True + ) + @flow(name='Extract-Subset-Flow', log_prints=True, persist_result=False) def extract_pull_flow( corp_file: str, @@ -65,6 +84,8 @@ def extract_pull_flow( pg_fastload: bool = False, pg_disable_method: str = 'replica_role', out: str | None=None, + run_dbschemacli: bool = False, + dbschemacli_cmd: str = 'dbschemacli', ) -> None: """ Generate files @@ -83,6 +104,15 @@ def extract_pull_flow( raise RuntimeError(f'Generator exited with code {result.returncode}') print(f'generator completed successfully') + if run_dbschemacli: + master_script = _resolve_master_script_path(mode=mode, out=out) + run_result = run_dbschemacli_task( + master_script=str(master_script), + dbschemacli_cmd=dbschemacli_cmd, + ) + if run_result.returncode != 0: + raise RuntimeError(f'DbSchemaCLI exited with code {run_result.returncode}') + if __name__ == '__main__': @@ -94,6 +124,8 @@ def extract_pull_flow( p.add_argument('--pg-fastload', action='store_true', help='Enable Postgres fast-load') p.add_argument('--pg-disable-method', default='replica_role', choices=('table_triggers', 'replica_role')) p.add_argument('--out', default=None, help='Output path for generated master script.') + p.add_argument('--run-dbschemacli', action='store_true') + p.add_argument('--dbschemacli-cmd', default='dbschemacli') args = p.parse_args() extract_pull_flow( corp_file=args.corp_file, @@ -103,4 +135,6 @@ def extract_pull_flow( pg_fastload=args.pg_fastload, pg_disable_method=args.pg_disable_method, out=args.out, + run_dbschemacli=args.run_dbschemacli, + dbschemacli_cmd=args.dbschemacli_cmd, ) \ No newline at end of file From 210dea4efa80cf0b9a0b310eead95df2b3542763 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Tue, 24 Mar 2026 14:19:16 -0700 Subject: [PATCH 06/12] added prev subset cleanup --- .../flows/refresh_extract_subset_flow.py | 73 +++++++++++++++---- 1 file changed, 58 insertions(+), 15 deletions(-) diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 100a633fd4..771ee318ce 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -1,40 +1,83 @@ import argparse +import os from pathlib import Path +import re import subprocess import sys from prefect import flow, task from prefect.cache_policies import NO_CACHE from prefect.states import Failed - +from flask import current_app _REPO_ROOT = Path(__file__).resolve().parents[2] _SCRIPT_PATH = _REPO_ROOT / 'data-tool' / 'scripts' / 'generate_cprd_subset_extract.py' _GENERATED_DIR = _REPO_ROOT / 'data-tool' / 'scripts' / 'generated' +_DEFAULT_DDL = _REPO_ROOT / 'data-tool' / 'scripts' / 'colin_corps_extract_postgres_ddl' def _resolve_master_script_path(mode: str, out: str | None) -> Path: if out: - return Path(out).expanduser().resolve + return Path(out).expanduser().resolve() return (_GENERATED_DIR / f'subset_{mode}.sql').resolve() +def _run_cmd(argv: list[str], env: dict[str, str] | None = None) -> None: + r = subprocess.run(argv, cwd=str(_REPO_ROOT), capture_output=False, text=True, env=env) + if r.returncode != 0: + raise RuntimeError(f'command failed ({r.returncode}): {" ".join(argv)}') + +def require_file(path: str | Path, description: str) -> Path: + """File Not Found Error""" + resolved = Path(path).expanduser().resolve() + if not resolved.is_file(): + raise FileNotFoundError(f'{description} not found (expected a file): {resolved}') + return resolved + + +def _reset_extract_postgres_db() -> None: + dbname = current_app.config.DATABASE_NAME_COLIN_MIGR + host = current_app.config.DATABASE_NAME_COLIN_MIGR + port = current_app.config.DATABASE_PORT_COLIN_MIGR + user = current_app.config.DATABASE_USERNAME_COLIN_MIGR + password = current_app.config.DATABASE_PASSWORD_COLIN_MIGR + + require_file(_DEFAULT_DDL, 'Extract DDL File') + + pg_flags = ['-h', host, '-p', str(port), '-U', user] + run_env = dict(os.environ) + if password and 'PGPASSWORD' not in run_env: + run_env['PGPASSWORD'] = password + terminate_sql = { + "SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity" + f"WHERE datname = {dbname} AND pid <> pg_backend_pid();" + } + _run_cmd(['psql', *pg_flags, '-d', 'postgres', '-c', terminate_sql ], env=run_env) + _run_cmd(['dropdb', *pg_flags, '-d', '--if-exists', '-c', dbname ], env=run_env) + _run_cmd(['createdb', *pg_flags, '-d', '-T', 'template0', dbname ], env=run_env) + _run_cmd(['psql', *pg_flags, '-d', dbname, '-v', 'ON_ERROR_STOP=1', '-f', str[_DEFAULT_DDL] ], env=run_env) + +@task(name='Cleanup-Extract-Postgres', cache_policy=NO_CACHE) +def cleanup_extract_postgres_db() -> None: + _reset_extract_postgres_db() + @task(name='Run-CPRD-Subset-Generator', cache_policy=NO_CACHE) def run_cprd_subset_extract_generator( corp_file: str, - mode: str = 'load', - chunk_size: int = 500, - threads: int = 4, - pg_fastload: bool = False, - pg_disabled_method: str = 'replica_role', - out: str | None = None + mode: str , + chunk_size: int, + threads: int, + pg_fastload: bool, + pg_disabled_method: str, + out: str | None, + reset_extract_postgres: bool, + run_dbschemacli: bool, + dbschemacli_cmd: str, ) -> subprocess.CompletedProcess: """ Generate Commands """ - if not _SCRIPT_PATH.exists(): - raise FileNotFoundError( - f'CPRD subset extract script not found: {_SCRIPT_PATH}' - ) - corp_path = Path(corp_file).expanduser().resolve() - if not corp_path.exists(): - raise FileNotFoundError(f'Corp file not found: {corp_path}') + if reset_extract_postgres: + reset_extract_postgres() + + require_file(_SCRIPT_PATH, 'Generated script') + corp_path =require_file(corp_file, 'Corp list file') argv = [ sys.executable, From 36e54afe23ec685019b460a2f1ee71cee882f8b8 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Tue, 24 Mar 2026 14:31:15 -0700 Subject: [PATCH 07/12] updated --- data-tool/flows/refresh_extract_subset_flow.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 771ee318ce..93f0e97b44 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -8,6 +8,7 @@ from prefect.cache_policies import NO_CACHE from prefect.states import Failed from flask import current_app +from config import get_named_config _REPO_ROOT = Path(__file__).resolve().parents[2] _SCRIPT_PATH = _REPO_ROOT / 'data-tool' / 'scripts' / 'generate_cprd_subset_extract.py' _GENERATED_DIR = _REPO_ROOT / 'data-tool' / 'scripts' / 'generated' @@ -32,11 +33,12 @@ def require_file(path: str | Path, description: str) -> Path: def _reset_extract_postgres_db() -> None: - dbname = current_app.config.DATABASE_NAME_COLIN_MIGR - host = current_app.config.DATABASE_NAME_COLIN_MIGR - port = current_app.config.DATABASE_PORT_COLIN_MIGR - user = current_app.config.DATABASE_USERNAME_COLIN_MIGR - password = current_app.config.DATABASE_PASSWORD_COLIN_MIGR + cfg = get_named_config() + dbname = cfg.DATABASE_NAME_COLIN_MIGR + host = cfg.DATABASE_HOST_COLIN_MIGR + port = str(cfg.DATABASE_PORT_COLIN_MIGR) + user = cfg.DATABASE_USERNAME_COLIN_MIGR + password = cfg.DATABASE_PASSWORD_COLIN_MIGR require_file(_DEFAULT_DDL, 'Extract DDL File') @@ -44,10 +46,10 @@ def _reset_extract_postgres_db() -> None: run_env = dict(os.environ) if password and 'PGPASSWORD' not in run_env: run_env['PGPASSWORD'] = password - terminate_sql = { + terminate_sql = ( "SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity" f"WHERE datname = {dbname} AND pid <> pg_backend_pid();" - } + ) _run_cmd(['psql', *pg_flags, '-d', 'postgres', '-c', terminate_sql ], env=run_env) _run_cmd(['dropdb', *pg_flags, '-d', '--if-exists', '-c', dbname ], env=run_env) _run_cmd(['createdb', *pg_flags, '-d', '-T', 'template0', dbname ], env=run_env) From b61faf5f4d195f2f22a5365a0ac09ea2177ffff5 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Tue, 24 Mar 2026 14:37:06 -0700 Subject: [PATCH 08/12] updated --- .../flows/refresh_extract_subset_flow.py | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 93f0e97b44..ce53f52f33 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -62,22 +62,16 @@ def cleanup_extract_postgres_db() -> None: @task(name='Run-CPRD-Subset-Generator', cache_policy=NO_CACHE) def run_cprd_subset_extract_generator( corp_file: str, - mode: str , + mode: str, chunk_size: int, threads: int, pg_fastload: bool, - pg_disabled_method: str, - out: str | None, - reset_extract_postgres: bool, - run_dbschemacli: bool, - dbschemacli_cmd: str, + pg_disable_method: str, + out: str | None ) -> subprocess.CompletedProcess: """ Generate Commands """ - if reset_extract_postgres: - reset_extract_postgres() - require_file(_SCRIPT_PATH, 'Generated script') corp_path =require_file(corp_file, 'Corp list file') @@ -93,7 +87,7 @@ def run_cprd_subset_extract_generator( '--threads', str(threads), '--pg-disable-method', - pg_disabled_method, + pg_disable_method, ] if pg_fastload: argv.append('--pg-fastload') @@ -117,7 +111,7 @@ def run_dbschemacli_task(master_script: str, dbschemacli_cmd: str = 'dbschemacli [dbschemacli_cmd, str(master_script_path)], cwd=str(_REPO_ROOT), capture_output=False, - text=True + text=True, ) @flow(name='Extract-Subset-Flow', log_prints=True, persist_result=False) @@ -131,10 +125,14 @@ def extract_pull_flow( out: str | None=None, run_dbschemacli: bool = False, dbschemacli_cmd: str = 'dbschemacli', + reset_extract_postgres: bool = False, ) -> None: """ Generate files """ + if reset_extract_postgres: + cleanup_extract_postgres_db() + print(f'Running CPRD subset extract generator {corp_file}') result = run_cprd_subset_extract_generator( corp_file=corp_file, @@ -142,7 +140,7 @@ def extract_pull_flow( chunk_size=chunk_size, threads=threads, pg_fastload=pg_fastload, - pg_disabled_method=pg_disable_method, + pg_disable_method=pg_disable_method, out=out, ) if result.returncode != 0: @@ -171,15 +169,5 @@ def extract_pull_flow( p.add_argument('--out', default=None, help='Output path for generated master script.') p.add_argument('--run-dbschemacli', action='store_true') p.add_argument('--dbschemacli-cmd', default='dbschemacli') - args = p.parse_args() - extract_pull_flow( - corp_file=args.corp_file, - mode=args.mode, - chunk_size=args.chunk_size, - threads=args.threads, - pg_fastload=args.pg_fastload, - pg_disable_method=args.pg_disable_method, - out=args.out, - run_dbschemacli=args.run_dbschemacli, - dbschemacli_cmd=args.dbschemacli_cmd, - ) \ No newline at end of file + p.add_argument('--reset-extract-postgres', action='store_true') + extract_pull_flow(**vars(p.parse_args())) \ No newline at end of file From 95bb4a3afb92f68a0f424eed26e9cb0987f025d2 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Tue, 24 Mar 2026 14:41:48 -0700 Subject: [PATCH 09/12] updated --- data-tool/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-tool/Makefile b/data-tool/Makefile index b305718d37..55a8ee67be 100644 --- a/data-tool/Makefile +++ b/data-tool/Makefile @@ -115,7 +115,7 @@ run-colin-freeze: ## Run colin freeze flow run-extract-load: . $(VENV_DIR)/bin/activate && \ - python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" + python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" --run-dbschemacli --dbschemacli-cmd "$${DBSCHEMACLI_CMD:-dbschemacli}" run-extract-refresh: . $(VENV_DIR)/bin/activate && \ From 12da8df6d4be59482928d7dfc72b2cb039be888d Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Tue, 24 Mar 2026 14:48:42 -0700 Subject: [PATCH 10/12] updated --- data-tool/flows/refresh_extract_subset_flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index ce53f52f33..241737cd26 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -17,7 +17,7 @@ def _resolve_master_script_path(mode: str, out: str | None) -> Path: if out: return Path(out).expanduser().resolve() - return (_GENERATED_DIR / f'subset_{mode}.sql').resolve() + return (_GENERATED_DIR / f'subset_refresh.sql').resolve() def _run_cmd(argv: list[str], env: dict[str, str] | None = None) -> None: r = subprocess.run(argv, cwd=str(_REPO_ROOT), capture_output=False, text=True, env=env) From fe8a420e654ff16b785cd1768e9306df5b67dbf1 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Tue, 24 Mar 2026 15:40:32 -0700 Subject: [PATCH 11/12] fix geerating sql --- data-tool/flows/refresh_extract_subset_flow.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 241737cd26..752e6114e2 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -13,11 +13,12 @@ _SCRIPT_PATH = _REPO_ROOT / 'data-tool' / 'scripts' / 'generate_cprd_subset_extract.py' _GENERATED_DIR = _REPO_ROOT / 'data-tool' / 'scripts' / 'generated' _DEFAULT_DDL = _REPO_ROOT / 'data-tool' / 'scripts' / 'colin_corps_extract_postgres_ddl' +_SUBSET = _GENERATED_DIR / 'subset_refresh.sql' def _resolve_master_script_path(mode: str, out: str | None) -> Path: if out: return Path(out).expanduser().resolve() - return (_GENERATED_DIR / f'subset_refresh.sql').resolve() + return _SUBSET.resolve() def _run_cmd(argv: list[str], env: dict[str, str] | None = None) -> None: r = subprocess.run(argv, cwd=str(_REPO_ROOT), capture_output=False, text=True, env=env) @@ -91,8 +92,9 @@ def run_cprd_subset_extract_generator( ] if pg_fastload: argv.append('--pg-fastload') - if out is not None: - argv.extend(['--out', str(Path(out).expanduser().resolve())]) + out_path = Path(out).expanduser().resolve() if out is not None else _SUBSET.resolve() + out_path.parent.mkdir(parents=True, exist_ok=True) + argv.extend(['--out', str(out)]) return subprocess.run( argv, @@ -148,7 +150,7 @@ def extract_pull_flow( print(f'generator completed successfully') if run_dbschemacli: - master_script = _resolve_master_script_path(mode=mode, out=out) + master_script = _resolve_master_script_path(out=out) run_result = run_dbschemacli_task( master_script=str(master_script), dbschemacli_cmd=dbschemacli_cmd, From 0186f8e154d77c9de46eb7c3f7a63a546175dd88 Mon Sep 17 00:00:00 2001 From: Rajandeep Date: Thu, 26 Mar 2026 08:02:49 -0700 Subject: [PATCH 12/12] updated --- data-tool/Makefile | 5 ++-- .../flows/refresh_extract_subset_flow.py | 28 ++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/data-tool/Makefile b/data-tool/Makefile index 55a8ee67be..1e8754878c 100644 --- a/data-tool/Makefile +++ b/data-tool/Makefile @@ -112,10 +112,11 @@ run-tombstone-verify: ## Run corp tombstone verify flow run-colin-freeze: ## Run colin freeze flow . $(VENV_DIR)/bin/activate && \ python flows/colin_freeze_flow.py - +OUT ?= +RESET_EXTRACT_POSTGRES ?= run-extract-load: . $(VENV_DIR)/bin/activate && \ - python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" --run-dbschemacli --dbschemacli-cmd "$${DBSCHEMACLI_CMD:-dbschemacli}" + python flows/refresh_extract_subset_flow.py "$(CORP_FILE)" $(if $(strip $(OUT)), --out "$(OUT)",) --run-dbschemacli --dbschemacli-cmd "$${DBSCHEMACLI_CMD:-dbschemacli}" run-extract-refresh: . $(VENV_DIR)/bin/activate && \ diff --git a/data-tool/flows/refresh_extract_subset_flow.py b/data-tool/flows/refresh_extract_subset_flow.py index 752e6114e2..737ccf3dec 100644 --- a/data-tool/flows/refresh_extract_subset_flow.py +++ b/data-tool/flows/refresh_extract_subset_flow.py @@ -15,7 +15,7 @@ _DEFAULT_DDL = _REPO_ROOT / 'data-tool' / 'scripts' / 'colin_corps_extract_postgres_ddl' _SUBSET = _GENERATED_DIR / 'subset_refresh.sql' -def _resolve_master_script_path(mode: str, out: str | None) -> Path: +def _resolve_master_script_path(out: str | None) -> Path: if out: return Path(out).expanduser().resolve() return _SUBSET.resolve() @@ -35,11 +35,11 @@ def require_file(path: str | Path, description: str) -> Path: def _reset_extract_postgres_db() -> None: cfg = get_named_config() - dbname = cfg.DATABASE_NAME_COLIN_MIGR - host = cfg.DATABASE_HOST_COLIN_MIGR - port = str(cfg.DATABASE_PORT_COLIN_MIGR) - user = cfg.DATABASE_USERNAME_COLIN_MIGR - password = cfg.DATABASE_PASSWORD_COLIN_MIGR + dbname = cfg.DB_NAME_COLIN_MIGR + host = cfg.DB_HOST_COLIN_MIGR + port = str(cfg.DB_PORT_COLIN_MIGR) + user = cfg.DB_USER_COLIN_MIGR + password = cfg.DB_PASSWORD_COLIN_MIGR require_file(_DEFAULT_DDL, 'Extract DDL File') @@ -47,14 +47,16 @@ def _reset_extract_postgres_db() -> None: run_env = dict(os.environ) if password and 'PGPASSWORD' not in run_env: run_env['PGPASSWORD'] = password + safe_db = str(dbname).replace("'", "''") terminate_sql = ( - "SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity" - f"WHERE datname = {dbname} AND pid <> pg_backend_pid();" + "SELECT pg_terminate_backend(pg_stat_activity.pid) " + "FROM pg_stat_activity " + f"WHERE datname = '{safe_db}' AND pid <> pg_backend_pid();" ) _run_cmd(['psql', *pg_flags, '-d', 'postgres', '-c', terminate_sql ], env=run_env) - _run_cmd(['dropdb', *pg_flags, '-d', '--if-exists', '-c', dbname ], env=run_env) - _run_cmd(['createdb', *pg_flags, '-d', '-T', 'template0', dbname ], env=run_env) - _run_cmd(['psql', *pg_flags, '-d', dbname, '-v', 'ON_ERROR_STOP=1', '-f', str[_DEFAULT_DDL] ], env=run_env) + _run_cmd(['dropdb', *pg_flags, '--maintenance-db=postgres', '--if-exists', dbname ], env=run_env) + _run_cmd(['createdb', *pg_flags, '--maintenance-db=postgres', '-T', 'template0', dbname ], env=run_env) + _run_cmd(['psql', *pg_flags, '-d', dbname, '-v', 'ON_ERROR_STOP=1', '-f', str(_DEFAULT_DDL) ], env=run_env) @task(name='Cleanup-Extract-Postgres', cache_policy=NO_CACHE) def cleanup_extract_postgres_db() -> None: @@ -127,7 +129,7 @@ def extract_pull_flow( out: str | None=None, run_dbschemacli: bool = False, dbschemacli_cmd: str = 'dbschemacli', - reset_extract_postgres: bool = False, + reset_extract_postgres: bool = True, ) -> None: """ Generate files @@ -171,5 +173,5 @@ def extract_pull_flow( p.add_argument('--out', default=None, help='Output path for generated master script.') p.add_argument('--run-dbschemacli', action='store_true') p.add_argument('--dbschemacli-cmd', default='dbschemacli') - p.add_argument('--reset-extract-postgres', action='store_true') + p.add_argument('--reset-extract-postgres', action='store_false') extract_pull_flow(**vars(p.parse_args())) \ No newline at end of file