Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
16d166f
update ignore files
austensen May 27, 2026
f2495a0
update docker and python deps for security
austensen May 27, 2026
b61884d
Runtime controls + schema plumbing + reprocess selectors
austensen May 27, 2026
f60d2c5
Run manifest + stage checkpointing + single-run locking
austensen May 27, 2026
a10632e
ETL module structure (flat split, move-only)
austensen May 27, 2026
dcc3c32
Parse/load memory reduction and CSV preprocessing reduction
austensen May 27, 2026
596e208
PR #19 appearanceid + S3 publish fixes
austensen May 27, 2026
00840e6
Incremental geocoding with delta extraction and DB upsert
austensen May 27, 2026
57de240
Multi-address geocode row keys
austensen May 27, 2026
ee8d3d7
Atomic staging->main promotion hardening
austensen May 27, 2026
db12b02
Publish optimization + operational hardening
austensen May 28, 2026
570ed86
Schema-safe table bootstrap before staging import
austensen May 28, 2026
285a4c1
remove unused prep_db
austensen May 28, 2026
884a632
Batch parse & staging export (#20)
austensen May 28, 2026
5909343
update readmes
austensen May 29, 2026
35c7eda
prevent dropped remote db connections for long processes
austensen May 29, 2026
92a5dc2
add DB connection issue protections, suppress geosupport logs
austensen May 29, 2026
0674af2
adjust geocode candidate selection
austensen May 29, 2026
ae51651
reorder steps for better grouping (publish all files together) and av…
austensen May 29, 2026
bb68234
Geom SQL (schema, promotion, views, upsert)
austensen May 29, 2026
1a30fff
CSV geocode + weekly pipeline rewire
austensen May 29, 2026
e45d3c1
RDS backfill CLI
austensen May 29, 2026
1abb194
update Documentation for geocoding changes
austensen May 29, 2026
e80073e
fix bug uploading tempfiles to s3
austensen May 30, 2026
e6a2d6f
update ignore files
austensen Jun 3, 2026
d9e53e6
simplify parser progress logging for non-interactive
austensen Jun 3, 2026
a3e6ef4
handle cases marked for deletion (#22)
austensen Jun 4, 2026
e3cc8a1
Handle failed parse (#23)
austensen Jun 4, 2026
51c5dd8
add env flag to skip csv s3 publish when reprocessing (#24)
austensen Jun 5, 2026
1ab7c41
fix minor bug in threading for parsers that raised exception but not …
austensen Jun 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,23 +1,73 @@
# configs and git
.git/
.gitattributes
.gitignore
.github/
.idea/
.cursor/

# python and docker
.venv/
.env
.env.*
.env.example
Dockerfile
docker-compose.yml
dockerhub-publish.sh
__pycache__/
*.py[cod]
*$py.class
.pytest_cache/
.mypy_cache/
.ruff_cache/
.coverage
htmlcov/
*.egg-info/
dist/
build/
.ipynb_checkpoints

# data folders
# credentials and keys (never ship in image layers)
*.pem
*.key
*.p12
*.pfx
**/id_rsa
**/id_rsa.pub
**/.ssh/
*kubeconfig*
secrets/
credentials/

# ops / deploy (runtime uses env vars, not these files)
k8s/
notebooks/
docs/
tests/
hooks/
README.md
LICENSE
run.sh
geocoder_test.py
example.output.txt
requirements.txt
.python-version

# data folders and local databases
staging.db
staging.duckdb*
data/
data-raw/
data-clean/
data-private/
data-public/
lib/data-private/
lib/data-public/

# logs and temp
*.log
*.tmp

# macos
.DS_Store
.Trash-0/
.Trash-0/
58 changes: 48 additions & 10 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,35 +1,73 @@
# Mode (level 1 or 2)
# 2 = full S3 publish (production default)
MODE=2

# PostgreSQL target schema (optional)
# Sets session search_path for the ETL run. Leave empty for public.
DB_SCHEMA=

# Optional S3 key prefix for private/ and public/ paths (e.g. refactor/ for isolated runs)
S3_PREFIX=

# Reprocess zip files from S3 private/ backups by filename glob
# Example: LandlordTenant.Incr.2024-*.zip
REPROCESS_GLOB=

# When true, replay REPROCESS_GLOB matches even if already completed in etl_files manifest
FORCE_REPROCESS=false

# When true, fail parse_xml and abort before export/promote if any case-level parse errors occur
PARSE_FAIL_FAST=false

# When true, skip post-promote RDS public CSV export and S3 encryption normalization (reprocess throughput only)
SKIP_PUBLIC_PUBLISH=false

# Geocoding and CSV tuning (optional; safe defaults preserve current behavior)
GEOCODE_WORKERS=
CENSUS_BATCH_CHUNK_SIZE=2500
CSV_ROW_CHECK_CHUNK_SIZE=1000

# Parser → DuckDB write batching (Option A; enabled by default)
PARSE_WRITE_BATCH_ENABLED=1
PARSE_WRITE_BATCH_SIZE=128
PARSE_WRITE_FLUSH_EVERY_N_CASES=16

# PostgreSQL TCP keepalives (optional; reduce idle disconnects on long ETL runs)
# DB_KEEPALIVES=1
# DB_KEEPALIVES_IDLE=60
# DB_KEEPALIVES_INTERVAL=10
# DB_KEEPALIVES_COUNT=5

# Long-running SQL timeout in milliseconds (default 3600000 = 1 hour)
# DB_STATEMENT_TIMEOUT_MS=3600000

# The database URL
# ----------------
#
# This is the postgres instance the parsed cases will
# load data into.
#
# If you use the Dockerfile you don't need to change this. Otherwise make this the remote
# PostgreSQL instance where parsed cases are loaded and promoted.
# With Docker Compose, point this at your RDS instance (or local db service).

DATABASE_URL=

# Optional clone/sync target (legacy maintenance path)
# CLONED_DATABASE_URL=

# Amazon Web Services (AWS) configuration
# ---------------------------------------
#
# These are used to move data to/from S3 bucket
# If you are using AWS Lambda you do not need to include AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
# please configure https://docs.aws.amazon.com/lambda/latest/dg/lambda-intro-execution-role.html
# Used to move data to/from the S3 bucket and for RDS aws_s3 import/export.
# On ECS/Lambda you can omit keys and use an IAM role instead.

AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_S3_BUCKET_NAME=


# OCA SFTP credentials
# ---------------------------------------
#
# These are used to download raw XML and CSV files
# Used to download new raw XML zip files from OCA

SFTP_HOST=
SFTP_USER=
SFTP_PSWD=
SFTP_DIR=
SFTP_DIR=
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ data-public/
.DS_Store
.Trash-0/

staging.duckdb*
staging.duckdb*

k8s/*-kubeconfig.yaml
k8s/oca-etl-secret.yaml
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
FROM --platform=linux/amd64 python:3.12-slim-bookworm
FROM --platform=linux/amd64 python:3.12.13-slim-trixie
ENV TZ=America/New_York

# Update package lists and setup Python with uv
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
openssh-client curl ca-certificates python3 unzip && \
openssh-client curl ca-certificates unzip && \
rm -rf /var/lib/apt/lists/*
ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && rm /uv-installer.sh
Expand Down
92 changes: 77 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ This work is licensed under a [Creative Commons Attribution-NonCommercial-ShareA

## CSV Files

[![Date Last Updated](https://oca-2-dev.s3.amazonaws.com/public/last-updated-shield.png)](https://oca-2-dev.s3.amazonaws.com/public/last-updated-date.txt)
[![Date Last Updated](https://oca-2-dev.s3.amazonaws.com/public/last-updated-shield.svg)](https://oca-2-dev.s3.amazonaws.com/public/last-updated-date.txt)

* [`oca_index`](https://oca-2-dev.s3.amazonaws.com/public/oca_index.csv)
* [`oca_causes`](https://oca-2-dev.s3.amazonaws.com/public/oca_causes.csv)
Expand All @@ -43,36 +43,98 @@ The data we receive from OCA is an extract of all landlord and tenant cases in N

## About the code

For information about the details of various components, see [`/lib`](/lib)
The ETL pipeline lives under [`lib/`](lib/). See [`lib/README.md`](lib/README.md) for stage-by-stage architecture, module map, and SQL script roles.

### Local Setup
### Local setup

First, you will only be able to run this yourself if you have HDC's credentials to access to the SFTP to get the raw data transfered from OCA and access to the private AWS S3 where those files are stored.
You need credentials for OCA SFTP and the BetaNYC AWS for S3 (file storage) and RDS (PostgreSQL database), plus Docker and Docker Compose.

You will need Docker and Docker Compose.
Copy the example env file and fill in credentials:

First, you'll want to create an `.env` file by copying the example one:

```
```bash
cp .env.example .env # Or 'copy .env.example .env' on Windows
```

Take a look at the `.env` file and fill in the AWS S3 credentials.
Required variables: `DATABASE_URL`, `AWS_*`, `SFTP_*`, and `MODE=2` for full publish. Optional runtime controls are documented in [`.env.example`](.env.example).

**Typical weekly run** (process new SFTP files only; geocodes addresses in the staging CSV before S3 upload, then promotes and publishes):

```bash
docker compose run --rm app python oca_update.py
```

To run the whole process in the docker container run:
**RDS geocode backfill** (on-demand; rows in `oca_addresses` where `lat IS NULL` only; does not publish public CSVs):

```bash
docker compose run --rm app python oca_geocode_backfill.py
```
docker-compose up

Use the same `DATABASE_URL` and `DB_SCHEMA` as weekly ETL. Optional flags: `--geocode-workers`, `--census-batch-chunk-size` (or env `GEOCODE_WORKERS`, `CENSUS_BATCH_CHUNK_SIZE`). After backfill, run view rebuild + publish separately if S3 public files must reflect new coordinates.

**Refactor / replay run** (isolated schema and S3 prefix, force replay from S3 private backups):

```bash
docker compose run --rm app env \
DB_SCHEMA=refactor \
S3_PREFIX=refactor/ \
REPROCESS_GLOB='LandlordTenant.Incr.2025-*.zip' \
FORCE_REPROCESS=true \
SKIP_PUBLIC_PUBLISH=true \
GEOCODE_WORKERS=2 \
python oca_update.py
```

### Jupyter notebook for maintenance
Bulk reprocess with `SKIP_PUBLIC_PUBLISH=true` updates RDS and private backups only; public S3 CSVs stay stale until you run once with `SKIP_PUBLIC_PUBLISH=false` (or unset).

Compose reads `.env` from the repo root for `DATABASE_URL`, AWS, and SFTP. Override any variable inline with `env VAR=value ...` as above.

Comment out `CMD ["python", "oca_update.py"]` in the Dockerfile
Run the test suite in Docker:

```bash
docker compose run --rm app python -m unittest discover -s tests -p "test_*.py"
```
docker-compose up -d
docker-compose exec app /bin/bash

### Weekly scheduling and Kubernetes

See [`docs/operations/weekly-etl-scheduling.md`](docs/operations/weekly-etl-scheduling.md) for:

- local Docker + **cron** (weekly example),
- **Kubernetes CronJob** (`k8s/k8s-cron-job.yaml`, 2Gi memory limit, secrets via `oca-etl-secrets`),
- **AWS EventBridge + ECS Fargate** (weekly task schedule).

Create cluster secrets from [`k8s/oca-etl-secret.example.yaml`](k8s/oca-etl-secret.example.yaml); do not commit real credentials.

### Runtime controls

Optional env vars (and matching `oca_update.py` CLI flags) tune isolation, replay, memory, and parse throughput. When unset, defaults preserve standard weekly behavior: new SFTP files only, `public` schema, CPU-count geocode workers.

| Variable | Purpose | Default |
|----------|---------|---------|
| `DB_SCHEMA` | PostgreSQL `search_path` target | `public` |
| `S3_PREFIX` | Prefix for `private/` and `public/` S3 keys | none |
| `REPROCESS_GLOB` | Filename glob for S3 private zip replay | none |
| `FORCE_REPROCESS` | Replay manifest-completed glob matches | `false` |
| `SKIP_PUBLIC_PUBLISH` | Skip post-promote RDS→S3 public CSV export and SSE normalize | `false` |
| `PARSE_FAIL_FAST` | Abort before export/promote on any case-level parse failure | `false` |
| `GEOCODE_WORKERS` | Geosupport multiprocessing pool size | CPU count |
| `CENSUS_BATCH_CHUNK_SIZE` | Census batch geocoder chunk | `2500` |
| `CSV_ROW_CHECK_CHUNK_SIZE` | Staging CSV preprocess / row-check chunk | `1000` |
| `PARSE_WRITE_BATCH_ENABLED` | Buffer parser DuckDB writes in txn windows | `1` (on) |
| `PARSE_WRITE_BATCH_SIZE` | Max buffered INSERTs before flush | `128` |
| `PARSE_WRITE_FLUSH_EVERY_N_CASES` | Flush cadence per parse worker | `16` |
| `DB_KEEPALIVES_*` | PostgreSQL TCP keepalive tuning (see `.env.example`) | RDS-friendly defaults |

Long runs (multi-hour XML parse, S3 upload, geocoding) may idle the RDS connection; the pipeline uses TCP keepalives and automatic reconnect (`ensure_connection`) before RDS-heavy stages. Optional `DB_KEEPALIVES_IDLE` / `DB_KEEPALIVES_INTERVAL` / `DB_KEEPALIVES_COUNT` override libpq defaults.

Use an isolated `S3_PREFIX` (e.g. `refactor/`) for refactor and end-to-end test runs so reads and writes stay out of production public paths. Memory target per job is **≤ 2 GiB**; lower `GEOCODE_WORKERS` if geocoding approaches the limit.

### Jupyter notebook for maintenance

Comment out `CMD ["python", "oca_update.py"]` in the Dockerfile, then:

```bash
docker compose up -d
docker compose exec app /bin/bash
jupyter notebook --allow-root --ip 0.0.0.0 --no-browser
```

Expand Down
16 changes: 15 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,25 @@ services:
user: 'root'
volumes:
- .:/app
# Keep Linux .venv out of the host bind mount (macOS .venv breaks imports in-container).
- app_venv:/app/.venv
environment:
DATABASE_URL: ${DATABASE_URL}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_S3_BUCKET_NAME: ${AWS_S3_BUCKET_NAME}
SFTP_HOST: ${SFTP_HOST}
SFTP_USER: ${SFTP_USER}
SFTP_PSWD: ${SFTP_PSWD}
SFTP_DIR: ${SFTP_DIR}
# to debug in the container uncomment below
# then use use > docker compose up -d
# > docker compose exec app /bin/bash
# lastly run jupyter using > uv pip install notebook and
# > jupyter notebook --allow-root --ip 0.0.0.0 --no-browser
# ports:
# - 8888:8888
tty: true
tty: true

volumes:
app_venv:
51 changes: 51 additions & 0 deletions dockerhub-publish.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash
set -eo pipefail

# --- Configuration ---
DOCKER_USER="justfix"
DOCKER_TEAM="justfixnyc"
REPO_NAME="oca"
IMAGE_TAG="latest" # Or use a dynamic tag like $1 or a version number

FULL_IMAGE_NAME="${DOCKER_TEAM}/${REPO_NAME}:${IMAGE_TAG}"
DOCKERFILE_PATH="./Dockerfile" # Path to your Dockerfile

# Ensure credentials are set as environment variables for security
if [ -z "$DOCKER_PASSWORD" ]; then
echo "Error: DOCKER_PASSWORD environment variable not set."
exit 1
fi
# ---------------------

echo "Starting Docker image build and push process..."

# 1. Log in to Docker Hub using standard input for the password for security
echo "Logging in to Docker Hub..."
echo "$DOCKER_PASSWORD" | docker login --username "$DOCKER_USER" --password-stdin
if [ $? -ne 0 ]; then
echo "Error: Docker login failed."
exit 1
fi
echo "Successfully logged in."

# 2. Build the Docker image
echo "Building image: ${FULL_IMAGE_NAME} from ${DOCKERFILE_PATH}..."
docker build -f "${DOCKERFILE_PATH}" -t "${FULL_IMAGE_NAME}" .
if [ $? -ne 0 ]; then
echo "Error: Docker build failed."
exit 1
fi
echo "Successfully built image."

# 3. Push the image to Docker Hub
echo "Pushing image: ${FULL_IMAGE_NAME} to Docker Hub..."
docker push "${FULL_IMAGE_NAME}"
if [ $? -ne 0 ]; then
echo "Error: Docker push failed."
exit 1
fi
echo "Successfully pushed image to Docker Hub."

# Optional: Log out of Docker Hub after pushing
docker logout
echo "Logged out of Docker Hub."
Loading